# Cross-validation
In this notebook, all the cross-validation pipeline will be conducted, including imputation
feature selection, preprocessing, modeling and evaluation.

Each process is abstracted and encapsulated in the sklearn.pipline.

In [2]:
from sklearn.preprocessing import RobustScaler
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.compose import ColumnTransformer
from sklearn.linear_model import LassoCV
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.feature_selection import SelectFromModel
from sklearn.pipeline import Pipeline
import numpy as np
import pandas as pd
import shap
import dataclasses
import matplotlib.pyplot as plt
import seaborn as sns

%load_ext autoreload
%autoreload 2

In [3]:
from cycler import cycler
COLORS = [
    "#F27970",
    "#BB9727",
    "#54B345",
    "#32B897",
    "#05B9E2",
    "#8983BF",
    "#C76DA2"
]
plt.rcParams['axes.prop_cycle'] = cycler(color=COLORS)


In [4]:
# read the dataset
dataset = pd.read_csv("dataset.csv")
dataset

FileNotFoundError: [Errno 2] No such file or directory: 'dataset.csv'

In [None]:
# split the dataset
features = dataset.columns.to_list()
features.remove("cls")

X = dataset[features]
y = dataset["cls"]

numerical_cols = ['hba1c', 'fasting_glucose', 'ldl_c', 'hdl_c', 'potassium', 'age']
categorical_cols = ['sex']

#scaling the numerical columns
def scale(X):
    scaler = RobustScaler()
    pipeline=ColumnTransformer([
        ('num',scaler,numerical_cols),
    ])

    X_scaled = pipeline.fit_transform(X)
    return X_scaled, pipeline


## 1. Imputation

In [None]:
from sklearn.ensemble import RandomForestRegressor
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer

def impute(X, n_imputation):
    print("the iteration of imputation is {}".format(n_imputation))
    rr = RandomForestRegressor()
    imputer = IterativeImputer(estimator=rr, verbose=1, max_iter=n_imputation)
    X_imputed = imputer.fit_transform(X)
    return X_imputed, imputer

## 2.Feature selection and preprocessing
we will apply feature selection using Lasso with Grid Search nested cross-validation to locate the optimal
features

In [None]:
def select_features(X, y):

    lasso = LassoCV(cv=5)
    lasso.fit(X, y)
    select_model = SelectFromModel(lasso, prefit=True)

    features_selected = []
    for i, coef in enumerate(lasso.coef_):
        if coef != 0:
            features_selected.append(features[i])
    alpha = lasso.alpha_
    X_selected = select_model.transform(X)
    print("feature selected: ", features_selected)
    return X_selected, features_selected, select_model, alpha

In [None]:
import warnings
warnings.filterwarnings("ignore")

## 3. Model development

In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import StratifiedKFold
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.neural_network import MLPClassifier
from sklearn.svm import SVC
from sklearn.metrics import hamming_loss, f1_score, average_precision_score, precision_recall_curve
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import confusion_matrix
import itertools
from sklearn.preprocessing import LabelBinarizer
from sklearn.metrics import roc_auc_score, roc_curve
from xgboost import XGBClassifier

In [None]:
from sklearn.base import BaseEstimator, ClassifierMixin
from pydantic import BaseModel
from typing import Any, Dict, List, Tuple
from sklearn.model_selection import KFold
import tensorflow as tf

class TFDNN(BaseEstimator, ClassifierMixin):
    '''
    sklearn compatible version of TensorFlow DNN
    '''

    def __init__(self, p:float=1.0) -> None:
        
    # Create the Sequential model
        self.p = p
        self.model = tf.keras.Sequential([
        # Input layer with a flexible input shape and first hidden layer with 16 units
            tf.keras.layers.Dense(16, activation='relu'),
            tf.keras.layers.Dense(32, activation='relu'),
            tf.keras.layers.Dense(64, activation='relu'),
            tf.keras.layers.Dense(32, activation='relu'),
            tf.keras.layers.Dense(16, activation='relu'),
            tf.keras.layers.Dropout(rate=p),
            tf.keras.layers.Dense(4, activation='softmax')
        ])
        self.model.compile(optimizer="adam", loss=tf.keras.losses.CategoricalCrossentropy(), metrics=[tf.keras.metrics.F1Score(average="weighted")])

    def fit(self, X, y):
        self.model.fit(X, y, batch_size=32, epochs=10)
        
    def predict_prob(self, X):
        return self.model(X)
    
    def predict(self, X):
        return tf.argmax(self.model.predict(X_comp), axis=1)
    

In [None]:
def plot_confusion_matrix(y_pred, y, save_path, classes):
    '''
    Plot the confusion matrix of multi-label classification.
    
    Args:
        y_pred (List[int]): Predicted labels by the model, categorized by 0, 1, 2, 3.
        y (List[int]): Ground truth labels, categorized by 0, 1, 2, 3.
        save_path: the path for saving the result plot.
        classes (List[str]): List of class names corresponding to each label.
    '''
    # Compute confusion matrix
    cm = confusion_matrix(y, y_pred)
    plt.figure(figsize=(8, 6))
    # Normalize confusion matrix
    cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
    
    plt.imshow(cm, interpolation='nearest', cmap="Pastel1")
    plt.title('Confusion matrix')
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.2f'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.tight_layout()
    plt.savefig("figures/" + save_path + ".svg", format="svg", dpi=1400)
    # plt.show()

def plot_auc(y_pred, y, save_path, classes):
    '''
    Plot the one-versus-rest AUC curve.
    
    Args:
        y_pred predict probability of each class
        y (List[int]): Ground truth labels, categorized by 0, 1, 2, 3.
        classes (List[str]): List of class names corresponding to each label.
    '''
    lb = LabelBinarizer()
    lb.fit(y)
    y_bin = lb.transform(y)

    plt.figure(figsize=(8, 6))
    for i in range(len(classes)):
        auc = roc_auc_score(y_bin[:, i], y_pred[:, i])
        fpr, tpr, _ = roc_curve(y_bin[:, i], y_pred[:, i])
        plt.plot(fpr, tpr, lw=2, label='%s (AUC = %0.2f)' % (classes[i], auc))

    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('1 - Specificity')
    plt.ylabel('Sensitivity')
    plt.title('One-vs-Rest ROC AUC Curve')
    plt.legend(loc="lower right")
    plt.savefig("figures/" + save_path + ".svg", dpi=1400, format="svg")
    # plt.show()




In [None]:
models = [
    {
        "name": "lr",
        "estimator": LogisticRegression,
        "params": {"penalty": ["l1", "l2"], "C":[0.01, 0.1, 1, 10, 100, 1000],'solver':['liblinear']}
    },
    {
        "name": "svm",
        "estimator": SVC,
        "params": {"C":[0.01, 0.1, 1, 10, 100, 1000],'kernel': ['rfb', 'linear'], "probability":[True]}
    },
    {
        "name": "rfc",
        "estimator": RandomForestClassifier,
        "params": {"n_estimators": [10, 100, 1000], "criterion": ["gini", "entropy", "log_loss"]}
    },
    {
        "name": "xgboost",
        "estimator": XGBClassifier,
        "params": {"learning_rate": [1e-1, 1e-2, 1e-3,], "max_depth":[3, 4, 6], "n_estimators": [10, 100, 100]}
    },
    {
        "name": "dnn",
        "estimator": MLPClassifier,
        "params": {"hidden_layer_sizes":[[16,32,64,32,16], [16,32,32]],"learning_rate_init": [1e-2, 1e-3, 1e-4, 1e-5]}
    }

]

result = {
        "alpha":[],
        "features_selected": [],
        "test_set": [],
        "training_set": []
    }
for model in models:
    result.update({
        model["name"]: {
                "hamming_loss": [],
                "f1_macro": [],
                "f1_weighted": [],
                "precision0":[],
                "precision1":[],
                "precision2":[],
                "precision3":[],
                "recall0": [],
                "recall1": [],
                "recall2": [],
                "recall3": [],
                "average_precision0": [],
                "average_precision1": [],
                "average_precision2": [],
                "average_precision3": [],
                "tpr0": [],
                "tpr1": [],
                "tpr2": [],
                "tpr3": [],
                "fpr0": [],
                "fpr1": [],
                "fpr2": [],
                "fpr3": [],
                "auc0": [],
                "auc1": [],
                "auc2": [],
                "auc3": [],
                "cache": []
            }
    })
classes = ["g0", "g1", "g2", "g3"]

def train(X, y, models):
    best_estimators = {}
    for model in models:
        params = model["params"]
        name = model["name"]
        # Grid search for the parameters
        # use weighted score to take the imbalance into account
        gs = GridSearchCV(model["estimator"](), cv=5, param_grid=params, scoring="f1_weighted") 
        gs.fit(X, y)
        best_estimators[name] = {
            "estimator": gs.best_estimator_,
            "params": gs.best_params_
        }
    return best_estimators

def evaluate(estimator, X_test, y_test, eval_result, iter_num):
    model_name = estimator.__class__.__name__
    y_test_pred = estimator.predict(X_test)
    y_test_pred_prob = estimator.predict_proba(X_test)
    eval_result["hamming_loss"].append(hamming_loss(y_pred =y_test_pred, y_true=y_test))

    lb = LabelBinarizer()
    lb.fit(y_test)
    y_bin = lb.transform(y_test)
    f1_macro = f1_score(y_true=y_test, y_pred=y_test_pred, average='macro')
    f1_weighted = f1_score(y_true=y_test, y_pred=y_test_pred, average='weighted')
    eval_result["f1_macro"].append(f1_macro)
    eval_result["f1_weighted"].append(f1_weighted)
    eval_result["cache"].append(estimator)
    
    # plot metrics
    file_name = model_name + f"-Iter-{iter_num}"
    plot_confusion_matrix(y_test_pred, y_test, file_name + "-ConfusionMatrix", classes)

    plot_auc(y_test_pred_prob, y_test, file_name + "-AUC", classes)
    for i in range(len(classes)):
        fpr, tpr, _ = roc_curve(y_bin[:, i], [pred[i] for pred in y_test_pred_prob])
        eval_result[f"fpr{i}"].append(fpr)
        eval_result[f"tpr{i}"].append(tpr)
        auc = roc_auc_score(y_bin[:, i], [pred[i] for pred in y_test_pred_prob])
        eval_result[f"auc{i}"].append(auc)

        precision, recall, _ = precision_recall_curve(y_bin[:, i], [pred[i] for pred in y_test_pred_prob])
        average_precision = average_precision_score(y_bin[:, i], [pred[i] for pred in y_test_pred_prob])
        eval_result[f"precision{i}"].append(precision)
        eval_result[f"recall{i}"].append(recall)
        eval_result[f"average_precision{i}"].append(average_precision)
        

def cross_validate(X, y, n_splits):
    '''
    cross-validate the models
    '''
    kf = StratifiedKFold(n_splits=n_splits)
    

    for iter, (train_index, test_index) in enumerate(kf.split(X, y)):
        print("For the iteration: {}".format(iter), "#" * 30)
        X_train = X.loc[train_index]
        y_train = y.loc[train_index]
        X_test = X.loc[test_index]
        y_test = y.loc[test_index]
        
        X_train_scaled, scaler = scale(X_train)

        n_imputation = dataset[features].isnull().mean().mean() * 100 // 1
        n_imputation = int(n_imputation)
        X_train_imputed, imputer = impute(X_train_scaled, n_imputation)
        
        X_train_selected, features_selected, selector, alpha = select_features(X_train_imputed, y_train)
 
        best_estimators = train(X_train_selected, y_train, models)
        result["features_selected"].append(features_selected)
        result["alpha"].append(alpha)
        ### evaluate
        
        for process in [scaler, imputer, selector]:
            X_test = process.transform(X_test)

        # get the reference of the current test_set
        result["test_set"].append((X_test, y_test))
        result["training_set"].append((X_train_selected, y_train))
        # evaluate the metrics for each iteration
        for name, value in best_estimators.items():
            evaluate(estimator=value["estimator"], X_test=X_test, y_test=y_test, eval_result=result[name], iter_num=iter)
                

cross_validate(X.head(500), y.head(500), 2)

In [1]:
def plot_recall_precision_curve()

NameError: name 'result' is not defined

## Do some housework

collect the data and write to disk

In [None]:
import scipy.stats as st 


def CI(samples, alpha = 0.95):
    mean = np.mean(samples)
    left, right = st.norm.interval(confidence = alpha, loc = mean, scale = st.sem(np.array(samples)))
    res = [left , mean, right]
    func = lambda x : "{}%".format(round(x * 100, 2))
    return list(map(func, res))


model_names = [item["name"] for item in models]
performances = {
    "model": [],
    "f1_macro":[],
    "f1_weighted": [],
    "hamming_loss": [],
    "auc0": [],
    "auc1": [],
    "auc2": [],
    "auc3": [],
}

for name in model_names:
    performances["model"].append(name)
    for metric in performances.keys():
        if metric == "model":
            continue
        samples = result[name][metric]
        ci = CI(samples)
        performances[metric].append(ci)

performances_df = pd.DataFrame(performances)
performances_df.to_csv("csv/model_performace.csv")

In [None]:
performances_df

## Model Selection
Need to select features set first, then compare the model performance(over cross-validation) to find the most optimal model.

### Select the optimum features set

In [None]:
# find the optimum based on alpha, need to check the 
optimum_index = np.argmin(result["alpha"])

### Save model

In [None]:
import pickle
# take rfc for example
optimal_model = result["rfc"]["cache"][optimum_index]

def save_model(file_name: str, model):
    with open("models/"+file_name,'wb') as f:
        pickle.dump(model,f)
save_model("rfc.pkl", optimal_model)

## Interpretation



### Shapely Value

In [None]:
import shap

In [None]:
# need to choose the best model, here just use lr as an example
estimator = result["lr"]["cache"][optimum_index]
X_test_optimal, y_test_optimal = result["test_set"][optimum_index]
X_test_optimal = pd.DataFrame(X_test_optimal, columns=result["features_selected"][optimum_index])
explainer = shap.KernelExplainer(estimator.predict_proba, X_test_optimal)

In [None]:
shap_values = explainer(X_test_optimal)

In [None]:
result["features_selected"][optimum_index]
result["features_selected"][optimum_index]

In [None]:
classes = ["g0", "g1", "g2", "g3"]
values_list = [shap_values[:, :, cls_ind] for cls_ind in range(len(classes))]
shap.summary_plot(values_list, feature_names=result["features_selected"][optimum_index],plot_type="bar", show=False, plot_size=0.5)
plt.gcf().set_size_inches(10, 2)
plt.legend().set_visible(False)

In [None]:
def plot_shap_beeswarm(values_list, features):
    # Setting figure size and DPI for high-resolution outputs
    fig = plt.figure(figsize=(12, 42))
    
    for i in range(4):
        plt.subplot(2, 2, i + 1)
        # Generate SHAP beeswarm plot
        ax = shap.plots.beeswarm(values_list[i], show=False, color_bar= i%2 != 0, plot_size=1, color_bar_label="").axes
        
        # Set axis labels and title
        ax.set_xlabel(f"SHAP Value (Group {i + 1})", fontsize=10)
        
        # Set y-axis labels
        if i % 2 != 0:
            ax.set_yticklabels(["" for _ in features])

        # Adjusting tick parameters for better visibility
        ax.tick_params(axis='both', which='major', labelsize=10)
        plt.subplots_adjust(left=0.3, right=0.9, top=0.95, bottom=0.05, hspace=1)
    # Adjust layout to prevent overlapping
    plt.tight_layout()
    plt.savefig("figures/beeswarm.svg", format="svg", dpi=1400)

plot_shap_beeswarm(values_list, result["features_selected"][optimum_index])
        

Color is used to display the original value of a feature

### Tree-based Interpretation 

This is left blank for now.

"
To select the lower probability threshold for the very high group, all possible predicted probabilities
were with a positive likelihood ratio greater than or equal to 10 were selected. Any of these could
arbitrarily be used as the threshold, we have selected to use the lowest value to maximize the
number of patients classed into this risk group.
Next, possible probability thresholds greater than or equal to the very high group threshold were
removed, and the selection process was repeated to find the threshold for the high risk group, this
time looking for a positive likelihood ratio of 5 or greater. The lowest such value was selected.
For the very low risk group, probability thresholds less than the threshold for the high risk group,
with negative likelihood ratio of 0.1 or less were selected, and the highest such value was used as
the upper probability threshold for the very low risk group. Finally, for the low risk group, values
greater than the very low risk group threshold, smaller than the high risk group threshold, with
negative likelihood ratios of 0.2 or less were selected, and the highest value was used as the upper
probability threshold for the low risk group.
"

--Montgomery-Csobán, T., Kavanagh, K., Murray, P., Robertson, C., Barry, S. J., Ukah, U. V., ... & Widmer, M. (2024). Machine learning-enabled maternal risk assessment for women with pre-eclampsia (the PIERS-ML model): a modelling study. The Lancet Digital Health, 6(4), e238-e250.

Based on the descriptions, the algorithm can be demonstrated as follow steps:
1. Calculate the LR+, and LR- for each group. 