In [2]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report
from sklearn.preprocessing import OneHotEncoder
import numpy as np
import matplotlib.pyplot as plt
import bisect
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import MultiLabelBinarizer

In [3]:
class ConformalPredictor():
    def __init__(self, n_trees=100, s=2, gamma=1, labda=1, tree_max_depth=None, combination=1, data_name=None, random_state=None):
        self.n_trees = n_trees
        self.s = s
        self.labda = labda
        self.gamma = gamma
        self.combination = combination
        self.data_name = data_name
        self.w = np.ones(n_trees) / n_trees
        self.model = RandomForestClassifier(n_estimators=n_trees, max_depth=tree_max_depth, random_state=random_state)

    def fit(self, X_train, y_train, X_calib, y_calib):
        self.model.fit(X_train, y_train)

        # Store class information
        self.classes = self.model.classes_
        self.class_to_index = {c: i for i, c in enumerate(self.classes)}

        # calibration nonconformity scores
        # for each sample:
        # nonconformity_score = 1 - p(correct_class|x_calib)
        prob_calib = self.model.predict_proba(X_calib)
        self.calibration_scores_by_class = {c: [] for c in self.classes}

        for i, true_class in enumerate(y_calib):
            class_idx = self.class_to_index[true_class]
            # nonconformity score for this instance is 1 - probability of the true class
            nonconformity_score = 1 - prob_calib[i, class_idx]
            self.calibration_scores_by_class[true_class].append(nonconformity_score)

        for c in self.classes:
            self.calibration_scores_by_class[c] = np.sort(self.calibration_scores_by_class[c])

        return self

    def _nonconformity_score_for_class(self, X, class_idx):
        prob = self.model.predict_proba(X)
        return 1 - prob[:, class_idx]

    def predict_proba(self, X):
        # compute the conformal p-values for each class
        # for a given test point, p-value for class c_j is:

        n_test = X.shape[0]
        p_values = np.zeros((n_test, len(self.classes)))

        for j, c in enumerate(self.classes):
            test_scores = self._nonconformity_score_for_class(X, j)
            sorted_scores = self.calibration_scores_by_class[c]
            N_j = len(sorted_scores)

            # for each test sample, count num calibration scores are >= test_score
            # p-value = (count + 1)/(N_j + 1)
            for i, score in enumerate(test_scores):
                idx = bisect.bisect_left(sorted_scores, score)
                count = N_j - idx
                p_values[i, j] = (count + 1) / (N_j + 1) if N_j > 0 else 1.0

        return p_values

    def predict(self, X, alpha=0.05):
        # get conformal set of classes whose p-value > alpha
        p_values = self.predict_proba(X)
        # thresh at alpha to get sets of classes
        conformal_sets = (p_values > alpha).astype(int)
        mlb = MultiLabelBinarizer(classes=self.classes)
        mlb.fit([self.classes])
        pred = mlb.inverse_transform(conformal_sets)

        return pred


In [11]:
def u65_score(y_test, y_pred):
        imprecise_predictions = y_pred
        indeterminate_instance = (imprecise_predictions == -1)
        determinate_instance = (imprecise_predictions != -1)
        
        # calculate single-set length
        single_set_length = len(y_test) - sum(indeterminate_instance)
        
        # calculate determinacy
        determinacy = single_set_length/len(y_test)
        determinacy = round(determinacy*100, 2)
        
        # calculate single-set accuracy
        single_set_accuracy = sum(y_test[determinate_instance]==imprecise_predictions[determinate_instance])/single_set_length
        single_set_accuracy = round(single_set_accuracy*100, 2)
        
        # claculate u65
        u65_score = round(65 + (single_set_accuracy - 65)*determinacy/100, 2)
        return u65_score, single_set_accuracy


## Example use case with german credit data

In [26]:
url = "https://archive.ics.uci.edu/ml/machine-learning-databases/statlog/german/german.data"
columns = [
    "Status_of_existing_checking_account", "Duration_in_month", "Credit_history",
    "Purpose", "Credit_amount", "Savings_account_bonds", "Employment_since",
    "Installment_rate", "Personal_status_and_sex", "Other_debtors", "Present_residence_since",
    "Property", "Age_in_years", "Other_installment_plans", "Housing", "Number_of_existing_credits",
    "Job", "Number_of_people_liable", "Telephone", "Foreign_worker", "Target"
]

data = pd.read_csv(url, delimiter=' ', names=columns, header=None)
X = data.drop(columns=["Target"])
y = data["Target"]
y = y - 1
categorical_features = X.select_dtypes(include=["object"]).columns

encoder = OneHotEncoder(sparse_output=False, drop='first')
X_encoded = encoder.fit_transform(X[categorical_features])
X_encoded_df = pd.DataFrame(X_encoded, columns=encoder.get_feature_names_out(categorical_features))

X = X.drop(columns=categorical_features)
X = pd.concat([X.reset_index(drop=True), X_encoded_df.reset_index(drop=True)], axis=1)

X_train_calib, X_test, y_train_calib, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

X_train, X_calib, y_train, y_calib = train_test_split(X_train_calib, y_train_calib, test_size=0.2, random_state=42, stratify=y_train_calib)
print(len(X_calib))
print(len(X_train))
print(len(X_test))

160
640
200


In [27]:
cpc = ConformalPredictor()
cpc.fit(X_train, y_train, X_calib, y_calib)
cpc_preds = cpc.predict(X_test, alpha=0.1)

rfc = RandomForestClassifier(n_estimators=100, random_state=42)
rfc.fit(X_train, y_train)
rfc_prd = rfc.predict(X_test)

In [30]:
cpc_pred_transformed = [
    -1 if p == (0, 1) else p[0] 
    for p in cpc_preds
]
print(y_test)
print(cpc_pred_transformed)
u65, single_set_acc = u65_score(y_test, rfc_prd)
print(f"Single set acc for Random Forest: {single_set_acc}, u65: {u65}")
u65, single_set_acc = u65_score(y_test, np.array(cpc_pred_transformed))
print(f"Single set acc for conformal pred: {single_set_acc}, u65: {u65}")



30     0
128    0
289    1
216    0
966    1
      ..
522    1
977    0
52     0
542    1
423    0
Name: Target, Length: 200, dtype: int64
[-1, -1, 1, -1, -1, -1, -1, -1, 1, 0, 0, -1, 0, -1, -1, 0, -1, 0, 0, 0, -1, 0, -1, 1, -1, -1, 0, -1, 0, 0, -1, -1, -1, 1, -1, 1, 1, 1, 0, 1, -1, -1, -1, -1, -1, 0, -1, 1, -1, 0, 0, 0, -1, 1, 1, -1, 0, -1, 1, 1, -1, 1, -1, 1, 0, -1, 0, 0, 1, 1, -1, -1, 0, -1, -1, 0, -1, 0, 1, 0, 1, -1, -1, 0, 0, -1, -1, 0, 1, -1, 1, 1, -1, -1, -1, 0, 1, -1, -1, 1, 0, 0, 0, -1, 0, -1, -1, -1, -1, -1, -1, -1, 1, 0, 0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 0, 0, -1, 1, -1, -1, -1, 1, 1, 1, 1, 0, -1, -1, -1, -1, -1, 0, -1, -1, -1, 0, -1, 0, 0, 0, 0, -1, 0, -1, -1, 1, -1, -1, -1, -1, 0, -1, 0, -1, 0, 1, -1, -1, -1, -1, 0, -1, 0, 0, 1, 0, 0, 1, 1, -1, -1, -1, 1, 1, -1, 1, 0, 0, 0, -1, 1, 0, -1, 0, 1, 0, 0, -1, 0]
Single set acc for Random Forest: 76.0, u65: 76.0
Single set acc for conformal pred: 80.0, u65: 72.5


## Example use case on compas data

In [21]:
df = pd.read_csv('./data/compas/compas_data_combined_matches.csv')
columns_to_drop = ['FirstName', 'LastName', 'DateOfBirth', 'id', 'v_decile_score', 'DecileScore_Risk of Failure to Appear','race', 'DecileScore_Risk of Recidivism', 'DecileScore_Risk of Violence', 'RawScore_Risk of Failure to Appear', 'RawScore_Risk of Recidivism', 'RawScore_Risk of Violence', '_merge', 'sex', 'c_charge_desc']
rf_dataset = df.drop(columns=columns_to_drop)
## Remove Nans
na_counts = rf_dataset.isna().sum()
na_columns = na_counts[na_counts > 0] 
nans = na_columns.to_dict()
columns_to_remove = []
for key in nans.keys():
    columns_to_remove.append(key)
rf_dataset = rf_dataset.drop(columns=columns_to_remove)
labels = rf_dataset.two_year_recid
rf_dataset = rf_dataset.drop(columns=['two_year_recid', 'is_recid', 'score_text'])
print(len(rf_dataset))
print(len(labels))
X_train_calib, X_test, y_train_calib, y_test = train_test_split(rf_dataset, labels, test_size=0.2, random_state=42)

X_train, X_calib, y_train, y_calib = train_test_split(X_train_calib, y_train_calib, test_size=0.2, random_state=42)
print(len(X_calib))
print(len(X_train))
print(len(X_test))


7185
7185
1150
4598
1437


In [23]:
cpc = ConformalPredictor()
cpc.fit(X_train, y_train, X_calib, y_calib)
cpc_preds = cpc.predict(X_test, alpha=0.1)

rfc = RandomForestClassifier(n_estimators=100, random_state=42)
rfc.fit(X_train, y_train)
rfc_prd = rfc.predict(X_test)

In [None]:
cpc_pred_transformed = [
    -1 if p == (0, 1) else p[0] 
    for p in cpc_preds
]
u65, single_set_acc = u65_score(y_test, rfc_prd)
print(f"Single set acc for Random Forest: {single_set_acc}, u65: {u65}")
u65, single_set_acc = u65_score(y_test, np.array(cpc_pred_transformed))
print(f"Single set acc for conformal pred: {single_set_acc}, u65: {u65}")



1315    1.0
1893    1.0
6937    0.0
7021    0.0
4908    1.0
       ... 
4874    1.0
4664    0.0
940     0.0
3817    0.0
5251    0.0
Name: two_year_recid, Length: 1437, dtype: float64
Single set acc for Random Forest: 62.28, u65: 62.28
Single set acc for conformal pred: 70.78, u65: 66.95
