# 04 – Operational Risk Thresholding  
## Turning Probabilities into Staff-Usable Decisions (Performance & DS Hybrid)

### Objective
Models are only useful in practice if their outputs translate into **actionable decisions**.

This notebook converts model probabilities into operational rules such as:
- *How many players per week do we flag?*
- *What recall do we achieve at a manageable alert volume?*
- *What threshold should be used for “high risk”?*
- *What is the precision among flagged players?*

### Key ideas (club context)
- Injury prevention workflows are capacity-limited: staff can only intervene on a subset of players.
- We treat the model as a **ranking / triage tool** rather than a perfect classifier.
- We evaluate thresholds and top‑K strategies with practical metrics.


In [None]:
# === Setup ===
import duckdb
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path

from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline

from sklearn.metrics import (
    roc_auc_score,
    average_precision_score,
    brier_score_loss,
    confusion_matrix,
    precision_score,
    recall_score,
    f1_score,
)

plt.rcParams.update({
    "figure.figsize": (7, 4),
    "axes.grid": True,
    "grid.alpha": 0.25,
    "axes.spines.top": False,
    "axes.spines.right": False,
})

def resolve_db_path(filename: str = "analytics.duckdb") -> Path:
    cwd = Path.cwd()
    candidates = [cwd / "lakehouse" / filename, cwd.parent / "lakehouse" / filename]
    for p in candidates:
        if p.exists():
            return p
    raise FileNotFoundError("DuckDB not found. Expected lakehouse/analytics.duckdb")

DB_PATH = resolve_db_path()
DB_PATH

In [None]:
# === Load data ===
with duckdb.connect(str(DB_PATH)) as con:
    dfp = con.execute("""
        SELECT *
        FROM player_dataset_predictive
        WHERE acwr IS NOT NULL
    """).df()

dfp.shape

## 1) Train/Test split (chronological)
We simulate forward prediction: train on earlier matches, evaluate on later matches.


In [None]:
features = ["minutes_last_7d", "minutes_last_14d", "minutes_last_28d", "acwr"]

cols = features + ["high_risk_next"]
if "match_date" in dfp.columns:
    cols.append("match_date")
if "match_id" in dfp.columns:
    cols.append("match_id")
if "player_id" in dfp.columns:
    cols.append("player_id")

d = dfp[cols].dropna().copy()

if "match_date" in d.columns:
    d["match_date"] = pd.to_datetime(d["match_date"], errors="coerce")
    d = d.dropna(subset=["match_date"]).sort_values("match_date")

cut = int(len(d) * 0.8)
train = d.iloc[:cut].copy()
test = d.iloc[cut:].copy()

X_train = train[features]
y_train = train["high_risk_next"].astype(int)
X_test = test[features]
y_test = test["high_risk_next"].astype(int)

(len(train), len(test), y_train.mean(), y_test.mean())

## 2) Fit models (Logit baseline + HGB)
You can use either model operationally. HGB often captures non-linear effects.


In [None]:
logit = Pipeline(steps=[
    ("scaler", StandardScaler()),
    ("model", LogisticRegression(max_iter=2000)),
])

hgb = HistGradientBoostingClassifier(
    max_depth=3,
    learning_rate=0.1,
    max_iter=300,
    random_state=42,
)

logit.fit(X_train, y_train)
hgb.fit(X_train, y_train)

# Probabilities on BOTH train and test:
p_logit_train = logit.predict_proba(X_train)[:, 1]
p_hgb_train   = hgb.predict_proba(X_train)[:, 1]
p_logit_test  = logit.predict_proba(X_test)[:, 1]
p_hgb_test    = hgb.predict_proba(X_test)[:, 1]

def basic_metrics(y_true, p):
    return {
        "ROC_AUC": roc_auc_score(y_true, p),
        "PR_AUC": average_precision_score(y_true, p),
        "Brier": brier_score_loss(y_true, p),
        "Prevalence": float(np.mean(y_true)),
    }

pd.DataFrame([
    {"Split": "TEST", "Model": "LogisticRegression", **basic_metrics(y_test, p_logit_test)},
    {"Split": "TEST", "Model": "HistGradientBoosting", **basic_metrics(y_test, p_hgb_test)},
]).sort_values(["Split", "ROC_AUC"], ascending=[True, False])


## 3) Threshold sweep (precision / recall / alert volume)
We evaluate a grid of thresholds and compute:
- precision, recall, F1
- number flagged (alerts)
- alert rate (% of players flagged)

This is more actionable than ROC alone.


In [None]:
def threshold_table(y_true, p, thresholds=None):
    """Metrics as a function of threshold.
    NOTE: Use this on TRAIN for policy design, then APPLY the chosen threshold on TEST for evaluation.
    """
    if thresholds is None:
        thresholds = np.linspace(0.05, 0.95, 19)
    rows = []
    n = len(y_true)
    for t in thresholds:
        pred = (p >= t).astype(int)
        rows.append({
            "threshold": float(t),
            "alerts": int(pred.sum()),
            "alert_rate": float(pred.mean()),
            "precision": precision_score(y_true, pred, zero_division=0),
            "recall": recall_score(y_true, pred, zero_division=0),
            "f1": f1_score(y_true, pred, zero_division=0),
        })
    return pd.DataFrame(rows)

# POLICY DESIGN (TRAIN): sweep thresholds to understand trade-offs
tab_logit_train = threshold_table(y_train, p_logit_train)
tab_hgb_train   = threshold_table(y_train, p_hgb_train)

tab_hgb_train.head()


In [None]:
# Plot precision-recall vs threshold (TRAIN - policy design)
plt.plot(tab_logit_train["threshold"], tab_logit_train["precision"], marker="o", label="precision")
plt.plot(tab_logit_train["threshold"], tab_logit_train["recall"], marker="o", label="recall")
plt.title("Logistic (TRAIN): Precision/Recall vs Threshold")
plt.xlabel("Threshold")
plt.ylabel("Score")
plt.legend()
plt.tight_layout()
plt.show()

plt.plot(tab_hgb_train["threshold"], tab_hgb_train["precision"], marker="o", label="precision")
plt.plot(tab_hgb_train["threshold"], tab_hgb_train["recall"], marker="o", label="recall")
plt.title("HGB (TRAIN): Precision/Recall vs Threshold")
plt.xlabel("Threshold")
plt.ylabel("Score")
plt.legend()
plt.tight_layout()
plt.show()


In [None]:
# Plot alert volume vs threshold (TRAIN - policy design)
plt.plot(tab_logit_train["threshold"], tab_logit_train["alerts"], marker="o", label="Logit alerts (train)")
plt.plot(tab_hgb_train["threshold"], tab_hgb_train["alerts"], marker="o", label="HGB alerts (train)")
plt.title("Alert Volume vs Threshold (TRAIN)")
plt.xlabel("Threshold")
plt.ylabel("# Flagged players")
plt.legend()
plt.tight_layout()
plt.show()


## 4) Operational policies

### Policy A — Fixed threshold
Choose a threshold based on acceptable alert volume and desired recall.

### Policy B — Top-K per matchday
Many clubs prefer ranking: flag the top *K* players per matchday/week.

Below we implement both.


In [None]:
# Helper: confusion matrix at a chosen threshold
def cm_at_threshold(y_true, p, t):
    pred = (p >= t).astype(int)
    cm = confusion_matrix(y_true, pred)
    prec = precision_score(y_true, pred, zero_division=0)
    rec = recall_score(y_true, pred, zero_division=0)
    return cm, prec, rec

def threshold_for_target_alert_rate(p_train: np.ndarray, target_rate: float = 0.10) -> float:
    """Choose threshold on TRAIN so that ~target_rate are flagged (capacity constraint).
    Uses the (1-target_rate) quantile of TRAIN probabilities.
    """
    target_rate = float(target_rate)
    if not (0 < target_rate < 1):
        raise ValueError("target_rate must be in (0, 1)")
    # Flagging top target_rate probabilities means threshold at the (1-target_rate) quantile.
    return float(np.quantile(p_train, 1 - target_rate))

def metrics_at_threshold(y_true, p, t):
    pred = (p >= t).astype(int)
    return {
        "threshold": float(t),
        "alerts": int(pred.sum()),
        "alert_rate": float(pred.mean()),
        "precision": float(precision_score(y_true, pred, zero_division=0)),
        "recall": float(recall_score(y_true, pred, zero_division=0)),
        "f1": float(f1_score(y_true, pred, zero_division=0)),
    }

TARGET_ALERT_RATE = 0.10

# Pick thresholds on TRAIN (no test optimisation)
t_logit = threshold_for_target_alert_rate(p_logit_train, TARGET_ALERT_RATE)
t_hgb   = threshold_for_target_alert_rate(p_hgb_train,   TARGET_ALERT_RATE)

row_logit_train = metrics_at_threshold(y_train, p_logit_train, t_logit)
row_hgb_train   = metrics_at_threshold(y_train, p_hgb_train,   t_hgb)

row_logit_test = metrics_at_threshold(y_test, p_logit_test, t_logit)
row_hgb_test   = metrics_at_threshold(y_test, p_hgb_test,   t_hgb)

t_logit, row_logit_train, row_logit_test, t_hgb, row_hgb_train, row_hgb_test


In [None]:
# Confusion matrices at the TRAIN-chosen thresholds (evaluated on TEST)
cmL, precL, recL = cm_at_threshold(y_test, p_logit_test, t_logit)
cmH, precH, recH = cm_at_threshold(y_test, p_hgb_test, t_hgb)

cmL, precL, recL, cmH, precH, recH


### Top‑K ranking policy (matchday-level)

If `match_id` exists, we can flag the top K players **per match** (or per matchday-like unit).
This aligns well with staff workflows (e.g., “flag 3–5 players per match”).


In [None]:
def topk_policy(df_test: pd.DataFrame, p: np.ndarray, k: int = 5, group_col: str = "match_id"):
    out = df_test.copy()
    out = out.reset_index(drop=True)
    out["p"] = p
    if group_col not in out.columns:
        raise ValueError(f"{group_col} not found. Available columns: {list(out.columns)[:25]} ...")
    out["rank_in_group"] = out.groupby(group_col)["p"].rank(ascending=False, method="first")
    out["flag_topk"] = (out["rank_in_group"] <= k).astype(int)
    return out

# Only run if match_id exists
if "match_id" in test.columns:
    topk5_hgb = topk_policy(test, p_hgb, k=5, group_col="match_id")
    y_true = topk5_hgb["high_risk_next"].astype(int).values
    y_pred = topk5_hgb["flag_topk"].values
    
    prec = precision_score(y_true, y_pred, zero_division=0)
    rec = recall_score(y_true, y_pred, zero_division=0)
    alerts = int(y_pred.sum())
    groups = topk5_hgb["match_id"].nunique()
    
    {"k": 5, "groups": groups, "alerts_total": alerts, "alerts_per_group": alerts / groups, "precision": prec, "recall": rec}
else:
    "match_id not available: Top-K policy skipped (threshold policies still valid)." 

## 5) Capacity planning: expected alerts per squad-week

In practice, staff capacity matters. We estimate alerts under assumptions:
- squad size (e.g., 25 players)
- decision cycle frequency (e.g., weekly or per match)

We approximate alerts as `alert_rate * squad_size`.


In [None]:
def expected_alerts_per_squad(alert_rate: float, squad_size: int = 25):
    return alert_rate * squad_size

policy = pd.DataFrame([
    {
        "Model": "Logit",
        "Policy": f"Threshold via TRAIN quantile (target={TARGET_ALERT_RATE:.0%} alerts)",
        "threshold": t_logit,
        "train_alert_rate": row_logit_train["alert_rate"],
        "test_alert_rate": row_logit_test["alert_rate"],
        "test_precision": row_logit_test["precision"],
        "test_recall": row_logit_test["recall"],
        "test_f1": row_logit_test["f1"],
        "expected_alerts_per_25": expected_alerts_per_squad(row_logit_test["alert_rate"], 25),
    },
    {
        "Model": "HGB",
        "Policy": f"Threshold via TRAIN quantile (target={TARGET_ALERT_RATE:.0%} alerts)",
        "threshold": t_hgb,
        "train_alert_rate": row_hgb_train["alert_rate"],
        "test_alert_rate": row_hgb_test["alert_rate"],
        "test_precision": row_hgb_test["precision"],
        "test_recall": row_hgb_test["recall"],
        "test_f1": row_hgb_test["f1"],
        "expected_alerts_per_25": expected_alerts_per_squad(row_hgb_test["alert_rate"], 25),
    },
]).sort_values("test_recall", ascending=False)

policy


## 6) Recommended reporting (for README / staff slide)

When presenting to a performance team, focus on:
- **Alert volume** (“how many players flagged per cycle?”)
- **Recall** (“how many risk cases we catch?”)
- **Precision** (“how many flagged are truly high risk?”)
- **Calibration** (“is 0.7 actually ~70%?”)

This turns modelling into operational decision support.


In [None]:
# Copy/paste friendly summary (no test-based threshold optimisation)
final_summary = pd.DataFrame([
    {
        "Model": "LogisticRegression",
        "ROC_AUC_TEST": roc_auc_score(y_test, p_logit_test),
        "PR_AUC_TEST": average_precision_score(y_test, p_logit_test),
        "Brier_TEST": brier_score_loss(y_test, p_logit_test),
        f"Threshold_train(q@{TARGET_ALERT_RATE:.0%})": t_logit,
        "Test alert_rate": row_logit_test["alert_rate"],
        "Test Precision@thr": row_logit_test["precision"],
        "Test Recall@thr": row_logit_test["recall"],
        "Alerts per 25": expected_alerts_per_squad(row_logit_test["alert_rate"], 25),
    },
    {
        "Model": "HistGradientBoosting",
        "ROC_AUC_TEST": roc_auc_score(y_test, p_hgb_test),
        "PR_AUC_TEST": average_precision_score(y_test, p_hgb_test),
        "Brier_TEST": brier_score_loss(y_test, p_hgb_test),
        f"Threshold_train(q@{TARGET_ALERT_RATE:.0%})": t_hgb,
        "Test alert_rate": row_hgb_test["alert_rate"],
        "Test Precision@thr": row_hgb_test["precision"],
        "Test Recall@thr": row_hgb_test["recall"],
        "Alerts per 25": expected_alerts_per_squad(row_hgb_test["alert_rate"], 25),
    },
]).sort_values("ROC_AUC_TEST", ascending=False)

final_summary


In [None]:
# Optional: alert-rate stability over time (TEST)
if "match_date" in test.columns:
    tmp = test.copy()
    tmp["p_hgb"] = p_hgb_test
    tmp["alert_hgb"] = (tmp["p_hgb"] >= t_hgb).astype(int)
    tmp["week"] = tmp["match_date"].dt.to_period("W").astype(str)
    weekly = tmp.groupby("week").agg(
        n=("alert_hgb","size"),
        alerts=("alert_hgb","sum"),
        alert_rate=("alert_hgb","mean"),
        prevalence=("high_risk_next","mean"),
        avg_p=("p_hgb","mean"),
    ).reset_index()
    display(weekly.tail(10))
    plt.plot(weekly["week"], weekly["alert_rate"], marker="o")
    plt.title("HGB: weekly alert rate on TEST (threshold chosen on TRAIN)")
    plt.xticks(rotation=45, ha="right")
    plt.xlabel("Week")
    plt.ylabel("Alert rate")
    plt.tight_layout()
    plt.show()
else:
    print("No match_date column found; skipping weekly stability plot.")
