# Supervised Classification Analysis of Single-cell RNA-seq Data

**Goal:** Train a classifier (LogisticRegression & LightGBM) for each cell in pancreas_islet.h5ad based on its disease label, and output prediction results and evaluation report.

In [None]:
import scanpy as sc
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, accuracy_score, confusion_matrix, f1_score
import lightgbm as lgb
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
import time
import json
import nbformat as nbf
import os
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import roc_curve, auc
from itertools import cycle
from sklearn.preprocessing import label_binarize
import warnings
import matplotlib
import joblib
import shap
import umap
warnings.filterwarnings('ignore')
# Set matplotlib backend to avoid font issues
matplotlib.use('Agg')

# Set random seed for reproducibility
np.random.seed(42)
# Configure font settings
plt.rcParams['font.family'] = ['Arial', 'DejaVu Sans', 'Liberation Sans', 'sans-serif']
plt.rcParams['font.sans-serif'] = ['Arial', 'DejaVu Sans', 'Liberation Sans', 'sans-serif']
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['figure.max_open_warning'] = 0


In [None]:
# Load data
print("Loading data...")
if os.path.exists("preprocessed_pancreas_islet.h5ad"):
    print("Loading preprocessed data...")
    adata = sc.read_h5ad("preprocessed_pancreas_islet.h5ad")
else:
    print("Loading raw data and preprocessing...")
    adata = sc.read_h5ad("pancreas_islet.h5ad")
    sc.pp.filter_genes(adata, min_cells=20)
    sc.pp.highly_variable_genes(adata, n_top_genes=3000)
    adata = adata[:, adata.var.highly_variable]
    sc.pp.normalize_total(adata, target_sum=1e4)
    sc.pp.log1p(adata)
    sc.pp.scale(adata, max_value=10)
    adata.write_h5ad("preprocessed_pancreas_islet.h5ad")
    print("Preprocessing completed, data saved!")

# Print basic data information
print(f"Data shape: {adata.shape}")
print(f"Available annotations: {list(adata.obs.columns)}")

# Verify disease label exists
if 'disease' not in adata.obs.columns:
    raise ValueError("Data does not contain 'disease' label, please check the dataset")

# Find donor identifier columns
donor_columns = []
for col in adata.obs.columns:
    if 'donor' in col.lower() or 'individual' in col.lower() or 'subject' in col.lower():
        donor_columns.append(col)

if donor_columns:
    print(f"Found possible donor-related columns: {donor_columns}")
    donor_id_column = donor_columns[0]
else:
    # If no clear donor column is found, try to find other possible grouping columns
    possible_columns = [col for col in adata.obs.columns 
                        if any(k in col.lower() for k in ['batch', 'sample', 'id', 'group'])]
    if possible_columns:
        donor_id_column = possible_columns[0]
    else:
        # If still no suitable grouping column is found, create random grouping
        print("No suitable grouping column found, using random grouping")
        adata.obs['random_group'] = np.random.randint(0, 10, size=adata.shape[0])
        donor_id_column = 'random_group'
        
print(f"Using '{donor_id_column}' as donor identifier")

# Print disease category statistics
print("\nDisease category statistics:")
print(adata.obs['disease'].value_counts())

In [None]:
# Data preprocessing
print("\nPreprocessing data...")

# Preprocess data according to requirements
with tqdm(total=5, desc="Progress of preprocessing") as pbar:
    # Filter genes
    sc.pp.filter_genes(adata, min_cells=20)
    pbar.update(1)
    
    # Select highly variable genes
    sc.pp.highly_variable_genes(adata, n_top_genes=3000)
    adata = adata[:, adata.var.highly_variable]
    pbar.update(1)
    
    # Normalize
    sc.pp.normalize_total(adata, target_sum=1e4)
    pbar.update(1)
    
    # Logarithmic transformation
    sc.pp.log1p(adata)
    pbar.update(1)
    
    # Scaling
    sc.pp.scale(adata, max_value=10)
    pbar.update(1)

print(f"Shape of preprocessed data: {adata.shape}")

In [None]:
# Split data based on donor_id (80%/20%)
print(f"\nSplitting data based on {donor_id_column}...")
donors = adata.obs[donor_id_column].unique()
train_donors, test_donors = train_test_split(donors, test_size=0.2, random_state=42, stratify=donors)

# Create training set and test set
train_mask = adata.obs[donor_id_column].isin(train_donors)
test_mask = adata.obs[donor_id_column].isin(test_donors)

train_adata = adata[train_mask]
test_adata = adata[test_mask]

print(f"Number of cells in training set: {train_adata.shape[0]}")
print(f"Number of cells in test set: {test_adata.shape[0]}")

# Extract feature matrix and labels
X_train = train_adata.X
y_train = train_adata.obs['disease'].astype(str).values  # Convert to string array
X_test = test_adata.X
y_test = test_adata.obs['disease'].astype(str).values    # Convert to string array

# Ensure X is a dense matrix (if sparse)
if hasattr(X_train, 'toarray'):
    X_train = X_train.toarray()
    X_test = X_test.toarray()

In [None]:
# Check and load pre-trained models
print("Checking if pre-trained models exist...")

if os.path.exists('lightgbm_model.txt') and os.path.exists('logistic_regression_model.pkl'):
    print("Pre-trained models found, loading...")
    
    # Load LightGBM model
    gbm = lgb.Booster(model_file='lightgbm_model.txt')
    print("LightGBM model loaded")
    
    # Load Logistic Regression model and preprocessors
    lr = joblib.load('logistic_regression_model.pkl')
    label_encoder = joblib.load('label_encoder.pkl')
    imputer = joblib.load('imputer.pkl')
    print("Logistic Regression model loaded")
    
    # Get original classes
    original_classes = label_encoder.classes_
    print(f"Loaded class mapping: {dict(zip(original_classes, range(len(original_classes))))}")
    
    # Apply saved preprocessors
    X_train = imputer.transform(X_train)
    X_test = imputer.transform(X_test)
    
    # Perform label encoding
    y_train_encoded = label_encoder.transform(y_train)
    y_test_encoded = label_encoder.transform(y_test)
    
    print("Models loaded, ready for prediction and evaluation")
else:
    print("Pre-trained models not found, need to retrain...")
    # Continue with training code


In [None]:
# Train Logistic Regression model
print("\nTraining Logistic Regression model...")
print(f"Size of training data: {X_train.shape}")

# If data is too large, can take a sample for training
if X_train.shape[0] > 10000:
    from sklearn.utils import resample
    X_train_sample, y_train_sample = resample(
        X_train, y_train, n_samples=10000, random_state=42, stratify=y_train
    )
    print(f"Data size is large, using 10000 samples for training...")
    print(f"Size of sampled training data: {X_train_sample.shape}")
else:
    X_train_sample, y_train_sample = X_train, y_train

# Check and fill NaN, to prevent Logistic Regression from crashing
imputer = SimpleImputer(strategy='constant', fill_value=0)
X_train_sample = imputer.fit_transform(X_train_sample)
X_test = imputer.transform(X_test)

# Train model
start_time = time.time()
with tqdm(total=100, desc="Training progress") as pbar:
    lr = LogisticRegression(multi_class="multinomial", solver="saga", max_iter=200, n_jobs=-1, random_state=42)
    pbar.update(10)
    lr.fit(X_train_sample, y_train_sample)
    pbar.update(90)
end_time = time.time()
print(f"Training completed! Time taken: {end_time - start_time:.2f} seconds")

# Predict
y_pred_lr = lr.predict(X_test)
y_pred_proba_lr = lr.predict_proba(X_test)

# Evaluate Logistic Regression
print("\nEvaluation of Logistic Regression model:")
lr_accuracy = accuracy_score(y_test, y_pred_lr)
lr_f1_macro = f1_score(y_test, y_pred_lr, average='macro')
print(f"Accuracy: {lr_accuracy:.4f}")
print(f"Macro-F1: {lr_f1_macro:.4f}")
print("\nClassification report:")
print(classification_report(y_test, y_pred_lr))

In [None]:
# Train LightGBM model
print("\nTraining LightGBM model...")

# Get class labels
classes = np.unique(y_train)
num_classes = len(classes)

# Create dataset
lgb_train = lgb.Dataset(X_train, y_train)  # Use full dataset
lgb_test = lgb.Dataset(X_test, y_test, reference=lgb_train)

# Set parameters
params = {
    'objective': 'multiclass',
    'num_class': num_classes,
    'metric': 'multi_logloss',
    'boosting_type': 'gbdt',
    'num_leaves': 31,
    'learning_rate': 0.05,
    'feature_fraction': 0.9,
    'num_threads': -1,  # Use all CPUs
    'verbose': -1
}

# Train model
print("Starting LightGBM training...")
gbm = lgb.train(
    params,
    lgb_train,
    num_boost_round=100,
    valid_sets=[lgb_train, lgb_test]
)

# Save LightGBM model
print("Saving LightGBM model...")
gbm.save_model('lightgbm_model.txt')
print(" LightGBM model saved as 'lightgbm_model.txt'")

# Predict
y_pred_proba_lgb = gbm.predict(X_test, num_iteration=gbm.best_iteration)
y_pred_lgb = np.argmax(y_pred_proba_lgb, axis=1)

# Convert predictions back to original labels
y_pred_lgb = label_encoder.inverse_transform(y_pred_lgb)

# Evaluate LightGBM
print("\nEvaluation of LightGBM model:")
lgb_accuracy = accuracy_score(y_test, y_pred_lgb)
lgb_f1_macro = f1_score(y_test, y_pred_lgb, average='macro')
print(f"Accuracy: {lgb_accuracy:.4f}")
print(f"Macro-F1: {lgb_f1_macro:.4f}")
print("\nClassification report:")
print(classification_report(y_test, y_pred_lgb))

# Compare two models
print("\nModel comparison:")
print(f"Logistic Regression - Accuracy: {lr_accuracy:.4f}, Macro-F1: {lr_f1_macro:.4f}")
print(f"LightGBM - Accuracy: {lgb_accuracy:.4f}, Macro-F1: {lgb_f1_macro:.4f}")

In [None]:
# Create and save prediction results CSV
print("\nCreating prediction results...")
cell_indices = range(len(test_adata.obs.index))
predictions_df = pd.DataFrame()
predictions_df['cell_index'] = cell_indices

# Ensure class names are consistent
required_classes = ['normal', 'type 1 diabetes mellitus', 'type 2 diabetes mellitus', 'endocrine pancreas disorder']

# Check if all required classes exist
missing_classes = set(required_classes) - set(original_classes)
if missing_classes:
    print(f"Warning: Some required classes are missing in data: {missing_classes}")
    print(f"Actual classes: {original_classes}")

# Add probability columns for each class (in the specified order)
for cls in required_classes:
    if cls in original_classes:
        cls_idx = np.where(original_classes == cls)[0][0]
        predictions_df[f'p_{cls}'] = lr.predict_proba(X_test)[:, cls_idx]
    else:
        # If class doesn't exist, fill with 0
        predictions_df[f'p_{cls}'] = 0

# Save prediction results
predictions_df.to_csv('predictions.csv', index=False)
print("Prediction results saved to 'predictions.csv'")

# Create confusion matrix plot
plt.figure(figsize=(10, 8))
cm = confusion_matrix(y_test, lr.predict(X_test))
sns.heatmap(cm, annot=True, fmt='g', cmap='Blues')
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.title('Confusion Matrix for Logistic Regression')
plt.tight_layout()
plt.savefig('metrics.png')
print("Confusion matrix plot saved to 'metrics.png'")

In [None]:
# UMAP Visualization
import umap
from sklearn.preprocessing import LabelEncoder

# Create UMAP embeddings
print("Creating UMAP embeddings...")
reducer = umap.UMAP(n_neighbors=15, min_dist=0.1, random_state=42)
X_test_umap = reducer.fit_transform(X_test)

# Prepare predictions
y_pred_lr = lr.predict(X_test)
y_pred_lgb_labels = label_encoder.inverse_transform(np.argmax(gbm.predict(X_test), axis=1))

# Create visualization
fig, axes = plt.subplots(1, 3, figsize=(18, 5))

# True labels
for i, disease in enumerate(np.unique(y_test)):
    mask = y_test == disease
    axes[0].scatter(X_test_umap[mask, 0], X_test_umap[mask, 1], 
                   label=disease, alpha=0.7, s=10)
axes[0].set_title('True Labels')
axes[0].legend(bbox_to_anchor=(1.05, 1), loc='upper left')

# LightGBM predictions
for i, disease in enumerate(np.unique(y_test)):
    mask = y_pred_lgb_labels == disease
    axes[1].scatter(X_test_umap[mask, 0], X_test_umap[mask, 1], 
                   label=disease, alpha=0.7, s=10)
axes[1].set_title('LightGBM Predictions')
axes[1].legend(bbox_to_anchor=(1.05, 1), loc='upper left')

# Logistic Regression predictions
for i, disease in enumerate(np.unique(y_test)):
    mask = y_pred_lr == disease
    axes[2].scatter(X_test_umap[mask, 0], X_test_umap[mask, 1], 
                   label=disease, alpha=0.7, s=10)
axes[2].set_title('Logistic Regression Predictions')
axes[2].legend(bbox_to_anchor=(1.05, 1), loc='upper left')

plt.tight_layout()
plt.savefig('umap_visualization.png', dpi=300, bbox_inches='tight')
plt.show()
print("UMAP visualization saved to 'umap_visualization.png'")