### Import packages

In [None]:
from statistics import stdev

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from lime import lime_tabular
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.calibration import calibration_curve
from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
from sklearn.feature_selection import RFECV
from sklearn.inspection import (
    PartialDependenceDisplay,
)
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import (
    auc,
    brier_score_loss,
    confusion_matrix,
    make_scorer,
    roc_auc_score,
    roc_curve,
)
from sklearn.model_selection import (
    GridSearchCV,
    RepeatedStratifiedKFold,
    train_test_split,
)
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import PowerTransformer, StandardScaler
from sklearn.svm import SVC
from sklearn.utils import resample


### Define evaluation functions

In [None]:
def calculate_metric_statistics(scores):
    """
    Calculate the mean and confidence interval of a list of scores.

    Args:
    scores (list of float): A list of scores.

    Returns:
    tuple: A tuple containing the mean score, lower bound of the confidence interval,
    and upper bound of the confidence interval.
    """
    # Constants for the percentiles
    LOWER_PERCENTILE = 2.5
    UPPER_PERCENTILE = 97.5

    # Sort the scores for percentile calculation
    sorted_scores = np.sort(scores)

    # Calculate mean of the scores
    mean_score = round(np.mean(sorted_scores), 2)

    # Calculate the lower and upper bounds of the confidence interval
    lower_ci = round(np.percentile(sorted_scores, LOWER_PERCENTILE), 2)
    upper_ci = round(np.percentile(sorted_scores, UPPER_PERCENTILE), 2)

    return mean_score, lower_ci, upper_ci

def DCA(y_true, y_pred_prob, model_dict, model_name, ax):
    """
    Perform Decision Curve Analysis (DCA) and plot the results.

    Args:
    y_true (array-like): True binary labels.
    y_pred_prob (array-like): Predicted probabilities.
    model_dict (dict): Dictionary mapping model names to display names.
    model_name (str): Name of the model to analyze.
    ax (matplotlib.axes.Axes): Axes object to plot on.

    Returns:
    matplotlib.axes.Axes: The axes object with the DCA plot.
    """
    # Define threshold group
    thresh_group = np.arange(0, 1, 0.05)

    # Initialize arrays to store standardized net benefits
    std_nb_model = np.array([])
    std_nb_all_model = np.array([])

    # Calculate the incidence of the condition in the dataset
    mcid_incidence = np.sum(y_true == 1)
    n = len(y_true)

    for thresh in thresh_group:
        # Predicted classes based on the threshold
        y_pred = y_pred_prob >= thresh

        # Calculate confusion matrix components
        tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()

        # Calculate net benefit
        nb = (tp / n) - (fp / n) * (thresh / (1 - thresh))
        std_nb = nb / (mcid_incidence / n) if tp != 0 and not np.isnan(nb) else 0
        std_nb_model = np.append(std_nb_model, std_nb)

        # Net benefit for 'treat all' strategy
        _, _, _, tp_all = confusion_matrix(y_true, [1] * len(y_true)).ravel()
        nb_all = (tp_all / n) - (1 - (tp_all / n)) * (thresh / (1 - thresh))
        std_nb_all = nb_all / (mcid_incidence / n) if tp_all != 0 and not np.isnan(nb_all) else 0
        std_nb_all_model = np.append(std_nb_all_model, std_nb_all)

    # Plotting the DCA curve
    ax.plot(thresh_group, std_nb_model, color="blue", label=f"{model_dict[model_name]}")
    ax.plot(thresh_group, std_nb_all_model, color="grey", label="Treat all")
    ax.plot((0, 1), (0, 0), color="grey", linestyle=":", label="Treat none")

    # Setting plot aesthetics
    ax.set_ylim(-0.15, 1.15)
    ax.set_xlim(0, 0.8)
    ax.set_xlabel("Threshold Probability", size=12)
    ax.set_ylabel("Standardized Net Benefit", size=12)
    ax.set_title("Decision Curve Analysis", pad=25, size=15, fontweight="bold")
    ax.grid(True)
    ax.spines["right"].set_color((0.8, 0.8, 0.8))
    ax.spines["top"].set_color((0.8, 0.8, 0.8))
    ax.legend(loc="upper right")

    return ax

def lowess(x, y, f):
    """
    Perform basic LOWESS (Locally Weighted Scatterplot Smoothing) with a linear model.

    Note:
    - This implementation is not robust, meaning no iteration and only normally distributed errors.
    - It uses only linear polynomials (degree = 1).

    Args:
        x (array-like): Independent variable.
        y (array-like): Dependent variable.
        f (float): Smoothing parameter, representing the fraction of data points used
                   for each local regression.

    Returns:
        tuple: Two arrays containing the smoothed values of y and their standard errors.
    """
    # Calculate the effective width after applying the reduction factor
    xwidth = f * (x.max() - x.min())

    # Number of observations
    N = len(x)

    # Ensure the data is sorted according to x
    order = np.argsort(x)

    # Initialize arrays for smoothed values and their standard errors
    y_sm = np.zeros_like(y)
    y_stderr = np.zeros_like(y)

    # Define the weighting function with clipping
    tricube = lambda d: np.clip((1 - np.abs(d) ** 3) ** 3, 0, 1)

    # Perform regression for each observation
    for i in range(N):
        # Compute weights for each observation
        dist = np.abs((x[order][i] - x[order])) / xwidth
        w = tricube(dist)

        # Form linear system with the weights
        A = np.stack([w, x[order] * w]).T
        b = w * y[order]
        ATA = A.T.dot(A)
        ATb = A.T.dot(b)

        # Solve the linear system
        sol = np.linalg.solve(ATA, ATb)

        # Predict the value for the current observation
        yest = A[i].dot(sol)
        place = order[i]
        y_sm[place] = yest

        # Compute the variance
        sigma2 = np.sum((A.dot(sol) - y[order]) ** 2) / N

        # Calculate the standard error for the current observation
        y_stderr[place] = np.sqrt(sigma2 * A[i].dot(np.linalg.inv(ATA)).dot(A[i]))

    return y_sm, y_stderr

def plot_cal_curve(ax, y_actual, y_pred_prob):
    """
    Plot a calibration curve with LOWESS smoothing on the given axis.

    Args:
        ax (matplotlib.axes.Axes): The matplotlib axis to plot on.
        y_actual (array-like): Actual binary labels.
        y_pred_prob (array-like): Predicted probabilities.

    Returns:
        matplotlib.axes.Axes: The axis with the calibration plot.
    """
    # Calculate the calibration curve
    y, x = calibration_curve(y_actual, y_pred_prob, n_bins=20)

    # Apply LOWESS smoothing
    y_sm, y_std = lowess(x, y, f=1 / 3)
    order = np.argsort(x)

    # Plot the smoothed calibration curve
    ax.plot(x[order], y_sm[order], color="black", label="Model", linewidth=0.5)
    ax.plot([0, 1], [0, 1], color="red", linestyle="-", linewidth=0.5)

    # Histograms for positive and negative predicted probabilities
    y_pred_prob_positive = y_pred_prob[y_actual == 1]
    y_pred_prob_negative = y_pred_prob[y_actual == 0]
    weights_positive = np.ones_like(y_pred_prob_positive) / len(y_pred_prob)
    ax.hist(
        y_pred_prob_positive,
        weights=weights_positive,
        bins=np.maximum(10, 100),
        alpha=0.4,
        color="green",
    )
    weights_negative = -np.ones_like(y_pred_prob_negative) / len(y_pred_prob)
    ax.hist(
        y_pred_prob_negative,
        weights=weights_negative,
        bins=np.maximum(10, 100),
        alpha=0.4,
        color="red",
    )

    # Fill between for confidence interval
    ax.fill_between(
        x[order],
        y_sm[order] - y_std[order],
        y_sm[order] + y_std[order],
        alpha=0.3,
        color="grey",
    )

    # Set plot lables and title
    ax.set_xlabel("Predicted probability", size=12)
    ax.set_ylabel("Observed proportion", size=12)
    ax.set_title(label="Calibration plot", pad=25, size=15, fontweight="bold")
    
    # Set plot limits and ticks
    ax.set_ylim(-0.1, 1.1)
    ax.set_yticks([0.0, 0.2, 0.4, 0.6, 0.8, 1])

    return ax


def test_model_performance_training(X_train, y_train, model_pipeline, model_name):
    """
    Evaluate model performance on training data using cross-validation.

    Args:
        X_train (DataFrame): Training feature dataset.
        y_train (Series): Training target variable.
        model_pipeline: Machine learning model pipeline.
        model_name (str): Name identifier for the model.

    Returns:
        DataFrame: A DataFrame containing the model name and its performance metrics.
    """
    
    model_dict = {
        "rf": "Random Forest",
        "sgb": "Stochastic Gradient Boosting",
        "nn": "Neural Network",
        "svm": "Support Vector Machine",
        "plr": "Penalized Logistic Regression",
    }

    kf = RepeatedStratifiedKFold(n_splits=10, n_repeats=5, random_state=42)
    
    # Initialize lists to store performance metrics
    auc_scores = []
    calibration_intercepts = []
    calibration_slopes = []
    brier_scores = []

    # Cross-validation loop
    for train_index, test_index in kf.split(X_train, y_train):
        # Split data
        X_train_fold, X_test_fold = X_train.iloc[train_index], X_train.iloc[test_index]
        y_train_fold, y_test_fold = y_train.iloc[train_index], y_train.iloc[test_index]

        # Fit model
        model_pipeline.fit(X_train_fold, y_train_fold.values.ravel())

        # Predicted probabilities
        y_pred_prob = model_pipeline.predict_proba(X_test_fold)[:, 1]

        # Calculcate and store performance metrics
        auc_scores.append(roc_auc_score(y_test_fold, y_pred_prob))
        fop, mpv = calibration_curve(
            y_test_fold, y_pred_prob, n_bins=20, strategy="uniform"
        )
        slope, intercept = np.polyfit(mpv, fop, 1)
        calibration_slopes.append(slope)
        calibration_intercepts.append(intercept)
        brier_scores.append(brier_score_loss(y_test_fold, y_pred_prob))

    # Calculate means
    auc_stat = calculate_metric_statistics(auc_scores)
    intercept_stat = calculate_metric_statistics(calibration_intercepts)
    slope_stat = calculate_metric_statistics(calibration_slopes)
    brier_stat = calculate_metric_statistics(brier_scores)

    # Compile results
    results_df = pd.DataFrame(
        {
            "model_name": [model_dict[model_name]],
            "auc_score": [f"{auc_stat[0]} ({auc_stat[1]} - {auc_stat[2]})"],
            "calibration_intercept": [
                f"{intercept_stat[0]} ({intercept_stat[1]} - {intercept_stat[2]})"
            ],
            "calibration_slope": [
                f"{slope_stat[0]} ({slope_stat[1]} - {slope_stat[2]})"
            ],
            "brier_score": [f"{brier_stat[0]} ({brier_stat[1]} - {brier_stat[2]})"],
        }
    )

    return results_df


def model_performance_testing(X_test,y_test,model,model_name,save=False,save_path=None,
                              model_to_save=None,figure=None):
    """
    Evaluate the performance of a model on test data, with options for bootstrapping, plotting, and saving results.

    Args:
        X_test (DataFrame): Test feature dataset.
        y_test (Series): Test target variable.
        model: Trained machine learning model.
        model_name (str): Name identifier for the model.
        save (bool): Whether to save plots.
        save_path (str): Directory path to save plots.
        model_to_save (list): List of models to save.
        figure (bool): Whether to create and display figures.

    Returns:
        DataFrame, array: A DataFrame containing performance metrics and an array of predicted probabilities.
    """
      
    model_dict = {
        "rf": "Random Forest",
        "sgb": "Stochastic Gradient Boosting",
        "nn": "Neural Network",
        "svm": "Support Vector Machine",
        "plr": "Penalized Logistic Regression",
    }

    y_pred_prob = model.predict_proba(X_test)[:, 1]

    # Parameters for bootstrap
    n_bootstraps = 2000
    n_bins = 20

    # Initialize lists for storing perfromance metrics
    auc_scores = []
    calibration_intercepts = []
    calibration_slopes = []
    brier_scores = []

    # Bootstrap loop
    for i in range(n_bootstraps):
        while True:
            X_boot, y_boot = resample(X_test, y_test, n_samples=len(y_test))
            # Check if the resampled set has more than one unique class
            if len(np.unique(y_boot)) > 1:
                break  # Exit the loop if the sample is valid

        y_pred_boot = model.predict_proba(X_boot)[:, 1]

        # Compute and score metrics
        auc_scores.append(roc_auc_score(y_boot, y_pred_boot))
        fop, mpv = calibration_curve(
            y_boot, y_pred_boot, n_bins=n_bins, strategy="uniform"
        )
        slope, intercept = np.polyfit(mpv, fop, 1)
        calibration_slopes.append(slope)
        calibration_intercepts.append(intercept)
        brier_scores.append(brier_score_loss(y_boot, y_pred_boot))

    # Calculate means of score metrics
    auc_stat = calculate_metric_statistics(auc_scores)
    intercept_stat = calculate_metric_statistics(calibration_intercepts)
    slope_stat = calculate_metric_statistics(calibration_slopes)
    brier_stat = calculate_metric_statistics(brier_scores)

    if figure == True:
        # Create a figure with a single row of three subplots
        fig, (ax1, ax2, ax3) = plt.subplots(
            1, 3, figsize=(30, 10)
        )  # Adjust the figsize as needed

        # Plot ROC curve
        fpr, tpr, thresholds = roc_curve(y_test, y_pred_prob)
        roc_auc = auc(fpr, tpr)

        ax1.plot(
            fpr, tpr, label=f"model_dict[model_name] (AUC = {roc_auc:.2f})", color="red"
        )
        ax1.grid(visible=True, which="major")
        ax1.set_xlim([0.0, 1.0])
        ax1.set_ylim([0.0, 1.05])
        ax1.set_xlabel("False Positive Rate")
        ax1.set_ylabel("True Positive Rate")
        ax1.set_title("Receiver Operating Characteristic (ROC)")
        ax1.legend(loc="lower right")

        # Plot calibration and dca curve
        ax2 = plot_cal_curve(ax2, y_test, y_pred_prob)
        ax3 = DCA(y_test, y_pred_prob, model_dict, model_name, ax3)

        # Plot figures
        plt.tight_layout()
        plt.show()

        if save and model_name in model_to_save:
            # Save the ROC plot
            fig_roc, ax_roc = plt.subplots(figsize=(10, 7))
            ax_roc.plot(
                fpr,
                tpr,
                label=f"{model_dict[model_name]} (AUC = {roc_auc:.2f})",
                color="red",
            )
            ax_roc.grid(visible=True, which="major")
            ax_roc.set_xlim([0.0, 1.0])
            ax_roc.set_ylim([0.0, 1.05])
            ax_roc.set_xlabel("False Positive Rate")
            ax_roc.set_ylabel("True Positive Rate")
            ax_roc.set_title("Receiver Operating Characteristic (ROC)")
            ax_roc.legend(loc="lower right")
            fig_roc.savefig(f"{model_dict[model_name]}_ROC.png", dpi=300)

            # Save the Calibration plot
            fig_cal, ax_cal = plt.subplots(figsize=(10, 7))
            ax_cal = plot_cal_curve(ax_cal, y_test, y_pred_prob)
            fig_cal.savefig(f"{model_dict[model_name]}_Calibration_curve.png", dpi=300)

            # Save the Decision Curve plot
            fig_dc, ax_dc = plt.subplots(figsize=(10, 7))
            ax_dc = DCA(y_test, y_pred_prob, model_dict, model_name, ax_dc)
            fig_dc.savefig(f"{model_dict[model_name]}_Decision_Curve.png", dpi=300)

    # Compile results
    results_df = pd.DataFrame(
        {
            "model_name": [model_dict[model_name]],
            "auc_score": [f"{auc_stat[0]} ({auc_stat[1]} - {auc_stat[2]})"],
            "calibration_intercept": [
                f"{intercept_stat[0]} ({intercept_stat[1]} - {intercept_stat[2]})"
            ],
            "calibration_slope": [
                f"{slope_stat[0]} ({slope_stat[1]} - {slope_stat[2]})"
            ],
            "brier_score": [f"{brier_stat[0]} ({brier_stat[1]} - {brier_stat[2]})"],
        }
    )

    return results_df, y_pred_prob

### Define preprocessing methods

In [None]:
def pipeline_importance_getter(pipe):
    """Get feature importances from a pipeline's named step."""
    if hasattr(pipe.named_steps["classifier"], "feature_importances_"):
        return pipe.named_steps["classifier"].feature_importances_
    elif hasattr(pipe.named_steps["classifier"], "coef_"):
        return abs(pipe.named_steps["classifier"].coef_[0])  # For logistic regression
    else:
        raise ValueError(
            'The classifier does not expose "coef_" or "feature_importances_" attributes'
        )


class CustomPowerTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, columns, all_features):
        """
        Initialize the CustomPowerTransformer.

        Parameters:
        - columns: list of feature names to power-transform
        - all_features: list of all feature names
        """
        self.columns = columns
        self.all_features = all_features

    def fit(self, X, y=None):
        """
        Fit the transformer.
        Here we compute the names for columns to power-transform and fit the PowerTransformer
        and StandardScaler on the appropriate data.
        """
        # Determine indices of columns to power-transform
        self.column_names = self.all_features[: X.shape[1]]
        self.column_indices = [
            i
            for i, col_name in enumerate(self.column_names)
            if col_name in self.columns
        ]

        # Convert X to numpy array if it's a DataFrame
        if isinstance(X, pd.DataFrame):
            X = X.values

        # Fit the power transformer to the specified columns
        self.power_transformer = PowerTransformer(
            method="yeo-johnson", standardize=False
        )
        self.power_transformer.fit(X[:, self.column_indices])

        # Transform the specified columns
        X_transformed = X.copy()
        X_transformed[:, self.column_indices] = self.power_transformer.transform(
            X[:, self.column_indices]
        )

        # Fit the scaler to the transformed data
        self.scaler = StandardScaler().fit(X_transformed)

        return self

    def transform(self, X, y=None):
        """
        Apply power transformation to specified columns and then scale all columns.
        """
        # Convert X to numpy array if it's a DataFrame
        if isinstance(X, pd.DataFrame):
            X = X.values

        # Apply power transformation to specified columns
        X_transformed = X.copy()
        X_transformed[:, self.column_indices] = self.power_transformer.transform(
            X[:, self.column_indices]
        )

        # Scale all the columns
        return self.scaler.transform(X_transformed)

### Initialize data

In [None]:
df = pd.read_excel("Test_QOL_Data_imputed_EQ5DIndexes.xlsx")

df["binary_ASIA"] = df["ASIA"].apply(lambda x: 0 if x <= 3 else 1)
df["binary_ASA"] = df["All_ASA"].apply(lambda x: 0 if x <= 2 else 1)
df["binary_KPS"] = df["KPS"].apply(lambda x: 0 if x >= 80 else 1)
df["Opioid"] = df["Analgesic"].apply(lambda x: 1 if x > 1 else 0)

# Create new variable with categories
df["grouped_KPS"] = pd.cut(
    df["KPS"], bins=[-1, 40, 80, 100], labels=[0, 1, 2], right=True, include_lowest=True
)

# Adjust for the middle bin
df.loc[df["KPS"] == 80, "grouped_KPS"] = 2

features = [
    "Age",
    "BMI",
    "Gender",
    "CCI_YN",
    "binary_KPS",
    "Functional_Stat_1",
    "binary_ASIA",
    "binary_ASA",
    "Katagiri_Group",
    "Visceral",
    "Brain",
    "Liver",
    "Lung",
    "Path_Fract",
    "Nr_Skel_Met",
    "Nr_Spine_Met",
    "Nr_Ex_Spin_Foci",
    "Pre_Chem",
    "Opioid",
    "WBC",
    "Hemo",
    "Platelet",
    "Creat",
    "CHI+RT",
    "B_Mob",
    "B_Sel",
    "B_Usu",
    "B_Dis",
    "B_Anx",
    "B_Index",
]

pred_features = [
    "Age",
    "BMI",
    "Gender",
    "CCI_YN",
    "binary_KPS",
    "Functional_Stat_1",
    "binary_ASIA",
    "binary_ASA",
    "Katagiri_Group",
    "Visceral",
    "Brain",
    "Liver",
    "Lung",
    "Path_Fract",
    "Nr_Skel_Met",
    "Nr_Spine_Met",
    "Nr_Ex_Spin_Foci",
    "Pre_Chem",
    "Opioid",
    "WBC",
    "Hemo",
    "Platelet",
    "Creat",
    "B_Index",
]

numerical_features = ["Age", "BMI", "WBC", "Hemo", "Platelet", "Creat", "B_Index"]

# MCID recalculation
std_baseline = stdev(df["B_Index"])
print("MCID, 0.5 of SD = ", round(0.5 * std_baseline, 2))
MCID = round(0.5 * std_baseline, 2)
df["MCID_Result"] = (df["M3_Index"] - df["B_Index"] >= MCID).astype(int)

# Separate the features (X) and the target variable (y)
X = df[features]
y = df["MCID_Result"]

# Perform the stratified split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, shuffle=True, stratify=y, random_state=42
)

In [None]:
df["MCID_Result"].value_counts()

In [None]:
X_train["Test"] = 0
X_test["Test"] = 1
X_train.to_excel("X_train_fortable.xlsx")
X_test.to_excel("X_test_fortable.xlsx")

In [None]:
df

In [None]:
y_train_value_counts = y_train.value_counts()
y_test_value_counts = y_test.value_counts()

print("Training set counts:")
print(y_train_value_counts)

print("\nTest set counts:")
print(y_test_value_counts)

In [None]:
power_and_scale_transformer_svm = CustomPowerTransformer(
    columns=numerical_features, all_features=pred_features
)
power_and_scale_transformer_plr = CustomPowerTransformer(
    columns=numerical_features, all_features=pred_features
)

In [None]:
feature_pipelines = {
    "rf": Pipeline(
        [("classifier", RandomForestClassifier(n_estimators=100, n_jobs=-1))]
    ),
    "plr": Pipeline(
        [
            ("preprocessor", power_and_scale_transformer_plr),
            (
                "classifier",
                LogisticRegression(
                    penalty="elasticnet", solver="saga", l1_ratio=0.5, max_iter=10000
                ),
            ),
        ]
    ),
}

# Step 2: Use these pipelines in RFECV
rskf = RepeatedStratifiedKFold(n_splits=10, n_repeats=5, random_state=42)

rfecv_rf_selector = RFECV(
    estimator=feature_pipelines["rf"],
    step=1,
    cv=rskf,
    importance_getter=pipeline_importance_getter,
    n_jobs=-1,
    verbose=1,
)
rfecv_plr_selector = RFECV(
    estimator=feature_pipelines["plr"],
    step=1,
    cv=rskf,
    importance_getter=pipeline_importance_getter,
    n_jobs=-1,
    verbose=1,
)

rfecv_rf_selector.fit(X_train[pred_features], y_train)
rfecv_plr_selector.fit(X_train[pred_features], y_train)

# Get optimal feature masks
rf_features_mask = rfecv_rf_selector.support_
plr_features_mask = rfecv_plr_selector.support_

# Retrieve optimal features names
rf_optimal_features = [
    feature for feature, boolean in zip(pred_features, rf_features_mask) if boolean
]
print(rf_features_mask)
plr_optimal_features = [
    feature for feature, boolean in zip(pred_features, plr_features_mask) if boolean
]
print(plr_features_mask)

print("Optimal features for Random Forest:", rf_optimal_features)
print("Optimal features for Penalized Logistic Regression:", plr_optimal_features)

In [None]:
optimal_rf = [
    "Age",
    "BMI",
    "Gender",
    "CCI_YN",
    "binary_KPS",
    "binary_ASA",
    "Katagiri_Group",
    "Path_Fract",
    "Nr_Spine_Met",
    "Nr_Ex_Spin_Foci",
    "Pre_Chem",
    "Opioid",
    "WBC",
    "Hemo",
    "Platelet",
    "Creat",
    "B_Index",
]
optimal_plr = ["Katagiri_Group", "B_Index"]

### Train model

In [None]:
pipelines = {
    "rf": Pipeline([("classifier", RandomForestClassifier())]),
    "sgb": Pipeline([("classifier", GradientBoostingClassifier())]),
    "svm": Pipeline(
        [
            ("scaler", power_and_scale_transformer_svm),
            ("classifier", SVC(kernel="rbf", probability=True)),
        ]
    ),
    "plr": Pipeline(
        [
            ("preprocessor", power_and_scale_transformer_plr),
            (
                "classifier",
                LogisticRegression(penalty="elasticnet", solver="saga", max_iter=10000),
            ),
        ]
    ),
}

# Using StratifiedKFold in GridSearchCV
cv_method = RepeatedStratifiedKFold(n_splits=10, n_repeats=3, random_state=42)

# Grid Search for Hyperparameter Tuning
param_grid_rf = {
    'classifier__n_estimators': [50, 100, 200, 300, 400, 500],
    'classifier__max_depth': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, None],
    'classifier__max_features': [sqrt, log2, None],
    'classifier__min_samples_split': [2, 3, 4, 5, 6, 7, 8, 9, 10],
    'classifier__min_samples_leaf': [1, 2, 3, 4, 5, 6, 7, 8, 10]
}

param_grid_sgb = {
    'classifier__loss': ["log_loss", "exponential"],
    'classifier__n_estimators': [25, 50, 75, 100, 150, 200],
    'classifier__learning_rate': [0.001, 0.01, 0.1],
    'classifier__max_depth': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, None],
    'classifier__subsample': [0.5, 0.6, 0.7, 0.8, 0.9],
    'classifier__max_features': [sqrt, log2, None],
    'classifier__min_samples_split': [2, 3, 4, 5, 6, 7, 8, 9, 10],
    'classifier__min_samples_leaf': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
}

param_grid_svm = {  # Non-linear SVM
    "classifier__C": [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1],
    "classifier__gamma": ["scale", "auto"],
}

param_grid_plr = {
    "classifier__C": [0.001, 0.01, 0.01, 0.1, 1, 10, 100, 1000],
    "classifier__l1_ratio": [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9],
}

# Define multiple scorers
scorers = {
    "roc_auc": "roc_auc",
    "neg_brier_score": make_scorer(
        brier_score_loss, needs_proba=True, greater_is_better=False
    ),
}

summary = []

for model_name, model_pipeline in pipelines.items():
    if model_name == "plr":
        # Optimize hyperparameters with GridSearchCV using multiple metrics
        grid_search = GridSearchCV(
            model_pipeline,
            eval(f"param_grid_{model_name}"),
            cv=cv_method,
            scoring=scorers,
            refit="roc_auc",
            verbose=1,
            n_jobs=-1,
        )
        grid_search.fit(X_train[optimal_plr], y_train)

        # Get mean test scores
        roc_auc = grid_search.cv_results_["mean_test_roc_auc"][grid_search.best_index_]
        brier = -grid_search.cv_results_["mean_test_neg_brier_score"][
            grid_search.best_index_
        ]  # Convert back to positive Brier score

        # Store results in a summary list
        summary.append(
            {
                "Model": model_name,
                "Best Hyperparameters": grid_search.best_params_,
                "Mean ROC AUC": round(roc_auc, 2),
                "Mean Brier Score": round(brier, 2),
            }
        )

    else:
        # Optimize hyperparameters with GridSearchCV using multiple metrics
        grid_search = GridSearchCV(
            model_pipeline,
            eval(f"param_grid_{model_name}"),
            cv=cv_method,
            scoring=scorers,
            refit="roc_auc",
            verbose=1,
            n_jobs=-1,
        )
        grid_search.fit(X_train[optimal_rf], y_train)

        # Get mean test scores
        roc_auc = grid_search.cv_results_["mean_test_roc_auc"][grid_search.best_index_]
        brier = -grid_search.cv_results_["mean_test_neg_brier_score"][
            grid_search.best_index_
        ]  # Convert back to positive Brier score

        # Store results in a summary list
        summary.append(
            {
                "Model": model_name,
                "Best Hyperparameters": grid_search.best_params_,
                "Mean ROC AUC": round(roc_auc, 2),
                "Mean Brier Score": round(brier, 2),
            }
        )

# Convert the summary list to a DataFrame for better visualization
summary_df = pd.DataFrame(summary)
summary_df

### Stratified 10-fold cross-validation repeated 5 times for training performance

In [None]:
# Evaluate the models on training data
result_df = pd.DataFrame()
all_results = pd.DataFrame()

for model_name, model_pipeline in pipelines.items():
    # Get the best parameters for the model
    best_params = [
        item["Best Hyperparameters"] for item in summary if item["Model"] == model_name
    ][0]

    # Strip 'classifier__' prefix from the parameter names
    stripped_params = {
        key.replace("classifier__", ""): value for key, value in best_params.items()
    }

    # Update the model with the best parameters
    model_pipeline.named_steps["classifier"].set_params(**stripped_params)

    if model_name == "plr":
        X_train_model = X_train[optimal_plr]
    else:
        X_train_model = X_train[optimal_rf]

    result_df = test_model_performance_training(
        X_train_model, y_train, model_pipeline, model_name
    )
    all_results = pd.concat([all_results, result_df], ignore_index=True)

all_results

### Test models

In [None]:
# Store the best models
best_models = {}

for model_name, model_pipeline in pipelines.items():
    # Get the best parameters for the model
    best_params = [
        item["Best Hyperparameters"] for item in summary if item["Model"] == model_name
    ][0]

    # Strip 'classifier__' prefix from the parameter names
    stripped_params = {
        key.replace("classifier__", ""): value for key, value in best_params.items()
    }

    # Update the model with the best parameters
    model_pipeline.named_steps["classifier"].set_params(**stripped_params)

    # Determine the right feature set based on the model
    if model_name == "plr":
        X_train_model = X_train[optimal_plr]
    else:
        X_train_model = X_train[optimal_rf]

    # Fit the model on the full training data
    model_pipeline.fit(X_train_model, y_train)

    # Store the trained model
    best_models[model_name] = model_pipeline

# Evaluate the models on test data
results_df = pd.DataFrame()
all_results = pd.DataFrame()
probabilities = {}

# Evaluate the models on test data
for model_name, model in best_models.items():
    # Determine the right feature set based on the model
    if model_name == "plr":
        X_test_model = X_test[optimal_plr]
    else:
        X_test_model = X_test[optimal_rf]

    # Get the performance statistics for the model
    figure = True
    save = True
    model_to_save = ["plr", "sgb"]
    results_df, model_probs = model_performance_testing(
        X_test_model,
        y_test,
        model=model,
        model_name=model_name,
        figure=figure,
        save=save,
        model_to_save=model_to_save,
    )
    all_results = pd.concat([all_results, results_df], ignore_index=True)

    if model_name in ["plr", "sgb"]:
        probabilities[model_name] = model_probs

all_results

### Null-model for Brier score

In [None]:
# Extracting the actual outcomes
actual_outcomes = df["MCID_Result"].values

# Calculating the baseline probability
baseline_probability = np.mean(actual_outcomes)

# Calculating the Brier score
brier_score = np.mean((baseline_probability - actual_outcomes) ** 2)

# Displaying the results
print(f"Baseline Probability: {baseline_probability:.2f}")
print(f"Brier Score: {brier_score:.2f}")

### Plots

In [None]:
feature_importances = best_models["sgb"].steps[-1][1].feature_importances_

# Get sorted indices of feature importances
indices = np.argsort(feature_importances)[::-1]

# Plot feature importances
plt.figure(figsize=(10, 8))
plt.title("Feature Importances")
plt.barh(
    range(len(optimal_rf)), feature_importances[indices], color="b", align="center"
)
plt.yticks(range(len(optimal_rf)), np.array(optimal_rf)[indices])
plt.xlabel("Importance")
plt.ylabel("Features")
plt.gca().invert_yaxis()  # To display the highest importance at the top
plt.tight_layout()  # Adjusts plot to ensure everything fits without overlapping
plt.show()

In [None]:
fig, (ax1, ax2) = plt.subplots(
    1, 2, figsize=(10, 6)
)  # Adjust for the number of features and models


pd_features = ["Katagiri_Group", "B_Index"]
categorical_features = ["Katagiri_Group"]

sgb_disp = PartialDependenceDisplay.from_estimator(
    best_models["sgb"],
    X_test[optimal_rf],
    features=pd_features,
    kind="average",
    ax=[ax1, ax2],
    line_kw={"label": "Stochastic Gradient Boosting"},
)

plr_disp = PartialDependenceDisplay.from_estimator(
    best_models["plr"],
    X_test[optimal_plr],
    features=pd_features,
    kind="average",
    ax=[ax1, ax2],
    line_kw={"label": "Penalized Logistic Regression", "color": "red"},
)

ax1.set_ylim(0, 1)
ax1.set_ylabel("Predicted probability")
ax2.set_ylim(0, 1)
ax2.set_ylabel("Predicted probability")

plt.show()

In [None]:
# Load the sgb
model = best_models["plr"].steps[-1][1]

predict_plr = lambda x: model.predict_proba(x).astype(float)

# Create a LIME explainer object for the random forest model
explainer = lime_tabular.LimeTabularExplainer(
    training_data=X_train[optimal_plr].to_numpy(),
    feature_names=optimal_plr,
    class_names=["No MCID", "MCID"],
    discretize_continuous=True,
    categorical_features=["Katagiri_Group"],
)

exp = explainer.explain_instance(
    X_test[optimal_plr].to_numpy()[2], predict_plr, num_features=6
)
exp.show_in_notebook(show_all=False)

In [None]:
exp = explainer.explain_instance(
    X_test[optimal_plr].to_numpy()[3], predict_plr, num_features=6
)
exp.show_in_notebook(show_all=False)

In [None]:
cases = [2, 3]
for case in cases:
    exp = explainer.explain_instance(
        X_test[optimal_plr].to_numpy()[case], predict_plr, num_features=6
    )
    test = exp.as_pyplot_figure()