In [None]:
%reload_kedro

In [None]:
import numpy as np
import bisect
from sklearn.preprocessing import LabelEncoder, MultiLabelBinarizer
from sklearn.utils.validation import check_is_fitted
from pandas import DataFrame
from logging import getLogger
from sklearn.model_selection import train_test_split
from numpy import bincount
import matplotlib.pyplot as plt

class InductiveConformalPredictor():
    """
    Standard Conformal Predictor with uncertainty non-conformity score.
    Args:
        predictor: classifier used in upstream task.

    FROM: https://medium.com/data-from-the-trenches/measuring-models-uncertainty-with-conformal-prediction-f6aa8debb50e
    """

    def __init__(self, predictor):
        self.predictor = predictor
        check_is_fitted(self.predictor, attributes=["classes_"])

        self._le = LabelEncoder()
        self.classes = self._le.fit_transform(predictor.classes_)

    def fit(self, X, y):
        self.calibration_score = self._uncertainty_conformity_score(X)
        self.calibration_class = self._le.transform(y)
        return self

    def _uncertainty_conformity_score(self, data):
        uncertainty_score = 1 - self.predictor.predict_proba(data)
        return uncertainty_score

    def predict_proba(self, X, mondrian=True):
        check_is_fitted(self, attributes=["calibration_score"])

        conformity_score = self._uncertainty_conformity_score(X)
        conformal_pred = np.zeros(conformity_score.shape)

        for c in self.classes:
            if mondrian:
                calibration_filt = self.calibration_score[self.calibration_class == c]
                calib = calibration_filt[:, c]
            else:
                calib = self.calibration_score[range(len(self.calibration_class)), 
                                                          self.calibration_class]

            sorted_calib = np.sort(calib)
            conformal_pred[:, c] = [float(bisect.bisect(sorted_calib, x))/len(calib)
                                    for x in conformity_score[:, c]]

        return conformal_pred

    def predict(self, X, mondrian=True, alpha=0.05):
        _conformal_proba = self.predict_proba(X=X, mondrian=mondrian)
        conformal_pred = (_conformal_proba > alpha).astype(int)

        mlb = MultiLabelBinarizer()
        mlb.fit([self._le.classes_])
        pred = mlb.inverse_transform(conformal_pred)

        return pred

ALPHA = 0.25


def return_conformity_scores(
    data, params, model
) -> DataFrame:
    cfm = InductiveConformalPredictor(predictor=model)

    X, Y = data[params.get('features')], data[params.get('target')]

    X_train, X_test, y_train, y_test = train_test_split(
        X, Y, test_size=0.20, random_state=42)

    cfm.fit(X_train, y_train)

    y_test_conf = cfm.predict(X, alpha=ALPHA)

    data = data.copy()

    data['y_test_conf'] = y_test_conf 
    
    return data

# Random Forest Classifier

In [None]:
data_test = catalog.load('scored_test')
model = catalog.load('model')

d2 = return_conformity_scores(data_test, context.params, model)

cats = d2.y_test_conf.unique()

print("Random Forest\n")
for cat in cats:
    d = d2[d2.y_test_conf == cat].copy()
    print("Mean: %4.2f -- Share: %3.1f%% -- Share Target: %3.1f%%" % (
        d.target.mean(), 100*(d.shape[0]/d2.shape[0]), 100*((d.target==1).sum()/(d2.target==1).sum())
    ))
    print("Bincount: %s" % bincount(d.target))
    print()

In [None]:
d2.prob.hist(bins=20)
plt.axvline((d2[d2.y_test_conf == (0,1)]).prob.min(), c='r')
plt.axvline((d2[d2.y_test_conf == (0,1)]).prob.max(), c='r')
plt.title("Random Forest Classifier")
plt.ylabel("Count")
plt.xlabel("model.predict_proba")
plt.tight_layout()
plt.show()

# Catboost Classifier

In [None]:
data_test = catalog.load('catboost.scored_test')
model = catalog.load('catboost.model')

d2 = return_conformity_scores(data_test, context.params, model)

cats = d2.y_test_conf.unique()

print("Catboost\n")
for cat in cats:
    d = d2[d2.y_test_conf == cat].copy()
    print("Mean: %4.2f -- Share: %3.1f%% -- Share Target: %3.1f%%" % (
        d.target.mean(), 100*(d.shape[0]/d2.shape[0]), 100*((d.target==1).sum()/(d2.target==1).sum())
    ))
    print("Bincount: %s" % bincount(d.target))
    print()

In [None]:
d2.prob.hist(bins=20)
plt.axvline((d2[d2.y_test_conf == (0,1)]).prob.min(), c='r')
plt.axvline((d2[d2.y_test_conf == (0,1)]).prob.max(), c='r')
plt.title("Catboost Classifier")
plt.ylabel("Count")
plt.xlabel("model.predict_proba")
plt.tight_layout()
plt.show()