In [6]:
import numpy as np
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
import scipy.interpolate
import scipy.optimize
import joblib

class sigbg_model:
    def __init__(self, model, train, test):
        self.model = model
        self.train = train
        self.test = test
        self.train_pred = train[:,:-1]
        self.train_label = train[:,-1]
        self.test_pred = test[:,:-1]
        self.test_label = test[:,-1]
    
    def fit(self):
        self.model.fit(self.train_pred, self.train_label)
    
    def predict(self, test_pred=None):
        test_pred = test_pred if np.any(test_pred) else self.test_pred
        return self.model.predict(test_pred)
        
    def accuracy(self, display=True):
        predictions = self.predict()
        accuracy = 100 * round(
            (len(self.test) - np.sum(np.abs(predictions - self.test_label))) / len(self.test), 4)
        if display:
            print('Accuracy: {}%'.format(self.acc))
        return accuracy
        
    def confusion_matrix(self, predictions=None, labels=None):
        predictions = predictions if np.any(predictions) else self.predict(self.test_pred)
        labels = labels if np.any(labels) else self.test_label
        return 100 * confusion_matrix(labels, predictions) / len(labels)

#     def ppv(self):
#         """positive predictive value: correctly identified signal / all identified signal"""
#         conf_matrix = self.confusion_matrix()
#         return round(conf_matrix[0,0]/np.sum(conf_matrix[:,0]),3)
    
    def tpr(self, predictions=None, test_label=None):
        """true positive rate: correctly identified signal / all signal"""
        predictions = predictions if np.any(predictions) else self.predict(self.test_pred)
        test_label = test_label if np.any(test_label) else self.test_label
        conf_matrix = self.confusion_matrix(predictions, test_label)
        return conf_matrix[1,1]/np.sum(conf_matrix[1])
    
    def fpr(self, predictions=None, test_label=None):
        """false positive rate: misidentified signal / all background"""
        predictions = predictions if np.any(predictions) else self.predict(self.test_pred)
        test_label = test_label if np.any(test_label) else self.test_label
        conf_matrix = self.confusion_matrix(predictions, test_label)
        return conf_matrix[0,1]/np.sum(conf_matrix[0])
    
    def tpr_cm(self, conf_matrix):
        return conf_matrix[1,1]/np.sum(conf_matrix[1])
        
    def fpr_cm(self, conf_matrix):
        return conf_matrix[0,1]/np.sum(conf_matrix[0])
    
    def bare_metric(self):
        return self.tpr() / np.sqrt(self.fpr())
    
    def ams(self, signal, background):
        return signal * self.tpr() / np.sqrt(background * self.fpr())
    
    def metric(self, signal, background):
        return signal * self.tpr() / np.sqrt(signal * self.tpr() + background * self.fpr())
    
    def newvar2thresh(self, newvar):
        return 1 - np.power(10,newvar)
    
    def thresh2newvar(self, thresh):
        return np.log10(1 - thresh)
    
    def best_threshold(self, signal, background):
        """
        newvar = log_10(1 - threshold)
        threshold = 1 - 10**newvar
        0 < threshold < 1
        ??? < new_var < 0 
        """
        
        newvars = np.concatenate((np.linspace(-8, -2, 10, endpoint=False), np.linspace(-2, 0, 51, endpoint=False)))
        probs = self.model.predict_proba(self.test_pred)[:,1]
        predicts = np.array(
            [np.where(probs > self.newvar2thresh(newvar), np.ones_like(probs), np.zeros_like(probs)) for newvar in newvars])
        conf_matrices = [self.confusion_matrix(predictions=predictions) for predictions in predicts]
        tprs = [self.tpr_cm(conf_matrix) for conf_matrix in conf_matrices]
        fprs = [self.fpr_cm(conf_matrix) for conf_matrix in conf_matrices]
        significances = [
            -signal * tpr / np.sqrt(signal * tpr + background * fpr + 1e-10) for tpr, fpr in zip(tprs, fprs)] # negative as we minimize
        f = scipy.interpolate.interp1d(newvars, significances, kind='cubic')
        res = scipy.optimize.minimize(f, [-2], bounds=[(-8 + 1e-2, -1e-1)])
        
        best_threshold = self.newvar2thresh(res.x[0])
        best_predict = np.where(probs > best_threshold, np.ones_like(probs), np.zeros_like(probs))
        conf_matrix = self.confusion_matrix(predictions=best_predict)
        tpr = self.tpr_cm(conf_matrix)
        fpr = self.fpr_cm(conf_matrix)
        best_sig = signal * tpr / np.sqrt(signal * tpr + background * fpr + 1e-10)
        
        return [best_threshold, best_sig, tpr, fpr, tprs, fprs]
    
    def req_sig_cs(self, lumi, bg_cs, tpr, fpr, sig=5):
        conv = 10**15 / 10**12
        coef = [-tpr**2 * lumi**2 * conv**2, sig**2 * tpr * lumi * conv, sig**2 * fpr * bg_cs * lumi * conv]
        return np.amax(np.roots(coef))
        
    def save_model(self, filename):
        joblib.dump(self.model, filename + '.joblib')

In [7]:
def refresh_model(model):
    return sigbg_model(model.model, model.train, model.test)