In [None]:
import numpy as np
import pandas as pd
from pathlib import Path

from sklearn.model_selection import train_test_split, StratifiedKFold, cross_val_predict
from sklearn.metrics import (
    precision_score,
    recall_score,
    f1_score,
    roc_auc_score,
    average_precision_score,
    confusion_matrix,
    ConfusionMatrixDisplay,
    RocCurveDisplay,
    PrecisionRecallDisplay,
    precision_recall_curve,
    classification_report
)
from sklearn.ensemble import GradientBoostingClassifier

import matplotlib.pyplot as plt
import seaborn as sns

pd.options.display.float_format = "{:,.3f}".format
sns.set_style("whitegrid")

In [None]:
# Load cleaned training data
train_path = Path("../data/Train_Cleaned.csv")
df = pd.read_csv(train_path)

# Encode target
raw_target = df["PotentialFraud"]
encoded_target = pd.to_numeric(raw_target.replace({"Yes": 1, "No": 0}), errors="coerce")
if encoded_target.isna().any():
    raise ValueError("Unexpected labels in PotentialFraud; extend mapping to proceed.")
y = encoded_target.astype(int)

# Features exclude label and provider identifier (if present)
feature_cols = [c for c in df.columns if c not in ("PotentialFraud", "Provider")]
X = df[feature_cols]

# Stratified 60/20/20 split
X_temp, X_test, y_temp, y_test = train_test_split(
    X, y, test_size=0.2, stratify=y, random_state=42
)
X_train, X_val, y_train, y_val = train_test_split(
    X_temp, y_temp, test_size=0.25, stratify=y_temp, random_state=42
)

print(f"Train: {X_train.shape}, Val: {X_val.shape}, Test: {X_test.shape}")
print("Label distribution (positive rate):")
for name, series in {"train": y_train, "val": y_val, "test": y_test}.items():
    print(f"  {name:<5}: {series.mean():.3f} ({series.sum()} / {len(series)})")

In [None]:
# Metric helpers

def summarize_metrics(y_true, preds, probs):
    return {
        "precision": precision_score(y_true, preds, zero_division=0),
        "recall": recall_score(y_true, preds, zero_division=0),
        "f1": f1_score(y_true, preds, zero_division=0),
        "roc_auc": roc_auc_score(y_true, probs),
        "pr_auc": average_precision_score(y_true, probs)
    }


def metric_row(split, y_true, preds, probs):
    return {"split": split, **summarize_metrics(y_true, preds, probs)}


def sweep_thresholds(y_true, probs, thresholds, fp_cost=1_000, fn_cost=10_000):
    rows = []
    for t in thresholds:
        preds = (probs >= t).astype(int)
        tn, fp, fn, tp = confusion_matrix(y_true, preds, labels=[0, 1]).ravel()
        metrics = summarize_metrics(y_true, preds, probs)
        rows.append({
            "threshold": round(float(t), 4),
            **metrics,
            "fp": fp,
            "fn": fn,
            "expected_cost": fp * fp_cost + fn * fn_cost
        })
    return pd.DataFrame(rows)

In [None]:
# 5-fold stratified CV on the dev set (train + val)
gb_model = GradientBoostingClassifier(random_state=42)
dev_X = pd.concat([X_train, X_val])
dev_y = pd.concat([y_train, y_val])

skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
cv_probs = cross_val_predict(
    gb_model,
    dev_X,
    dev_y,
    cv=skf,
    method="predict_proba",
    n_jobs=-1
)[:, 1]
cv_preds = (cv_probs >= 0.5).astype(int)

cv_metrics = metric_row("cv_5fold", dev_y, cv_preds, cv_probs)
pd.DataFrame([cv_metrics]).set_index("split")

In [None]:
# Fit on training set and tune threshold on validation
val_model = GradientBoostingClassifier(random_state=42)
val_model.fit(X_train, y_train)
val_probs = val_model.predict_proba(X_val)[:, 1]

prec, rec, thresholds = precision_recall_curve(y_val, val_probs)
f1_scores = 2 * prec * rec / (prec + rec + 1e-9)
best_idx = int(np.argmax(f1_scores[:-1]))  # last precision/recall pair has no threshold
optimal_threshold = float(thresholds[best_idx])

val_preds = (val_probs >= optimal_threshold).astype(int)
val_metrics = metric_row("validation", y_val, val_preds, val_probs)
print(f"Optimal threshold from validation PR curve: {optimal_threshold:.3f}")

# Refit on train+val and evaluate on held-out test
gb_model.fit(dev_X, dev_y)
test_probs = gb_model.predict_proba(X_test)[:, 1]
test_preds = (test_probs >= optimal_threshold).astype(int)
test_metrics = metric_row("test", y_test, test_preds, test_probs)

metric_table = pd.DataFrame([cv_metrics, val_metrics, test_metrics]).set_index("split")
print("Precision/Recall/F1/ROC-AUC/PR-AUC summary")
display(metric_table)

print("Classification report (test):")
print(classification_report(y_test, test_preds, zero_division=0))

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(10, 4))
ConfusionMatrixDisplay.from_predictions(y_val, val_preds, cmap="Blues", ax=axes[0])
axes[0].set_title("Validation confusion matrix")
ConfusionMatrixDisplay.from_predictions(y_test, test_preds, cmap="Blues", ax=axes[1])
axes[1].set_title("Test confusion matrix")
plt.tight_layout()
plt.show()

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
RocCurveDisplay.from_predictions(y_test, test_probs, name="GB", ax=axes[0])
axes[0].set_title("ROC curve (test)")
PrecisionRecallDisplay.from_predictions(y_test, test_probs, name="GB", ax=axes[1])
axes[1].set_title("Precisionâ€“Recall curve (test)")
plt.tight_layout()
plt.show()

In [None]:
FP_COST = 1_000   # adjust to actual investigation cost
FN_COST = 10_000  # adjust to expected loss from a missed fraud case

threshold_grid = np.linspace(0.05, 0.95, 19)
cost_table = sweep_thresholds(y_val, val_probs, threshold_grid, fp_cost=FP_COST, fn_cost=FN_COST)

best_cost_row = cost_table.sort_values("expected_cost").iloc[0]
print("Best cost-weighted threshold:")
display(best_cost_row.to_frame().T)

plt.figure(figsize=(10, 4))
sns.lineplot(data=cost_table, x="threshold", y="expected_cost", marker="o")
plt.axvline(best_cost_row["threshold"], color="red", linestyle="--", label="Min cost threshold")
plt.title("Expected cost by threshold (validation)")
plt.legend()
plt.show()

In [None]:
analysis_df = X_test.copy()
analysis_df["y_true"] = y_test.values
analysis_df["pred"] = test_preds
analysis_df["proba"] = test_probs
analysis_df["error_type"] = np.select(
    [
        (analysis_df["pred"] == 1) & (analysis_df["y_true"] == 0),
        (analysis_df["pred"] == 0) & (analysis_df["y_true"] == 1)
    ],
    ["False Positive", "False Negative"],
    default="Correct"
)

ref_mean = X_train.mean()
ref_std = X_train.std().replace(0, np.nan)


def top_drivers(row, top_n=5):
    z = ((row - ref_mean) / ref_std).abs().sort_values(ascending=False).head(top_n)
    return "; ".join([f"{idx} (z={val:.1f})" for idx, val in z.items()])

fp_cases = analysis_df.loc[analysis_df["error_type"] == "False Positive"].copy()
fn_cases = analysis_df.loc[analysis_df["error_type"] == "False Negative"].copy()

fp_cases["drivers"] = fp_cases[feature_cols].apply(top_drivers, axis=1)
fn_cases["drivers"] = fn_cases[feature_cols].apply(top_drivers, axis=1)

print("False positives (top 3 by fraud probability):")
display(fp_cases.sort_values("proba", ascending=False)[["proba", "drivers"]].head(3))

print("False negatives (top 3 by lowest fraud probability among actual frauds):")
display(fn_cases.sort_values("proba", ascending=True)[["proba", "drivers"]].head(3))

print("Error mix on held-out test split:")
display(analysis_df["error_type"].value_counts())