In [None]:
import pandas as pd
import numpy as np
from sklearn.calibration import calibration_curve
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import RepeatedKFold, StratifiedKFold
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.feature_selection import RFECV
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, roc_auc_score, RocCurveDisplay, confusion_matrix, roc_curve, auc
from sklearn.impute import SimpleImputer
from lightgbm import LGBMClassifier
import matplotlib.lines as mlines
import roc_utils as ru
import itertools
import warnings
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
from mlxtend.plotting import plot_decision_regions
import lightgbm.plotting as lgb_plotting
import shap

In [None]:
warnings.filterwarnings(action = 'ignore', category  = DeprecationWarning)
matplotlib.rc('xtick', labelsize=20) 
matplotlib.rc('ytick', labelsize=20) 

In [None]:
def compute_roc_ci(y_true, probability, n_boot = 1000, show_plot = False):
    y_true = y_true.values
    y_pred = probability[:,1]
    rng_seed = 42  # control reproducibility
    bootstrapped_scores = []
    fpr_values = []
    tpr_values = []
    rng = np.random.RandomState(rng_seed)

    for i in range(n_boot):
         indices = rng.randint(0, len(y_true), len(y_true))
         score = roc_auc_score(y_true[indices], y_pred[indices])
         fpr, tpr, _ = roc_curve(y_true[indices],  y_pred[indices])
         fpr_values.append(fpr)
         tpr_values.append(tpr)
         bootstrapped_scores.append(score)
  
    sorted_scores = np.array(bootstrapped_scores)
    sorted_scores.sort()
    confidence_lower = sorted_scores[int(0.05 * len(sorted_scores))]
    confidence_upper = sorted_scores[int(0.95 * len(sorted_scores))]
    mean = np.mean(np.array(bootstrapped_scores))
    if show_plot:
        roc_auc = auc(fpr, tpr)
        display = RocCurveDisplay(fpr = fpr, tpr = tpr, roc_auc = roc_auc) 
        display.plot()
    return [round(confidence_lower,2), round(mean,2), round(confidence_upper,2)]

def compute_metrics(true_positive, false_positive, true_negative, false_negative):

    print("Accuracy {}".format(round((true_positive + true_negative)/(true_positive + false_negative + false_positive + true_negative),2)))
    print("Sensitivity {}".format(round(true_positive/(true_positive + false_negative),2)))
    print("Specificity {}".format(round(true_negative/(true_negative + false_positive),2)))
    
    print("PPV {}".format(round(true_positive/(true_positive + false_positive),2)))
    print("PNV {}".format(round(true_negative/(true_negative + false_negative),2)))
    
    
class ShapInput(object):
    def __init__(self, expectation, shap_values, features, feat_names):
        self.base_values = expectation
        self.values = shap_values
        self.data = features
        self.feature_names = feat_names
 

In [None]:
columns_to_use = ["cs_raca", "cs_sexo", "cs_escol_n", "hiv", "agravdroga", "agravtabac", "agravalcoo", "agravdiabe", 
                  "agravdoenc","agravoutra", "ant_retro", "tratamento", "raiox_tora", "bacilosc_e", "histopatol", 
                  "cultura_es", "test_sensi", "benef_gov", "idade", "outcome"]
data = pd.read_csv("../data/2023_10_09_Ml_python_data.csv", usecols = columns_to_use)
data.head()

In [None]:
#compute missing proportion
cols = []
value = []
cols_to_remove = []
for col in columns_to_use:
    miss_prop = data[col].isna().sum()/data.shape[0]
    cols.append(col)
    value.append(miss_prop)
    print("{} missing proportion {}".format(col, miss_prop))
    if miss_prop > 0.35:
        cols_to_remove.append(col) 
    

cols_to_remove

In [None]:
#remove columns with more than 35% missing
data.drop(columns = cols_to_remove, inplace = True)

In [None]:
cat_cols = [col for col in columns_to_use if col not in cols_to_remove and col not in ["idade", "time_in_SINAN"]]
num_cols = ["idade"]

In [None]:
#df = df.drop(columns = ["multiclass_task"])
cure = data[data["outcome"] == 0]
unsc = data[data["outcome"] == 1]
cure = cure.sample(n = len(unsc), random_state = 42)
data = pd.concat([cure, unsc])

In [None]:
#create age cat
bins = pd.IntervalIndex.from_tuples([(0, 18), (18, 35), (35, 50),(50,65),(80,120)])
data["idade"] = pd.cut(data["idade"], bins)
data = pd.get_dummies(data, columns = ["idade"], dtype = float)
data.head()

In [None]:
data.rename(columns = {"idade_(0, 18]":"Teenage", "idade_(18, 35]":"Adult", "idade_(35, 50]":"Mild_age",
                       "idade_(50, 65]":"Older", "idade_(80, 120]":"Elder"}, inplace = True)

In [None]:
cat_cols = [col for col in data.columns if col not in ["outcome"]]
#features = data[features]
features = data[cat_cols]
target = data["outcome"]

In [None]:
#split data into train and test
X_train, X_test, y_train, y_test = train_test_split(features, target, test_size = 0.3, random_state = 42)

In [None]:
print("Train set {}".format(X_train.shape[0]))
print("Test set {}".format(X_test.shape[0]))     

In [None]:
#perform imputation
cat_imputer = SimpleImputer(strategy = 'most_frequent', missing_values = np.nan)
for col in cat_cols:
    cat_imputer.fit(X_train[[col]])
    X_train[col] = cat_imputer.transform(X_train[[col]])
    
    
# #imput numerical col
# num_imputer = SimpleImputer(strategy = 'median', missing_values = np.nan)
# for col in num_cols:
#     num_imputer.fit(X_train[[col]])
#     X_train[col] = num_imputer.transform(X_train[[col]])
    
    
cat_imputer = SimpleImputer(strategy = 'most_frequent', missing_values = np.nan)
for col in cat_cols:
    cat_imputer.fit(X_test[[col]])
    X_test[col] = cat_imputer.transform(X_test[[col]])
    
    
# #imput numerical col
# num_imputer = SimpleImputer(strategy = 'median', missing_values = np.nan)
# for col in num_cols:
#     num_imputer.fit(X_test[[col]])
#     X_test[col] = num_imputer.transform(X_test[[col]])

In [None]:
X_train.head()

In [None]:
repeated_cv = RepeatedKFold(n_splits = 10, n_repeats = 10, random_state = 42)

In [None]:
#define calssifier
rf_classifier = RandomForestClassifier(
    random_state = 42)

lightgbm_classifier = LGBMClassifier(
    random_state = 42)

log_reg = LogisticRegression(
    max_iter = 5000, 
    random_state = 42)

In [None]:
estimator = RandomForestClassifier(random_state=42)
rfecv = RFECV(estimator= estimator, cv = StratifiedKFold(n_splits = 10, random_state = 42, shuffle=True), scoring = "roc_auc", n_jobs = 20)
rfecv.fit(X_train, y_train)

In [None]:
plt.figure(figsize=(10, 10))
plt.plot(range(1, len(rfecv.cv_results_['mean_test_score'])+1), rfecv.cv_results_['mean_test_score'])
plt.grid()
plt.xticks(range(1, X_train.shape[1]+1))
plt.xlabel("Number of Selected Features")
plt.ylabel("CV Score")
plt.title("Recursive Feature Elimination (RFE)")
plt.savefig("../results/Feature_Selection.pdf")
print("The optimal number of features: {}".format(rfecv.n_features_))

In [None]:
X_train = X_train.iloc[:, rfecv.support_]

In [None]:
X_train.head()

In [None]:
X_test = X_test.iloc[:, rfecv.support_]

In [None]:
X_test.head()

In [None]:
n_estimators = [100,200,500,1000]
max_depth = [4,6,8,12]
learning_rate = [0.3,0.2,0.1,0.01,0.001]
C = [100, 10, 1.0, 0.1, 0.01]

In [None]:
for classifier in ["lr", "rf", "light"]:
    print("Run: {}".format(classifier))
    if classifier == "rf":
        
        #train part
        model_to_fit_rf = GridSearchCV(rf_classifier, 
                                    param_grid = {"n_estimators":[est for est in n_estimators if est <=500] , "max_depth": max_depth}, 
                                    cv = repeated_cv, 
                                    n_jobs = 30, 
                                    scoring = "accuracy")
        
        model_to_fit_rf.fit(X_train, y_train)
        
        #predicting part
        pred_rf = model_to_fit_rf.predict(X_test)
        predicted_proba_rf =  model_to_fit_rf.predict_proba(X_test)
        best_params_rf =  model_to_fit_rf.best_params_
        

        
    elif classifier == "light":
        
        #train part
        model_to_fit_lgb = GridSearchCV(lightgbm_classifier, 
                                    param_grid = {"n_estimators": n_estimators, "max_depth":max_depth, "learning_rate": learning_rate}, 
                                    cv = repeated_cv, 
                                    n_jobs = 28, 
                                    scoring = "accuracy")
        
        model_to_fit_lgb.fit(X_train,y_train)
        
        #predicting part
        pred_lgb = model_to_fit_lgb.predict(X_test)
        predicted_proba_lgb = model_to_fit_lgb.predict_proba(X_test)
        best_params_lgb = model_to_fit_lgb.best_params_
        
    elif classifier == "lr":
        model_to_fit_lr = GridSearchCV(log_reg, 
                                    param_grid = {"C":C}, 
                                    cv = repeated_cv, 
                                    n_jobs = 28, 
                                    scoring = "accuracy")
        
        model_to_fit_lr.fit(X_train, y_train)
        
        pred_lr = model_to_fit_lr.predict(X_test)
        predicted_proba_lr = model_to_fit_lr.predict_proba(X_test)
        best_params_lr = model_to_fit_lr.best_params_

In [None]:
print("Logistic Regression {}".format(best_params_lr))
print("Random Forest {}".format(best_params_rf))
print("LightGradientBoosting {}".format(best_params_lgb))

In [None]:
tn, fp, fn, tp = confusion_matrix(y_true = y_test, y_pred = pred_lr).ravel()
compute_metrics(true_positive = tp, false_positive = fp, true_negative = tn, false_negative = fn)
print(compute_roc_ci(y_true = y_test, probability = predicted_proba_lr, n_boot = 1000, show_plot = True))

In [None]:
tn, fp, fn, tp = confusion_matrix(y_true = y_test, y_pred = pred_rf).ravel()
compute_metrics(true_positive = tp, false_positive = fp, true_negative = tn, false_negative = fn)
print(compute_roc_ci(y_true = y_test, probability = predicted_proba_rf, n_boot = 1000, show_plot = True))

In [None]:
tn, fp, fn, tp = confusion_matrix(y_true = y_test, y_pred = pred_lgb).ravel()
compute_metrics(true_positive = tp, false_positive = fp, true_negative = tn, false_negative = fn)
print(compute_roc_ci(y_true = y_test, probability = predicted_proba_lgb, n_boot = 1000, show_plot = True))

In [None]:
#calibration plot
prob_y, prob_x = calibration_curve(y_true = y_test, y_prob = predicted_proba_lgb[:,1], n_bins = 10)
fig, ax = plt.subplots()
fig.set_figheight(10)
fig.set_figwidth(10)
plt.plot(prob_x, prob_y, marker = 'o', linewidth = 2, label = "rf")
lines = mlines.Line2D([0,1],[0,1], color = "black", linewidth = 2)
transform = ax.transAxes
lines.set_transform(transform)
ax.add_line(lines)
ax.set_xlabel("Predicted probability", fontsize = 32)
ax.set_ylabel("Observerd probability", fontsize = 32)
plt.savefig("../results/Calibration_plot_LGB.pdf")

In [None]:
#calibration plot
prob_y, prob_x = calibration_curve(y_true = y_test, y_prob = predicted_proba_rf[:,1], n_bins = 10)
fig, ax = plt.subplots()
fig.set_figheight(10)
fig.set_figwidth(10)
plt.plot(prob_x, prob_y, marker = 'o', linewidth = 2, label = "rf")
lines = mlines.Line2D([0,1],[0,1], color = "black", linewidth = 2)
transform = ax.transAxes
lines.set_transform(transform)
ax.add_line(lines)
ax.set_xlabel("Predicted probability", fontsize = 32)
ax.set_ylabel("Observerd probability", fontsize = 32)
plt.savefig("../results/Calibration_plot_RF.pdf")

In [None]:
#calibration plot
prob_y, prob_x = calibration_curve(y_true = y_test, y_prob = predicted_proba_lr[:,1], n_bins = 10)
fig, ax = plt.subplots()
fig.set_figheight(10)
fig.set_figwidth(10)
plt.plot(prob_x, prob_y, marker = 'o', linewidth = 2, label = "rf")
lines = mlines.Line2D([0,1],[0,1], color = "black", linewidth = 2)
transform = ax.transAxes
lines.set_transform(transform)
ax.add_line(lines)
ax.set_xlabel("Predicted probability", fontsize = 32)
ax.set_ylabel("Observerd probability", fontsize = 32)
plt.savefig("../results/Calibration_plot_LR.pdf")

In [None]:
binary_explainer = shap.TreeExplainer(model_to_fit_lgb.best_estimator_.booster_, X_test, model_output = "probability")
binary_shap_values = binary_explainer.shap_values(X_test)

In [None]:
plt.figure(figsize = (20,12))
plt.subplot(1,2,1)
shap.summary_plot(binary_shap_values, X_test, plot_type = "bar", plot_size = None, show=False)
ax = plt.gca()
ax.tick_params(axis = 'both', labelsize = 32)
plt.xlabel('Mean(|SHAP value|)', fontsize = 32)
plt.subplot(1,2,2)
shap.summary_plot(binary_shap_values, X_test, plot_size=None, show=False)
ax = plt.gca()
ax.tick_params(axis = 'both', labelsize = 32)
plt.xlabel('SHAP value', fontsize = 32)
ax.set_yticklabels([])
plt.tight_layout()
plt.savefig('../results/Fig_2_Binary_result.pdf')

In [None]:
print("Done")