In [1]:
# Your existing imports
import numpy as np
import pandas as pd
import re
import matplotlib.pyplot as plt
from IPython.display import display
from sklearn.metrics import precision_recall_curve, roc_curve, auc
from sklearn.metrics import f1_score, precision_score, recall_score
from sklearn.metrics import roc_auc_score  # Add this import

# =============================================================================
# STEP 1: LOAD YOUR DATA
# =============================================================================

# Load your dataset
df = pd.read_csv('dataset/ed/finals/16_finalwithsigmoid.csv')

print("Dataset loaded successfully!")
print(f"Dataset shape: {df.shape}")
print(f"Columns: {list(df.columns)}")
print()

# =============================================================================
# STEP 2: DATA CLEANING
# =============================================================================

print("\n=== DATA PREPARATION FOR THRESHOLD ANALYSIS ===")
print("Filtering to rows with valid predictions (non-missing mortality_prob and label)...")
print(f"Before filtering: {len(df):,} rows")

# Keep only rows with predictions & labels
df_clean = df.dropna(subset=['mortality_prob', 'died_within_30_days']).copy()

# Coerce types & clip probabilities to valid range [0,1]
df_clean['mortality_prob'] = pd.to_numeric(df_clean['mortality_prob'], errors='coerce').clip(0, 1)
df_clean['died_within_30_days'] = pd.to_numeric(df_clean['died_within_30_days'], errors='coerce').astype(int)

# Drop any rows that may have turned NaN after coercion
df_clean = df_clean.dropna(subset=['mortality_prob', 'died_within_30_days'])

# Also fix dtype for symptom column if it exists
if 'symptom' in df_clean.columns:
    df_clean['symptom'] = pd.to_numeric(df_clean['symptom'], errors='coerce').astype('Int64')

print(f"After filtering: {len(df_clean):,} rows with predictions")


Dataset loaded successfully!
Dataset shape: (591892, 33)
Columns: ['subject_id', 'stay_id', 'intime', 'outtime', 'gender', 'race', 'chiefcomplaint', 'anchor_age', 'anchor_year', 'anchor_year_group', 'dod', 'dead_in_days', 'died_within_30_days', 'race_standard', 'age_group', 'unique_visit_id', 'terms', 'terms_new', 'indiv_symptom', 'counter', 'unique_ids_exploded', 'expanded_symptoms', 'expanded_symptoms_new', 'counter_new', 'unique_ids_exploded_new', 'snomed', 'is_male', 'ed_age', 'symptom', 'mu', 'sigma', 'mortality_prob', 'mortality_percent']


=== DATA PREPARATION FOR THRESHOLD ANALYSIS ===
Filtering to rows with valid predictions (non-missing mortality_prob and label)...
Before filtering: 591,892 rows
After filtering: 232,895 rows with predictions


In [None]:
# ============================
# CALIBRATION CHECKER (MEASUREMENT ONLY, WITH "WHY IT'S GOOD" NOTES)
# ============================
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from typing import Dict, Tuple, Optional
from sklearn.linear_model import LogisticRegression

# ---------- Utility ----------
def _safe_logit(p: np.ndarray, eps: float = 1e-6) -> np.ndarray:
    """
    Convert probabilities p in (0,1) to logits log(p/(1-p)).
    Clipping avoids infinities when p is 0 or 1.
    Why it’s good: using logits makes linear calibration modeling stable and numerically well-behaved.
    """
    p = np.clip(np.asarray(p, dtype=float), eps, 1 - eps)
    return np.log(p / (1 - p))

# ---------- Core calibration metrics ----------
def calibration_slope_intercept(y: np.ndarray, p: np.ndarray) -> Tuple[float, float]:
    """
    Estimate calibration-in-the-large (intercept) and calibration slope (slope) by fitting:
      logit(E[Y]) = a + b * logit(p).
    Ideal: intercept a ≈ 0, slope b ≈ 1. Returns (a, b); (nan, nan) if y has one class.
    Why it’s good: data-efficient summary used widely in clinical prediction; quickly tells if risks are globally shifted (a) or over/under-confident (b).
    """
    y = np.asarray(y, dtype=int)
    if len(np.unique(y)) < 2:
        return float("nan"), float("nan")
    z = _safe_logit(p).reshape(-1, 1)
    lr = LogisticRegression(solver="lbfgs", max_iter=1000)
    lr.fit(z, y)
    a = float(lr.intercept_[0])
    b = float(lr.coef_[0][0])
    return a, b

def ece_mce(y: np.ndarray, p: np.ndarray, n_bins: int = 15) -> Tuple[float, float]:
    """
    Expected Calibration Error (ECE) and Maximum Calibration Error (MCE) via uniform-probability bins.
    - ECE: weighted average |observed - predicted| per bin.
    - MCE: largest absolute gap across bins. Smaller is better.
    Why it’s good: ECE/MCE give intuitive, threshold-free calibration gaps that are easy to explain and visualize.
    """
    y = np.asarray(y, dtype=int)
    p = np.clip(np.asarray(p, dtype=float), 0, 1)
    bins = np.linspace(0.0, 1.0, n_bins + 1)
    idx = np.digitize(p, bins) - 1
    ece = 0.0
    mce = 0.0
    N = len(y)
    for b in range(n_bins):
        m = idx == b
        if not np.any(m):
            continue
        acc = y[m].mean()      # observed rate in the bin
        conf = p[m].mean()     # average predicted in the bin
        gap = abs(acc - conf)
        ece += gap * (m.sum() / N)
        mce = max(mce, gap)
    return float(ece), float(mce)

def brier(y: np.ndarray, p: np.ndarray) -> float:
    """
    Brier score = mean squared error between probability predictions and outcomes.
    Range [0,1], smaller is better.
    Why it’s good: a strictly proper scoring rule that rewards well-calibrated, sharp probabilities in one number.
    """
    y = np.asarray(y, dtype=float)
    p = np.clip(np.asarray(p, dtype=float), 0, 1)
    return float(np.mean((p - y) ** 2))

def log_loss_nll(y: np.ndarray, p: np.ndarray, eps: float = 1e-12) -> float:
    """
    Negative log-likelihood (log loss) of probabilistic predictions.
    Penalizes confident wrong predictions more. Smaller is better.
    Why it’s good: a gold-standard proper scoring rule aligned with maximum likelihood; sensitive to overconfidence.
    """
    y = np.asarray(y, dtype=float)
    p = np.clip(np.asarray(p, dtype=float), eps, 1 - eps)
    return float(-np.mean(y * np.log(p) + (1 - y) * np.log(1 - p)))

def mean_pred_vs_obs(y: np.ndarray, p: np.ndarray) -> Tuple[float, float, float]:
    """
    Compare average predicted risk to observed prevalence:
    - mean_pred      : E[p]  (expected event rate across patients)
    - observed_prev  : E[y]  (observed event rate / prevalence)
    - E_over_O ratio : mean_pred / observed_prev  (≈1 is ideal)
      >1 indicates global overestimation; <1 indicates underestimation.
    Why it’s good: a simple, clinician-friendly “sanity check” for calibration-in-the-large before deeper analysis.
    """
    p_mean = float(np.mean(p))
    obs = float(np.mean(y))
    e_over_o = (p_mean / obs) if obs > 0 else float("inf")
    return p_mean, obs, e_over_o

def hosmer_lemeshow(y: np.ndarray, p: np.ndarray, g: int = 10) -> Dict[str, float]:
    """
    Hosmer–Lemeshow goodness-of-fit test (grouped by predicted-risk deciles).
    Returns {'HL_X2': chi-square, 'df': degrees_of_freedom}. Interpret cautiously (sample-size sensitive).
    Why it’s good: a classic omnibus check that can flag severe miscalibration patterns; use alongside ECE and plots.
    """
    order = np.argsort(p)
    y_sorted = y[order]
    p_sorted = p[order]
    indices = np.array_split(np.arange(len(y)), g)
    chi2 = 0.0
    dof = g - 2
    for idx in indices:
        og = y_sorted[idx].sum()     # observed events in group
        eg = p_sorted[idx].sum()     # expected events in group
        ng = len(idx)
        var = eg * (1 - (eg / max(ng, 1)))  # simple variance approx on counts
        if var > 0:
            chi2 += (og - eg) ** 2 / var
    return {"HL_X2": float(chi2), "df": int(dof)}

# ---------- Plot (optional visualization) ----------
def plot_reliability(y: np.ndarray, p: np.ndarray, n_bins: int = 15, title: str = "Reliability diagram") -> None:
    """
    Reliability diagram: observed event rate vs mean predicted probability per bin.
    Perfect calibration lies on the diagonal (y = x).
    Why it’s good: fast visual diagnosis of under/overestimation and nonlinear calibration issues.
    """
    y = np.asarray(y, dtype=int)
    p = np.asarray(p, dtype=float)
    bins = np.linspace(0, 1, n_bins + 1)
    idx = np.digitize(p, bins) - 1
    xs, ys = [], []
    for b in range(n_bins):
        m = idx == b
        if not np.any(m):
            continue
        xs.append(p[m].mean())
        ys.append(y[m].mean())
    xs, ys = np.array(xs), np.array(ys)
    plt.figure()
    plt.plot([0, 1], [0, 1], linestyle="--", label="Perfect")
    if len(xs) > 0:
        plt.plot(xs, ys, marker="o", label="Observed")
    plt.xlabel("Mean predicted probability (per bin)")
    plt.ylabel("Observed event rate (per bin)")
    plt.title(title)
    plt.legend()
    plt.show()

# ---------- DataFrame-based wrappers ----------
def calibration_metrics_block(
    df: pd.DataFrame,
    prob_col: str = "mortality_prob",
    y_col: str = "died_within_30_days",
    n_bins: int = 15
) -> Dict[str, float]:
    """
    Compute a standard set of calibration metrics for a DataFrame block:
      mean_pred, observed_prev, E_over_O, intercept/slope, ECE/MCE, Brier, NLL, HL (if big enough).
    Returns a dict of named metrics.
    Why it’s good: one call gives you a complete calibration snapshot that’s easy to table and compare.
    """
    p = np.clip(df[prob_col].astype(float).values, 0, 1)
    y = df[y_col].astype(int).values
    a, b = calibration_slope_intercept(y, p)
    ece, mce = ece_mce(y, p, n_bins=n_bins)
    br = brier(y, p)
    nll = log_loss_nll(y, p)
    p_mean, obs, e_over_o = mean_pred_vs_obs(y, p)
    hl = hosmer_lemeshow(y, p, g=min(10, max(5, int(np.sqrt(len(df)))))) if len(df) >= 1000 else {"HL_X2": np.nan, "df": np.nan}
    return {
        "n": int(len(df)),
        "events": int(y.sum()),
        "mean_pred": p_mean,
        "observed_prev": obs,
        "E_over_O": e_over_o,
        "cal_intercept": a,   # target ~ 0
        "cal_slope": b,       # target ~ 1
        "ECE": ece,
        "MCE": mce,
        "Brier": br,
        "NLL": nll,
        "HL_X2": hl["HL_X2"],
        "HL_df": hl["df"]
    }

def calibration_summary(
    df: pd.DataFrame,
    prob_col: str = "mortality_prob",
    y_col: str = "died_within_30_days",
    group_col: Optional[str] = None,
    n_bins: int = 15
) -> pd.DataFrame:
    """
    Compute calibration metrics overall or grouped by `group_col`.
    Returns a DataFrame indexed by group (or 'OVERALL').
    Why it’s good: makes apples-to-apples calibration comparisons across gender/race/intersections trivial.
    """
    if group_col is None:
        m = calibration_metrics_block(df, prob_col, y_col, n_bins)
        return pd.DataFrame([m], index=["OVERALL"])
    out = []
    for g, sub in df.groupby(group_col):
        m = calibration_metrics_block(sub, prob_col, y_col, n_bins)
        m["group"] = g
        out.append(m)
    return pd.DataFrame(out).set_index("group").sort_index()

# ---------- Recommendation logic ----------
def recommend_calibration(
    overall: Dict[str, float],
    by_gender: Optional[pd.DataFrame] = None,
    by_race: Optional[pd.DataFrame] = None,
    min_events_per_group: int = 200
) -> str:
    """
    Rule-based recommendation for calibration correction, based on intercept/slope/ECE and group consistency.
    Returns a short human-readable recommendation string.
    Why it’s good: gives you an actionable next step (e.g., logistic vs isotonic vs group-wise) without guesswork.
    """
    msgs = []
    a = overall.get("cal_intercept", np.nan)
    b = overall.get("cal_slope", np.nan)
    ece = overall.get("ECE", np.nan)

    intercept_bad = (not np.isnan(a)) and (abs(a) > 0.1)
    slope_bad     = (not np.isnan(b)) and (abs(b - 1.0) > 0.1)
    ece_bad       = (not np.isnan(ece)) and (ece > 0.03)

    if not intercept_bad and not slope_bad and not ece_bad:
        msgs.append("Overall calibration looks acceptable (intercept≈0, slope≈1, ECE small). No correction strictly needed.")
    else:
        if intercept_bad and not slope_bad:
            msgs.append("Calibration-in-the-large issue (intercept far from 0). → Intercept-only update or logistic recalibration.")
        if slope_bad:
            if b < 1:
                msgs.append("Slope < 1 (predictions too extreme). → Logistic recalibration or temperature scaling.")
            else:
                msgs.append("Slope > 1 (predictions not extreme enough). → Logistic recalibration.")
        if ece_bad:
            msgs.append("ECE is high (likely nonlinear miscalibration). → If enough data, isotonic; else beta calibration.")

    def _group_flag(df, label):
        if df is None or len(df) == 0:
            return
        # Only consider groups with enough events (avoid noisy judgments)
        df2 = df[df["events"] >= min_events_per_group]
        if df2.empty:
            msgs.append(f"{label}: many groups have few events; prefer simpler calibrators (logistic) or pool groups.")
            return
        ia = df2["cal_intercept"].astype(float)
        ib = df2["cal_slope"].astype(float)
        ie = df2["ECE"].astype(float)
        if (ia.max() - ia.min()) > 0.2 or (ib.max() - ib.min()) > 0.3 or (ie.max() - ie.min()) > 0.03:
            msgs.append(f"{label}: notable between-group calibration differences. → Consider group-wise logistic recalibration (or multicalibration).")
        else:
            msgs.append(f"{label}: group calibration appears fairly consistent.")

    _group_flag(by_gender, "Gender")
    _group_flag(by_race, "Race")

    # Primary recommendation summary
    if intercept_bad and not slope_bad and not ece_bad:
        primary = "Intercept-only update (recalibrate baseline risk)."
    elif slope_bad and not ece_bad:
        primary = "Logistic recalibration (Platt-style) or temperature scaling."
    elif ece_bad:
        primary = "Isotonic regression (if data-rich) or Beta calibration (if moderate data)."
    else:
        primary = "No correction needed; monitor with temporal validation."

    msgs.append(f"Primary recommendation: {primary}")
    return "\n".join(msgs)

# ---------- One-call runner ----------
def run_calibration_check(
    df_clean: pd.DataFrame,
    prob_col: str = "mortality_prob",
    y_col: str = "died_within_30_days",
    n_bins: int = 15,
    do_plots: bool = True
) -> Dict[str, pd.DataFrame]:
    """
    Run calibration measurement overall, by gender, by race, and by gender×race.
    Prints tables + optional reliability plots, and returns a dict of DataFrames.
    Why it’s good: a turnkey “calibration report” you can drop into your pipeline and cite in your thesis.
    """
    # Build intersectional column if available
    if "gender" in df_clean.columns and "race_standard" in df_clean.columns:
        df_clean = df_clean.copy()
        df_clean["gender_race"] = df_clean["gender"].astype(str) + "|" + df_clean["race_standard"].astype(str)

    # Compute summaries
    overall = calibration_summary(df_clean, prob_col, y_col, group_col=None, n_bins=n_bins)
    by_gender = calibration_summary(df_clean, prob_col, y_col, group_col="gender", n_bins=n_bins) if "gender" in df_clean.columns else pd.DataFrame()
    by_race = calibration_summary(df_clean, prob_col, y_col, group_col="race_standard", n_bins=n_bins) if "race_standard" in df_clean.columns else pd.DataFrame()
    by_ix = calibration_summary(df_clean, prob_col, y_col, group_col="gender_race", n_bins=n_bins) if "gender_race" in df_clean.columns else pd.DataFrame()

    # Optional reliability plots
    if do_plots:
        y = df_clean[y_col].astype(int).values
        p = np.clip(df_clean[prob_col].astype(float).values, 0, 1)
        plot_reliability(y, p, n_bins=n_bins, title="Reliability: OVERALL")
        if not by_gender.empty:
            for g, sub in df_clean.groupby("gender"):
                plot_reliability(sub[y_col].astype(int).values, sub[prob_col].astype(float).values,
                                 n_bins=n_bins, title=f"Reliability: gender={g}")
        if not by_race.empty:
            for r, sub in df_clean.groupby("race_standard"):
                plot_reliability(sub[y_col].astype(int).values, sub[prob_col].astype(float).values,
                                 n_bins=n_bins, title=f"Reliability: race={r}")

    # Recommendation text
    rec_txt = recommend_calibration(overall.iloc[0].to_dict(),
                                    by_gender if not by_gender.empty else None,
                                    by_race if not by_race.empty else None)

    # Print summaries
    print("\n=== CALIBRATION: OVERALL ===")
    print(overall.round(4).to_string())
    if not by_gender.empty:
        print("\n=== CALIBRATION: BY GENDER ===")
        print(by_gender.round(4).to_string())
    if not by_race.empty:
        print("\n=== CALIBRATION: BY RACE ===")
        print(by_race.round(4).to_string())
    if not by_ix.empty:
        print("\n=== CALIBRATION: BY GENDER×RACE (intersection) ===")
        print(by_ix.round(4).to_string())

    print("\n=== RECOMMENDATION ===")
    print(rec_txt)

    return {
        "overall": overall,
        "by_gender": by_gender,
        "by_race": by_race,
        "by_gender_race": by_ix,
        "recommendation": rec_txt
    }


In [None]:
# =============================================================================
# STEP 3: FAIRNESS/BIAS AUDIT HELPERS 
# =============================================================================
from typing import Dict, Iterable, Any, Tuple
from sklearn.metrics import roc_curve, roc_auc_score, confusion_matrix, brier_score_loss

def ensure_binary(y: pd.Series) -> np.ndarray:
    yv = pd.to_numeric(y, errors="coerce").astype("Int64").fillna(0).astype(int).values
    bad = set(np.unique(yv)) - {0, 1}
    if bad:
        raise ValueError(f"died_within_30_days must be 0/1. Found {bad}.")
    return yv




def ece_score(y_true: np.ndarray, y_prob: np.ndarray, n_bins: int = 15) -> float:
    y_prob = np.clip(y_prob.astype(float), 0, 1)
    bins = np.linspace(0, 1, n_bins + 1)
    idx = np.digitize(y_prob, bins) - 1
    ece = 0.0
    n = len(y_true)
    for b in range(n_bins):
        m = idx == b
        if not np.any(m):
            continue
        acc = y_true[m].mean()
        conf = y_prob[m].mean()
        ece += np.abs(acc - conf) * (m.sum() / n)
    return float(ece)


def calib_points(y_true: np.ndarray, y_prob: np.ndarray, n_bins: int = 15) -> Tuple[np.ndarray, np.ndarray]:
    y_prob = np.clip(y_prob.astype(float), 0, 1)
    bins = np.linspace(0, 1, n_bins + 1)
    idx = np.digitize(y_prob, bins) - 1
    xs, ys = [], []
    for b in range(n_bins):
        m = idx == b
        if not np.any(m): 
            continue
        xs.append(y_prob[m].mean())
        ys.append(y_true[m].mean())
    return np.array(xs), np.array(ys)



def rates_at_threshold(y_true: np.ndarray, y_prob: np.ndarray, thr: float) -> Dict[str, float]:
    y_pred = (y_prob >= thr).astype(int)
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred, labels=[0,1]).ravel()
    tpr = tp / (tp + fn) if (tp + fn) else 0.0
    fpr = fp / (fp + tn) if (fp + tn) else 0.0
    tnr = 1 - fpr
    fnr = 1 - tpr
    ppv = tp / (tp + fp) if (tp + fp) else 0.0
    npv = tn / (tn + fn) if (tn + fn) else 0.0
    prev = (tp + fn) / (tp + fp + tn + fn) if (tp + fp + tn + fn) else 0.0
    return {"TPR": tpr, "FPR": fpr, "TNR": tnr, "FNR": fnr, "PPV": ppv, "NPV": npv, "Prevalence": prev,
            "TP": tp, "FP": fp, "TN": tn, "FN": fn}


# ---- “maximize X” rules --------------------------------------------------
def youdens_j_threshold(y_true: np.ndarray, y_prob: np.ndarray) -> float:
    fpr, tpr, thr = roc_curve(y_true, y_prob)
    j = tpr - fpr
    return float(thr[np.argmax(j)])



def auc_by_group(df: pd.DataFrame, group_col: str) -> pd.DataFrame:
    rows = []
    for g, sub in df.groupby(group_col):
        y = ensure_binary(sub["died_within_30_days"])
        p = sub["mortality_prob"].astype(float).values
        auc_val = np.nan if len(np.unique(y)) < 2 else roc_auc_score(y, p)
        rows.append({"group": g, "AUC": auc_val, "n": len(sub)})
    return pd.DataFrame(rows).set_index("group")



def brier_by_group(df: pd.DataFrame, group_col: str) -> pd.DataFrame:
    rows = []
    for g, sub in df.groupby(group_col):
        y = ensure_binary(sub["died_within_30_days"]).astype(float)
        p = sub["mortality_prob"].astype(float).values
        rows.append({"group": g, "Brier": brier_score_loss(y, p), "n": len(sub)})
    return pd.DataFrame(rows).set_index("group")


def ece_by_group(df: pd.DataFrame, group_col: str, n_bins: int = 15) -> pd.DataFrame:
    rows = []
    for g, sub in df.groupby(group_col):
        y = ensure_binary(sub["died_within_30_days"]).astype(int)
        p = sub["mortality_prob"].astype(float).values
        rows.append({"group": g, "ECE": ece_score(y, p, n_bins=n_bins), "n": len(sub)})
    return pd.DataFrame(rows).set_index("group")



def base_rate_by_group(df: pd.DataFrame, group_col: str) -> pd.DataFrame:
    rows = []
    for g, sub in df.groupby(group_col):
        rows.append({"group": g, "Prevalence": float(sub["died_within_30_days"].mean()), "n": len(sub)})
    return pd.DataFrame(rows).set_index("group")







                     

In [None]:
# other threshold functions 
    '''
    (quick guide)
    - Clinical sensitivity target (avoid missed deaths): threshold_target_tpr (e.g., TPR ≥ 0.90).
    - Resource cap (e.g., only top 10% get extra tests): threshold_top_k.
    - Balanced errors: threshold_youden_j or threshold_max_f1.
    - Cost asymmetry (FN >> FP): bayes_threshold_from_costs (if well-calibrated) or threshold_min_expected_cost (agnostic).
    - Fairness—global policy: threshold_min_eo_gap or threshold_min_eqopp_gap.
    - Fairness—group-specific policy: group_thresholds_for_target_tpr / …_fpr.
    '''

# =============================================================================
# STEP 3B: THRESHOLD SELECTION HELPERS (copy/paste this block)_- Thresholdsssssssssssssssssssssssssssssssssssss
# =============================================================================
import numpy as np
import pandas as pd
from typing import Dict, Tuple, Iterable, Any
from sklearn.metrics import confusion_matrix, roc_curve, roc_auc_score

# ---- Basic utilities --------------------------------------------------------
def _ensure_binary(y: pd.Series) -> np.ndarray:
    yv = pd.to_numeric(y, errors="coerce").fillna(0).astype(int).values
    if not set(np.unique(yv)).issubset({0,1}):
        raise ValueError("died_within_30_days must be 0/1.")
    return yv

def _confusion(y_true: np.ndarray, y_prob: np.ndarray, thr: float) -> Tuple[int,int,int,int]:
    y_pred = (y_prob >= thr).astype(int)
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred, labels=[0,1]).ravel()
    return tp, fp, tn, fn

def _metrics(tp:int, fp:int, tn:int, fn:int) -> Dict[str, float]:
    tpr = tp / (tp + fn) if (tp + fn) else 0.0
    fpr = fp / (fp + tn) if (fp + tn) else 0.0
    tnr = 1 - fpr
    fnr = 1 - tpr
    ppv = tp / (tp + fp) if (tp + fp) else 0.0
    npv = tn / (tn + fn) if (tn + fn) else 0.0
    prec, rec = ppv, tpr
    f1 = 2*prec*rec/(prec+rec) if (prec+rec) else 0.0
    return {"TPR": tpr, "FPR": fpr, "TNR": tnr, "FNR": fnr, "PPV": ppv, "NPV": npv, "F1": f1}

def _candidate_thresholds(y_prob: np.ndarray, n: int = 512) -> np.ndarray:
    lo, hi = float(np.min(y_prob)), float(np.max(y_prob))
    uniq = np.unique(y_prob)
    if len(uniq) <= n:
        # extend ends slightly to include all decision flips
        return np.concatenate(([lo-1e-12], uniq, [hi+1e-12]))
    return np.linspace(lo, hi, n)

def scan_thresholds(y_true: np.ndarray, y_prob: np.ndarray, candidates: np.ndarray = None) -> pd.DataFrame:
    if candidates is None:
        candidates = _candidate_thresholds(y_prob)
    rows = []
    for thr in candidates:
        tp, fp, tn, fn = _confusion(y_true, y_prob, thr)
        m = _metrics(tp, fp, tn, fn)
        m.update({"threshold": float(thr), "TP": tp, "FP": fp, "TN": tn, "FN": fn})
        rows.append(m)
    return pd.DataFrame(rows).sort_values("threshold").reset_index(drop=True)

# ---- “maximize X” rules -----------------------------------------------------
def threshold_youden_j(y_true: np.ndarray, y_prob: np.ndarray) -> float:
    fpr, tpr, thr = roc_curve(y_true, y_prob)
    j = tpr - fpr
    return float(thr[np.argmax(j)])

def threshold_max_f1(y_true: np.ndarray, y_prob: np.ndarray) -> float:
    tbl = scan_thresholds(y_true, y_prob)
    i = int(tbl["F1"].values.argmax())
    return float(tbl.loc[i, "threshold"])

def threshold_max_fbeta(y_true: np.ndarray, y_prob: np.ndarray, beta: float = 2.0) -> float:
    tbl = scan_thresholds(y_true, y_prob)
    prec, rec = tbl["PPV"].values, tbl["TPR"].values
    fbeta = (1+beta**2) * (prec*rec) / (beta**2*prec + rec + 1e-12)
    return float(tbl.loc[int(fbeta.argmax()), "threshold"])

# ---- Resource / policy constraints -----------------------------------------
def threshold_top_k(y_prob: np.ndarray, k: float) -> float:
    """
    Select top k fraction as positive. k=0.10 => label top 10% highest risk as positive.
    Returns the score cutoff (inclusive).
    """
    assert 0 < k < 1, "k should be in (0,1)"
    return float(np.quantile(y_prob, 1 - k))

def threshold_target_tpr(y_true: np.ndarray, y_prob: np.ndarray, target_tpr: float) -> float:
    """
    Smallest threshold that achieves TPR >= target_tpr (maximizes specificity subject to sensitivity).
    """
    fpr, tpr, thr = roc_curve(y_true, y_prob)
    ok = np.where(tpr >= target_tpr)[0]
    if len(ok) == 0:
        return float(thr[np.argmax(tpr)])  # highest achievable TPR
    return float(thr[ok[0]])

def threshold_target_fpr(y_true: np.ndarray, y_prob: np.ndarray, target_fpr: float) -> float:
    """
    Largest threshold with FPR <= target_fpr (maximizes TPR subject to specificity).
    """
    fpr, tpr, thr = roc_curve(y_true, y_prob)
    ok = np.where(fpr <= target_fpr)[0]
    if len(ok) == 0:
        return float(thr[np.argmin(fpr)])  # lowest achievable FPR
    # among ok, pick the one with highest TPR
    best = ok[np.argmax(tpr[ok])]
    return float(thr[best])

def threshold_target_ppv(y_true: np.ndarray, y_prob: np.ndarray, target_ppv: float) -> float:
    """
    Smallest threshold that yields PPV >= target_ppv.
    Note: PPV is not monotone in threshold, so we scan.
    """
    tbl = scan_thresholds(y_true, y_prob)
    ok = tbl.index[tbl["PPV"] >= target_ppv].tolist()
    return float(tbl.loc[ok[0], "threshold"]) if ok else float(tbl.loc[tbl["PPV"].idxmax(), "threshold"])

# ---- Cost/utility based -----------------------------------------------------
#      Bayes (cost-based) (calibrated)
def bayes_threshold_from_costs(prevalence: float, cost_fn: float = 5.0, cost_fp: float = 1.0) -> float:
    """
    Bayes decision rule assuming calibrated probabilities:
    predict 1 if p >= τ*, where τ* = C_FP*(1-π) / [C_FN*π + C_FP*(1-π)]
    """
    pi = float(prevalence)
    return float((cost_fp*(1 - pi)) / (cost_fn*pi + cost_fp*(1 - pi)))

def threshold_min_expected_cost(y_true: np.ndarray, y_prob: np.ndarray, cost_fn: float = 5.0, cost_fp: float = 1.0) -> float:
    """
    Choose threshold that minimizes Expected Cost = cost_fn*FN + cost_fp*FP
    (does NOT assume perfect calibration; uses empirical counts).
    """
    tbl = scan_thresholds(y_true, y_prob)
    exp_cost = cost_fn*tbl["FN"].values + cost_fp*tbl["FP"].values
    return float(tbl.loc[int(exp_cost.argmin()), "threshold"])

# ---- Fairness-aware search (global thresholds) ------------------------------
def _rates_by_group(y_true: np.ndarray, y_prob: np.ndarray, groups: Iterable[Any], thr: float) -> pd.DataFrame:
    df = pd.DataFrame({"y": y_true, "p": y_prob, "g": np.array(list(groups))})
    rows = []
    for g, sub in df.groupby("g"):
        tp, fp, tn, fn = _confusion(sub["y"].values, sub["p"].values, thr)
        m = _metrics(tp, fp, tn, fn); m["group"] = g
        rows.append(m)
    return pd.DataFrame(rows).set_index("group")

def threshold_min_eo_gap(y_true: np.ndarray, y_prob: np.ndarray, groups: Iterable[Any], n_candidates: int = 200) -> Tuple[float, pd.DataFrame]:
    """
    Minimize Equalized Odds gaps: argmin_thr [ (max TPR - min TPR) + (max FPR - min FPR) ].
    """
    lo, hi = float(np.min(y_prob)), float(np.max(y_prob))
    candidates = np.linspace(lo, hi, n_candidates)
    best = (None, None, np.inf)
    for thr in candidates:
        per = _rates_by_group(y_true, y_prob, groups, thr)
        gap = (per["TPR"].max() - per["TPR"].min()) + (per["FPR"].max() - per["FPR"].min())
        if float(gap) < best[2]:
            best = (float(thr), per, float(gap))
    return best[0], best[1]

def threshold_min_eqopp_gap(y_true: np.ndarray, y_prob: np.ndarray, groups: Iterable[Any], n_candidates: int = 200) -> Tuple[float, pd.DataFrame]:
    """
    Minimize Equality of Opportunity gap: argmin_thr [ max TPR - min TPR ].
    """
    lo, hi = float(np.min(y_prob)), float(np.max(y_prob))
    candidates = np.linspace(lo, hi, n_candidates)
    best = (None, None, np.inf)
    for thr in candidates:
        per = _rates_by_group(y_true, y_prob, groups, thr)
        gap = per["TPR"].max() - per["TPR"].min()
        if float(gap) < best[2]:
            best = (float(thr), per, float(gap))
    return best[0], best[1]

# ---- Group-specific thresholds (policy choice) ------------------------------
def group_thresholds_for_target_tpr(y_true: np.ndarray, y_prob: np.ndarray, groups: Iterable[Any], target_tpr: float) -> Dict[Any, float]:
    """
    For each group, pick the smallest threshold that achieves TPR >= target_tpr (Eq. Opportunity via group-specific cutoffs).
    """
    df = pd.DataFrame({"y": y_true, "p": y_prob, "g": np.array(list(groups))})
    out = {}
    for g, sub in df.groupby("g"):
        out[g] = threshold_target_tpr(sub["y"].values, sub["p"].values, target_tpr)
    return out

def group_thresholds_for_target_fpr(y_true: np.ndarray, y_prob: np.ndarray, groups: Iterable[Any], target_fpr: float) -> Dict[Any, float]:
    """
    For each group, pick the largest threshold with FPR <= target_fpr (specificity constraint).
    """
    df = pd.DataFrame({"y": y_true, "p": y_prob, "g": np.array(list(groups))})
    out = {}
    for g, sub in df.groupby("g"):
        out[g] = threshold_target_fpr(sub["y"].values, sub["p"].values, target_fpr)
    return out

# =============================================================================
# EXAMPLE USAGE (uncomment and run after Step 2)
# =============================================================================
# y = _ensure_binary(df_clean["died_within_30_days"])
# p = df_clean["mortality_prob"].astype(float).values
# g_gender = df_clean["gender"].astype(str).values
#
# thr_j     = threshold_youden_j(y, p)
# thr_f1    = threshold_max_f1(y, p)
# thr_fb    = threshold_max_fbeta(y, p, beta=2.0)
# thr_tpr90 = threshold_target_tpr(y, p, target_tpr=0.90)
# thr_fpr05 = threshold_target_fpr(y, p, target_fpr=0.05)
# thr_top10 = threshold_top_k(p, k=0.10)
# thr_ppv50 = threshold_target_ppv(y, p, target_ppv=0.50)
#
# # Cost-based (assuming misclassifying a death (FN) is 5x worse than a FP):
# pi = float(y.mean())
# thr_bayes = bayes_threshold_from_costs(prevalence=pi, cost_fn=5.0, cost_fp=1.0)
# thr_min_cost = threshold_min_expected_cost(y, p, cost_fn=5.0, cost_fp=1.0)
#
# # Fairness-aware (global threshold):
# thr_eo, per_eo = threshold_min_eo_gap(y, p, g_gender)
# thr_eqopp, per_eqopp = threshold_min_eqopp_gap(y, p, g_gender)
#
# # Group-specific thresholds (policy choice):
# thr_by_gender_eqopp = group_thresholds_for_target_tpr(y, p, g_gender, target_tpr=0.90)
# thr_by_gender_spec  = group_thresholds_for_target_fpr(y, p, g_gender, target_fpr=0.05)
#
# print("YoudenJ:", thr_j)
# print("Max F1:", thr_f1)
# print("Max Fβ (β=2):", thr_fb)
# print("Target TPR 0.90:", thr_tpr90)
# print("Target FPR 0.05:", thr_fpr05)
# print("Top-10%:", thr_top10)
# print("Target PPV 0.50:", thr_ppv50)
# print("Bayes (cost_fn=5,cost_fp=1, π=%.3f):" % pi, thr_bayes)
# print("Min expected cost:", thr_min_cost)
# print("Min EO gap (global):", thr_eo)
# print("Min EqOpp gap (global):", thr_eqopp)
# print("Per-group EqOpp thresholds:", thr_by_gender_eqopp)
# print("Per-group specificity thresholds:", thr_by_gender_spec)




In [None]:
# calibrations category II >>  Modifying predictions to improve fairness/ calibration_optional 
# If you want to intervention but not necessary for bias detection

# Logistic recalibration on a validation set
import numpy as np, pandas as pd
from sklearn.linear_model import LogisticRegression

def fit_logistic_recal(val_df, prob_col="mortality_prob", y_col="died_within_30_days"):
    p = np.clip(val_df[prob_col].astype(float).values, 1e-6, 1-1e-6)
    z = np.log(p/(1-p))  # logit
    X = z.reshape(-1,1)
    y = val_df[y_col].astype(int).values
    lr = LogisticRegression(solver="lbfgs")
    lr.fit(X, y)
    return lr  # use predict_proba on new logits

def apply_logistic_recal(lr, test_df, prob_col="mortality_prob"):
    p = np.clip(test_df[prob_col].astype(float).values, 1e-6, 1-1e-6)
    z = np.log(p/(1-p)).reshape(-1,1)
    return lr.predict_proba(z)[:,1]

# Isotonic regression (needs enough data)
from sklearn.isotonic import IsotonicRegression

def fit_isotonic(val_df, prob_col="mortality_prob", y_col="died_within_30_days"):
    iso = IsotonicRegression(out_of_bounds="clip")
    p = val_df[prob_col].astype(float).values
    y = val_df[y_col].astype(int).values
    iso.fit(p, y)
    return iso

def apply_isotonic(iso, test_df, prob_col="mortality_prob"):
    return np.clip(iso.predict(test_df[prob_col].astype(float).values), 0, 1)

# Beta calibration (parametric, flexible) - simple logistic on log(p) & log(1-p)
def fit_beta_cal(val_df, prob_col="mortality_prob", y_col="died_within_30_days"):
    p = np.clip(val_df[prob_col].astype(float).values, 1e-6, 1-1e-6)
    X = np.column_stack([np.log(p), np.log(1-p)])
    y = val_df[y_col].astype(int).values
    lr = LogisticRegression(solver="lbfgs")
    lr.fit(X, y)
    return lr

def apply_beta_cal(lr, test_df, prob_col="mortality_prob"):
    p = np.clip(test_df[prob_col].astype(float).values, 1e-6, 1-1e-6)
    X = np.column_stack([np.log(p), np.log(1-p)])
    return lr.predict_proba(X)[:,1]

# Group-wise logistic recalibration (allow different intercept/slope by group)
def fit_group_logistic_recal(val_df, group_col, prob_col="mortality_prob", y_col="died_within_30_days"):
    df = val_df.copy()
    p = np.clip(df[prob_col].astype(float).values, 1e-6, 1-1e-6)
    df["logit_p"] = np.log(p/(1-p))
    # one-hot group + interaction with logit_p
    X = pd.get_dummies(df[group_col], drop_first=True)
    X = pd.concat([df[["logit_p"]], X, X.mul(df["logit_p"], axis=0)], axis=1)
    y = df[y_col].astype(int).values
    lr = LogisticRegression(max_iter=1000)
    lr.fit(X.values, y)
    return lr, X.columns

def apply_group_logistic_recal(lr, cols, test_df, group_col, prob_col="mortality_prob"):
    df = test_df.copy()
    p = np.clip(df[prob_col].astype(float).values, 1e-6, 1-1e-6)
    df["logit_p"] = np.log(p/(1-p))
    X = pd.get_dummies(df[group_col], drop_first=True)
    X = pd.concat([df[["logit_p"]], X, X.mul(df["logit_p"], axis=0)], axis=1)
    X = X.reindex(columns=cols, fill_value=0.0)
    return lr.predict_proba(X.values)[:,1]
i




In [None]:
# main for running 




