In [None]:
import numpy as np
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier
from sklearn.pipeline import Pipeline
from sklearn.model_selection import LeaveOneOut, GridSearchCV
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, FunctionTransformer
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.metrics import roc_curve
from confidenceinterval import roc_auc_score, precision_score, recall_score, f1_score, accuracy_score, tnr_score
from scipy.stats import ttest_ind
import warnings
from joblib import Parallel, delayed
import os
from sklearn.exceptions import ConvergenceWarning
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=ConvergenceWarning)
np.seterr(invalid='ignore')

os.environ['PYTHONHASHSEED'] = '0'
random_state = 42

In [22]:
# === FILE PATHS ===
project_path = '/Users/labneuro2/Documents/lab/AI_MBS/AI_MBS'

# === LOAD DATA ===
df_features_reg = pd.read_excel(f'{project_path}/local_measures_combined_atlas.xlsx') \
    .sort_values(by='Subject').reset_index(drop=True)

X_reg = df_features_reg[[col for col in df_features_reg.columns if col.split('_')[0] in ['fALFF','ALFF','ReHo']]].to_numpy()

df_clinic = pd.read_excel(f'{project_path}/clinical_data.xlsx') \
    .sort_values(by='subject')

clinic_cols = ['age_y','female_gender','type_2_diabetes','hypertension','hypothyroidism','depression',
               'BMI_kgm2','waist_cm','hips_cm','waist_hip_ratio']
binary_cols = ['female_gender','type_2_diabetes','hypertension','hypothyroidism','depression']
X_clinic = df_clinic.loc[df_clinic['post_MBS'] == 0][clinic_cols].to_numpy()

df_features_corr = pd.read_csv(f'{project_path}/FC_combined_atlas.csv') \
    .sort_values(by='Subject').reset_index(drop=True)

X_corr = df_features_corr[[col for col in df_features_corr.columns if '_to_' in col]].to_numpy()


# === COMBINE FEATURES ===
X_fmri = np.hstack((X_corr, X_reg))
X_all = np.hstack((X_fmri, X_clinic))

X_reg_clinic = np.hstack((X_reg, X_clinic))
X_corr_clinic = np.hstack((X_corr, X_clinic))

# === LABELS: weight loss success (>50% overweight lost within a year) ===
y = (df_clinic.loc[df_clinic['post_MBS'] == 1]['overweight_delta_proc_to_baseline_kg'] < -50).astype(int).to_numpy()


In [25]:
# FeatureSelector selects top-k features based on t-test statistics between two classes
class FeatureSelector(BaseEstimator, TransformerMixin):
    def __init__(self, k=10):
        self.k = k
        self.selected_features_ = None

    def fit(self, X, y):
        t_values, p_values = ttest_ind(X[y == 0], X[y == 1], axis=0)
        self.selected_features_ = np.argsort(np.abs(t_values))[-self.k:]
        return self

    def transform(self, X):
        if self.selected_features_ is None:
            raise ValueError("The FeatureSelector has not been fitted yet.")
        return X[:, self.selected_features_]

# DynamicPreprocessor scales only continuous features, leaves binary features unchanged
class DynamicPreprocessor(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        # Identify binary features
        self.binary_mask_ = np.array([np.array_equal(np.unique(X[:, i]), [0, 1]) for i in range(X.shape[1])])
        self.continuous_features_ = np.where(~self.binary_mask_)[0]
        self.binary_features_ = np.where(self.binary_mask_)[0]
        # Fit scaler only on continuous features
        self.scaler_ = StandardScaler().fit(X[:, self.continuous_features_])
        return self

    def transform(self, X):
        X_transformed = X.copy()
        # Scale continuous features, leave binary features unchanged
        X_transformed[:, self.continuous_features_] = self.scaler_.transform(X[:, self.continuous_features_])
        return X_transformed

In [26]:
def process_fold(train_idx, test_idx, X, y, option):
    # Suppress warnings for cleaner output
    warnings.filterwarnings("ignore", category=UserWarning)
    warnings.filterwarnings("ignore", category=ConvergenceWarning)

    # Extract parameters for this fold
    param_grid = option['param_grid'] 
    cv = option['cv'] 
    clf = option['classifier']

    # Split data into training and test sets for this fold
    X_train, X_test = X[train_idx], X[test_idx]
    y_train, y_test = y[train_idx], y[test_idx]

    # Build pipeline: feature selection, scaling, then classification
    pipeline = Pipeline([
        ('selector', FeatureSelector()),
        ('scaler', DynamicPreprocessor()),
        ('classifier', clf)
    ])
    
    # Perform grid search cross-validation to find best hyperparameters
    best_pipeline = GridSearchCV(pipeline, param_grid, cv=cv, scoring='roc_auc', n_jobs=-1)
    best_pipeline.fit(X_train, y_train)

    # Retrieve best feature selector and scaler from the pipeline
    best_selector = best_pipeline.best_estimator_['selector']
    best_scaler = best_pipeline.best_estimator_['scaler']
    
    # Apply feature selection to train and test sets
    X_train_selected = best_selector.transform(X_train)
    X_test_selected = best_selector.transform(X_test)

    # Scale the selected features
    X_train_scaled = best_scaler.transform(X_train_selected)
    X_test_scaled = best_scaler.transform(X_test_selected)

    # Get the best classifier and predict probabilities for the test sample
    best_model = best_pipeline.best_estimator_['classifier']
    y_pred_proba = best_model.predict_proba(X_test_scaled)[:, 1]

    # Return true label and predicted probability for this fold
    return y_test[0], y_pred_proba[0]

In [27]:
def run_loo_with_options(X, y, options):
    # This function performs Leave-One-Out cross-validation for each option in options.
    # For each option, it collects predictions for all folds, computes performance metrics,
    # and prints summary statistics.
    results = []

    for option in options:
        # Leave-One-Out cross-validation
        loo = LeaveOneOut()
        # Parallelize the processing of each fold
        fold_results = Parallel(n_jobs=-1)(
            delayed(process_fold)(train_idx, test_idx, X, y, option) for train_idx, test_idx in loo.split(X, y)
        )

        # Collect true labels and predicted probabilities from all folds
        y_true_all = np.array([res[0] for res in fold_results])
        y_pred_proba_all = np.array([res[1] for res in fold_results])
        y_pred = (y_pred_proba_all >= .5).astype(int)
        
        # Calculate metrics and their confidence intervals
        auc, auc_ci = roc_auc_score(y_true_all, y_pred_proba_all, confidence_level=0.95)
        accuracy, accuracy_ci = accuracy_score(y_true_all, y_pred, confidence_level=0.95)
        sensitivity, sensitivity_ci = recall_score(y_true_all, y_pred, confidence_level=0.95)
        specificity, specificity_ci = tnr_score(y_true_all, y_pred, confidence_level=0.95)
        precision, precision_ci = precision_score(y_true_all, y_pred, confidence_level=0.95)
        f1, f1_ci = f1_score(y_true_all, y_pred, confidence_level=0.95)

        # Store results for this option
        results.append({
            'model': option['classifier'],
            'param_grid': option['param_grid'],
            'cv': option['cv'],
            'accuracy': round(accuracy, 4),
            'auc': round(auc, 4),
            'sensitivity': round(sensitivity, 4),
            'specificity': round(specificity, 4),
            'precision': round(precision, 4),
            'f1': round(f1, 4),
            'accuracy_ci': [round(c, 4) for c in accuracy_ci],
            'auc_ci': [round(c, 4) for c in auc_ci],
            'sensitivity_ci': [round(c, 4) for c in sensitivity_ci],
            'specificity_ci': [round(c, 4) for c in specificity_ci],
            'precision_ci': [round(c, 4) for c in precision_ci],
            'f1_ci': [round(c, 4) for c in f1_ci],
            'y_pred_proba': y_pred_proba_all
        })
        # Print summary for this option
        print(f"Option: {results[-1]['model']}, params {results[-1]['param_grid']}")
        print(f" ACC: {results[-1]['accuracy']:.3f}, AUC: {results[-1]['auc']:.3f}")
    return results

In [28]:
# Mapping from sklearn/xgboost class names to human-readable model names
MODEL_NAME_MAPPING = {
    "SVC": "Support Vector Machine",
    "LogisticRegression": "Logistic Regression",
    "RandomForest": "Random Forest",
    "XGBClassifier": "Extreme Gradient Boosting",
    "MLPClassifier": "Multi-Layer Perceptron",
}

# Function to intelligently recognize model name from its repr()
def map_model_name(classifier):
    classifier_repr = repr(classifier)  # Get string representation, e.g., "SVC(probability=True, random_state=42)"
    for key, name in MODEL_NAME_MAPPING.items():
        if key in classifier_repr:  # If key is found in repr(), return mapped name
            return name
    return "Unknown Model"  # Return default if no match

# Format hyperparameters into separate lines for readability
def format_hyperparameters(param_grid):
    formatted_params = []
    for key, value in param_grid.items():
        param_name = key.replace("classifier__", "").replace("_", " ").capitalize()
        formatted_params.append(f"{param_name}: {', '.join(map(str, value)) if isinstance(value, list) else value}")
    return "\n".join(formatted_params)

# Merge metric value with its confidence interval for reporting
def merge_metric_ci(df, metric):
    df[metric] = df[metric].astype(str) + "\n(" + df[metric + "_ci"].astype(str) + ")"
    df.drop(columns=[metric + "_ci"], inplace=True)

# Format metric column names for Word-style output
def format_metric_column_name(metric):
    return f"{metric.capitalize()}\n(95% CI)"

# List of metrics to process
metrics = ["accuracy", "auc", "sensitivity", "specificity", "precision", "f1"]

In [29]:
def get_classifiers_and_param_sets(random_state):
    # Returns a dictionary of classifiers and their hyperparameter grids for model selection
    classifiers = {
        'svm': {
            'classifier': SVC(probability=True, random_state=random_state),
            'param_sets': [
                {'classifier__C': [0.01, 0.1, 1], 'classifier__kernel': ['linear']},
                {'classifier__C': [0.01, 0.1, 1], 'classifier__kernel': ['rbf']},
                {'classifier__kernel': ['rbf'], 'classifier__gamma': ['scale', 'auto', 0.01, 0.1]},
                {'classifier__kernel': ['rbf'], 'classifier__gamma': ['scale', 'auto', 0.001, 0.01]},
                {'classifier__C': [0.01, 0.1, 1], 'classifier__kernel': ['rbf'], 'classifier__gamma': ['scale', 'auto', 0.01, 0.1]},
                {'classifier__C': [0.01, 0.1, 1], 'classifier__kernel': ['rbf'], 'classifier__gamma': ['scale', 'auto', 0.001, 0.01]},
            ]
        },
        'logistic_regression': {
            'classifier': LogisticRegression(random_state=random_state),
            'param_sets': [
                {'classifier__penalty': ['l2'], 'classifier__C': [0.1, 1, 10]},
                {'classifier__penalty': ['l1'], 'classifier__C': [0.1, 1, 10], 'classifier__solver': ['liblinear']},
                {'classifier__penalty': ['elasticnet'], 'classifier__C': [0.1, 1, 10], 'classifier__solver': ['saga'], 'classifier__l1_ratio': [0.1, 0.5, 0.9]},
                {'classifier__penalty': ['l2'], 'classifier__C': [0.01, 0.1, 1]},
                {'classifier__penalty': ['l1'], 'classifier__C': [0.01, 0.1, 1], 'classifier__solver': ['liblinear']},
                {'classifier__penalty': ['elasticnet'], 'classifier__C': [0.01, 0.1, 1], 'classifier__solver': ['saga'], 'classifier__l1_ratio': [0.1, 0.5, 0.9]},
            ]
        },
        'random_forest': {
            'classifier': RandomForestClassifier(random_state=random_state),
            'param_sets': [
                {'classifier__n_estimators': [50, 100, 200], 'classifier__max_depth': [5, 10, None]},
                {'classifier__n_estimators': [10, 50, 100], 'classifier__max_depth': [5, 10, None]},
                {'classifier__n_estimators': [50, 100, 200], 'classifier__max_depth': [5, 10, None], 'classifier__min_samples_leaf': [1, 2, 4]},
                {'classifier__n_estimators': [10, 50, 100], 'classifier__max_depth': [5, 10, None], 'classifier__min_samples_leaf': [1, 2, 4]},
            ]
        },
        'xgboost': {
            'classifier': XGBClassifier(eval_metric='logloss', random_state=random_state),
            'param_sets': [
                {'classifier__learning_rate': [0.01, 0.1, 0.3]},
                {'classifier__booster': ['gbtree', 'dart']},
                {'classifier__learning_rate': [0.01, 0.1, 0.3], 'classifier__booster': ['gbtree', 'dart']},
            ]
        },
        'mlp': {
            'classifier': MLPClassifier(random_state=random_state),
            'param_sets': [
                {'classifier__hidden_layer_sizes': [(50,), (100,)]},
                {'classifier__hidden_layer_sizes': [(50,), (100,), (100, 50)]},
                {'classifier__hidden_layer_sizes': [(50,), (100,), (100, 50), (100, 50, 50)]},
                {'classifier__hidden_layer_sizes': [(50,), (100,), (100, 50), (100, 50, 50), (100, 100, 50, 50)]},
                {'classifier__hidden_layer_sizes': [(50,), (100,), (100, 50), (100, 50, 50), (100, 100, 50, 50), (100, 100, 100, 50, 50)]},
            ]
        }
    }
    return classifiers


In [31]:
# This function generates a list of option dictionaries for model selection.
# Each option specifies a classifier, its hyperparameter grid, and feature selection range.
def generate_options_with_classifiers():
    t_stat_ranges = [np.arange(10, 21, 1)]  # Range of top-k features to select (from 10 to 20)

    options = []
    classifiers = get_classifiers_and_param_sets(random_state)  # Get classifiers and their parameter sets
    for name, classifier_info in classifiers.items():
        for param_set in classifier_info['param_sets']:
            for t_range in t_stat_ranges:
                options.append({
                        'param_grid': {
                            **param_set,
                            'selector__k': list(t_range)  # Add feature selector parameter
                        },
                        'cv': 10,
                        'classifier': classifier_info['classifier'],
                        'name': f"{name}_t_stat"
                })
    return options

options = generate_options_with_classifiers()

In [None]:
inputs = [X_clinic, X_corr, X_reg, X_all, X_fmri, X_reg_clinic, X_corr_clinic]
inputs_names = ['clinic', 'corr', 'reg', 'all', 'fmri', 'reg_clinic', 'FC_clinic']

# Process results separately for each input dataset
for X, Xname in zip(inputs, inputs_names):
    results = run_loo_with_options(X, y, options)
    df_res = pd.DataFrame(results)

    # Intelligent mapping of model names based on `repr()`
    df_res["Model"] = df_res["model"].apply(map_model_name)

    # Formatting hyperparameters
    df_res["Hyperparameter Grid"] = df_res["param_grid"].apply(format_hyperparameters)

    # Formatting metrics
    for metric in metrics:
        merge_metric_ci(df_res, metric)

    # Change metric column names to Word-style format
    column_mapping = {metric: format_metric_column_name(metric) for metric in metrics}
    df_res.rename(columns=column_mapping, inplace=True)

    # Reorder columns to match Word-style output
    df_res = df_res[["Model", "Hyperparameter Grid"] + list(column_mapping.values())]

    # Save to separate files for each Xname
    output_file = f"{project_path}/formatted_models_result_{Xname}.xlsx"
    df_res.to_excel(output_file, index=False)
