# Calculate calibration curve points using SciKit Learn

### Calculate calibration curve datapoints for subsequent plotting in R

Shubhayu Bhattacharyay
<br>
Ari Ercole

## I. Initialization

### Import necessary packages

In [1]:
# Fundamental methods
import os
import re
import sys
import time
import glob
import random
import warnings
import itertools
import numpy as np
import pandas as pd
import pickle as cp
import seaborn as sns
from scipy import stats
from pathlib import Path
import matplotlib.pyplot as plt
warnings.filterwarnings(action="ignore")

# SciKit-Learn methods
from sklearn.calibration import calibration_curve
from sklearn.metrics import mean_squared_error

# Load custom functions
%run -i 'functions/anti_join.py'

## II. Calculate probability calibration curve points

### Load compiled model results

In [2]:
# Compile lists of model predictions
mnlr_results_compiled = pd.read_csv('../repeated_cv/compiled_predictions/mnlr.csv')
polr_results_compiled = pd.read_csv('../repeated_cv/compiled_predictions/polr.csv')
deepMN_results_compiled = pd.read_csv('../repeated_cv/compiled_predictions/deepMN.csv')
deepOR_results_compiled = pd.read_csv('../repeated_cv/compiled_predictions/deepOR.csv')

In [3]:
# Load tuning grid
layer_options = range(2,5)
units_options = [128,256,512]
smote_options = [0,1]

layer_smote_combos = np.array(np.meshgrid(layer_options, smote_options)).reshape(2, len(layer_options)*len(smote_options)).T

tuning_grid = pd.DataFrame(np.empty((0,3)))
for i in range(np.shape(layer_smote_combos)[0]):
    curr_tuning_options = pd.DataFrame({"layers":layer_smote_combos[i,0],\
                                        "smote": layer_smote_combos[i,1],\
                                        "units": list(itertools.product(units_options,repeat = layer_smote_combos[i,0]))})
    tuning_grid.columns = curr_tuning_options.columns
    tuning_grid = tuning_grid.append(curr_tuning_options,ignore_index=True)
tuning_grid["tune_idx"] = np.arange(tuning_grid.shape[0]) + 1
tuning_grid.to_csv('../repeated_cv/dl_tuning_grid.csv',index=False)

# Establish dataframe of all tuning index-GOSE-class combinations for AUC
viable_indices = pd.DataFrame(list(itertools.product(tuning_grid.tune_idx.astype('int').unique(), ['1','3','4','5','6','7','8'])),\
                              columns=['tune_idx','class'])

# Removed banned tuning indices from both DeepMN and DeepOR
deepMN_banned_tuning_indices = pd.read_csv('../repeated_cv/deepMN_banned_tuning_indices.csv')
deepMN_banned_tuning_indices.tune_idx = deepMN_banned_tuning_indices.tune_idx.astype('int')
deepMN_banned_tuning_class_combos = deepMN_banned_tuning_indices[['tune_idx','class']]

deepOR_banned_tuning_indices = pd.read_csv('../repeated_cv/deepOR_banned_tuning_indices.csv')
deepOR_banned_tuning_indices.tune_idx = deepOR_banned_tuning_indices.tune_idx.astype('int')
deepOR_banned_tuning_class_combos = deepOR_banned_tuning_indices[['tune_idx','class']]

# Identify viable columnns for each model type
deepMN_viable_indices = pd.concat([viable_indices, deepMN_banned_tuning_class_combos, deepMN_banned_tuning_class_combos]).drop_duplicates(keep=False)
deepOR_viable_indices = pd.concat([viable_indices, deepOR_banned_tuning_class_combos, deepOR_banned_tuning_class_combos]).drop_duplicates(keep=False)

deepMN_viable_indices = deepMN_viable_indices.rename(columns={"class": "true_labels"})
deepOR_viable_indices = deepOR_viable_indices.rename(columns={"class": "true_labels"})

# Filter out only viable combinations in the compiled prediction dataframes
deepMN_results_compiled = deepMN_results_compiled[deepMN_results_compiled.tune_idx.astype('int').isin(deepMN_viable_indices.tune_idx)]
deepOR_results_compiled = deepOR_results_compiled[deepOR_results_compiled.tune_idx.astype('int').isin(deepOR_viable_indices.tune_idx)]

# Identify columns that hold probability scores
prob_cols = [col for col in deepMN_results_compiled if col.startswith('prob_GOSE_')]

In [4]:
# Load bootstrap id's
bootstrap_df = pd.read_csv('../repeated_cv/bootstrap_IDs.csv')

### Loop through result files and calculate probability calibration points

In [8]:
# Loop through bootstrap IDs
compiled_cal_curves_df = pd.DataFrame(np.empty((0,5)),columns = ['prob_true','prob_pred','Model','class','bs_idx'])
bs_message = display('',display_id=True)

for curr_bs_idx in bootstrap_df.bs_idx.unique():
    bs_message.update('Bootstrap no. ' + str(curr_bs_idx) + ' out of ' + str(len(bootstrap_df.bs_idx.unique())) + ' initiated.')
    # Get current entity_id's for the dataframe
    curr_bs_ids = bootstrap_df.entity_id[bootstrap_df.bs_idx == curr_bs_idx]
    
    # Filter out predictions in all the compiled prediction dataframes for in samples in the current bootstrap index
    in_sample_mnlr = mnlr_results_compiled[mnlr_results_compiled.entity_id.isin(curr_bs_ids)]
    in_sample_polr = polr_results_compiled[polr_results_compiled.entity_id.isin(curr_bs_ids)]
    in_sample_deepMN = deepMN_results_compiled[deepMN_results_compiled.entity_id.isin(curr_bs_ids)]
    in_sample_deepOR = deepOR_results_compiled[deepOR_results_compiled.entity_id.isin(curr_bs_ids)]

    # Filter out predictions in all the compiled prediction dataframes for out samples in the current bootstrap index
    out_sample_mnlr = mnlr_results_compiled[~mnlr_results_compiled.entity_id.isin(curr_bs_ids)]
    out_sample_polr = polr_results_compiled[~polr_results_compiled.entity_id.isin(curr_bs_ids)]
    out_sample_deepMN = deepMN_results_compiled[~deepMN_results_compiled.entity_id.isin(curr_bs_ids)]
    out_sample_deepOR = deepOR_results_compiled[~deepOR_results_compiled.entity_id.isin(curr_bs_ids)]   
    
    # For each model determine the optimal tuning configuration (in terms of minimized MSE of the calibration curve) for each class in the in sample
    # DeepMN 
    deepMN_opt_tune_indices = pd.DataFrame(np.empty((0,2)),columns=['true_labels','tune_idx'])
    for curr_deepMN_class in deepMN_viable_indices.true_labels.unique():
        curr_label_prob_name = [col for col in prob_cols if col.startswith('prob_GOSE_') & col.endswith(str(curr_deepMN_class))][0]
        curr_class_tune_indices = deepMN_viable_indices.tune_idx[deepMN_viable_indices.true_labels == curr_deepMN_class].values
        if len(curr_class_tune_indices) == 1:
            deepMN_opt_tune_indices = deepMN_opt_tune_indices.append(pd.DataFrame({'true_labels':[curr_deepMN_class],'tune_idx':curr_class_tune_indices}),ignore_index = True)
            continue
        lowest_mse = sys.float_info.max
        opt_tune_idx = 0
        for curr_tune_idx in curr_class_tune_indices:
            filt_in_sample_deepMN = in_sample_deepMN[in_sample_deepMN.tune_idx == curr_tune_idx]
            filt_in_sample_deepMN['temp_label'] = (filt_in_sample_deepMN.true_labels == curr_deepMN_class).astype('int')
            curr_prob_true, curr_prob_pred =  calibration_curve(filt_in_sample_deepMN['temp_label'].values, np.clip(filt_in_sample_deepMN[curr_label_prob_name].values,0,1), n_bins=10, strategy = 'quantile')            
            curr_mse = mean_squared_error(curr_prob_true, curr_prob_pred)
            if curr_mse < lowest_mse:
                lowest_mse = curr_mse
                opt_tune_idx = curr_tune_idx
        deepMN_opt_tune_indices = deepMN_opt_tune_indices.append(pd.DataFrame({'true_labels':[curr_deepMN_class],'tune_idx':[opt_tune_idx]}),ignore_index = True)

    # DeepOR    
    deepOR_opt_tune_indices = pd.DataFrame(np.empty((0,2)),columns=['true_labels','tune_idx'])
    for curr_deepOR_class in deepOR_viable_indices.true_labels.unique():
        curr_label_prob_name = [col for col in prob_cols if col.startswith('prob_GOSE_') & col.endswith(str(curr_deepOR_class))][0]
        curr_class_tune_indices = deepOR_viable_indices.tune_idx[deepOR_viable_indices.true_labels == curr_deepOR_class].values
        if len(curr_class_tune_indices) == 1:
            deepOR_opt_tune_indices = deepOR_opt_tune_indices.append(pd.DataFrame({'true_labels':[curr_deepOR_class],'tune_idx':curr_class_tune_indices}),ignore_index = True)
            continue
        lowest_mse = sys.float_info.max
        opt_tune_idx = 0
        for curr_tune_idx in curr_class_tune_indices:
            filt_in_sample_deepOR = in_sample_deepOR[in_sample_deepOR.tune_idx == curr_tune_idx]
            filt_in_sample_deepOR['temp_label'] = (filt_in_sample_deepOR.true_labels == curr_deepOR_class).astype('int')
            curr_prob_true, curr_prob_pred =  calibration_curve(filt_in_sample_deepOR['temp_label'].values, np.clip(filt_in_sample_deepOR[curr_label_prob_name].values,0,1), n_bins=10, strategy = 'quantile')            
            curr_mse = mean_squared_error(curr_prob_true, curr_prob_pred)
            if curr_mse < lowest_mse:
                lowest_mse = curr_mse
                opt_tune_idx = curr_tune_idx
        deepOR_opt_tune_indices = deepOR_opt_tune_indices.append(pd.DataFrame({'true_labels':[curr_deepOR_class],'tune_idx':[opt_tune_idx]}),ignore_index = True)

    # MNLR
    mnlr_opt_tune_indices = pd.DataFrame(np.empty((0,2)),columns=['true_labels','SMOTE'])
    for curr_mnlr_class in in_sample_mnlr['true.labels'].unique():
        curr_label_prob_name = [col for col in prob_cols if col.startswith('prob_GOSE_') & col.endswith(str(curr_mnlr_class))][0]
        lowest_mse = sys.float_info.max
        opt_SMOTE = 100
        for curr_SMOTE in [0,1]:
            filt_in_sample_mnlr = in_sample_mnlr[in_sample_mnlr.SMOTE == curr_SMOTE]
            filt_in_sample_mnlr['temp_label'] = (filt_in_sample_mnlr['true.labels'] == curr_mnlr_class).astype('int')
            curr_prob_true, curr_prob_pred =  calibration_curve(filt_in_sample_mnlr['temp_label'].values, np.clip(filt_in_sample_mnlr[curr_label_prob_name].values,0,1), n_bins=10, strategy = 'quantile')            
            curr_mse = mean_squared_error(curr_prob_true, curr_prob_pred)
            if curr_mse < lowest_mse:
                lowest_mse = curr_mse
                opt_SMOTE = curr_SMOTE
        mnlr_opt_tune_indices = mnlr_opt_tune_indices.append(pd.DataFrame({'true_labels':[curr_mnlr_class],'SMOTE':[opt_SMOTE]}),ignore_index = True)

    # POLR
    polr_opt_tune_indices = pd.DataFrame(np.empty((0,2)),columns=['true_labels','SMOTE'])
    for curr_polr_class in in_sample_polr['true.labels'].unique():
        curr_label_prob_name = [col for col in prob_cols if col.startswith('prob_GOSE_') & col.endswith(str(curr_polr_class))][0]
        lowest_mse = sys.float_info.max
        opt_SMOTE = 100
        for curr_SMOTE in [0,1]:
            filt_in_sample_polr = in_sample_polr[in_sample_polr.SMOTE == curr_SMOTE]
            filt_in_sample_polr['temp_label'] = (filt_in_sample_polr['true.labels'] == curr_polr_class).astype('int')
            curr_prob_true, curr_prob_pred =  calibration_curve(filt_in_sample_polr['temp_label'].values, np.clip(filt_in_sample_polr[curr_label_prob_name].values,0,1), n_bins=10, strategy = 'quantile')            
            curr_mse = mean_squared_error(curr_prob_true, curr_prob_pred)
            if curr_mse < lowest_mse:
                lowest_mse = curr_mse
                opt_SMOTE = curr_SMOTE
        polr_opt_tune_indices = polr_opt_tune_indices.append(pd.DataFrame({'true_labels':[curr_polr_class],'SMOTE':[opt_SMOTE]}),ignore_index = True)
        
    # Using optimal model configurations, calculate probability calibration bins on out-sample predictions
    # DeepMN
    for curr_deepMN_class in deepMN_opt_tune_indices.true_labels.astype('int').values:
        curr_label_prob_name = [col for col in prob_cols if col.startswith('prob_GOSE_') & col.endswith(str(curr_deepMN_class))][0]
        curr_opt_tune_idx = deepMN_opt_tune_indices.tune_idx[deepMN_opt_tune_indices.true_labels.astype('int') == curr_deepMN_class].values[0]
        filt_out_sample_deepMN = out_sample_deepMN[out_sample_deepMN.tune_idx == curr_opt_tune_idx]
        filt_out_sample_deepMN['temp_label'] = (filt_out_sample_deepMN['true_labels'] == curr_deepMN_class).astype('int')
        curr_prob_true, curr_prob_pred =  calibration_curve(filt_out_sample_deepMN['temp_label'].values, np.clip(filt_out_sample_deepMN[curr_label_prob_name].values,0,1), n_bins=10, strategy = 'quantile')            
        temp_df = pd.DataFrame({'prob_true':curr_prob_true, 'prob_pred':curr_prob_pred,'Model':'DeepMN','class':curr_deepMN_class,'bs_idx':curr_bs_idx})
        compiled_cal_curves_df = compiled_cal_curves_df.append(temp_df,ignore_index = True)
        
    # DeepOR
    for curr_deepOR_class in deepOR_opt_tune_indices.true_labels.astype('int').values:
        curr_label_prob_name = [col for col in prob_cols if col.startswith('prob_GOSE_') & col.endswith(str(curr_deepOR_class))][0]
        curr_opt_tune_idx = deepOR_opt_tune_indices.tune_idx[deepOR_opt_tune_indices.true_labels.astype('int') == curr_deepOR_class].values[0]
        filt_out_sample_deepOR = out_sample_deepOR[out_sample_deepOR.tune_idx == curr_opt_tune_idx]
        filt_out_sample_deepOR['temp_label'] = (filt_out_sample_deepOR['true_labels'] == curr_deepOR_class).astype('int')
        curr_prob_true, curr_prob_pred =  calibration_curve(filt_out_sample_deepOR['temp_label'].values, np.clip(filt_out_sample_deepOR[curr_label_prob_name].values,0,1), n_bins=10, strategy = 'quantile')            
        temp_df = pd.DataFrame({'prob_true':curr_prob_true, 'prob_pred':curr_prob_pred,'Model':'DeepOR','class':curr_deepOR_class,'bs_idx':curr_bs_idx})
        compiled_cal_curves_df = compiled_cal_curves_df.append(temp_df,ignore_index = True)

    # MNLR
    for curr_mnlr_class in mnlr_opt_tune_indices.true_labels.astype('int').values:
        curr_label_prob_name = [col for col in prob_cols if col.startswith('prob_GOSE_') & col.endswith(str(curr_mnlr_class))][0]
        curr_opt_SMOTE = mnlr_opt_tune_indices.SMOTE[mnlr_opt_tune_indices.true_labels.astype('int') == curr_mnlr_class].values[0]
        filt_out_sample_mnlr = out_sample_mnlr[out_sample_mnlr.SMOTE == curr_opt_SMOTE]
        filt_out_sample_mnlr['temp_label'] = (filt_out_sample_mnlr['true.labels'] == curr_mnlr_class).astype('int')
        curr_prob_true, curr_prob_pred =  calibration_curve(filt_out_sample_mnlr['temp_label'].values, np.clip(filt_out_sample_mnlr[curr_label_prob_name].values,0,1), n_bins=10, strategy = 'quantile')            
        temp_df = pd.DataFrame({'prob_true':curr_prob_true, 'prob_pred':curr_prob_pred,'Model':'MNLR','class':curr_mnlr_class,'bs_idx':curr_bs_idx})
        compiled_cal_curves_df = compiled_cal_curves_df.append(temp_df,ignore_index = True)
        
    # POLR
    for curr_polr_class in polr_opt_tune_indices.true_labels.astype('int').values:
        curr_label_prob_name = [col for col in prob_cols if col.startswith('prob_GOSE_') & col.endswith(str(curr_polr_class))][0]
        curr_opt_SMOTE = polr_opt_tune_indices.SMOTE[polr_opt_tune_indices.true_labels.astype('int') == curr_polr_class].values[0]
        filt_out_sample_polr = out_sample_polr[out_sample_polr.SMOTE == curr_opt_SMOTE]
        filt_out_sample_polr['temp_label'] = (filt_out_sample_polr['true.labels'] == curr_polr_class).astype('int')
        curr_prob_true, curr_prob_pred =  calibration_curve(filt_out_sample_polr['temp_label'].values, np.clip(filt_out_sample_polr[curr_label_prob_name].values,0,1), n_bins=10, strategy = 'quantile')            
        temp_df = pd.DataFrame({'prob_true':curr_prob_true, 'prob_pred':curr_prob_pred,'Model':'POLR','class':curr_polr_class,'bs_idx':curr_bs_idx})
        compiled_cal_curves_df = compiled_cal_curves_df.append(temp_df,ignore_index = True)
        
compiled_cal_curves_df.to_csv('../metrics/compiled_calib_curves.csv',index=False)

'Bootstrap no. 1000 out of 1000 initiated.'