In [None]:
import os 
import pandas as pd 
import numpy as np 
import matplotlib.pyplot as plt
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, roc_auc_score
from sklearn.metrics import roc_curve, auc, det_curve
from sklearn.model_selection import cross_val_predict, StratifiedKFold, GridSearchCV
from scipy.interpolate import interp1d

from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, label_binarize
from sklearn.pipeline import make_pipeline
from sklearn.impute import KNNImputer

from sklearn.metrics import (
    accuracy_score,
    roc_auc_score,
    precision_score,
    recall_score,
    f1_score,
    precision_recall_curve,
    auc,
    confusion_matrix
)



In [None]:
DATA_ROOT = '../data'
clincopathology = pd.read_csv(os.path.join(DATA_ROOT, 'clincopathology_features.csv'))
clinical = pd.read_csv(os.path.join(DATA_ROOT, 'clinical_features.csv'))
pathology = pd.read_csv(os.path.join(DATA_ROOT, 'pathology_features.csv'))

In [None]:
clinical_features = clinical.drop(columns=['pid', 'diagnosis'])
pathology_features = pathology.drop(columns=['pid', 'diagnosis'])
clincopathology_features = clincopathology.drop(columns=['pid', 'diagnosis'])
outcomes = clinical['diagnosis']

## Multiclass Logistic Regression with nested CV, return best outcome with the best hyperparameter

In [None]:
def nested_multiclass_lr_cv(features, outcomes):
    # outer loop: 10-fold cv 
    outer_cv = StratifiedKFold(n_splits=10, shuffle=True, random_state=42)

    # define hyperparams for grid search 
    param_grid = {
        'logisticregression__C': [0.1, 1, 10],  # regularization strength
        'logisticregression__penalty': ['l1', 'l2'],
    }

    # Initialize metrics lists
    outer_f1_scores = []  # weighted F1 score per fold (weighted by the number of samples)
    class_f1_scores = {cls: [] for cls in np.unique(outcomes)}  # per-class F1 scores

    # track predictions for confusion matrix 
    all_y_true = []
    all_y_pred = []

    # define outer cv loop for performance evaluation
    for train_index, test_index in outer_cv.split(features, outcomes):
        X_train, X_test = features.iloc[train_index], features.iloc[test_index]
        y_train, y_test = outcomes.iloc[train_index], outcomes.iloc[test_index]

        # define inner cv loop for hyperparam tuning 
        inner_cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

        # define model pipeline 
        pipeline = make_pipeline(
            KNNImputer(n_neighbors=5, weights='distance'),
            StandardScaler(),
            LogisticRegression( # multinomial lr automatically chosen 
                solver="saga",  # supports both l1 and l2 and multinomidal
                max_iter=500,
                random_state=42
            )
        )

        # grid search for inner cv loop 
        grid_search = GridSearchCV(pipeline, param_grid, cv=inner_cv, scoring='f1_weighted', n_jobs=-1)
        grid_search.fit(X_train, y_train)

        # re-train model using X_train and the best hyperparameters found during grid search 
        best_model = grid_search.best_estimator_

        # get predictions on X_test
        y_pred = best_model.predict(X_test)
        y_pred_proba = best_model.predict_proba(X_test)

        # track predictions and true labels for entire dataset tracking
        # (used to contruct a confusion matrix after the outer cv loop) 
        all_y_true.extend(y_test)
        all_y_pred.extend(y_pred)

        # fold f1-scores 
        fold_class_f1_scores = f1_score(y_test, y_pred, average=None, labels=np.unique(outcomes))
        for cls, f1 in zip(np.unique(outcomes), fold_class_f1_scores):
            class_f1_scores[cls].append(f1)

        fold_weighted_f1 = f1_score(y_test, y_pred, average='weighted') # weighted sum of per-class f1-score, weighted by number of samples
        outer_f1_scores.append(fold_weighted_f1)

    conf_matrix = confusion_matrix(all_y_true, all_y_pred, labels=np.unique(outcomes))
    report = classification_report(all_y_true, all_y_pred, target_names=[str(cls) for cls in np.unique(outcomes)])

    print("Weighted F1 Scores across folds:", outer_f1_scores)
    print(f"Average Weighted F1 Score (Sd): {np.mean(outer_f1_scores):.3f} ({np.std(outer_f1_scores):.3f})")
    print("Per-Class Average F1 Scores (SD):")
    for cls in class_f1_scores:
        print(f"Class {cls}: {np.mean(class_f1_scores[cls]):.3f} ({np.std(class_f1_scores[cls]):.3f})")
    print("\nConfusion Matrix:")
    print(conf_matrix)
    print("\nClassification Report:")
    print(report)

    return conf_matrix

In [None]:
def nested_multiclass_lr_cv_late_fusion_with_weights(clinical_features, pathology_features, outcomes):
    # outer loop: 10-fold cv 
    outer_cv = StratifiedKFold(n_splits=10, shuffle=True, random_state=42)

    # define hyperparam for grid search 
    param_grid = {
        'logisticregression__C': [0.1, 1, 10],  # reg strength
        'logisticregression__penalty': ['l1', 'l2'],
        'fusion_weight': [[0.5, 0.5], [0.6, 0.4], [0.4, 0.6], [0.7, 0.3], [0.3, 0.7]]  # weighing two single-modality models 
    }

    # track metrics 
    outer_f1_scores = []  # Weighted F1 score per fold (weighted by the number of samples)
    class_f1_scores = {cls: [] for cls in np.unique(outcomes)}  # Per-class F1 scores

    # track predictions for confusion matrix 
    all_y_true = []
    all_y_pred = []

    for train_index, test_index in outer_cv.split(clinical_features, outcomes):
        X_train_clinical, X_test_clinical = clinical_features.iloc[train_index], clinical_features.iloc[test_index]
        X_train_pathology, X_test_pathology = pathology_features.iloc[train_index], pathology_features.iloc[test_index]
        y_train, y_test = outcomes.iloc[train_index], outcomes.iloc[test_index]

        # inner cv
        inner_cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

        # clinical-only model pipeline 
        clinical_pipeline = make_pipeline(
            KNNImputer(n_neighbors=5, weights='distance'),
            StandardScaler(),
            LogisticRegression(
                solver="saga",
                max_iter=500,
                random_state=42
            )
        )

        # pathology-only model pipeline 
        pathology_pipeline = make_pipeline(
            KNNImputer(n_neighbors=5, weights='distance'),
            StandardScaler(),
            LogisticRegression(
                solver="saga",
                max_iter=500,
                random_state=42
            )
        )

        # git
        clinical_pipeline.fit(X_train_clinical, y_train)
        pathology_pipeline.fit(X_train_pathology, y_train)

        # get pred prob
        clinical_proba = clinical_pipeline.predict_proba(X_test_clinical)
        pathology_proba = pathology_pipeline.predict_proba(X_test_pathology)

        # inner loop grid search 
        best_score = -np.inf
        best_weight = None
        best_params = None

        for fusion_weight in param_grid['fusion_weight']:
            for penalty in ['l1', 'l2']:
                for C in [0.1, 1, 10]:
                    # Apply weight combination
                    combined_proba = fusion_weight[0] * clinical_proba + fusion_weight[1] * pathology_proba
                    unique_classes = np.unique(outcomes)
                    predicted_indices = np.argmax(combined_proba, axis=1)
                    y_pred = unique_classes[predicted_indices]

                    # eval by f1 score 
                    fold_score = f1_score(y_test, y_pred, average='weighted')

                    if fold_score > best_score:
                        best_score = fold_score
                        best_weight = fusion_weight
                        best_params = {'C': C, 'penalty': penalty}

        # final predictions using the best hyperparams 
        combined_proba = best_weight[0] * clinical_proba + best_weight[1] * pathology_proba
        unique_classes = np.unique(outcomes)
        predicted_indices = np.argmax(combined_proba, axis=1)
        y_pred = unique_classes[predicted_indices]

        # track predictions 
        all_y_true.extend(y_test)
        all_y_pred.extend(y_pred)

        # Fold F1 scores
        fold_class_f1_scores = f1_score(y_test, y_pred, average=None, labels=np.unique(outcomes))
        for cls, f1 in zip(np.unique(outcomes), fold_class_f1_scores):
            class_f1_scores[cls].append(f1)

        fold_weighted_f1 = f1_score(y_test, y_pred, average='weighted')
        outer_f1_scores.append(fold_weighted_f1)

    # Calculate evaluation metrics
    conf_matrix = confusion_matrix(all_y_true, all_y_pred, labels=np.unique(outcomes))
    report = classification_report(
        all_y_true, all_y_pred,
        target_names=[str(cls) for cls in np.unique(outcomes)]
    )

    print("Weighted F1 Scores across folds:", outer_f1_scores)
    print(f"Average Weighted F1 Score (SD): {np.mean(outer_f1_scores):.3f} ({np.std(outer_f1_scores):.3f})")
    print("Per-Class Average F1 Scores (SD):")
    for cls in class_f1_scores:
        print(f"Class {cls}: {np.mean(class_f1_scores[cls]):.3f} ({np.std(class_f1_scores[cls]):.3f})")
    print("\nConfusion Matrix:")
    print(conf_matrix)
    print("\nClassification Report:")
    print(report)

    return conf_matrix

In [None]:
c_conf = nested_multiclass_lr_cv(clinical_features, outcomes)

In [None]:
p_conf = nested_multiclass_lr_cv(pathology_features, outcomes)

In [None]:
ef_conf = nested_multiclass_lr_cv(clincopathology_features, outcomes) # early-fusion confusion matrix

In [None]:
lf_matrix = nested_multiclass_lr_cv_late_fusion(clinical_features, pathology_features, outcomes)

## Plot Confusion Matrix

In [None]:
def plot_confusion_matrix(conf_matrix, class_names, title=""):
    """
    Plots a confusion matrix using matplotlib.

    Parameters:
    - conf_matrix (array-like): Confusion matrix (2D array).
    - class_names (list): List of class names corresponding to the confusion matrix indices.
    """
    fig, ax = plt.subplots(figsize=(8,8))
    im = ax.imshow(conf_matrix, interpolation='nearest', cmap='Blues')
    # ax.figure.colorbar(im, ax=ax)

    ax.set(xticks=np.arange(conf_matrix.shape[1]),
           yticks=np.arange(conf_matrix.shape[0]),
           xticklabels=class_names, yticklabels=class_names)
    ax.set_title(title, fontsize=16)
    ax.set_xlabel("Predicted Label", fontsize=16)
    ax.set_ylabel("True Label", fontsize=16)

    # Rotate the tick labels and set their alignment
    plt.setp(ax.get_xticklabels(), rotation=45, ha="right", rotation_mode="anchor")

    # Loop over data dimensions and create text annotations.
    thresh = conf_matrix.max() / 2.
    for i in range(conf_matrix.shape[0]):
        for j in range(conf_matrix.shape[1]):
            ax.text(j, i, format(conf_matrix[i, j], 'd'),
                    ha="center", va="center",
                    color="white" if conf_matrix[i, j] > thresh else "black")
    fig.tight_layout()
    plt.show()

In [None]:
class_names = [str(cls) for cls in np.unique(outcomes)]
class_names_str = [
    'psoriasis',
    'seboreic dermatitis',
    'lichen planus',
    'pityriasis rosea',
    'cronic dermatitis',
    'pityriasis rubra pilaris',
]


# Plot the confusion matrix
plot_confusion_matrix(c_conf, class_names_str, 'Clinical-only model')

In [None]:
plot_confusion_matrix(ef_conf, class_names_str, 'Early-fusion model')

In [None]:
plot_confusion_matrix(lf_matrix, class_names_str, 'late-fusion model')