In [1]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import (
    accuracy_score,
    balanced_accuracy_score,
    classification_report,
    precision_score,
    confusion_matrix
)
from collections import defaultdict

In [2]:
# Load the Drug Consumption dataset
url = "https://archive.ics.uci.edu/ml/machine-learning-databases/00373/drug_consumption.data"
columns = [
    "ID", "Age", "Gender", "Education", "Country", "Ethnicity", "Neuroticism", "Extraversion",
    "Openness", "Agreeableness", "Conscientiousness", "Impulsiveness", "Sensation-seeking",
    "Alcohol", "Amphetamines", "Amyl_nitrite", "Benzodiazepines", "Caffeine", "Cannabis",
    "Chocolate", "Cocaine", "Crack", "Ecstasy", "Heroin", "Ketamine", "Legal_highs", "LSD",
    "Methadone", "Mushrooms", "Nicotine", "Semer", "Volatile_substance"
]
data = pd.read_csv(url, header=None, names=columns)

In [3]:
# Define target (Y) and bias group (B)
target = "Cannabis"
bias_group = "Ethnicity"

# Binarize the target and bias group
data[target] = data[target].apply(lambda x: 1 if x in ["CL3", "CL4", "CL5", "CL6"] else 0)
data[bias_group] = data[bias_group].apply(lambda x: 1 if x > -0.5 else 0)

In [4]:
# Preprocess categorical features
categorical_columns = data.select_dtypes(include=["object"]).columns
label_encoders = {}
for col in categorical_columns:
    encoder = LabelEncoder()
    data[col] = encoder.fit_transform(data[col])
    label_encoders[col] = encoder

# Ensure all columns are numeric
for col in data.columns:
    if data[col].dtype == "object":
        data[col] = pd.to_numeric(data[col], errors="coerce")

In [5]:
# Split dataset into train and test sets
train_data, test_data = train_test_split(data, test_size=0.3, random_state=42)

# Define features and target for training and testing
X_train = train_data.drop(columns=[target, bias_group])
y_train = train_data[target]
X_test = test_data.drop(columns=[target, bias_group])
y_test = test_data[target]

# Convert features to float to avoid type issues
X_train = X_train.astype(float)
X_test = X_test.astype(float)

In [6]:
# Standardize the data for KNN and Logistic Regression
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

In [7]:
def train_and_evaluate_model(model, X_train, y_train, X_test, y_test, group, model_name):
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)

    accuracy = accuracy_score(y_test, y_pred)
    balanced_acc = balanced_accuracy_score(y_test, y_pred)
    print(f"\n{model_name} - Regular Data")
    print(f"Accuracy: {accuracy:.2f}")
    print(f"Balanced Accuracy: {balanced_acc:.2f}")
    print("\nClassification Report:\n", classification_report(y_test, y_pred))

    # Fairness metrics
    equalized_odds = calculate_equalized_odds(y_test, y_pred, group)
    predictive_parity = calculate_predictive_value_parity(y_test, y_pred, group)

    print("\nEqualized Odds (TPR, FPR) by Group:")
    for g, metrics in equalized_odds.items():
        print(f"Group {g}: TPR = {metrics['TPR']:.2f}, FPR = {metrics['FPR']:.2f}")

    print("\nPredictive Parity (PPV) by Group:")
    for g, ppv in predictive_parity.items():
        print(f"Group {g}: PPV = {ppv:.2f}")

    return y_pred

In [8]:
def calculate_equalized_odds(y_true, y_pred, group):
    tpr_fpr = defaultdict(dict)
    unique_groups = np.unique(group)

    for g in unique_groups:
        mask = group == g
        tn, fp, fn, tp = confusion_matrix(y_true[mask], y_pred[mask]).ravel()
        tpr = tp / (tp + fn) if (tp + fn) > 0 else 0  # Sensitivity
        fpr = fp / (fp + tn) if (fp + tn) > 0 else 0  # Fall-out
        tpr_fpr[g] = {"TPR": tpr, "FPR": fpr}

    return tpr_fpr

In [9]:
def calculate_predictive_value_parity(y_true, y_pred, group):
    ppv = {}
    unique_groups = np.unique(group)

    for g in unique_groups:
        mask = group == g
        precision = precision_score(y_true[mask], y_pred[mask], zero_division=0)
        ppv[g] = precision

    return ppv

In [10]:
knn_regular = KNeighborsClassifier(n_neighbors=5)
y_pred_knn_regular = train_and_evaluate_model(
    knn_regular, X_train_scaled, y_train, X_test_scaled, y_test, test_data[bias_group], "KNN Model"
)


KNN Model - Regular Data
Accuracy: 0.83
Balanced Accuracy: 0.83

Classification Report:
               precision    recall  f1-score   support

           0       0.78      0.86      0.82       249
           1       0.88      0.80      0.84       317

    accuracy                           0.83       566
   macro avg       0.83      0.83      0.83       566
weighted avg       0.83      0.83      0.83       566


Equalized Odds (TPR, FPR) by Group:
Group 0: TPR = 0.83, FPR = 0.08
Group 1: TPR = 0.80, FPR = 0.14

Predictive Parity (PPV) by Group:
Group 0: PPV = 0.83
Group 1: PPV = 0.88


In [11]:
rf_regular = RandomForestClassifier(random_state=42)
y_pred_rf_regular = train_and_evaluate_model(
    rf_regular, X_train, y_train, X_test, y_test, test_data[bias_group], "Random Forest Model"
)


Random Forest Model - Regular Data
Accuracy: 0.88
Balanced Accuracy: 0.88

Classification Report:
               precision    recall  f1-score   support

           0       0.87      0.86      0.86       249
           1       0.89      0.90      0.89       317

    accuracy                           0.88       566
   macro avg       0.88      0.88      0.88       566
weighted avg       0.88      0.88      0.88       566


Equalized Odds (TPR, FPR) by Group:
Group 0: TPR = 1.00, FPR = 0.08
Group 1: TPR = 0.89, FPR = 0.15

Predictive Parity (PPV) by Group:
Group 0: PPV = 0.86
Group 1: PPV = 0.89


In [12]:
logistic_regular = LogisticRegression(random_state=42, max_iter=1000)
y_pred_logistic_regular = train_and_evaluate_model(
    logistic_regular, X_train_scaled, y_train, X_test_scaled, y_test, test_data[bias_group], "Logistic Regression Model"
)


Logistic Regression Model - Regular Data
Accuracy: 0.85
Balanced Accuracy: 0.85

Classification Report:
               precision    recall  f1-score   support

           0       0.80      0.86      0.83       249
           1       0.89      0.83      0.86       317

    accuracy                           0.85       566
   macro avg       0.84      0.85      0.85       566
weighted avg       0.85      0.85      0.85       566


Equalized Odds (TPR, FPR) by Group:
Group 0: TPR = 1.00, FPR = 0.08
Group 1: TPR = 0.83, FPR = 0.14

Predictive Parity (PPV) by Group:
Group 0: PPV = 0.86
Group 1: PPV = 0.89


In [13]:
classes = train_data[target].unique()
bias_groups = train_data[bias_group].unique()
subsampled_distributions = {}

for c in classes:
    class_c_data = train_data[train_data[target] == c]
    bias_distribution = class_c_data[bias_group].value_counts(normalize=True)
    subsampled_data = []
    for other_c in classes:
        if other_c == c:
            subsampled_data.append(class_c_data)
        else:
            other_class_data = train_data[train_data[target] == other_c]
            sampled = []
            for bg in bias_groups:
                target_count = int(bias_distribution[bg] * len(other_class_data))
                sampled.append(other_class_data[other_class_data[bias_group] == bg].sample(n=target_count, replace=True))
            subsampled_data.append(pd.concat(sampled))
    subsampled_distributions[c] = pd.concat(subsampled_data)

In [14]:
def train_bias_mitigated_model(model_class, model_args, subsampled_distributions):
    bias_clfs = {}
    for c, subset in subsampled_distributions.items():
        X_train_bm = subset.drop(columns=[target, bias_group])
        y_train_bm = subset[target]
        X_train_bm_scaled = scaler.fit_transform(X_train_bm)

        if len(y_train_bm.unique()) == 1:
            print(f"Warning: Only one class present in training data for class {c}. Using constant probabilities.")
            bias_clfs[c] = None
        else:
            model = model_class(**model_args)
            model.fit(X_train_bm_scaled, y_train_bm)
            bias_clfs[c] = model
    return bias_clfs

In [15]:
bias_knn_clfs = train_bias_mitigated_model(KNeighborsClassifier, {"n_neighbors": 5}, subsampled_distributions)
bias_rf_clfs = train_bias_mitigated_model(RandomForestClassifier, {"random_state": 42}, subsampled_distributions)
bias_logistic_clfs = train_bias_mitigated_model(LogisticRegression, {"random_state": 42, "max_iter": 1000}, subsampled_distributions)

In [16]:
def evaluate_bias_mitigated_model(clfs, X_test, y_test, group, model_name):
    bias_predictions = []
    for c, clf in clfs.items():
        if clf is None:
            constant_prob = 1 if train_data[train_data[target] == c][target].iloc[0] == 1 else 0
            bias_predictions.append(np.full(len(X_test), constant_prob))
        else:
            bias_predictions.append(clf.predict_proba(X_test)[:, 1])
    final_bias_predictions = np.argmax(bias_predictions, axis=0)
    accuracy = accuracy_score(y_test, final_bias_predictions)
    balanced_acc = balanced_accuracy_score(y_test, final_bias_predictions)
    print(f"\n{model_name} - Bias-Mitigated Data")
    print(f"Accuracy: {accuracy:.2f}")
    print(f"Balanced Accuracy: {balanced_acc:.2f}")
    print("\nClassification Report:\n", classification_report(y_test, final_bias_predictions))
    equalized_odds = calculate_equalized_odds(y_test, final_bias_predictions, group)
    predictive_parity = calculate_predictive_value_parity(y_test, final_bias_predictions, group)
    print("\nEqualized Odds (TPR, FPR) by Group:")
    for g, metrics in equalized_odds.items():
        print(f"Group {g}: TPR = {metrics['TPR']:.2f}, FPR = {metrics['FPR']:.2f}")
    print("\nPredictive Parity (PPV) by Group:")
    for g, ppv in predictive_parity.items():
        print(f"Group {g}: PPV = {ppv:.2f}")

In [None]:
evaluate_bias_mitigated_model(bias_knn_clfs, X_test_scaled, y_test, test_data[bias_group], "KNN Model")

In [None]:
evaluate_bias_mitigated_model(bias_rf_clfs, X_test, y_test, test_data[bias_group], "Random Forest Model")

In [None]:
evaluate_bias_mitigated_model(bias_logistic_clfs, X_test_scaled, y_test, test_data[bias_group], "Logistic Regression Model")