# Re-analysis for "A novel cortical biomarker signature predicts individual pain sensitivity"

libaries

In [42]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.neural_network import MLPClassifier
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.utils import shuffle
from sklearn.metrics import accuracy_score, roc_auc_score
import os
import seaborn as sns
import warnings
warnings.filterwarnings("ignore")

set your basepath

In [43]:
# Define base path
basepath = "/home/ole/projects/PAF_reanalysis"

## Main analysis

This does the following: \
\
(1) loads in complete dataset with PAF, CME and class for all subjects. \
(2) defines models and its parameter space \
(3) splits data in independent training and test set \
(4) uses gridsearch crossvalidation in training data to fit model \
(5) uses trained models to predict data of test set \
(6) reports accuracy and AUC for both, training and test set  \

This is repeatedly done to avoid an over/underestimation of the final metrics due to the relatively low sample size for a machine learning pipeline.

Number of repetitions is set by parameter repetitions.

set number of repetitions 

In [1]:
repetitions = 10

Pipeline

In [44]:
# Load data
paf_file = os.path.join(basepath, "data/PAF_all.xlsx")
cme_file = os.path.join(basepath, "data/map_volume_all.xlsx")
class_file = os.path.join(basepath, "data/class_IDs_all.xlsx")

df_paf = pd.read_excel(paf_file)
df_cme = pd.read_excel(cme_file)
df_class = pd.read_excel(class_file)

# Calculate CME values
df_cme["CME"] = (df_cme.Volume_Day5 - df_cme.Volume_Day0).apply(lambda x: 1 if x > 0 else 0)

# Merge data on ID
data = df_class.merge(df_paf, on="ID", how="inner").merge(df_cme[["ID", "CME"]], on="ID", how="inner")

# Define predictors and target
X = data[["sensorimotor_paf", "CME"]]
y = data["class"]

# Models and hyperparameters
models_and_params = {
    "LogisticRegression": (
        LogisticRegression(),
        {
            'model__C': np.logspace(-3, 3, 30),
            'model__solver': ['newton-cg', 'lbfgs'],
            'model__max_iter': [200, 400, 2000, 5000]
        }
    ),
    "RandomForest": (
        RandomForestClassifier(),
        {
            'model__n_estimators': [300, 500, 1000],
            'model__max_depth': [None, 5, 10],
            'model__min_samples_split': [2, 5, 10],
            'model__bootstrap': [True, False]
        }
    ),
    "GradientBoosting": (
        GradientBoostingClassifier(),
        {
            'model__learning_rate': [0.01, 0.1, 0.5],
            'model__max_depth': [None, 2, 5],
            'model__min_samples_split': [2, 5, 10],
            'model__n_estimators': [100, 200]
        }
    ),
    "SVC": (
        SVC(probability=True),
        {
            'model__C': [0.01, 0.1, 1, 10],
            "model__kernel": ["linear", "rbf", "poly"],
            'model__gamma': ['scale', 'auto']
            
        }
    ),
    "MLPClassifier": (
        MLPClassifier(),
        {
            'model__alpha': [1e-4, 1e-3, 1e-2, 1e-1],
            'model__hidden_layer_sizes': [(100,), (100, 100), (100, 100, 100)],
            'model__max_iter': [5000]
        }
    )
}

# Function to perform analysis
def run_analysis(random_seeds, n_runs):
    results = []

    for seed in random_seeds:
        print(f"Running analysis with seed {seed}...")

        # Shuffle data
        X_shuffled, y_shuffled = shuffle(X, y, random_state=seed)

        # Split data into training and test sets
        X_train, X_test, y_train, y_test = train_test_split(X_shuffled, y_shuffled, test_size=0.35, random_state=seed)

        for model_name, (model, param_grid) in models_and_params.items():

            # Create pipeline
            pipeline = Pipeline([
                ("imputer", IterativeImputer(max_iter=100, random_state=seed)),
                ("scaler", StandardScaler()),
                ("model", model)
            ])

            # Perform grid search
            search = GridSearchCV(
                pipeline, param_grid=param_grid,
                cv=5, scoring="accuracy", verbose=1, n_jobs=-1
            )
            search.fit(X_train, y_train)

            # Get the best model
            best_model = search.best_estimator_

            # Evaluate the model on training and test sets
            for dataset, X_eval, y_eval, label in zip(
                ["train", "test"],
                [X_train, X_test],
                [y_train, y_test],
                ["Training", "Test"]
            ):
                y_pred = best_model.predict(X_eval)
                y_pred_proba = best_model.predict_proba(X_eval)[:, 1]
                accuracy = accuracy_score(y_eval, y_pred)
                auc = roc_auc_score(y_eval, y_pred_proba)
                results.append({
                    "Seed": seed,
                    "Model": model_name,
                    "Dataset": label,
                    "Accuracy": accuracy,
                    "AUC": auc
                })

    return pd.DataFrame(results)

# Run analysis
random_seeds = np.random.randint(0, 10000, size=repetitions)
n_runs = repetitions
results_df = run_analysis(random_seeds, n_runs=n_runs)

# Aggregate results
summary = results_df.groupby(["Model", "Dataset"]).mean()[["Accuracy", "AUC"]].reset_index()

# Save results
results_df.to_csv(os.path.join(basepath, "results/results_all_runs.csv"), index=False)
summary.to_csv(os.path.join(basepath, "results/summary_results.csv"), index=False)

print("Analysis complete. Results saved.")


Running analysis with seed 8928...
Fitting 5 folds for each of 240 candidates, totalling 1200 fits
Fitting 5 folds for each of 54 candidates, totalling 270 fits
Fitting 5 folds for each of 54 candidates, totalling 270 fits
Fitting 5 folds for each of 8 candidates, totalling 40 fits
Fitting 5 folds for each of 12 candidates, totalling 60 fits
Running analysis with seed 5111...
Fitting 5 folds for each of 240 candidates, totalling 1200 fits
Fitting 5 folds for each of 54 candidates, totalling 270 fits
Fitting 5 folds for each of 54 candidates, totalling 270 fits
Fitting 5 folds for each of 8 candidates, totalling 40 fits
Fitting 5 folds for each of 12 candidates, totalling 60 fits
Running analysis with seed 1991...
Fitting 5 folds for each of 240 candidates, totalling 1200 fits
Fitting 5 folds for each of 54 candidates, totalling 270 fits
Fitting 5 folds for each of 54 candidates, totalling 270 fits
Fitting 5 folds for each of 8 candidates, totalling 40 fits
Fitting 5 folds for each of 1

## Create plots for accuracy and AUC for all models 
\
Plot will only display the average for both metrics across all repetitions of the pipeline

In [41]:
# Define the order of the Dataset categories
hue_order = ["Training", "Test"]

# Melt the DataFrame for easier plotting
plot_df = summary.melt(id_vars=["Model", "Dataset"], value_vars=["Accuracy", "AUC"], 
                       var_name="Metric", value_name="Score")

# Create the bar plot for Accuracy and AUC
for metric in ["Accuracy", "AUC"]:
    plt.figure(figsize=(12, 10))
    sns.barplot(
        data=plot_df[plot_df["Metric"] == metric],
        x="Score",
        y="Model",
        hue="Dataset",
        hue_order=hue_order,  # Ensure Training is always left and Test is right
        palette={"Training": "#9fc8c8", "Test": "#298c8c"}  # Adjust colors
    )

    # Add values on bars
    for container in plt.gca().containers:
        labels = [f"{v.get_width():.2f}" for v in container]
        plt.gca().bar_label(container, labels=labels, label_type='edge', fontsize=14)

    # Add title and labels
    plt.title(f"{metric} by Model", fontsize=20, fontweight="bold")
    plt.xlabel(metric, fontsize=16)
    plt.ylabel("Model", fontsize=16)
    plt.xticks(fontsize=14)
    plt.yticks(fontsize=14)
    plt.legend(title="Dataset", fontsize=14, title_fontsize=16, loc="lower right")

    # Remove grid lines
    plt.gca().grid(False)
    plt.gca().spines["left"].set_linewidth(0.5)
    plt.gca().spines["bottom"].set_linewidth(0.5)

    # Save the figure as an SVG
    filename = os.path.join(basepath, f"figures/{metric}_by_Model.svg")
    plt.tight_layout()
    plt.savefig(filename, format="svg")
    plt.close()  # Close the plot to avoid overlap

    print(f"Saved {metric} plot as SVG: {filename}")


Saved Accuracy plot as SVG: /home/ole/projects/PAF_reanalysis/figures/Accuracy_by_Model.svg
Saved AUC plot as SVG: /home/ole/projects/PAF_reanalysis/figures/AUC_by_Model.svg


In [3]:
import pandas as pd
import numpy as np
from sklearn.mixture import GaussianMixture
from scipy.optimize import minimize
import os
from sklearn.preprocessing import LabelEncoder


In [17]:
import subprocess
import os

# Define the basepath and the script path
basepath = "/home/ole/projects/PAF_reanalysis"
script_path = os.path.join(basepath, "run_LGM.R")

# Run the R script using subprocess
result = subprocess.run(["Rscript", script_path], capture_output=True, text=True)

# Print the output and errors
print("Output:")
print(result.stdout)
print("Errors:")
print(result.stderr)

Output:

Errors:



In [16]:
result

CompletedProcess(args=['Rscript', '/home/ole/projects/PAF_reanalysis/run_LGM.R'], returncode=0, stdout='', stderr='')

In [46]:
# Necessary imports
import os
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.pipeline import Pipeline
from sklearn.impute import IterativeImputer
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, roc_auc_score
import subprocess

# Define base path
basepath = "/home/ole/projects/PAF_reanalysis"

# how many times we do the ML pipeline (increase robustness)
repetitions = 100

# File paths for input data
paf_file = os.path.join(basepath, "data/PAF_all.xlsx")
cme_file = os.path.join(basepath, "data/map_volume_all.xlsx")
class_file = os.path.join(basepath, "data/class_IDs_all.xlsx")
yawn_pain_file = os.path.join(basepath, "data/yawn_pain_all.csv")
chew_pain_file = os.path.join(basepath, "data/chew_pain_all.csv")

# Load data from Excel and CSV files
df_paf = pd.read_excel(paf_file)
df_cme = pd.read_excel(cme_file)

# Calculate CME values: 1 for increase in volume, 0 otherwise
df_cme["CME"] = (df_cme["Volume_Day5"] - df_cme["Volume_Day0"]).apply(lambda x: 1 if x > 0 else 0)

# Load yawn and chew pain data, combining them
df_yawn_pain = pd.read_csv(yawn_pain_file)
df_chew_pain = pd.read_csv(chew_pain_file)
df_all_pain = df_chew_pain.add(df_yawn_pain, fill_value=0)
df_all_pain["ID"] = df_yawn_pain["ID"]

# Models and hyperparameters
models_and_params = {
    "LogisticRegression": (
        LogisticRegression(),
        {
            'model__penalty': ['l2', 'elasticnet'],
            'model__C': np.logspace(-3, 3, 30),
            'model__solver': ['saga'],  # Supports elasticnet
            'model__l1_ratio': np.linspace(0.1, 1.0, 10),
            'model__max_iter': [200, 400, 2000]
        }
    ),
    "RandomForest": (
        RandomForestClassifier(),
        {
            'model__n_estimators': [500, 1000, 1500],
            'model__max_depth': [None, 5, 10],
            'model__min_samples_split': [2, 5, 10],
            'model__max_features': ['sqrt', 'log2', None],
            'model__bootstrap': [True, False]
        }
    ),
    "GradientBoosting": (
        GradientBoostingClassifier(),
        {
            'model__learning_rate': [0.01, 0.1, 0.5],
            'model__subsample': [0.8, 1.0],
            'model__max_depth': [None, 2, 5],
            'model__min_samples_split': [2, 5, 10],
            'model__max_features': ['sqrt', 'log2', None],
            'model__n_estimators': [100, 200]
        }
    ),
    "SVC": (
        SVC(probability=True),
        {
            'model__C': np.logspace(-4, 3, 20),
            'model__kernel': ['linear', 'rbf', 'poly', 'sigmoid'],
            'model__gamma': np.logspace(-4, 1, 10),
            'model__degree': [2, 3, 4]
        }
    ),
    "MLPClassifier": (
        MLPClassifier(),
        {
            'model__alpha': [1e-4, 1e-3, 1e-2, 1e-1],
            'model__hidden_layer_sizes': [(50,), (100,), (50, 50), (100, 100, 100)],
            'model__max_iter': [5000],
            'model__learning_rate_init': [0.0001, 0.001, 0.01],
            'model__activation': ['relu', 'tanh', 'logistic']
        }
    )
}

# Initialize results list and random seeds
results = []
random_seeds = np.random.randint(0, 10000, size=repetitions)

# Loop through random seeds for cross-validation
for seed in random_seeds:
    # Split the combined pain data into training and testing sets
    X_train, X_test = train_test_split(
        df_all_pain, test_size=1/3, random_state=seed
    )

    # Save split data to CSV files for further processing
    X_train.to_csv(os.path.join(basepath, "data/X_train.csv"), index=False)
    X_test.to_csv(os.path.join(basepath, "data/X_test.csv"), index=False)

    # Execute R script for LGM model
    script_path = os.path.join(basepath, "run_LGM.R")
    subprocess.run(["Rscript", script_path], capture_output=True, text=True)

    # Load classification results from R script
    df_ID_LGM_train = pd.read_csv(os.path.join(basepath, "data/Y_train.csv"))
    df_ID_LGM_test = pd.read_csv(os.path.join(basepath, "data/Y_test.csv"))

    # Merge PAF and CME data with classification results for training and testing
    data_train = df_ID_LGM_train.merge(df_paf, on="ID", how="inner").merge(
        df_cme[["ID", "CME"]], on="ID", how="inner"
    )
    data_test = df_ID_LGM_test.merge(df_paf, on="ID", how="inner").merge(
        df_cme[["ID", "CME"]], on="ID", how="inner"
    )

    # Extract features (X) and target (Y) for model training and testing
    X_train, X_test = data_train[["sensorimotor_paf", "CME"]], data_test[["sensorimotor_paf", "CME"]]
    Y_train, Y_test = data_train["class"], data_test["class"]

    # Iterate over different models and their parameter grids
    for model_name, (model, param_grid) in models_and_params.items():
        # Create a machine learning pipeline
        pipeline = Pipeline([
            ("imputer", IterativeImputer(max_iter=100, random_state=seed)),
            ("scaler", StandardScaler()),
            ("model", model)
        ])

        # Perform grid search cross-validation
        search = GridSearchCV(
            pipeline, param_grid=param_grid, cv=5,
            scoring="accuracy", verbose=1, n_jobs=-1
        )
        search.fit(X_train, Y_train)

        # Evaluate the best model on training and testing datasets
        best_model = search.best_estimator_
        for dataset, X_eval, y_eval, label in zip(
            ["train", "test"], [X_train, X_test], [Y_train, Y_test], ["Training", "Test"]
        ):
            y_pred = best_model.predict(X_eval)
            y_pred_proba = best_model.predict_proba(X_eval)[:, 1]
            accuracy = accuracy_score(y_eval, y_pred)
            auc = roc_auc_score(y_eval, y_pred_proba)
            results.append({
                "Seed": seed,
                "Model": model_name,
                "Dataset": label,
                "Accuracy": accuracy,
                "AUC": auc
            })

# Convert results to DataFrame for aggregation and saving
results_df = pd.DataFrame(results)

# Aggregate results by model and dataset
summary = results_df.groupby(["Model", "Dataset"]).mean()[["Accuracy", "AUC"]].reset_index()

# Save detailed and summary results to CSV files
results_path = os.path.join(basepath, "results")
os.makedirs(results_path, exist_ok=True)
results_df.to_csv(os.path.join(results_path, "results_all_runs_new.csv"), index=False)
summary.to_csv(os.path.join(results_path, "summary_results_new.csv"), index=False)

print("Analysis complete. Results saved.")


Fitting 5 folds for each of 240 candidates, totalling 1200 fits
Fitting 5 folds for each of 54 candidates, totalling 270 fits
Fitting 5 folds for each of 54 candidates, totalling 270 fits
Fitting 5 folds for each of 24 candidates, totalling 120 fits
Fitting 5 folds for each of 12 candidates, totalling 60 fits
Fitting 5 folds for each of 240 candidates, totalling 1200 fits
Fitting 5 folds for each of 54 candidates, totalling 270 fits
Fitting 5 folds for each of 54 candidates, totalling 270 fits
Fitting 5 folds for each of 24 candidates, totalling 120 fits
Fitting 5 folds for each of 12 candidates, totalling 60 fits
Fitting 5 folds for each of 240 candidates, totalling 1200 fits
Fitting 5 folds for each of 54 candidates, totalling 270 fits
Fitting 5 folds for each of 54 candidates, totalling 270 fits
Fitting 5 folds for each of 24 candidates, totalling 120 fits
Fitting 5 folds for each of 12 candidates, totalling 60 fits
Fitting 5 folds for each of 240 candidates, totalling 1200 fits
Fit

Unnamed: 0,Seed,Model,Dataset,Accuracy,AUC
6,42,SVC,Training,0.7625,0.215
7,42,SVC,Test,0.585366,0.390805
16,42,SVC,Training,0.7125,0.24375
17,42,SVC,Test,0.611111,0.295666
26,42,SVC,Training,0.7,0.2475
27,42,SVC,Test,0.652174,0.328846
36,42,SVC,Training,0.6375,0.743125
37,42,SVC,Test,0.666667,0.688889


In [45]:
# Models and hyperparameters
models_and_params = {
    "LogisticRegression": (
        LogisticRegression(),
        {
            'model__C': np.logspace(-3, 3, 30),
            'model__solver': ['newton-cg', 'lbfgs'],
            'model__max_iter': [200, 400, 2000, 5000]
        }
    ),
    "RandomForest": (
        RandomForestClassifier(),
        {
            'model__n_estimators': [300, 500, 1000],
            'model__max_depth': [None, 5, 10],
            'model__min_samples_split': [2, 5, 10],
            'model__bootstrap': [True, False]
        }
    ),
    "GradientBoosting": (
        GradientBoostingClassifier(),
        {
            'model__learning_rate': [0.01, 0.1, 0.5],
            'model__max_depth': [None, 2, 5],
            'model__min_samples_split': [2, 5, 10],
            'model__n_estimators': [100, 200]
        }
    ),
    "SVC": (
        SVC(probability=True),
        {
            'model__C': [0.01, 0.1, 1, 10],
            'model__gamma': ['scale', 'auto']
        }
    ),
    "MLPClassifier": (
        MLPClassifier(),
        {
            'model__alpha': [1e-4, 1e-3, 1e-2, 1e-1],
            'model__hidden_layer_sizes': [(100,), (100, 100), (100, 100, 100)],
            'model__max_iter': [5000]
        }
    )
}

In [193]:
def run_analysis(random_seeds, n_runs):
    results = []

    for seed in random_seeds:
        print(f"Running analysis with seed {seed}...")

        # Shuffle data
        X_shuffled, y_shuffled = shuffle(X, y, random_state=seed)

        # Split data into training and test sets
        X_train, X_test, y_train, y_test = train_test_split(X_shuffled, y_shuffled, test_size=0.35, random_state=seed)

        for model_name, (model, param_grid) in models_and_params.items():

            # Create pipeline
            pipeline = Pipeline([
                ("imputer", IterativeImputer(max_iter=100, random_state=seed)),
                ("scaler", StandardScaler()),
                ("model", model)
            ])

            # Perform grid search
            search = GridSearchCV(
                pipeline, param_grid=param_grid,
                cv=5, scoring="accuracy", verbose=1, n_jobs=-1
            )
            search.fit(X_train, y_train)

            # Get the best model
            best_model = search.best_estimator_

            # Evaluate the model on training and test sets
            for dataset, X_eval, y_eval, label in zip(
                ["train", "test"],
                [X_train, X_test],
                [y_train, y_test],
                ["Training", "Test"]
            ):
                y_pred = best_model.predict(X_eval)
                y_pred_proba = best_model.predict_proba(X_eval)[:, 1]
                accuracy = accuracy_score(y_eval, y_pred)
                auc = roc_auc_score(y_eval, y_pred_proba)
                results.append({
                    "Seed": seed,
                    "Model": model_name,
                    "Dataset": label,
                    "Accuracy": accuracy,
                    "AUC": auc
                })

    return pd.DataFrame(results)

In [194]:
# Load data
paf_file = os.path.join(basepath, "data/PAF_all.xlsx")
cme_file = os.path.join(basepath, "data/map_volume_all.xlsx")
class_file = os.path.join(basepath, "data/class_IDs_all.xlsx")

df_paf = pd.read_excel(paf_file)
df_cme = pd.read_excel(cme_file)

# Calculate CME values
df_cme["CME"] = (df_cme.Volume_Day5 -
                 df_cme.Volume_Day0).apply(lambda x: 1 if x > 0 else 0)


# load in yawn and chew pain data for all participants
yawn_pain_file = os.path.join(basepath, "data/yawn_pain_all.csv")
df_yawn_pain = pd.read_csv(yawn_pain_file)
chew_pain_file = os.path.join(basepath, "data/chew_pain_all.csv")
df_chew_pain = pd.read_csv(chew_pain_file)
df_all_pain = df_chew_pain.add(df_yawn_pain, fill_value=0)
df_all_pain["ID"] = df_yawn_pain["ID"]

# perform data split and LGM model
df_ID_LGM_train, df_ID_LGM_test = perform_LGM_classification(
    df_all_pain, random_state=2)

data_train = df_ID_LGM_train.merge(df_paf, on="ID", how="inner").merge(
    df_cme[["ID", "CME"]], on="ID", how="inner")
data_test = df_ID_LGM_test.merge(df_paf, on="ID", how="inner").merge(
    df_cme[["ID", "CME"]], on="ID", how="inner")

X_train, X_test = data_train[["sensorimotor_paf",
                              "CME"]], data_test[["sensorimotor_paf", "CME"]]
Y_train, Y_test = data_train["class"], data_test["class"]

results = []
# do the models
for model_name, (model, param_grid) in models_and_params.items():

    # Create pipeline
    pipeline = Pipeline([
        ("imputer", IterativeImputer(max_iter=100, random_state=42)),
        ("scaler", StandardScaler()),
        ("model", model)
    ])

    # Perform grid search
    search = GridSearchCV(
        pipeline, param_grid=param_grid,
        cv=5, scoring="accuracy", verbose=1, n_jobs=-1
    )
    search.fit(X_train, Y_train)

    # Get the best model
    best_model = search.best_estimator_

    # Evaluate the model on training and test sets
    for dataset, X_eval, y_eval, label in zip(
        ["train", "test"],
        [X_train, X_test],
        [Y_train, Y_test],
        ["Training", "Test"]
    ):
        y_pred = best_model.predict(X_eval)
        y_pred_proba = best_model.predict_proba(X_eval)[:, 1]
        accuracy = accuracy_score(y_eval, y_pred)
        auc = roc_auc_score(y_eval, y_pred_proba)
        results.append({
            "Seed": 42,
            "Model": model_name,
            "Dataset": label,
            "Accuracy": accuracy,
            "AUC": auc
        })


print(results)


Fitting 5 folds for each of 240 candidates, totalling 1200 fits
Fitting 5 folds for each of 54 candidates, totalling 270 fits
Fitting 5 folds for each of 54 candidates, totalling 270 fits
Fitting 5 folds for each of 8 candidates, totalling 40 fits
Fitting 5 folds for each of 12 candidates, totalling 60 fits
[{'Seed': 42, 'Model': 'LogisticRegression', 'Dataset': 'Training', 'Accuracy': 0.575, 'AUC': 0.5799999999999998}, {'Seed': 42, 'Model': 'LogisticRegression', 'Dataset': 'Test', 'Accuracy': 0.5, 'AUC': 0.44499999999999995}, {'Seed': 42, 'Model': 'RandomForest', 'Dataset': 'Training', 'Accuracy': 0.7, 'AUC': 0.7734375}, {'Seed': 42, 'Model': 'RandomForest', 'Dataset': 'Test', 'Accuracy': 0.55, 'AUC': 0.5549999999999999}, {'Seed': 42, 'Model': 'GradientBoosting', 'Dataset': 'Training', 'Accuracy': 0.65, 'AUC': 0.7331249999999999}, {'Seed': 42, 'Model': 'GradientBoosting', 'Dataset': 'Test', 'Accuracy': 0.55, 'AUC': 0.54125}, {'Seed': 42, 'Model': 'SVC', 'Dataset': 'Training', 'Accurac

In [190]:
results

[{'Seed': 42,
  'Model': 'LogisticRegression',
  'Dataset': 'Training',
  'Accuracy': 0.575,
  'AUC': 0.5799999999999998},
 {'Seed': 42,
  'Model': 'LogisticRegression',
  'Dataset': 'Test',
  'Accuracy': 0.5,
  'AUC': 0.44499999999999995},
 {'Seed': 42,
  'Model': 'RandomForest',
  'Dataset': 'Training',
  'Accuracy': 0.7375,
  'AUC': 0.8218750000000001},
 {'Seed': 42,
  'Model': 'RandomForest',
  'Dataset': 'Test',
  'Accuracy': 0.55,
  'AUC': 0.505},
 {'Seed': 42,
  'Model': 'GradientBoosting',
  'Dataset': 'Training',
  'Accuracy': 0.65,
  'AUC': 0.7331249999999999},
 {'Seed': 42,
  'Model': 'GradientBoosting',
  'Dataset': 'Test',
  'Accuracy': 0.55,
  'AUC': 0.54125},
 {'Seed': 42,
  'Model': 'SVC',
  'Dataset': 'Training',
  'Accuracy': 0.575,
  'AUC': 0.36812500000000004},
 {'Seed': 42,
  'Model': 'SVC',
  'Dataset': 'Test',
  'Accuracy': 0.525,
  'AUC': 0.46499999999999997},
 {'Seed': 42,
  'Model': 'MLPClassifier',
  'Dataset': 'Training',
  'Accuracy': 0.6375,
  'AUC': 0.663

In [159]:
### load in yawn and chew pain data for all participants
yawn_pain_file = os.path.join(basepath, "data/yawn_pain_all.csv")
df_yawn_pain = pd.read_csv(yawn_pain_file)
chew_pain_file = os.path.join(basepath, "data/chew_pain_all.csv")
df_chew_pain = pd.read_csv(chew_pain_file)
df_all_pain = df_chew_pain.add(df_yawn_pain, fill_value=0)
df_all_pain["ID"] = df_yawn_pain["ID"]

### Do LGM for classification (in train data)
# Split the data into train and test TODO: do it many times!
X_train, X_test = train_test_split(df_all_pain, test_size=1/3, random_state=1)

# transform to long format
X_train_long = X_train.melt(id_vars=["ID"], var_name="t", value_name="y")

# Encode the 't' column
label_encoder = LabelEncoder()
X_train_long["t"] = label_encoder.fit_transform(X_train_long["t"])

# Initial model with one group
lcga1 = GaussianMixture(n_components=1, covariance_type="full", max_iter=100, random_state=42)
lcga2 = GaussianMixture(n_components=2, covariance_type="full", max_iter=100, random_state=42)

# Fit the models
lcga1.fit(X_train_long[["y", "t"]])
lcga2.fit(X_train_long[["y", "t"]])

# Get class probabilities and IDs
probs = lcga2.predict_proba(X_train_long[["y", "t"]])
X_train_long["class"] = np.argmax(probs, axis=1)

# Ensure sufficient IDs for slicing
class_1_ID = X_train_long[X_train_long["class"] == 0]["ID"].unique()[:40]
class_2_ID = X_train_long[X_train_long["class"] == 1]["ID"].unique()[60:100]

# Continue with classA/classB comparison and DataFrame creation
classA = X_train[X_train["ID"].isin(class_1_ID)]
classB = X_train[X_train["ID"].isin(class_2_ID)]

if classA.iloc[:, 1:11].mean().mean() < classB.iloc[:, 1:11].mean().mean():
    ID_low_LGM = class_1_ID
    ID_high_LGM = class_2_ID
else:
    ID_low_LGM = class_2_ID
    ID_high_LGM = class_1_ID

# Get class df
df_ID_LGM_train = pd.DataFrame({
    "ID": np.concatenate([ID_low_LGM, ID_high_LGM]),
    "class": [0] * len(ID_low_LGM) + [1] * len(ID_high_LGM)
})

# Now apply model to test data
# transform to long format
X_test_long = X_test.melt(id_vars=["ID"], var_name="t", value_name="y")

# Encode the 't' column
label_encoder = LabelEncoder()
X_test_long["t"] = label_encoder.fit_transform(X_test_long["t"])
# Predict test classes
test_probs = lcga2.predict_proba(X_test_long[["y", "t"]])
X_test_long["class"] = np.argmax(test_probs, axis=1)

# Ensure sufficient IDs for slicing
class_1_ID = X_test_long[X_test_long["class"] == 0]["ID"].unique()[:20]
class_2_ID = X_test_long[X_test_long["class"] == 1]["ID"].unique()[30:50]

# Continue with classA/classB comparison and DataFrame creation
classA = X_test[X_test["ID"].isin(class_1_ID)]
classB = X_test[X_test["ID"].isin(class_2_ID)]

if classA.iloc[:, 1:11].mean().mean() < classB.iloc[:, 1:11].mean().mean():
    ID_low_LGM = class_1_ID
    ID_high_LGM = class_2_ID
else:
    ID_low_LGM = class_2_ID
    ID_high_LGM = class_1_ID

df_ID_LGM_test = pd.DataFrame({
    "ID": np.concatenate([ID_low_LGM, ID_high_LGM]),
    "class": [0] * len(ID_low_LGM) + [1] * len(ID_high_LGM)
})

len(df_ID_LGM_train)

80

In [37]:
yawn_pain_file = os.path.join(basepath, "data/yawn_pain_all.csv")
df_yawn_pain = pd.read_csv(yawn_pain_file)

chew_pain_file = os.path.join(basepath, "data/chew_pain_all.csv")
df_chew_pain = pd.read_csv(chew_pain_file)


df_all_pain = df_chew_pain.add(df_yawn_pain, fill_value=0)

df_all_pain["ID"] = df_yawn_pain["ID"]

df_all_pain_long = df_all_pain.melt(id_vars=["ID"], 
                                                var_name="t", 
                                                value_name="y")

# Fit latent class growth models
# Initial model with one group
# Encode the 't' column
label_encoder = LabelEncoder()
df_all_pain_long["t"] = label_encoder.fit_transform(df_all_pain_long["t"])
lcga1 = GaussianMixture(n_components=1, covariance_type="full", max_iter=100)
lcga1.fit(df_all_pain_long[["y", "t"]])

# Optimize with two groups
lcga2 = GaussianMixture(n_components=2, covariance_type="full", max_iter=100)
lcga2.fit(df_all_pain_long[["y", "t"]])


# Get class probabilities and IDs
probs = lcga2.predict_proba(df_all_pain_long[["y", "t"]])
df_all_pain_long["class"] = np.argmax(probs, axis=1)

# Classify IDs
class_1_ID = df_all_pain_long[df_all_pain_long["class"] == 0]["ID"].unique()[:40]
class_2_ID = df_all_pain_long[df_all_pain_long["class"] == 1]["ID"].unique()[60:100]

classA = all_pain_df_train[all_pain_df_train["ID"].isin(class_1_ID)]
classB = all_pain_df_train[all_pain_df_train["ID"].isin(class_2_ID)]

if classA.iloc[:, 1:11].mean().mean() < classB.iloc[:, 1:11].mean().mean():
    ID_low_LGM = class_1_ID
    ID_high_LGM = class_2_ID
else:
    ID_low_LGM = class_2_ID
    ID_high_LGM = class_1_ID


df_ID_LGM_train = pd.DataFrame({"low": ID_low_LGM, "high": ID_high_LGM})



Unnamed: 0,low,high
0,14,94
1,27,95
2,31,98
3,65,99
4,78,100
5,83,101
6,86,102
7,87,103
8,96,104
9,97,105


In [None]:
# this version includes Latent growth model for classification

# Load data
paf_file = os.path.join(basepath, "data/PAF_all.xlsx")
cme_file = os.path.join(basepath, "data/map_volume_all.xlsx")
yawn_pain_file = os.path.join(basepath, "data/yawn_pain_all.csv")
chew_pain_file = os.path.join(basepath, "data/chew_pain_all.csv")


df_paf = pd.read_excel(paf_file)
df_cme = pd.read_excel(cme_file)
df_yawn_pain = pd.read_csv(yawn_pain_file)
df_chew_pain = pd.read_csv(chew_pain_file)

# Calculate pain ratings



# Calculate CME values
df_cme["CME"] = (df_cme.Volume_Day5 - df_cme.Volume_Day0).apply(lambda x: 1 if x > 0 else 0)

# Merge data on ID
data = df_class.merge(df_paf, on="ID", how="inner").merge(df_cme[["ID", "CME"]], on="ID", how="inner")

# Define predictors and target
X = data[["sensorimotor_paf", "CME"]]
y = data["class"]

# Models and hyperparameters
models_and_params = {
    "LogisticRegression": (
        LogisticRegression(),
        {
            'model__C': np.logspace(-3, 3, 30),
            'model__solver': ['newton-cg', 'lbfgs'],
            'model__max_iter': [200, 400, 2000, 5000]
        }
    ),
    "RandomForest": (
        RandomForestClassifier(),
        {
            'model__n_estimators': [300, 500, 1000],
            'model__max_depth': [None, 5, 10],
            'model__min_samples_split': [2, 5, 10],
            'model__bootstrap': [True, False]
        }
    ),
    "GradientBoosting": (
        GradientBoostingClassifier(),
        {
            'model__learning_rate': [0.01, 0.1, 0.5],
            'model__max_depth': [None, 2, 5],
            'model__min_samples_split': [2, 5, 10],
            'model__n_estimators': [100, 200]
        }
    ),
    "SVC": (
        SVC(probability=True),
        {
            'model__C': [0.01, 0.1, 1, 10],
            'model__gamma': ['scale', 'auto']
        }
    ),
    "MLPClassifier": (
        MLPClassifier(),
        {
            'model__alpha': [1e-4, 1e-3, 1e-2, 1e-1],
            'model__hidden_layer_sizes': [(100,), (100, 100), (100, 100, 100)],
            'model__max_iter': [5000]
        }
    )
}

# Function to perform analysis
def run_analysis(random_seeds, n_runs):
    results = []

    for seed in random_seeds:
        print(f"Running analysis with seed {seed}...")

        # Shuffle data
        X_shuffled, y_shuffled = shuffle(X, y, random_state=seed)

        # Split data into training and test sets
        X_train, X_test, y_train, y_test = train_test_split(X_shuffled, y_shuffled, test_size=0.35, random_state=seed)

        for model_name, (model, param_grid) in models_and_params.items():

            # Create pipeline
            pipeline = Pipeline([
                ("imputer", IterativeImputer(max_iter=100, random_state=seed)),
                ("scaler", StandardScaler()),
                ("model", model)
            ])

            # Perform grid search
            search = GridSearchCV(
                pipeline, param_grid=param_grid,
                cv=5, scoring="accuracy", verbose=1, n_jobs=-1
            )
            search.fit(X_train, y_train)

            # Get the best model
            best_model = search.best_estimator_

            # Evaluate the model on training and test sets
            for dataset, X_eval, y_eval, label in zip(
                ["train", "test"],
                [X_train, X_test],
                [y_train, y_test],
                ["Training", "Test"]
            ):
                y_pred = best_model.predict(X_eval)
                y_pred_proba = best_model.predict_proba(X_eval)[:, 1]
                accuracy = accuracy_score(y_eval, y_pred)
                auc = roc_auc_score(y_eval, y_pred_proba)
                results.append({
                    "Seed": seed,
                    "Model": model_name,
                    "Dataset": label,
                    "Accuracy": accuracy,
                    "AUC": auc
                })

    return pd.DataFrame(results)

# Run analysis
random_seeds = np.random.randint(0, 10000, size=repetitions)
n_runs = repetitions
results_df = run_analysis(random_seeds, n_runs=n_runs)

# Aggregate results
summary = results_df.groupby(["Model", "Dataset"]).mean()[["Accuracy", "AUC"]].reset_index()

# Save results
results_df.to_csv(os.path.join(basepath, "results/results_all_runs.csv"), index=False)
summary.to_csv(os.path.join(basepath, "results/summary_results.csv"), index=False)

print("Analysis complete. Results saved.")
