# Early Warning System (EWS) 

This notebook builds a small, *bank-style* Early Warning System (EWS) using **customer-month** behavioural data.
It focuses on:
- **Rolling window features** (trend, volatility, worst behaviour)
- **Rule-based alerts** (operationally interpretable)
- **Optional PD-style model** comparison (logistic regression)
- **Monitoring outputs** (alerts over time, segment drilldowns)



In [None]:
# Core imports
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.linear_model import LogisticRegression
from sklearn.metrics import (
    roc_auc_score, average_precision_score,
    precision_recall_curve, roc_curve,
    classification_report, confusion_matrix
)

np.random.seed(42)
pd.set_option("display.max_columns", 200)


## 1) Generate synthetic customer-month behavioural data

Each row is one **account-month**. We'll simulate typical credit card / revolving credit signals:
- Utilisation
- Payment ratio (payment / statement)
- Missed payment flag
- DPD (days past due) bucket
- Balance and credit limit
- segment (product, channel -> can use for monitoring)

In [None]:
def simulate_credit_panel(
    n_customers=5000,
    n_months=24,
    start="2023-01-31"
) -> pd.DataFrame:
    """Simulate a customer-month panel with behaviour signals and an outcome.

    Outcome: default_next_3m (proxy for serious delinquency/default risk).
    """
    start_date = pd.to_datetime(start)
    dates = pd.date_range(start_date, periods=n_months, freq="M")

    # Customer-level static attributes
    customer_id = np.arange(n_customers)

    product = np.random.choice(["Classic", "Rewards", "Premium"], size=n_customers, p=[0.55, 0.35, 0.10])
    channel = np.random.choice(["Branch", "Online", "Partner"], size=n_customers, p=[0.30, 0.55, 0.15])
    risk_band = np.random.choice(["A", "B", "C", "D"], size=n_customers, p=[0.35, 0.35, 0.20, 0.10])

    # Credit limits vary by risk band (higher band => higher limits)
    band_limit_mu = {"A": 7000, "B": 5000, "C": 3500, "D": 2500}
    limit = np.array([max(800, np.random.normal(band_limit_mu[b], 900)) for b in risk_band])
    limit = np.round(limit, 0)

    rows = []

    # Latent "financial stress" baseline per customer (higher => worse)
    band_stress = {"A": 0.10, "B": 0.20, "C": 0.32, "D": 0.45}
    base_stress = np.array([np.random.beta(2, 10) + band_stress[b] for b in risk_band])
    base_stress = np.clip(base_stress, 0, 1)

    # Simulate month-by-month behaviour with persistence
    util_prev = np.random.beta(2, 5, size=n_customers)
    pay_prev = np.clip(np.random.normal(0.90, 0.12, size=n_customers), 0, 1)

    dpd_prev = np.zeros(n_customers, dtype=int)  # 0, 30, 60, 90+
    bal_prev = util_prev * limit

    for t, d in enumerate(dates):
        # Macro cycle / seasonality bump (small, but adds realism)
        macro = 0.05 * np.sin(2 * np.pi * (t / 12))

        # Utilization tends to increase with stress and previous utilization
        util = (
            0.65 * util_prev
            + 0.25 * base_stress
            + 0.10 * np.random.beta(2, 5, size=n_customers)
            + macro
        )
        util = np.clip(util, 0, 1.4)  # allow >100% occasionally (overlimit)

        # Payment ratio decreases with stress and higher utilization
        pay = (
            0.60 * pay_prev
            + 0.35 * (1 - base_stress)
            - 0.20 * np.maximum(util - 0.8, 0)
            + 0.05 * np.random.normal(0, 1, size=n_customers)
        )
        pay = np.clip(pay, 0, 1.2)

        # Missed payment probability increases if pay ratio is low or stress high
        p_miss = 0.03 + 0.25 * (1 - np.clip(pay, 0, 1)) + 0.20 * base_stress + 0.10 * np.maximum(util - 0.9, 0)
        p_miss = np.clip(p_miss, 0, 0.9)
        missed_payment = (np.random.rand(n_customers) < p_miss).astype(int)

        # DPD bucket transitions: missed payment pushes to higher bucket, good pay can cure
        dpd = dpd_prev.copy()
        # If missed, migrate upward; if already delinquent, can worsen
        migrate_up = missed_payment == 1
        dpd[migrate_up & (dpd_prev == 0)] = 30
        dpd[migrate_up & (dpd_prev == 30)] = 60
        dpd[migrate_up & (dpd_prev == 60)] = 90
        dpd[migrate_up & (dpd_prev == 90)] = 90

        # Cure: if not missed and paying reasonably, may improve
        cure = (missed_payment == 0) & (pay > 0.6)
        dpd[cure & (dpd_prev == 30)] = 0
        dpd[cure & (dpd_prev == 60)] = 30
        dpd[cure & (dpd_prev == 90)] = 60

        # Balance follows utilization and can be noisy
        balance = util * limit + np.random.normal(0, 120, size=n_customers)
        balance = np.clip(balance, 0, None)

        # Outcome risk: probability of serious delinquency within next 3m
        # Increase with dpd, persistent high utilization, low payment, missed payments, stress, and macro
        score = (
            -3.8
            + 1.8 * (dpd >= 30).astype(int)
            + 2.4 * (dpd >= 60).astype(int)
            + 3.0 * (dpd >= 90).astype(int)
            + 2.0 * np.maximum(util - 0.85, 0)
            + 2.2 * np.maximum(0.7 - np.clip(pay, 0, 1), 0)
            + 1.2 * missed_payment
            + 1.3 * base_stress
            + 0.6 * macro
        )
        p_default = 1 / (1 + np.exp(-score))
        default_next_3m = (np.random.rand(n_customers) < p_default).astype(int)

        # Append rows
        rows.append(pd.DataFrame({
            "customer_id": customer_id,
            "date": d,
            "product": product,
            "channel": channel,
            "risk_band": risk_band,
            "credit_limit": limit,
            "balance": balance,
            "utilization": balance / limit,
            "payment_ratio": pay,
            "missed_payment": missed_payment,
            "dpd_bucket": dpd,
            "default_next_3m": default_next_3m,
        }))

        # Update state
        util_prev = np.clip(util, 0, 1.4)
        pay_prev = np.clip(pay, 0, 1.2)
        dpd_prev = dpd
        bal_prev = balance

    df = pd.concat(rows, ignore_index=True)
    df = df.sort_values(["customer_id", "date"]).reset_index(drop=True)
    return df

df = simulate_credit_panel(n_customers=6000, n_months=24, start="2023-01-31")
df.head()


### Quick sanity checks

In [None]:
print(df.shape)
print(df["date"].min(), "→", df["date"].max())
df[["utilization", "payment_ratio", "missed_payment", "dpd_bucket", "default_next_3m"]].describe().T


## 2) Rolling feature engineering (customer level)

We build features that are typical in credit risk:
- Rolling mean (recent behaviour)
- Rolling max/min (worst behaviour)
- Rolling sums (counts)
- Rolling volatility (std)
- Deterioration trend (short window - long window)

> In production, these features are usually computed from monthly snapshots, bureau refreshes, or transaction aggregates.

In [None]:
# Ensure proper ordering
df = df.sort_values(["customer_id", "date"]).copy()

g = df.groupby("customer_id", sort=False)

# Rolling windows (in months)
W_SHORT = 3
W_LONG = 6
W_XLONG = 12

# Rolling means
df["util_3m_avg"] = g["utilization"].rolling(W_SHORT).mean().reset_index(level=0, drop=True)
df["util_6m_avg"] = g["utilization"].rolling(W_LONG).mean().reset_index(level=0, drop=True)
df["pay_3m_avg"]  = g["payment_ratio"].rolling(W_SHORT).mean().reset_index(level=0, drop=True)
df["pay_6m_avg"]  = g["payment_ratio"].rolling(W_LONG).mean().reset_index(level=0, drop=True)

# Rolling worst behaviour
df["util_6m_max"] = g["utilization"].rolling(W_LONG).max().reset_index(level=0, drop=True)
df["pay_6m_min"]  = g["payment_ratio"].rolling(W_LONG).min().reset_index(level=0, drop=True)

# Rolling counts
df["miss_3m_sum"] = g["missed_payment"].rolling(W_SHORT).sum().reset_index(level=0, drop=True)
df["miss_6m_sum"] = g["missed_payment"].rolling(W_LONG).sum().reset_index(level=0, drop=True)

# Rolling volatility
df["pay_6m_std"]  = g["payment_ratio"].rolling(W_LONG).std().reset_index(level=0, drop=True)

# Trend / deterioration features
df["util_trend_3m_vs_12m"] = df["util_3m_avg"] - g["utilization"].rolling(W_XLONG).mean().reset_index(level=0, drop=True)
df["pay_trend_3m_vs_12m"]  = df["pay_3m_avg"]  - g["payment_ratio"].rolling(W_XLONG).mean().reset_index(level=0, drop=True)

# DPD-related rolling (any delinquency in window)
df["any_dpd30_6m"] = g["dpd_bucket"].rolling(W_LONG).apply(lambda x: int(np.any(x >= 30)), raw=False).reset_index(level=0, drop=True)
df["any_dpd60_12m"] = g["dpd_bucket"].rolling(W_XLONG).apply(lambda x: int(np.any(x >= 60)), raw=False).reset_index(level=0, drop=True)

# Keep rows where long windows exist
feat_cols = [
    "util_3m_avg","util_6m_avg","pay_3m_avg","pay_6m_avg",
    "util_6m_max","pay_6m_min","miss_3m_sum","miss_6m_sum",
    "pay_6m_std","util_trend_3m_vs_12m","pay_trend_3m_vs_12m",
    "any_dpd30_6m","any_dpd60_12m"
]

model_df = df.dropna(subset=feat_cols + ["default_next_3m"]).copy()
model_df.shape


## 3) Rule-based EWS alerts (interpretable)

Banks often start with a **transparent ruleset**, then compare to a model.

We'll create a simple ruleset that flags accounts if behaviour deteriorates:
- Very high utilization recently or worst-case
- Poor payment ratio recently
- Multiple missed payments in last 6 months
- Recent delinquency (DPD)

You can tune thresholds to hit an operational target (e.g., alerts <= 3% of active accounts per month).

In [None]:
# Define rule thresholds (tune these)
TH_UTIL_MAX = 0.95
TH_PAY_AVG  = 0.65
TH_MISS_6M  = 2
TH_UTIL_TREND = 0.12  # short-term utilization rising vs long history

# Alert rules
model_df["alert_rule"] = (
    (model_df["util_6m_max"] >= TH_UTIL_MAX) & (model_df["pay_3m_avg"] <= TH_PAY_AVG)
) | (
    (model_df["miss_6m_sum"] >= TH_MISS_6M)
) | (
    (model_df["any_dpd30_6m"] == 1)
) | (
    (model_df["util_trend_3m_vs_12m"] >= TH_UTIL_TREND)
)

model_df["alert_rule"] = model_df["alert_rule"].astype(int)

# Basic performance: alerts vs outcome
y_true = model_df["default_next_3m"].astype(int)
y_alert = model_df["alert_rule"].astype(int)

cm = confusion_matrix(y_true, y_alert)
cm


In [None]:
tn, fp, fn, tp = cm.ravel()
precision = tp / (tp + fp) if (tp + fp) else np.nan
recall = tp / (tp + fn) if (tp + fn) else np.nan
alert_rate = y_alert.mean()

print(f"Alert rate: {alert_rate:.3%}")
print(f"Precision (default | alert): {precision:.3%}")
print(f"Recall (capture of defaults): {recall:.3%}")


### Alerts over time (monitoring view)

In [None]:
monthly = (
    model_df.groupby("date")
    .agg(
        accounts=("customer_id","nunique"),
        alert_rate=("alert_rule","mean"),
        default_rate=("default_next_3m","mean")
    )
)

monthly.head()


In [None]:
plt.figure(figsize=(10,4))
plt.plot(monthly.index, monthly["alert_rate"], label="Alert rate")
plt.plot(monthly.index, monthly["default_rate"], label="Default (next 3m) rate")
plt.title("EWS Monitoring: Alert Rate vs Default Rate Over Time")
plt.xlabel("Month")
plt.ylabel("Rate")
plt.legend()
plt.tight_layout()
plt.show()


### Segment drilldown (who is getting flagged?)

In [None]:
seg = (
    model_df.groupby(["risk_band","product"])
    .agg(
        n_accounts=("customer_id","nunique"),
        alert_rate=("alert_rule","mean"),
        default_rate=("default_next_3m","mean")
    )
    .reset_index()
    .sort_values(["risk_band","product"])
)

seg.head(12)


## 4) Optional: Model-based risk score (Logistic Regression)

Here we build a simple PD-style model using the rolling features, with a **time-based split** (train on early months, test on later months).

This is closer to how credit models are validated in practice than random splits.

In [None]:
# Time-based split
cutoff = model_df["date"].quantile(0.70)  # ~70% earliest months train
train = model_df[model_df["date"] <= cutoff].copy()
test  = model_df[model_df["date"] > cutoff].copy()

X_train = train[feat_cols]
y_train = train["default_next_3m"].astype(int)

X_test = test[feat_cols]
y_test = test["default_next_3m"].astype(int)

# Simple logistic regression (baseline PD model)
lr = LogisticRegression(max_iter=2000)
lr.fit(X_train, y_train)

test["pd_score"] = lr.predict_proba(X_test)[:, 1]

auc = roc_auc_score(y_test, test["pd_score"])
ap  = average_precision_score(y_test, test["pd_score"])

print("Cutoff date:", pd.to_datetime(cutoff).date())
print(f"Test ROC AUC: {auc:.3f}")
print(f"Test Avg Precision (PR AUC): {ap:.3f}")


### Compare operationally: pick a score threshold that matches the rule alert rate

In [None]:
target_alert_rate = model_df["alert_rule"].mean()

# Threshold = score quantile so that % flagged ≈ rule-based alert rate
thr = test["pd_score"].quantile(1 - target_alert_rate)
test["alert_model"] = (test["pd_score"] >= thr).astype(int)

cm2 = confusion_matrix(y_test, test["alert_model"])
tn, fp, fn, tp = cm2.ravel()
precision_m = tp / (tp + fp) if (tp + fp) else np.nan
recall_m = tp / (tp + fn) if (tp + fn) else np.nan

print(f"Rule-based alert rate (overall): {target_alert_rate:.3%}")
print(f"Model threshold (test): {thr:.4f}")
print(f"Model alert rate (test): {test['alert_model'].mean():.3%}")
print(f"Model precision: {precision_m:.3%}")
print(f"Model recall: {recall_m:.3%}")


### Curves (ROC and Precision-Recall)

In [None]:
fpr, tpr, _ = roc_curve(y_test, test["pd_score"])
prec, rec, _ = precision_recall_curve(y_test, test["pd_score"])

plt.figure(figsize=(6,4))
plt.plot(fpr, tpr)
plt.plot([0,1],[0,1])
plt.title("ROC Curve (Test)")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.tight_layout()
plt.show()

plt.figure(figsize=(6,4))
plt.plot(rec, prec)
plt.title("Precision-Recall Curve (Test)")
plt.xlabel("Recall")
plt.ylabel("Precision")
plt.tight_layout()
plt.show()


### Feature importance (model coefficients)

This is a lightweight way to explain what drives risk in the model (useful for interviews and governance-style storytelling).

In [None]:
coef = pd.DataFrame({
    "feature": feat_cols,
    "coef": lr.coef_[0]
}).sort_values("coef", ascending=False)

coef


## 5) EWS output table (what an analyst would hand to Risk Ops)

This is a monthly ranked alert list with:
- Rule alert
- Model PD score
- Key drivers

In a real setting you'd join identifiers, collections strategy, and contactability flags.

In [None]:
# Pick the latest month in test period for a 'current' alert list
latest_month = test["date"].max()
alerts_latest = test[test["date"] == latest_month].copy()

# Rank by model score; include both alert types
alerts_latest = alerts_latest.sort_values("pd_score", ascending=False)

cols_out = [
    "customer_id","date","risk_band","product","channel",
    "alert_rule","alert_model","pd_score",
    "utilization","util_3m_avg","util_6m_max","util_trend_3m_vs_12m",
    "payment_ratio","pay_3m_avg","pay_6m_min","pay_6m_std",
    "missed_payment","miss_6m_sum","dpd_bucket","any_dpd30_6m",
    "default_next_3m"
]

alerts_latest[cols_out].head(20)


## 6) Next improvements (if you want to extend)

Ideas to make this *even more bank/fintech realistic*:

- **Calibration:** Calibrate PD scores (Platt / isotonic) and show calibration plot.
- **Reject inference:** Simulate accept/reject and adjust labels.
- **Champion/Challenger:** Compare rules vs model vs hybrid strategy.
- **Cost-sensitive optimization:** Optimize thresholds based on expected loss (ECL proxy).
- **Explainability:** Add SHAP for tree models (if you choose LightGBM/XGBoost).
- **Production-friendly code:** Move simulation + feature engineering into `src/` modules.

---

### Minimal `requirements.txt`
```
numpy
pandas
matplotlib
scikit-learn
```
