In [1]:
import os
import numpy as np
import pandas as pd
from sklearn import preprocessing
from sklearn.preprocessing import LabelEncoder, OneHotEncoder, KBinsDiscretizer
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, f1_score
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.preprocessing import StandardScaler

In [15]:
def load_student():
    data_df = pd.read_csv(f"student.csv")

    # Drop the target column
    TARGET_COLUMNS = data_df.columns[-1]
    data = data_df.drop(columns=[TARGET_COLUMNS])

    data, numeric_columns, categorical_columns = preprocess_dataset(data, continuous_features=[])

    data_df_copy = data.copy()

    # ### Scale the dataset
    # min_max_scaler = preprocessing.MinMaxScaler()
    # data_scaled = min_max_scaler.fit_transform(data)
    # data = pd.DataFrame(data_scaled, columns=data.columns)

    FEATURE_COLUMNS = data.columns

    ### Add the target column back
    data[TARGET_COLUMNS] = data_df[TARGET_COLUMNS]

    ### Scale the dataset
    min_max_scaler = preprocessing.MinMaxScaler()
    data_scaled = min_max_scaler.fit_transform(data)
    data = pd.DataFrame(data_scaled, columns=data.columns)

    return data, FEATURE_COLUMNS, TARGET_COLUMNS, numeric_columns, categorical_columns

In [16]:
def preprocess_dataset(df, continuous_features=[]):
    label_encoder = LabelEncoder()
    onehot_encoder = OneHotEncoder()

    numeric_columns = []
    categorical_columns = []

    # Iterate over each column in the DataFrame
    for col in df.columns:
        # Check if the column is categorical
        if df[col].dtype == 'object' or df[col].dtype == 'category' and col not in continuous_features:
            categorical_columns.append(col)
            # If the column has only two unique values, treat it as binary categorical
            if len(df[col].unique()) == 2:
                # Label encode binary categorical features
                df[col] = label_encoder.fit_transform(df[col])
            else:
                # One-hot encode regular categorical features
                encoded_values = onehot_encoder.fit_transform(df[[col]])
                # Create new column names for the one-hot encoded features
                new_cols = [col + '_' + str(i) for i in range(encoded_values.shape[1])]
                # Convert the encoded values to a DataFrame and assign column names
                encoded_df = pd.DataFrame(encoded_values.toarray(), columns=new_cols)
                # Concatenate the encoded DataFrame with the original DataFrame
                df = pd.concat([df, encoded_df], axis=1)
                # Drop the original categorical column from the DataFrame
                df.drop(col, axis=1, inplace=True)
        # If the column is numerical but in string format and not in continuous_features, convert it to numerical type
        elif df[col].dtype == 'object' or df[col].dtype == 'category' and df[
            col].str.isnumeric().all() and col not in continuous_features:
            df[col] = df[col].astype(int)  # Convert to integer type
            categorical_columns.append(col)
        # If the column is a continuous feature, discretize it into bins
        elif col in continuous_features:
            numeric_columns.append(col)
            # Calculate the number of bins
            num_unique_values = len(df[col].unique())
            value_range = df[col].max() - df[col].min()
            num_bins = calculate_num_bins(num_unique_values, value_range)

            # Discretize into bins
            bin_discretizer = KBinsDiscretizer(n_bins=num_bins, encode='ordinal', strategy='uniform')
            bins = bin_discretizer.fit_transform(df[[col]])
            # Replace the original continuous feature with the binned values
            df[col] = bins.astype(int)
        else:
            # Here are numerical columns. If the column has only 2 unique values, dont add it to numeric_columns
            if len(df[col].unique()) > 2:
                numeric_columns.append(col)
    return df, numeric_columns, categorical_columns

In [17]:
data = load_student()
data

(     school  sex       age  address  famsize  Pstatus  Medu  Fedu  traveltime  \
 0       0.0  0.0  0.428571      1.0      0.0      0.0  1.00  1.00    0.333333   
 1       0.0  0.0  0.285714      1.0      0.0      1.0  0.25  0.25    0.000000   
 2       0.0  0.0  0.000000      1.0      1.0      1.0  0.25  0.25    0.000000   
 3       0.0  0.0  0.000000      1.0      0.0      1.0  1.00  0.50    0.000000   
 4       0.0  0.0  0.142857      1.0      0.0      1.0  0.75  0.75    0.000000   
 ..      ...  ...       ...      ...      ...      ...   ...   ...         ...   
 644     1.0  0.0  0.571429      0.0      0.0      1.0  0.50  0.75    0.000000   
 645     1.0  0.0  0.428571      1.0      1.0      1.0  0.75  0.25    0.000000   
 646     1.0  0.0  0.428571      1.0      0.0      1.0  0.25  0.25    0.333333   
 647     1.0  1.0  0.285714      1.0      1.0      1.0  0.75  0.25    0.333333   
 648     1.0  1.0  0.428571      0.0      1.0      1.0  0.75  0.50    0.666667   
 
      studytim

In [18]:
data, FEATURE_COLUMNS, TARGET_COLUMNS, numeric_columns, categorical_columns = load_student()

print(data.columns)

Index(['school', 'sex', 'age', 'address', 'famsize', 'Pstatus', 'Medu', 'Fedu',
       'traveltime', 'studytime', 'failures', 'schoolsup', 'famsup', 'paid',
       'activities', 'nursery', 'higher', 'internet', 'romantic', 'famrel',
       'freetime', 'goout', 'Dalc', 'Walc', 'health', 'absences', 'Mjob_0',
       'Mjob_1', 'Mjob_2', 'Mjob_3', 'Mjob_4', 'Fjob_0', 'Fjob_1', 'Fjob_2',
       'Fjob_3', 'Fjob_4', 'reason_0', 'reason_1', 'reason_2', 'reason_3',
       'guardian_0', 'guardian_1', 'guardian_2', 'target'],
      dtype='object')


In [19]:
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix

def calculate_metrics_and_split_data(data, features, target, classifier_type='logistic_regression',
                                     metrics=['accuracy'], attribute=None):
    """
    Train a classifier, predict on test data, calculate specified metrics from the confusion matrix,
    and optionally split the test data based on an attribute.

    :param data: DataFrame containing the dataset
    :param features: List of feature column names
    :param target: Name of the target column
    :param classifier_type: Type of classifier ('logistic_regression', 'svm', 'random_forest', 'naive_bayes')
    :param metrics: List of metrics to calculate ('accuracy', 'precision', 'recall', 'f1', 'tp', 'tn', 'fp', 'fn')
    :param attribute: Optional attribute to split the test data on, which should be binary (0 or 1)
    :return: Dictionary of requested metrics and optionally two DataFrames, one for each value of the attribute,
             and the trained model instance
    """
    # Split the data into training and testing sets
    X_train, X_test, y_train, y_test = train_test_split(data[FEATURE_COLUMNS], data[TARGET_COLUMNS], test_size=0.2, random_state=42)

    # Debugging: Print the shape of the train and test sets
    print(f"Training data shape: X_train: {X_train.shape}, y_train: {y_train.shape}")
    print(f"Testing data shape: X_test: {X_test.shape}, y_test: {y_test.shape}")

    # Initialize classifier
    classifiers = {
        'logistic_regression': LogisticRegression(max_iter=500, solver='saga'),
        'svm': SVC(),
        'random_forest': RandomForestClassifier(),
        'naive_bayes': GaussianNB()
    }
    classifier = classifiers[classifier_type]

    # Fit the classifier and predict
    classifier.fit(X_train, y_train)
    y_pred = classifier.predict(X_test)

    # Compute confusion matrix
    cm = confusion_matrix(y_test, y_pred)
    tp = cm[1, 1]  # True Positives
    tn = cm[0, 0]  # True Negatives
    fp = cm[0, 1]  # False Positives
    fn = cm[1, 0]  # False Negatives

    # Debugging: Print the confusion matrix
    print("Confusion Matrix:")
    print(cm)

    # Calculate metrics
    metrics_dict = {
        'accuracy': accuracy_score(y_test, y_pred),
        'precision': precision_score(y_test, y_pred, average='binary'),
        'recall': recall_score(y_test, y_pred, average='binary'),
        'f1': f1_score(y_test, y_pred, average='binary'),
        'tp': tp,
        'tn': tn,
        'fp': fp,
        'fn': fn
    }

    # Print accuracy
    print(f"Accuracy: {metrics_dict['accuracy']}")

    # Validate metrics
    results = {}
    for metric in metrics:
        if metric not in metrics_dict:
            raise ValueError(
                f"Unsupported metric '{metric}'. Choose from 'accuracy', 'precision', 'recall', 'f1', 'tp', 'tn', 'fp', 'fn'.")
        results[metric] = metrics_dict[metric]

#     if attribute is not None:
#         if attribute != target:
#             raise ValueError(f"The attribute for splitting should be the same as the target.")

        # Add target to test features
        X_test_df = X_test.copy()
        X_test_df[target] = y_test.values

#         # Split the test data into two groups
#         group_0 = X_test_df[X_test_df[attribute] == 0]
#         group_1 = X_test_df[X_test_df[attribute] == 1]

#         return results, group_0, group_1, classifier

    # Add the target column back to the test set for completeness
    X_test_df = X_test.copy()
    X_test_df[target] = y_test.values
    X_test_df['pred'] = y_pred

    # Debugging: Print the test data with all columns
    print("Test data with all columns:")
    print(X_test_df.head())

    return results, X_test_df, classifier

In [20]:
def generate_tn_fn_dataframes(test_df, target, attribute):
    group_0 = test_df[test_df[attribute] == 0]
    group_1 = test_df[test_df[attribute] == 1]
    print(f"Group 0 size: {group_0.shape[0]}")
    print(f"Group 1 size: {group_1.shape[0]}")
    y_true_0 = group_0[target]
    y_pred_0 = group_0['pred']
    tn_0, fp_0, fn_0, tp_0 = confusion_matrix(y_true_0, y_pred_0).ravel()
    print(f"Confusion Matrix for Group 0: TN={tn_0}, FP={fp_0}, FN={fn_0}, TP={tp_0}")
    y_true_1 = group_1[target]
    y_pred_1 = group_1['pred']
    tn_1, fp_1, fn_1, tp_1 = confusion_matrix(y_true_1, y_pred_1).ravel()
    print(f"Confusion Matrix for Group 1: TN={tn_1}, FP={fp_1}, FN={fn_1}, TP={tp_1}")
 # Create DataFrames for TN and FN
    tn_df_0 = group_0[(group_0[target] == 0) & (group_0['pred'] == 0)].drop(columns=[target, 'pred'])
    fn_df_0 = group_0[(group_0[target] == 1) & (group_0['pred'] == 0)].drop(columns=[target, 'pred'])

    tn_df_1 = group_1[(group_1[target] == 0) & (group_1['pred'] == 0)].drop(columns=[target, 'pred'])
    fn_df_1 = group_1[(group_1[target] == 1) & (group_1['pred'] == 0)].drop(columns=[target, 'pred'])

    print(f"TN Group 0 size: {tn_df_0.shape[0]}")
    print(f"FN Group 0 size: {fn_df_0.shape[0]}")
    print(f"TN Group 1 size: {tn_df_1.shape[0]}")
    print(f"FN Group 1 size: {fn_df_1.shape[0]}")
    return tn_df_0, fn_df_0, tn_df_1, fn_df_1

In [24]:
data, features, target, numeric_columns, categorical_columns = load_student()

target = 'target'
attribute = 'sex'
classifier_type = 'logistic_regression'

# Calculate metrics and get the test DataFrame with predictions
results, test_df, trained_model = calculate_metrics_and_split_data(data, features, target, classifier_type, attribute=attribute)

# Generate DataFrames for true negatives and false negatives
tn_df_0, fn_df_0, tn_df_1, fn_df_1 = generate_tn_fn_dataframes(test_df, target, attribute)
print("Results:")
for metric, value in results.items():
    print(f"{metric}: {value}")
print("True Negatives Group 0 (first 5 rows):")
print(tn_df_0.head())
print("False Negatives Group 0 (first 5 rows):")
print(fn_df_0.head())
print("True Negatives Group 1 (first 5 rows):")
print(tn_df_1.head())
print("False Negatives Group 1 (first 5 rows):")
print(fn_df_1.head())

Training data shape: X_train: (519, 43), y_train: (519,)
Testing data shape: X_test: (130, 43), y_test: (130,)
Confusion Matrix:
[[30 27]
 [16 57]]
Accuracy: 0.6692307692307692
Test data with all columns:
     school  sex       age  address  famsize  Pstatus  Medu  Fedu  traveltime  \
636     1.0  1.0  0.428571      1.0      0.0      1.0  1.00  1.00    0.000000   
220     0.0  0.0  0.142857      1.0      0.0      0.0  0.75  0.25    0.000000   
594     1.0  0.0  0.428571      1.0      0.0      1.0  1.00  1.00    0.333333   
429     1.0  1.0  0.142857      0.0      1.0      0.0  1.00  1.00    0.000000   
72      0.0  0.0  0.000000      0.0      0.0      1.0  0.25  0.25    0.000000   

     studytime  ...  Fjob_4  reason_0  reason_1  reason_2  reason_3  \
636   0.333333  ...     1.0       0.0       1.0       0.0       0.0   
220   0.333333  ...     0.0       1.0       0.0       0.0       0.0   
594   0.333333  ...     1.0       0.0       0.0       0.0       1.0   
429   0.333333  ...     

In [26]:
import dice_ml

d = dice_ml.Data(dataframe=data,
                 continuous_features=["age", "studytime", "traveltime"],
                 outcome_name='target')

In [27]:
m = dice_ml.Model(model=trained_model, backend="sklearn")

In [28]:
explainer = dice_ml.Dice(d, m, method="random")

In [29]:
input_datapoint = tn_df_0[0:1]
tn_df_0_cf = explainer.generate_counterfactuals(input_datapoint, 
                                  total_CFs=5, 
                                  desired_class="opposite",
)
# Visualize it
tn_df_0_cf.visualize_as_dataframe(show_only_changes=True)

100%|████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<00:00, 10.02it/s]

Query instance (original outcome : 0.0)





Unnamed: 0,school,sex,age,address,famsize,Pstatus,Medu,Fedu,traveltime,studytime,...,Fjob_3,Fjob_4,reason_0,reason_1,reason_2,reason_3,guardian_0,guardian_1,guardian_2,target
0,0.0,0.0,0.0,0.0,0.0,1.0,0.25,0.25,0.0,0.333333,...,0.0,0.0,0.0,0.0,0.0,1.0,0.0,1.0,0.0,0.0



Diverse Counterfactual set (new outcome: 1.0)


Unnamed: 0,school,sex,age,address,famsize,Pstatus,Medu,Fedu,traveltime,studytime,...,Fjob_3,Fjob_4,reason_0,reason_1,reason_2,reason_3,guardian_0,guardian_1,guardian_2,target
0,-,-,-,-,-,-,-,-,-,-,...,-,-,-,-,-,-,-,-,-,1.0
1,-,-,-,-,-,-,-,-,-,-,...,-,-,-,-,-,-,-,-,-,1.0
2,-,-,-,1.0,-,-,-,-,-,-,...,-,-,-,-,-,-,-,-,-,1.0
3,-,-,-,-,-,-,-,-,-,-,...,-,-,-,1.0,-,-,-,-,-,1.0
4,-,-,-,-,-,-,-,-,-,-,...,-,-,-,1.0,-,-,-,-,-,1.0


In [30]:
input_datapoint = fn_df_0[0:1]
fn_df_0_cf = explainer.generate_counterfactuals(input_datapoint, 
                                  total_CFs=5, 
                                  desired_class="opposite",
)
# Visualize it
fn_df_0_cf.visualize_as_dataframe(show_only_changes=True)

100%|████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<00:00, 11.15it/s]

Query instance (original outcome : 0.0)





Unnamed: 0,school,sex,age,address,famsize,Pstatus,Medu,Fedu,traveltime,studytime,...,Fjob_3,Fjob_4,reason_0,reason_1,reason_2,reason_3,guardian_0,guardian_1,guardian_2,target
0,1.0,0.0,0.0,0.0,1.0,1.0,0.5,0.5,0.0,0.666667,...,0.0,0.0,0.0,0.0,1.0,0.0,1.0,0.0,0.0,0.0



Diverse Counterfactual set (new outcome: 1.0)


Unnamed: 0,school,sex,age,address,famsize,Pstatus,Medu,Fedu,traveltime,studytime,...,Fjob_3,Fjob_4,reason_0,reason_1,reason_2,reason_3,guardian_0,guardian_1,guardian_2,target
0,-,-,-,-,-,-,-,-,-,-,...,-,-,-,-,0.0,-,-,-,-,1.0
1,-,-,-,-,-,-,-,-,-,-,...,-,-,-,-,-,-,-,-,-,1.0
2,-,-,-,-,-,-,-,-,-,-,...,-,-,-,-,0.0,-,-,-,-,1.0
3,-,-,-,-,-,-,0.75,-,-,-,...,-,-,-,-,-,-,-,-,-,1.0
4,-,-,-,-,-,-,-,-,-,-,...,-,-,-,-,0.0,-,-,-,-,1.0


In [31]:
def burden_distance(counterfactuals_df, input_datapoint):
    if isinstance(input_datapoint, pd.DataFrame):
        input_datapoint = input_datapoint.iloc[0]
    
    # Calculate the difference for each counterfactual row
    differences = counterfactuals_df.subtract(input_datapoint, axis=1)
    
    # If the difference is not zero (meaning the value has changed), take the absolute value; otherwise, ignore.
    distances = differences.where(differences != 0, 0).abs()
    
    # Sum the distances for each counterfactual row
    burden_distances = distances.sum(axis=1)
    
    mean_burden_distance = burden_distances.mean()
    
    return burden_distances, mean_burden_distance

In [32]:
distances = burden_distance(tn_df_0_cf.cf_examples_list[0].final_cfs_df, input_datapoint)
print(distances)

(0    13.083333
1    13.083333
2    13.083333
3    13.083333
4    14.083333
dtype: float64, 13.283333333333335)


In [33]:
distances = burden_distance(fn_df_0_cf.cf_examples_list[0].final_cfs_df, input_datapoint)
print(distances)

(0    2.00
1    2.00
2    3.00
3    2.25
4    2.00
dtype: float64, 2.25)
