## Setup

In [None]:
import pandas as pd
import numpy as np
from joblib import load

from utils import (
    compute_nc_scores,
    find_threshold,
    predict_conformal_sets,
    evaluate_sets,
    summarize_by_indicator,
    summarize_for_predicate
)

## Data and Model Loading

In [None]:
X_calib_f = pd.read_csv("./output/X_calib_f.csv") # 2015, w. protected attributes
X_calib_s = pd.read_csv("./output/X_calib_s.csv") # 2015, w/o protected attributes
y_calib = pd.read_csv("./output/y_calib.csv").iloc[:,0]

X_test_f = pd.read_csv("./output/X_test_f.csv")
X_test_s = pd.read_csv("./output/X_test_s.csv")
y_test = pd.read_csv("./output/y_test.csv").iloc[:,0]

In [None]:
#preds_test = pd.read_csv("./output/preds_test.csv")

glm1 = load("./models/glm1.joblib")
glm2 = load("./models/glm2.joblib")


## Conformal

In [None]:
# Miscoverage level
alpha = 0.1

### Conformal - Logit Regression (w. protected attributes)

In [None]:
probs_calib1 = glm1.predict_proba(X_calib_f)

nc_scores1 = compute_nc_scores(probs_calib1, y_calib)

q_hat1 = find_threshold(nc_scores1, alpha) # q_hat is data-driven threshold for classification
print(f"q_hat1: {q_hat1:.4f}")

In [None]:
# With test data
pred_sets1 = predict_conformal_sets(glm1, X_test_f, q_hat)

In [None]:
# With test data
evaluation1 = evaluate_sets(pred_sets1, y_test)
print(f"Coverage1: {evaluation1['coverage']:.2f}")
print(f"Avg. set size 1: {evaluation1['avg_size']:.2f}")

### Conformal - Logit Regression (w/o protected attributes)

In [None]:
probs_calib2 = glm2.predict_proba(X_calib_s)

nc_scores2 = compute_nc_scores(probs_calib2, y_calib)

q_hat2 = find_threshold(nc_scores2, alpha) # q_hat is data-driven threshold for classification
print(f"q_hat2: {q_hat2:.4f}")

In [None]:
# With test data
pred_sets2 = predict_conformal_sets(glm2, X_test_s, q_hat2)

In [None]:
# With test data
evaluation2 = evaluate_sets(pred_sets2, y_test)
print(f"Coverage2: {evaluation2['coverage']:.2f}")
print(f"Avg. set size 2: {evaluation2['avg_size']:.2f}")

## Analyzing CP per group 

In [None]:
# Logistic regression with protected attributes

# Create cp_groups with the same index as X_test_f (and y_test)
cp_groups = pd.DataFrame(index=X_test_f.index)
cp_groups['pred_set'] = pd.Series(pred_sets1, index=X_test_f.index).apply(lambda s: {int(x) for x in s})
cp_groups['true_label'] = y_test.reindex(X_test_f.index)
cp_groups['frau1'] = X_test_f['frau1']

cp_groups['nongerman'] = np.where(
    X_test_f['maxdeutsch1'] == 0, 
    1, 
    0
)
cp_groups.loc[
    X_test_f['maxdeutsch.Missing.'] == 1, 
    'nongerman'
] = np.nan

cp_groups['nongerman_male'] = np.where(
    (cp_groups['nongerman'] == 1) & (cp_groups['frau1'] == 0),
    1,
    0
)
cp_groups['nongerman_female'] = np.where(
    (cp_groups['nongerman'] == 1) & (cp_groups['frau1'] == 1),
    1,
    0
)

cp_groups = cp_groups.dropna()

In [None]:
# Logistic regression w/o protected attributes

# Create cp_groups with the same index as X_test_s
cp_groups2 = pd.DataFrame(index=X_test_s.index)

# Assign prediction sets (assuming pred_sets2 aligns with X_test_s)
cp_groups2['pred_set'] = pd.Series(pred_sets2, index=X_test_s.index).apply(lambda s: {int(x) for x in s})

# Get true labels from y_test
cp_groups2['true_label'] = y_test.reindex(X_test_s.index)

# Bring back protected features from X_test_f (or siab_test)
cp_groups2['frau1'] = X_test_f.loc[X_test_s.index, 'frau1']

cp_groups2['nongerman'] = np.where(
    X_test_f.loc[X_test_s.index, 'maxdeutsch1'] == 0,
    1,
    0
)
cp_groups2.loc[
    X_test_f.loc[X_test_s.index, 'maxdeutsch.Missing.'] == 1,
    'nongerman'
] = np.nan

# Split by gender
cp_groups2['nongerman_male'] = np.where(
    (cp_groups2['nongerman'] == 1) & (cp_groups2['frau1'] == 0),
    1,
    0
)
cp_groups2['nongerman_female'] = np.where(
    (cp_groups2['nongerman'] == 1) & (cp_groups2['frau1'] == 1),
    1,
    0
)

# Drop rows with missing data in any of the relevant columns
cp_groups2 = cp_groups2.dropna()


### Conditional Coverage

In [None]:
# Conditional coverage and set size

# List of subgroup indicators to evaluate
groups = ['frau1', 'nongerman', 'nongerman_male', 'nongerman_female']

# Align pred_sets with y_test indices for easy filtering
pred_sets_series = pd.Series(pred_sets2, index=y_test.index)

# Prepare a list to collect results
results = []

for group in groups:
    # Create a boolean mask for the current subgroup (True for indices in the subgroup)
    mask = (cp_groups2[group] == 1)
    # Align the mask to y_test index (in case cp_groups has a subset of test indices)
    mask_aligned = mask.reindex(y_test.index, fill_value=False)
    
    # Filter true labels and prediction sets for this subgroup
    group_y = y_test[mask_aligned]             # true labels for this subgroup
    group_pred_sets = pred_sets_series[mask_aligned]  # prediction sets for this subgroup
    
    # Compute coverage: fraction of cases where true label is in the prediction set
    coverage = np.mean([
        1 if true_label in pred_set else 0 
        for true_label, pred_set in zip(group_y, group_pred_sets)
    ])
    # Compute average prediction set size for this subgroup
    avg_set_size = np.mean([len(pred_set) for pred_set in group_pred_sets])
    
    # Store the results (optionally multiply coverage by 100 if you want percentage)
    results.append({
        'Group': group,
        'Coverage': coverage,
        'Avg Set Size': avg_set_size,
        'Num Samples': mask_aligned.sum()  # number of test samples in this subgroup
    })

# Create a DataFrame for clear tabular display of the results
coverage_results = pd.DataFrame(results).set_index('Group')
print(coverage_results)

### True Class Distribution

In [None]:
# Subgroups true class label distributions

# Overall distribution of true_label
print("Overall true_label distribution:")
print(cp_groups2['true_label'].value_counts().sort_index())
print("As proportions:")
print(cp_groups2['true_label'].value_counts(normalize=True).sort_index())
print(f"P(true_label=1): {cp_groups2['true_label'].mean():.4f}")
print()

In [None]:
# Distribution conditional on frau1
print("Distribution conditional on frau1:")
for frau_val in [0, 1]:
    subset = cp_groups2[cp_groups2['frau1'] == frau_val] # Get all females
    prop_positive = subset['true_label'].mean() # What % of females have true_label=1?
    print(f"P(true_label=1 | frau1={frau_val}): {prop_positive:.4f} (n={len(subset)})")
print()

# Add total counts
n_female = (cp_groups2['frau1'] == 1).sum()
n_male = (cp_groups2['frau1'] == 0).sum()
print(f"Total observations: {len(cp_groups2)} (female: n={n_female}, male: n={n_male})")
print()

In [None]:
# Distribution conditional on nongerman
print("Distribution conditional on nongerman:")
for ng_val in [0, 1]:
    subset = cp_groups[cp_groups['nongerman'] == ng_val]
    prop_positive = subset['true_label'].mean()
    print(f"P(true_label=1 | nongerman={ng_val}): {prop_positive:.4f} (n={len(subset)})")
print()

# Add total counts
n_german = (cp_groups['nongerman'] == 0).sum()
n_nongerman = (cp_groups['nongerman'] == 1).sum()
print(f"Total observations: {len(cp_groups)} (german: n={n_german}, nongerman: n={n_nongerman})")
print()

In [None]:
# Distribution conditional on nongerman_male and nongerman_female
print("Distribution conditional on nongerman subgroups:")
if 'nongerman_male' in cp_groups.columns:
    for nm_val in [0, 1]:
        subset = cp_groups[cp_groups['nongerman_male'] == nm_val]
        prop_positive = subset['true_label'].mean()
        print(f"P(true_label=1 | nongerman_male={nm_val}): {prop_positive:.4f} (n={len(subset)})")

if 'nongerman_female' in cp_groups.columns:
    for nf_val in [0, 1]:
        subset = cp_groups[cp_groups['nongerman_female'] == nf_val]
        prop_positive = subset['true_label'].mean()
        print(f"P(true_label=1 | nongerman_female={nf_val}): {prop_positive:.4f} (n={len(subset)})")
print()

# Add total counts
n_german_male = (cp_groups['nongerman_male'] == 0).sum()
n_nongerman_male = (cp_groups['nongerman_male'] == 1).sum()
print(f"Total observations: {len(cp_groups)} (other: n={n_german_male}, nongerman male: n={n_nongerman_male})")
print()

# Add total counts
n_german_female = (cp_groups['nongerman_female'] == 0).sum()
n_nongerman_female = (cp_groups['nongerman_female'] == 1).sum()
print(f"Total observations: {len(cp_groups)} (other: n={n_german_female}, nongerman female: n={n_nongerman_female})")
print()

### Prediction Sets Distribution

#### Summarize for Predicate

In [None]:
summarize_for_predicate(
    cp_groups2,
    predicate=lambda s: set(s) == {0},
    description="== {0}"
)

In [None]:
summarize_for_predicate(
    cp_groups2,
    predicate=lambda s: set(s) == {1},
    description="== {1}"
)

In [None]:
summarize_for_predicate(
    cp_groups2,
    predicate=lambda s: set(s) == {0,1},
    description="== {0,1}"
)

#### Summarize by Indicator

In [None]:
# Baselines CP

print("Value counts:")
print(cp_groups2['pred_set'].value_counts())
print("\nProportions:")
print(cp_groups2['pred_set'].value_counts(normalize=True))

In [None]:
# Summarize for frau1 == 1 (vs 0)
counts_female, pct_female = summarize_by_indicator(
    cp_groups2,
    indicator_col='frau1',
    positive_label='female',
    negative_label='male'
)

print("\nCounts by gender:\n")
print(counts_female)
print("\nPercentages by gender:\n")
print(pct_female)

In [None]:
# Summarize for nongerman == 1 (vs 0)
counts_ng, pct_ng = summarize_by_indicator(
    cp_groups2,
    indicator_col='nongerman',
    positive_label='non‐German',
    negative_label='German'
)

print("Counts by nationality (German vs non‐German):\n")
print(counts_ng)
print("\nPercentages by nationality:\n")
print(pct_ng)

In [None]:
# Summarize for nongerman_male == 1 (vs 0)
counts_ng_male, pct_ng_male = summarize_by_indicator(
    cp_groups2,
    indicator_col='nongerman_male',
    positive_label='non‐German Male',
    negative_label='Others'
)

print("\nCounts for non‐German Male vs Others:\n")
print(counts_ng_male)
print("\nPercentages for non‐German Male vs Others:\n")
print(pct_ng_male)

In [None]:
# Summarize for nongerman_female == 1 (vs 0)
counts_ng_female, pct_ng_female = summarize_by_indicator(
    cp_groups2,
    indicator_col='nongerman_female',
    positive_label='non‐German Female',
    negative_label='Others'
)

print("\nCounts for non‐German Female vs Others:\n")
print(counts_ng_female)
print("\nPercentages for non‐German Female vs Others:\n")
print(pct_ng_female)

# Confusion Matrix

In [None]:
from sklearn.metrics import confusion_matrix

# 1. Filter out ambiguous prediction sets (where pred_set == {0,1})
confident_indices = [idx for idx, pset in enumerate(pred_sets2) if pset != {0, 1}]

# If there are no confident predictions, handle that case
if len(confident_indices) == 0:
    print("No confident predictions (all predictions were ambiguous). Confusion matrix cannot be computed.")
else:
    # 2. Extract predicted labels from the remaining sets
    y_pred_filtered = []
    for idx in confident_indices:
        pset = pred_sets2[idx]
        # pset can only be {0} or {1} here
        predicted_label = 0 if pset == {0} else 1
        y_pred_filtered.append(predicted_label)

    # 3. Align predicted labels with the corresponding true labels
    # Use the same indices to filter y_test
    y_true_filtered = [y_test.iloc[idx] for idx in confident_indices]

    # 4. Compute the confusion matrix
    cm = confusion_matrix(y_true_filtered, y_pred_filtered)
    print("Confusion matrix (excluding ambiguous cases):")
    print(cm)

# Extract individual components
TN, FP, FN, TP = cm.ravel()  # Unpacks the 2x2 matrix into values

# Compute metrics
accuracy = (TP + TN) / (TP + TN + FP + FN)
precision = TP / (TP + FP) if (TP + FP) > 0 else float('nan')
recall = TP / (TP + FN) if (TP + FN) > 0 else float('nan')
f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else float('nan')

# Print results
print(f"Accuracy:  {accuracy:.3f}")
print(f"Precision: {precision:.3f}")
print(f"Recall:    {recall:.3f}")
print(f"F1 Score:  {f1:.3f}")

In [None]:
valid_idx = cp_groups2.index
# Align arrays
pred_sets_filtered = [pred_sets2[i] for i in valid_idx]
y_test_filtered = np.array(y_test)[valid_idx]

In [None]:
from sklearn.metrics import precision_recall_fscore_support

def compute_confusion_metrics(pred_sets, y_true, subgroup_mask):
    # Filter to non-ambiguous predictions and apply subgroup mask
    mask = np.array([len(s) == 1 for s in pred_sets]) & subgroup_mask
    if not np.any(mask):
        return None  # no data to evaluate
    
    y_true_filtered = np.array(y_true)[mask]
    y_pred_filtered = [list(s)[0] for i, s in enumerate(pred_sets) if len(s) == 1 and subgroup_mask[i]]

    # Confusion matrix
    cm = confusion_matrix(y_true_filtered, y_pred_filtered, labels=[0, 1])
    tn, fp, fn, tp = cm.ravel()

    # Metrics
    precision, recall, f1, _ = precision_recall_fscore_support(
        y_true_filtered, y_pred_filtered, average="binary", zero_division=0
    )

    return {
        "TP": tp,
        "TN": tn,
        "FP": fp,
        "FN": fn,
        "Precision": precision,
        "Recall": recall,
        "F1": f1,
        "Coverage (non-ambiguous)": np.mean(mask)
    }

#frau1_mask = cp_groups['frau1'] == 1
#nongerman_mask = cp_groups['nongerman'] == 1
#nongerman_male_mask = cp_groups['nongerman_male'] == 1
#nongerman_female_mask = cp_groups['nongerman_female'] == 1
#
## Create a dictionary of subgroups
#subgroups = {
#    "frau1": frau1_mask,
#    "nongerman": nongerman_mask,
#    "nongerman_male": nongerman_male_mask,
#    "nongerman_female": nongerman_female_mask
#}

subgroups = {
    "frau1": (cp_groups2["frau1"] == 1).values,
    "nongerman": (cp_groups2["nongerman"] == 1).values,
    "nongerman_male": (cp_groups2["nongerman_male"] == 1).values,
    "nongerman_female": (cp_groups2["nongerman_female"] == 1).values
}

# Example usage:
results = {}
for name, mask in subgroups.items():
    metrics = compute_confusion_metrics(pred_sets_filtered, y_test_filtered, mask)
    if metrics:
        results[name] = metrics

# Print nicely
df_results = pd.DataFrame(results).T
df_results.index.name = "Subgroup"
display(df_results)
