In [None]:

# my helper functions for this project
import importlib
import helper



In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt 

np.random.seed(1234)

# positive class: 1 
# negative class: 0
# high score -> more likely to be positive

# number of examples in each class
nN = 4*2000
nP = 2000
n = nN + nP

meanP=0.6
meanN=0.4
scalePN=0.12

# scores for each class generated from two normal distributions
scoresN=np.random.normal(loc=meanN, scale=scalePN, size=nN)
scoresP=np.random.normal(loc=meanP, scale=scalePN, size=nP)

# plot the scores - one histogram for each class
def plotScoreDistributions(scores, labels):
    plt.hist(scores[labels==0], density=False, bins=30, alpha=0.5, label="Negative class", color="skyblue")
    plt.hist(scores[labels==1], density=False, bins=30, alpha=0.5, label="Positive class", color="green")
    plt.legend(loc='upper left')
    plt.xlabel('Score')
    plt.ylabel('Number of examples')


# create labels for the two classes
labelsN=np.zeros(nN)
labelsP=np.ones(nP)

# create data frame of scores and labels
scores= np.concatenate((scoresN, scoresP))
labels= np.concatenate((labelsN, labelsP))

d = {'score': scores, 'label':labels}

df = pd.DataFrame(data=d)

df = df.set_index("score")
df = df.sort_values('score')
df.reset_index(inplace=True)

# checking
df.size
#print(df.head(n=10))
#print(df.tail(n=10))
print(min(df.score))
print(max(df.score))

print('class distribution:', nP/(nN+nP))

plotScoreDistributions(df.score, df.label)

# fix scores outside 0 and 1
print('Min:', min(df.score), " Max: ", max(df.score))

df.loc[df['score'] < 0, 'score'] = 0
df.loc[df['score'] > 1, 'score'] = 1
print('Min:', min(df.score), " Max: ", max(df.score))


In [None]:
# calculate AUC

from sklearn import metrics
#xx = df.dropna(subset="label")
#auc = metrics.roc_auc_score(xx['label'], xx['score'])

auc = metrics.roc_auc_score(df['label'], df['score'])
print("AUC:", auc)

In [None]:
# calibrate scores
from scipy.stats import norm

def myCalibrate(score):
    piP = nP/(nN+nP)

    scoresCal = piP*norm.pdf(score, meanP, scalePN)/((1-piP)*norm.pdf(score, meanN, scalePN)+piP*norm.pdf(score, meanP, scalePN))
    plt.scatter(score, scoresCal)
    plt.xlabel('Original score')
    plt.ylabel('Calibrated score')
    plt.axline((0, 0), slope=1, color='red')


    return(scoresCal)

df['scoreCal'] = myCalibrate(df.score)

df


In [None]:
# plot the calibrated score distributions

plotScoreDistributions(df.scoreCal, df.label)


In [None]:
# calculate TPR and FPR
importlib.reload(helper)
df = helper.calculateROCPoints(df)

print(df.head(n=10))
print(df.tail(n=10))

In [None]:


# plot roc curve
ax = df.plot(kind = 'line', x = 'fpr', y ='tpr', legend=False, color='skyblue')
ax.set_ylabel("TPR")
ax.set_xlabel("FPR");




In [None]:
###
### plotting the cost curve

import matplotlib.pyplot as plt 

# Calculate loss for different costs
piN = nN/(nN+nP)
piP = 1 - piN

b = 2
c_0 = 1
c = c_0 / 2

fig, ax = plt.subplots()
    
# plot line in cost space for each F0 F1 pair in ROC space
for i in range(1,df.shape[0]):

    gradient = 2*(piN*df.loc[i,'fpr'] - piP*(1-df.loc[i,'tpr']))
    intercept = 2*piP * (1-df.loc[i,'tpr'])
    ax.axline((0, intercept), slope=gradient, color='C0')
    ax.set_xlim(0, 1)
    ax.set_ylim(0, 1)




In [None]:
# Cost curve without plotting all the lines

importlib.reload(helper)
dfLossCost = helper.lowerEnvelope(df)

ax = dfLossCost.plot(kind = 'line', x = 'cost', y ='loss', legend=False, color='skyblue', ylim=[0,1])
ax.set_ylabel("Loss")
ax.set_xlabel("Cost")



In [None]:
# calculate brier loss for the original score and the calibrated scores

df['brier_loss'] = 2*(1-df['score'])*piP*(1-df['tpr']) + 2*df['score']*piN*df['fpr']
df['brier_loss_cal'] = 2*(1-df['scoreCal'])*piP*(1-df['tpr']) + 2*df['scoreCal']*piN*df['fpr']



In [None]:
# plot cost curve and brier curve (of uncalibrated scores)
ax = dfLossCost.plot(kind = 'line', x = 'cost', y ='loss', color='skyblue', ylim=[0,1], label='Model cost curve')
df.plot(kind = 'line', x = 'score', y ='brier_loss', ax=ax, color='C7', linestyle='--', label='Model Brier curve')
ax.set_ylabel("Loss")
ax.set_xlabel("Cost")
df


In [None]:

# same plot but checking that the calibrated scores version is the same as the cost curve

ax = dfLossCost.plot(kind = 'line', x = 'cost', y ='loss', legend=False, color='skyblue', ylim=[0,1])
df.plot(kind = 'line', x = 'score', y ='brier_loss', ax=ax, color='C7', linestyle='-', label='Model Brier curve')
df.plot(kind = 'line', x = 'scoreCal', y ='brier_loss_cal', ax=ax, color='C8', linestyle='--', label='Model Brier curve cal')
ax.set_ylabel("Loss")
ax.set_xlabel("Cost")

ax.set_xlim([0,1])


In [None]:
# calculate net benefit for DCA

#df['netben'] = df.tpr*nP/n - (df.fpr*nN/n)*(df.score/(1-df.score))
df['netben'] = df['tpr']*nP/n - (df['fpr']*nN/n)*(df['score']/(1-df['score']))
df['netben_treatnone'] = 0/n - (0/n)*(df['score']/(1-df['score']))
df['netben_treatall'] = nP/n - (nN/n)*(df['score']/(1-df['score']))

df

In [None]:
# plot DCA curve
fig, ax = plt.subplots()
ax.set_xlim(0, 1)
ax.set_ylim(-1, 1)
df.plot(kind = 'line', x = 'score', y ='netben', ax=ax, color='C2', linestyle='-', label='Model, standard DCA')
df.plot(kind = 'line', x = 'score', y ='netben_treatall', ax=ax, color='C2', linestyle='--', label='Treat all standard DCA')
ax.axline((0, 0), slope=0, color='C3', markersize=1, linestyle='-', linewidth=1, label="Treat none")

ax.set_xlabel('Score')
ax.set_ylabel('Net benefit')

ax.legend();


In [None]:

    # df.loc[i,'NB_loss'] = 1*pi_0*(1-df.loc[i,'f0']) + ((1-df.loc[i,'score'])/(df.loc[i,'score']))*pi_1*df.loc[i,'f1']
    
    # df.loc[i,'max_brier_loss'] = 2*df.loc[i,'score']*pi_0*(1-0) + 2*(1-df.loc[i,'score'])*pi_1*1
    # df.loc[i,'max_NB_loss'] = 1*pi_0*(1-0) + ((1-df.loc[i,'score'])/(df.loc[i,'score']))*pi_1*1

    # df.loc[i,'NB_loss_treatall'] = 1*pi_0*(1-1) + ((1-df.loc[i,'score'])/(df.loc[i,'score']))*pi_1*1
    # df.loc[i,'NB_loss_treatnone'] = 1*pi_0*(1-0) + ((1-df.loc[i,'score'])/(df.loc[i,'score']))*pi_1*0

df['netben_cal'] = df['tpr']*nP/n - (df['fpr']*nN/n)*(df['scoreCal']/(1-df['scoreCal']))

importlib.reload(helper)
lowerEnv = helper.lowerEnvelopeNB(df)
plt.plot(lowerEnv['cost'], lowerEnv['nb'], color='blue', linestyle='-', linewidth=1, label='Upper envelope decision curve')

plt.plot(df['score'], df['netben'], color='green', linestyle='-', label='Decision curve')
plt.plot(df['score'], df['netben_treatall'], color='green', linestyle='--', label='Treat all')
#plt.plot(df['scoreCal'], df['netben_cal'], color='yellow', linestyle='--', label='Model, calibrated probabilities')

plt.axline((0, 0), (1, 0), color='red', markersize=1, linestyle='-', linewidth=1, label="Treat none")

plt.xlabel('Decision threshold')
plt.ylabel('Net benefit')

plt.xlim(0,1)
plt.ylim(-1,1)
#lowerEnv

plt.legend()


In [None]:
###
### plot cost curves including the treat all and treat none lines


df['brier_loss_treatall'] = 2*(1-df['score'])*piP*(1-1) + 2*df['score']*piN*1
df['brier_loss_treatnone'] = 2*(1-df['score'])*piP*(1-0) + 2*df['score']*piN*0

##
## cost curve with cost proportion as the operating condition

# plot cost curve and brier curve (of uncalibrated scores)
ax = dfLossCost.plot(kind = 'line', x = 'cost', y ='loss', color='blue', ylim=[0,1], label='Lower envelope cost curve')
df.plot(kind = 'line', x = 'score', y ='brier_loss', ax=ax, color='green', linestyle='-', label='Brier curve')
df.plot(kind = 'line', x = 'score', y ='brier_loss_treatall', ax=ax, color='green', linestyle='--', label='Predict all as positive')
df.plot(kind = 'line', x = 'score', y ='brier_loss_treatnone', ax=ax, color='red', linestyle='-', label='Predict all as negative', linewidth=1)

ax.set_ylabel("Loss")
ax.set_xlabel("Cost proportion")
ax.set_xlim([0,1])


##
## cost curve with skew as the operating condition


df['brier_loss_skew'] = (1-df['score'])*(1-df['tpr']) + df['score']*df['fpr']
df['brier_loss_treatall_skew'] = (1-df['score'])*(1-1) + df['score']*1
df['brier_loss_treatnone_skew'] = (1-df['score'])*(1-0) + df['score']*0
#df['skew'] = 2*(1-df['score'])*piP*(1-1) + 2*df['score']*piN*1

dfLossSkew = helper.lowerEnvelopeSkew(df)


# plot cost curve and brier curve (of uncalibrated scores)
ax = dfLossSkew.plot(kind = 'line', x = 'skew', y ='loss', color='blue', ylim=[0,1], label='Model cost curve')
df.plot(kind = 'line', x = 'score', y ='brier_loss_skew', ax=ax, color='green', linestyle='-', label='Model Brier curve')
df.plot(kind = 'line', x = 'score', y ='brier_loss_treatall_skew', ax=ax, color='green', linestyle='--', label='Predict all as positive')
df.plot(kind = 'line', x = 'score', y ='brier_loss_treatnone_skew', ax=ax, color='red', linestyle='-', label='Predict all as negative', linewidth=1)

ax.set_ylabel("Loss")
ax.set_xlabel("Skew")

ax.set_xlim([0,1])
df



In [None]:
# Decision curve with loss isometrics

df['netben_cal'] = df['tpr']*nP/n - (df['fpr']*nN/n)*(df['scoreCal']/(1-df['scoreCal']))

importlib.reload(helper)
lowerEnv = helper.lowerEnvelopeNB(df)
plt.plot(lowerEnv['cost'], lowerEnv['nb'], color='blue', linestyle='-', linewidth=3, label='best possible for given ROC curve')

plt.plot(df['score'], df['netben'], color='green', linestyle='-', label='Model, standard DCA')
plt.plot(df['score'], df['netben_treatall'], color='green', linestyle='--', label='Treat all standard DCA')

plt.axline((0, 0), (1, 0), color='red', markersize=1, linestyle='-', linewidth=1, label="Treat none")


for loss in np.arange(0,1, 0.05):
        
        nb_costs = []

        for c in np.arange(0,1, 0.01):
            
            # nb from loss with standard costs for each
            nb = piP - loss/(2*(1-c))
            
            # nb from loss with Brier costs for each
            #nb = (2*piP*(1-c) - loss)/2

            nb_cost = {'cost':c, 'nb':nb}
            nb_costs.append(nb_cost)


        dfNBCost = pd.DataFrame(nb_costs)
        plt.plot(dfNBCost['cost'], dfNBCost['nb'], color='yellow', linestyle=':')

        textXPos = 0.7+(1-loss)/5
        textYPos = (piP - loss/(2*(1-textXPos)))
        if (textYPos >=-1):
            plt.text(textXPos,textYPos , "{:.2f}". format(loss) , fontsize=8, backgroundcolor='white', alpha=0.5, bbox=dict(boxstyle='square', fc='white', ec='none'))


plt.xlabel('Threshold score')
plt.ylabel('Net benefit');

plt.xlim(0,1)
plt.ylim(-1,1)

plt.legend()


In [None]:
###
### plot cost curves including the treat all and treat none lines


df['DCA_loss_treatall'] = (df['score']/(1-df['score']))*piN
df['DCA_loss_treatnone'] = piP
df['DCA_loss'] = piP*(1-df['tpr']) + (df['score']/(1-df['score']))*piN*df['fpr']

importlib.reload(helper)
dfDCALossCost = helper.lowerEnvelopeDCALoss(df)


##
## cost curve with cost proportion as the operating condition

# plot cost curve and brier curve (of uncalibrated scores)
ax = dfDCALossCost.plot(kind = 'line', x = 'cost', y ='loss', color='blue', ylim=[0,1], label='Lower envelope cost curve')
df.plot(kind = 'line', x = 'score', y ='DCA_loss', ax=ax, color='green', linestyle='-', label='Brier curve with DCA loss')
df.plot(kind = 'line', x = 'score', y ='DCA_loss_treatall', ax=ax, color='green', linestyle='--', label='Predict all as positive')
df.plot(kind = 'line', x = 'score', y ='DCA_loss_treatnone', ax=ax, color='red', linestyle='-', label='Predict all as negative', linewidth=1)

ax.set_ylabel("Loss")
ax.set_xlabel("Cost proportion")
ax.set_xlim([0,1])


In [None]:

## reverse DCA, in terms of interventions avoided

df['netben_interventions_avoided'] = piN*(1-df['fpr']) - (piP*(1-df['tpr'])*(1-df['score'])/df['score'])
df['netben_interventions_avoided_treat_none'] = piN - (piP*(1-df['score'])/df['score'])
df['netben_interventions_avoided_treat_all'] = 0


plt.plot(df['score'], df['netben_interventions_avoided'], color='green', linestyle='-', label='Decision curve')
plt.plot(df['score'], df['netben_interventions_avoided_treat_none'], color='green', linestyle='--', label='Treat none')
plt.plot(df['score'], df['netben_interventions_avoided_treat_all'], color='red', linestyle='-', label='Treat all')


plt.xlim(0,1)
plt.ylim(-1, 1)

plt.xlabel('Decision threshold')
plt.ylabel('Net benefit for negative class');

plt.legend()
