### DBS-ON/OFF states classification

#### Libraries

In [None]:
import matplotlib.pyplot as plt
import nibabel as nib
import nilearn
import numpy as np
import pandas as pd
import seaborn as sns
from functions import *
from sklearn.decomposition import PCA
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, f1_score, recall_score, roc_auc_score
from sklearn.model_selection import GridSearchCV, GroupKFold
from sklearn.naive_bayes import GaussianNB
from sklearn.neighbors import KNeighborsClassifier
from sklearn.pipeline import Pipeline
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from xgboost import XGBClassifier

#### File paths and parameters

In [None]:
data_dir = "../data"
mask_img = nib.load("../data/msk/sum_80_bin.nii")
measures = ["ALFF", "fALFF", "ECM_add", "ECM_deg", "ECM_norm", "ECM_rank", "GCOR", "ICC", "IHC", "LCOR"]
output_dir = "../results/scores/"

#### Run optimization with Grid Search

In [None]:
# Grid space
models = {
    "Logistic Regression": {
        "model": LogisticRegression(random_state=124, max_iter=5000),
        "params": {"clf__C": np.logspace(-2, 2, 5), "clf__solver": ["lbfgs", "liblinear", "saga"]},
    },
    "LDA": {
        "model": LinearDiscriminantAnalysis(),
        "params": {
            "clf__tol": [1e-4, 1e-3, 1e-2],
        },
    },
    "Naive Bayes": {"model": GaussianNB(), "params": {"clf__var_smoothing": np.logspace(0, -9, num=10)}},
    "Decision Tree": {
        "model": DecisionTreeClassifier(random_state=124),
        "params": {"clf__criterion": ["gini", "entropy", "log_loss"], "clf__max_depth": [3, 5, 10]},
    },
    "Random Forest": {
        "model": RandomForestClassifier(random_state=124),
        "params": {
            "clf__n_estimators": [100, 1000],
            "clf__criterion": ["gini", "entropy", "log_loss"],
            "clf__max_depth": [3, 5, 10],
        },
    },
    "XGBoost": {
        "model": XGBClassifier(random_state=124),
        "params": {
            "clf__n_estimators": [100, 1000],
            "clf__max_depth": [3, 5, 10],
            "clf__learning_rate": [0.05, 0.1],
        },
    },
    "Gradient Boosting": {
        "model": GradientBoostingClassifier(random_state=124),
        "params": {
            "clf__n_estimators": [100, 1000],
            "clf__max_depth": [3, 5, 10],
            "clf__learning_rate": [0.05, 0.1],
        },
    },
    "SVC": {
        "model": SVC(probability=True, random_state=124),
        "params": {"clf__C": np.logspace(-2, 2, 5), "clf__gamma": ["scale", "auto"]},
    },
    "KNN": {
        "model": KNeighborsClassifier(),
        "params": {"clf__n_neighbors": list(range(3, 11, 2)), "clf__weights": ["uniform", "distance"]},
    },
}

results_tr = {}
results_ts = {}
cv_scores = {}
feature_importances = {}

# Run per model/map
for measure in measures:
    print(f"Processing measure: {measure}")
    results_tr[measure] = []
    results_ts[measure] = []
    cv_scores[measure] = []
    feature_importances[measure] = {}

    # Load data
    on_features_train = np.array(pd.read_csv(f"{data_dir}/data_train/{measure}_ON.csv"))
    off_features_train = np.array(pd.read_csv(f"{data_dir}/data_train/{measure}_OFF.csv"))
    on_features_test = np.array(pd.read_csv(f"{data_dir}/data_test/{measure}_ON.csv"))
    off_features_test = np.array(pd.read_csv(f"{data_dir}/data_test/{measure}_OFF.csv"))

    X_tr = np.concatenate((on_features_train, off_features_train), axis=0)
    y_tr = np.concatenate([np.zeros(len(on_features_train)), np.ones(len(off_features_train))])
    X_ts = np.concatenate((on_features_test, off_features_test), axis=0)
    y_ts = np.concatenate([np.zeros(len(on_features_test)), np.ones(len(off_features_test))])

    groups = np.concatenate((range(len(on_features_train)), range(len(off_features_train))))

    for name, config in models.items():
        print(f"\nRunning GridSearchCV for {name}...")

        pipe = Pipeline([("pca", PCA()), ("clf", config["model"])])

        param_grid = [{"pca": [PCA(n_components=0.99), "passthrough"], **config["params"]}]

        group_kfold = GroupKFold(n_splits=5)
        scoring = {"roc_auc": "roc_auc", "accuracy": "accuracy", "f1": "f1", "recall": "recall"}

        grid = GridSearchCV(pipe, param_grid, cv=group_kfold, scoring=scoring, refit="accuracy", n_jobs=-1)
        grid.fit(X_tr, y_tr, groups=groups)

        best_model = grid.best_estimator_
        y_pred = best_model.predict(X_ts)
        y_proba = best_model.predict_proba(X_ts)[:, 1]

        # Compute evaluation metrics
        roc_auc = roc_auc_score(y_ts, y_proba)
        accuracy = accuracy_score(y_ts, y_pred)
        f1 = f1_score(y_ts, y_pred)
        recall = recall_score(y_ts, y_pred)

        clf = best_model.named_steps["clf"]
        pca_step = best_model.named_steps.get("pca", None)

        n_features = pca_step.n_components_ if isinstance(pca_step, PCA) else X_tr.shape[1]

        if isinstance(clf, (LogisticRegression, LinearDiscriminantAnalysis)):
            model_coef = clf.coef_[0]

            if isinstance(pca_step, PCA):
                loadings = pca_step.components_.T * np.sqrt(pca_step.explained_variance_)
                importance = np.dot(loadings, model_coef)
            else:
                importance = model_coef

            feature_importances[measure][name] = importance

        results_ts[measure].append({
            "Model": name,
            "ROC_AUC": roc_auc,
            "Accuracy": accuracy,
            "Recall": recall,
            "F1": f1,
            "num_features": n_features,
        })

        # Save metrics
        cv_scores[measure].append({"Model": name, "CV_Score": grid.best_score_, "Best_Params": grid.best_params_})

        best_idx = grid.best_index_
        results_tr[measure].append({
            "Model": name,
            "ROC_AUC": grid.cv_results_["mean_test_roc_auc"][best_idx],
            "Accuracy": grid.cv_results_["mean_test_accuracy"][best_idx],
            "Recall": grid.cv_results_["mean_test_recall"][best_idx],
            "F1": grid.cv_results_["mean_test_f1"][best_idx],
            "num_features": n_features,
        })

    # Save test results
    with pd.ExcelWriter(output_dir + "classification_performance_test_rev.xlsx", engine="xlsxwriter") as writer:
        for m in results_ts:
            df_results = pd.DataFrame(results_ts[m])
            df_results.to_excel(writer, sheet_name=m, index=False)

    # Save feature importances
    with pd.ExcelWriter(
        output_dir + "classification_features_importances_test_rev.xlsx", engine="xlsxwriter"
    ) as feature_writer:
        for m in feature_importances:
            if feature_importances[m]:
                df_feat = pd.DataFrame.from_dict(feature_importances[m], orient="index").transpose()
                df_feat.to_excel(feature_writer, sheet_name=m, index=False)

    # Save CV results
    with pd.ExcelWriter(
        output_dir + "classification_performance_train_cv_5_rev.xlsx", engine="xlsxwriter"
    ) as cv_writer:
        for m in results_tr:
            df_cv = pd.DataFrame(results_tr[m])
            df_cv.to_excel(cv_writer, sheet_name=m, index=False)

    # Save best params
    with pd.ExcelWriter(output_dir + "classification_best_parameters.xlsx", engine="xlsxwriter") as cv_score_writer:
        for m in cv_scores:
            df_cv_best = pd.DataFrame(cv_scores[m])
            df_cv_best.to_excel(cv_score_writer, sheet_name=m, index=False)

### Correlation with clinical scores

#### Load data

In [None]:
train_scores = pd.read_excel("../data/clida/clida.xlsx", sheet_name=0)
test_scores = pd.read_excel("../data/clida/clida.xlsx", sheet_name=1)

#### Load atlas

In [None]:
atlas_path = "../data/msk/rAAL.nii"
label_txt_path = "../data/msk/aal_labels.txt"

atlas_img = nib.load(atlas_path)
atlas_data = atlas_img.get_fdata().astype(int)
roi_ids = np.unique(atlas_data)[1:]

label_map = pd.read_csv(label_txt_path, sep=r"\s+", header=None, names=["Region", "Index"])
label_map["Index"] = label_map["Index"].astype(int)
roi_dict = dict(zip(label_map["Index"], label_map["Region"], strict=False))

#### Compute correlations with regional importance

In [None]:
feature_importances_path = output_dir + "classification_features_importances_test_rev.xlsx"
measures = ["ECM_deg", "ECM_norm", "ECM_rank", "GCOR"]

results_corr = {}

for measure in measures:
    print(f"\nProcessing measure: {measure}")
    df_feat = pd.read_excel(feature_importances_path, sheet_name=measure)

    on_features_train = np.array(pd.read_csv(f"{data_dir}/data_train/{measure}_ON.csv"))
    off_features_train = np.array(pd.read_csv(f"{data_dir}/data_train/{measure}_OFF.csv"))

    for model_name in df_feat.columns:
        importance_scores = df_feat[model_name].to_numpy()

        on_weighted = on_features_train * importance_scores
        off_weighted = off_features_train * importance_scores

        roi_df_on = []
        roi_df_off = []

        for subj_idx in range(on_weighted.shape[0]):
            subj_img_on = nilearn.masking.unmask(on_weighted[subj_idx], mask_img)
            subj_img_off = nilearn.masking.unmask(off_weighted[subj_idx], mask_img)

            subj_data_on = subj_img_on.get_fdata()
            subj_data_off = subj_img_off.get_fdata()

            row_on = {}
            row_off = {}

            for roi in roi_ids:
                roi_vox_on = subj_data_on[atlas_data == roi]
                roi_vox_off = subj_data_off[atlas_data == roi]

                row_on[roi_dict[roi]] = roi_vox_on.mean() if roi_vox_on.size > 0 else np.nan
                row_off[roi_dict[roi]] = roi_vox_off.mean() if roi_vox_off.size > 0 else np.nan

            roi_df_on.append(row_on)
            roi_df_off.append(row_off)

        roi_df_on = pd.DataFrame(roi_df_on)
        roi_df_off = pd.DataFrame(roi_df_off)

        roi_df_on["score_on"] = train_scores["MDSUPDRSIIIDBSON"][: len(roi_df_on)].to_numpy()
        roi_df_off["score_off"] = train_scores["MDSUPDRSIIIDBSOFF"][: len(roi_df_off)].to_numpy()

        roi_df_on = roi_df_on.dropna(subset=["score_on"])
        roi_df_off = roi_df_off.dropna(subset=["score_off"])

        score_on = roi_df_on["score_on"].to_numpy()
        score_off = roi_df_off["score_off"].to_numpy()
        score_diff = score_off - score_on

        corr_on_on = roi_df_on.drop(columns=["score_on"]).corrwith(roi_df_on["score_on"])
        corr_off_off = roi_df_off.drop(columns=["score_off"]).corrwith(roi_df_off["score_off"])
        corr_on_diff = roi_df_on.drop(columns=["score_on"]).corrwith(pd.Series(score_diff, index=roi_df_on.index))
        corr_off_diff = roi_df_off.drop(columns=["score_off"]).corrwith(pd.Series(score_diff, index=roi_df_off.index))

        corr_df = pd.DataFrame({
            "DBS-ON to MDSUPDRSIII-ON": corr_on_on,
            "DBS-OFF to MDSUPDRSIII-OFF": corr_off_off,
            "DBS-ON to MDSUPDRSIII-DIFF": corr_on_diff,
            "DBS-OFF to MDSUPDRSIII-DIFF": corr_off_diff,
        })

        results_corr[measure, model_name] = corr_df


with pd.ExcelWriter("../results/scores/correlations_train.xlsx") as writer:
    for (measure, algo), df in results_corr.items():
        sheet_name = f"{measure}_{algo}"
        df.to_excel(writer, sheet_name=sheet_name, index=True)

#### Visualize with heatmaps

In [None]:
models = ["Logistic Regression", "LDA"]

fig, axes = plt.subplots(nrows=2, ncols=4, figsize=(18, 12), sharex=True, sharey=False)
all_values = []

for measure in measures:
    for model_name in models:
        corr_df = results_corr[measure, model_name]
        corr_df["abs_max"] = corr_df.abs().max(axis=1)
        top50 = corr_df.sort_values("abs_max", ascending=False).head(20)
        corr_plot_top = top50.drop(columns=["abs_max"])
        all_values.append(corr_plot_top.values)

all_values = np.concatenate(all_values).flatten()
vmin, vmax = all_values.min(), all_values.max()

for col, measure in enumerate(measures):
    for row, model_name in enumerate(models):
        ax = axes[row, col]

        corr_df = results_corr[measure, model_name]
        corr_df["abs_max"] = corr_df.abs().max(axis=1)
        top50 = corr_df.sort_values("abs_max", ascending=False).head(20)
        corr_plot_top = top50.drop(columns=["abs_max"])

        sns.heatmap(
            corr_plot_top,
            ax=ax,
            cmap="PuOr_r",
            vmin=vmin,
            vmax=vmax,
            cbar=False,
        )

        if row == 0:
            ax.set_title(measure, fontsize=16)

        if col == 0:
            ax.set_ylabel(model_name, fontsize=16)

        ax.set_xlabel("")
        ax.set_yticklabels([lbl.get_text().replace("_", " ") for lbl in ax.get_yticklabels()])

cbar_ax = fig.add_axes([0.92, 0.15, 0.015, 0.7])
norm = plt.cm.colors.Normalize(vmin=vmin, vmax=vmax)
sm = plt.cm.ScalarMappable(cmap="PuOr_r", norm=norm)
sm.set_array([])
fig.colorbar(sm, cax=cbar_ax)

plt.tight_layout(rect=[0, 0, 0.9, 1])
plt.savefig("correlations_train.svg")
plt.show()