In [1]:
import pandas as pd
import numpy as np
import pyreadr
from numpy import loadtxt
from numpy import sort
from xgboost import XGBClassifier,XGBRFClassifier
from sklearn.model_selection import train_test_split, cross_validate, KFold
from sklearn.metrics import accuracy_score, roc_auc_score, f1_score, precision_score, recall_score, confusion_matrix,roc_curve
from sklearn.feature_selection import SelectFromModel

#plot the knee point
import matplotlib.pyplot as plt

versions = [1,3,4]
n_features = [27,29,30]
colors={1:"#F28147", 3:"#9FD4AE",4:"#5560AC"}


In [3]:
def cross_validate_xgboost_model( X_train, y_train, n_splits=10, suppress_output=False, validation_set_prediction = False):
    params = {"objective": "binary:logistic", 
                "eval_metric": "auc", 
                "eta":0.1, 
                "max_depth":20,
                "lambda": 0.0003, "alpha": 0.0003, "nthread" :10}
    model = XGBClassifier(**params) 


    kf = KFold(n_splits=n_splits, random_state=6, shuffle=True)
    
    #store the accuracy and auc for each fold
    accuracy_list = []
    roc_auc_list = []
    recall_list = []
    specificity_list = []
    best_model = None

    for fold, (train_index, valid_index) in enumerate(kf.split(X_train)):
        X_train_fold, X_valid_fold = X_train.iloc[train_index], X_train.iloc[valid_index]
        y_train_fold, y_valid_fold = y_train.iloc[train_index], y_train.iloc[valid_index]
        model.fit(X_train_fold, y_train_fold.values)
        y_pred = model.predict(X_valid_fold)
        y_pred_proba = model.predict_proba(X_valid_fold)[:,1]
        accuracy = accuracy_score(y_valid_fold, y_pred)
        roc_auc = roc_auc_score(y_valid_fold, y_pred_proba)
        f1 = f1_score(y_valid_fold, y_pred)
        precision = precision_score(y_valid_fold, y_pred)
        
        tn, fp, fn, tp = confusion_matrix(y_valid_fold, y_pred).ravel()
        specificity = tn / (tn+fp)

        recall = recall_score(y_valid_fold, y_pred)
        if validation_set_prediction:
            valid_set_predict = pd.concat([
                valid_set_predict,
                pd.DataFrame(y_pred_proba, index=y_valid_fold.index)
            ], axis=0)


        if roc_auc > max(roc_auc_list, default=0):
            best_model = model
            fpr, tpr, thresholds = roc_curve(y_valid_fold, y_pred_proba)
        accuracy_list.append(accuracy)
        roc_auc_list.append(roc_auc)
        recall_list.append(recall)
        specificity_list.append(specificity)



        if not suppress_output:
            print(f"Fold: {fold}, Accuracy: {accuracy}, ROC AUC: {roc_auc}, F1: {f1}, Precision: {precision}, Recall: {recall}, Specificity: {specificity}")

    return {"model": best_model, "accuracy": accuracy_list, "roc_auc": roc_auc_list, "recall": recall_list, "specificity": specificity_list, "fpr": fpr, "tpr": tpr}


In [2]:
def predict_model(results, X_test, y_test):
    model = results["model"]

    selected_features = model.get_booster().feature_names
    test_ds = X_test[selected_features]
    print(f"test_ds shape: {test_ds.shape}")

    y_pred = model.predict(test_ds)
    print(y_pred)
    y_pred_proba = model.predict_proba(test_ds)[:,1]

    #draw the roc curve
    fpr, tpr, _ = roc_curve(y_test, y_pred_proba)

    tn, fp, fn, tp = confusion_matrix(y_test, y_pred).ravel()
    specificity = tn / (tn+fp)

    accuracy = accuracy_score(y_test, y_pred)
    roc_auc = roc_auc_score(y_test, y_pred_proba)
    f1 = f1_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    
    #return in percentage keep 2 decimal
    return {"accuracy": accuracy*100, "roc_auc": roc_auc, "f1": f1*100, "precision": precision*100, "recall": recall*100, "specificity": specificity*100, "fpr": fpr, "tpr": tpr}


In [None]:
AUC_results = {}
for version, n_feature, color in zip(versions, n_features, colors):

    print(f"version: {version}")
    

    filename = f"Rdata/external_training_test_{version}.rda"

    training_test = pyreadr.read_r(filename)
    train1 = training_test["um_data"]
    test1 = training_test["external_data"]


    print(f"train1 shape: {train1.shape}, test1 shape: {test1.shape}")

    X1_train=train1.drop(["Row.names","ALS_status"],axis=1)
    y1_train=train1["ALS_status"]
    y1_train=y1_train.replace({'case':1,'control':0})

    X1_test=test1.drop(["Row.names","ALS_status"],axis=1)
    y1_test=test1["ALS_status"]
    #count how many case and control in ALS_status
    y1_test=y1_test.replace({'case':1,'control':0})

    #whole
    whole_results = cross_validate_xgboost_model(X1_train, y1_train, n_splits=10)
    best_model_whole = whole_results["model"]

    thresholds = sort(best_model_whole.feature_importances_)
    thresh= thresholds[-n_feature]

    #internal
    selection = SelectFromModel(estimator=best_model_whole, threshold=thresh, prefit=True)
    col_index = X1_train.columns[selection.get_support()]
    select_X_train = selection.transform(X1_train)
    select_X_train = pd.DataFrame(select_X_train, columns=col_index)
    print(f"thresh: {thresh}, n={select_X_train.shape[1]}")
    # train model
    internal_results = cross_validate_xgboost_model(select_X_train, y1_train, n_splits=10, suppress_output=True)
    feature_importance = internal_results["model"].get_booster().get_score(importance_type='weight')
    feature_importance = pd.DataFrame(feature_importance.items(), columns=['feature', 'importance'])
    internal_AUC = np.mean(internal_results["roc_auc"])
    internal_ROC = pd.DataFrame({"fpr":internal_results["fpr"],"tpr":internal_results["tpr"]})
    print(f"internal AUC:{internal_AUC}")

    AUC_results[version]={}
    AUC_results[version]["feature"] = feature_importance
    AUC_results[version]["internal"] = {"AUC":internal_AUC, "ROC":internal_ROC, "label":f"internal {n_feature}", "color":color}

    #external
    external_results = predict_model(internal_results, X1_test, y1_test)
    external_AUC = external_results["roc_auc"]
    external_ROC = pd.DataFrame({"fpr":external_results["fpr"],"tpr":external_results["tpr"]})
    print(f"external AUC:{external_AUC}")
    AUC_results[version] ["external"] = {"AUC":external_AUC, "ROC":external_ROC, "label":f"external {n_feature}", "color":color}


In [None]:
import matplotlib.pyplot as plt

colors={1:"#F28147", 3:"#9FD4AE",4:"#5560AC"}

for version in versions:
    
    #plot the internal
    internal_roc = AUC_results[version]["internal"]["ROC"]
    external_roc = AUC_results[version]["external"]["ROC"]
    plt.plot(internal_roc["fpr"], internal_roc["tpr"], label=f"{AUC_results[version]['internal']['label']}: {AUC_results[version]['internal']['AUC']*100:.2f}%", color=colors[version], linestyle='--')
    plt.plot(external_roc["fpr"], external_roc["tpr"], label=f"{AUC_results[version]['external']['label']}: {AUC_results[version]['external']['AUC']*100:.2f}%", color=colors[version])

plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC curve')
#add label
plt.legend(loc="lower right")
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.show()


In [None]:
for version in versions:
    precision_recall_df = pd.read_csv(f"precision_recall_curve_{version}.csv")
    plt.plot(precision_recall_df["recall"], precision_recall_df["precision"], label=f"version {version}", color=colors[version])
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall curve')
plt.legend(loc="lower left")
plt.figure(figsize=(6, 6))
for version in versions:
    precision_recall_df = pd.read_csv(f"precision_recall_curve_{version}.csv")
    plt.plot(precision_recall_df["recall"], precision_recall_df["precision"], label=f"version {version}", color=colors[version])
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall curve')
plt.legend(loc="lower left")



plt.gca().set_aspect('equal', adjustable='box')