# Frontier Analysis Workflow (Capacity Risk Focus)

This notebook is tailored for frontier analysis: estimating the upper demand boundary for capacity planning.

Analysis goals:
- Interpret `frontier_pred` as required capacity guardrail
- Validate whether `coverage` (actual <= frontier) is aligned with target alpha
- Quantify shortfall risk with `u_hat` when actual exceeds the frontier
- Compare operational what-if scenarios with `simulate`


## 1) Imports / Paths


In [None]:
from __future__ import annotations

from dataclasses import asdict
from pathlib import Path

import matplotlib.pyplot as plt
import lightgbm as lgb
import numpy as np
import pandas as pd
from IPython.display import display

from veldra.api import Artifact, evaluate, export, fit, predict, simulate


In [None]:
ROOT = Path.cwd()
while not (ROOT / "pyproject.toml").exists() and ROOT.parent != ROOT:
    ROOT = ROOT.parent

OUT_DIR = ROOT / "examples" / "out" / "notebook_frontier"
OUT_DIR.mkdir(parents=True, exist_ok=True)
TARGET_COL = "peak_load"
ALPHA = 0.90

print(f"project_root={ROOT}")
print(f"out_dir={OUT_DIR}")


## 2) Synthetic Data Generation (Frontier-Oriented)

- `peak_load` represents daily peak resource demand
- Right-tail shock noise is introduced to emulate realistic overload risk


In [None]:
def generate_frontier_base_data(n_samples: int = 2400, random_state: int = 42) -> pd.DataFrame:
    rng = np.random.default_rng(random_state)

    active_users = rng.integers(800, 8000, size=n_samples)
    new_signups = rng.integers(10, 700, size=n_samples)
    incidents_7d = rng.poisson(1.8, size=n_samples)
    release_events_7d = rng.poisson(0.9, size=n_samples)
    avg_latency_ms = rng.normal(180.0, 45.0, size=n_samples).clip(70, 450)
    seasonality_idx = rng.uniform(0.85, 1.20, size=n_samples)

    base = (
        0.030 * active_users
        + 0.12 * new_signups
        + 6.0 * incidents_7d
        + 3.0 * release_events_7d
        + 0.08 * avg_latency_ms
    ) * seasonality_idx

    # Heteroscedastic upper-tail noise (frontier use case)
    sigma = 8.0 + 0.004 * active_users + 2.2 * incidents_7d
    shock = rng.lognormal(mean=2.0, sigma=0.45, size=n_samples)
    noise = rng.normal(0.0, sigma, size=n_samples)
    peak_load = np.clip(base + noise + 0.5 * shock, 1.0, None)

    return pd.DataFrame(
        {
            "active_users": active_users,
            "new_signups": new_signups,
            "incidents_7d": incidents_7d,
            "release_events_7d": release_events_7d,
            "avg_latency_ms": avg_latency_ms,
            "seasonality_idx": seasonality_idx,
            TARGET_COL: peak_load,
        }
    )


def generate_frontier_drifted_data(n_samples: int = 1200, random_state: int = 99) -> pd.DataFrame:
    df = generate_frontier_base_data(n_samples=n_samples, random_state=random_state)
    rng = np.random.default_rng(random_state + 31)

    # Drift: latency and incidents worsen.
    df["incidents_7d"] = df["incidents_7d"] + rng.poisson(1.1, size=n_samples)
    df["avg_latency_ms"] = (df["avg_latency_ms"] + rng.normal(22, 12, size=n_samples)).clip(70, 600)

    base = (
        0.031 * df["active_users"].to_numpy(dtype=float)
        + 0.12 * df["new_signups"].to_numpy(dtype=float)
        + 6.8 * df["incidents_7d"].to_numpy(dtype=float)
        + 3.2 * df["release_events_7d"].to_numpy(dtype=float)
        + 0.09 * df["avg_latency_ms"].to_numpy(dtype=float)
    ) * df["seasonality_idx"].to_numpy(dtype=float)

    sigma = 10.0 + 0.0045 * df["active_users"].to_numpy(dtype=float) + 2.5 * df["incidents_7d"].to_numpy(dtype=float)
    shock = rng.lognormal(mean=2.1, sigma=0.50, size=n_samples)
    noise = rng.normal(0.0, sigma, size=n_samples)
    df[TARGET_COL] = np.clip(base + noise + 0.65 * shock, 1.0, None)
    return df


In [None]:
train_df = generate_frontier_base_data(n_samples=2400, random_state=42)
test_df = generate_frontier_drifted_data(n_samples=1200, random_state=77)

TRAIN_PATH = OUT_DIR / "frontier_train.parquet"
TEST_PATH = OUT_DIR / "frontier_test.parquet"
train_df.to_parquet(TRAIN_PATH, index=False)
test_df.to_parquet(TEST_PATH, index=False)

display(train_df.head())
display(test_df.head())
print(f"train_path={TRAIN_PATH}")
print(f"test_path={TEST_PATH}")


## 3) Train Frontier Model (`alpha=0.90`) and Evaluate


In [None]:
config = {
    "config_version": 1,
    "task": {"type": "frontier"},
    "frontier": {"alpha": ALPHA},
    "data": {"path": str(TRAIN_PATH), "target": TARGET_COL},
    "split": {"type": "kfold", "n_splits": 5, "seed": 42},
    "train": {"seed": 42},
    "export": {"artifact_dir": str(OUT_DIR / "artifacts")},
}

run = fit(config)
artifact = Artifact.load(run.artifact_path)

ev_train = evaluate(artifact, train_df)
ev_test = evaluate(artifact, test_df)

display(pd.DataFrame([ev_train.metrics, ev_test.metrics], index=["train", "test"]))
print(f"artifact_path={run.artifact_path}")


In [None]:
def build_frontier_pred_df(df: pd.DataFrame, pred_df: pd.DataFrame, split_name: str, alpha: float) -> pd.DataFrame:
    out = pd.DataFrame({
        "split": split_name,
        "actual": df[TARGET_COL].to_numpy(dtype=float),
        "frontier_pred": pred_df["frontier_pred"].to_numpy(dtype=float),
    })
    out["margin"] = out["frontier_pred"] - out["actual"]
    out["u_hat"] = np.maximum(0.0, -out["margin"])  # capacity shortfall when actual exceeds frontier
    out["covered"] = (out["actual"] <= out["frontier_pred"]).astype(int)
    out["alpha"] = alpha
    return out

pred_train = predict(artifact, train_df.drop(columns=[TARGET_COL])).data
pred_test = predict(artifact, test_df.drop(columns=[TARGET_COL])).data

pred_train_df = build_frontier_pred_df(train_df, pred_train, "train", ALPHA)
pred_test_df = build_frontier_pred_df(test_df, pred_test, "test", ALPHA)
pred_all = pd.concat([pred_train_df, pred_test_df], ignore_index=True)

display(pred_all.head())
display(pred_all.groupby("split")[["covered", "u_hat", "margin"]].mean())


## 4) Frontier-Specific Diagnostics

Focus metrics:
- `coverage`: ratio of points under the frontier line
- `margin`: `frontier_pred - actual`
- `u_hat`: shortfall amount when demand exceeds frontier


In [None]:
def plot_frontier_actual_vs_pred(pred_df: pd.DataFrame) -> None:
    fig, axes = plt.subplots(1, 2, figsize=(12, 5), sharex=False, sharey=False)
    for ax, (split_name, frame) in zip(axes, pred_df.groupby("split")):
        ax.scatter(frame["actual"], frame["frontier_pred"], s=10, alpha=0.35)
        lo = min(frame["actual"].min(), frame["frontier_pred"].min())
        hi = max(frame["actual"].max(), frame["frontier_pred"].max())
        ax.plot([lo, hi], [lo, hi], "k--", linewidth=1)
        ax.set_title(f"{split_name}: actual vs frontier_pred")
        ax.set_xlabel("actual")
        ax.set_ylabel("frontier_pred")
        ax.grid(alpha=0.25)
    plt.tight_layout()
    plt.show()


def plot_frontier_residuals(pred_df: pd.DataFrame) -> None:
    fig, axes = plt.subplots(1, 2, figsize=(12, 4))
    for split_name, frame in pred_df.groupby("split"):
        axes[0].hist(frame["margin"], bins=35, alpha=0.5, label=split_name)
    axes[0].axvline(0.0, color="k", linestyle="--", linewidth=1)
    axes[0].set_title("Margin distribution (frontier_pred - actual)")
    axes[0].set_xlabel("margin")
    axes[0].legend()
    axes[0].grid(alpha=0.25)

    for split_name, frame in pred_df.groupby("split"):
        axes[1].hist(frame["u_hat"], bins=35, alpha=0.5, label=split_name)
    axes[1].set_title("Shortfall distribution (u_hat)")
    axes[1].set_xlabel("u_hat")
    axes[1].legend()
    axes[1].grid(alpha=0.25)
    plt.tight_layout()
    plt.show()


plot_frontier_actual_vs_pred(pred_all)
plot_frontier_residuals(pred_all)


In [None]:
# Coverage and shortfall by risk segment (operations-focused)
segment_df = test_df[["incidents_7d", "avg_latency_ms"]].copy()
segment_df["incidents_band"] = pd.cut(segment_df["incidents_7d"], bins=[-1,1,3,6,100], labels=["low","mid","high","extreme"])
segment_df["latency_band"] = pd.qcut(segment_df["avg_latency_ms"], q=4, labels=["Q1","Q2","Q3","Q4"])

frontier_ops = pred_test_df.join(segment_df[["incidents_band", "latency_band"]])
coverage_by_incident = frontier_ops.groupby("incidents_band", observed=False)[["covered", "u_hat"]].mean().sort_index()
coverage_by_latency = frontier_ops.groupby("latency_band", observed=False)[["covered", "u_hat"]].mean().sort_index()

display(coverage_by_incident)
display(coverage_by_latency)


In [None]:
def compute_lgb_importance(artifact_obj: Artifact) -> pd.DataFrame:
    if not artifact_obj.model_text:
        raise ValueError("Artifact model_text is missing.")
    booster = lgb.Booster(model_str=artifact_obj.model_text)
    gain = booster.feature_importance(importance_type="gain")
    split = booster.feature_importance(importance_type="split")
    names = booster.feature_name()
    out = pd.DataFrame({"feature": names, "gain": gain, "split": split})
    out = out.sort_values("gain", ascending=False).reset_index(drop=True)
    return out

importance_df = compute_lgb_importance(artifact)
display(importance_df.head(15))

plt.figure(figsize=(8, 5))
plt.barh(importance_df.head(12)["feature"][::-1], importance_df.head(12)["gain"][::-1])
plt.title("Feature Importance (gain)")
plt.xlabel("gain")
plt.tight_layout()
plt.show()


In [None]:
# LightGBM pred_contrib provides SHAP-like additive contributions.
def compute_shap_summary(artifact_obj: Artifact, x_sample: pd.DataFrame) -> pd.DataFrame:
    if not artifact_obj.model_text:
        raise ValueError("Artifact model_text is missing.")
    booster = lgb.Booster(model_str=artifact_obj.model_text)
    contrib = booster.predict(x_sample, pred_contrib=True)
    columns = booster.feature_name() + ["bias"]
    contrib_df = pd.DataFrame(contrib, columns=columns)
    mean_abs = contrib_df.drop(columns=["bias"]).abs().mean().sort_values(ascending=False)
    out = mean_abs.rename("mean_abs_shap_like").reset_index()
    out.columns = ["feature", "mean_abs_shap_like"]
    return out

x_test = test_df.drop(columns=[TARGET_COL])
shap_summary_df = compute_shap_summary(artifact, x_test)
display(shap_summary_df.head(15))

plt.figure(figsize=(8, 5))
plt.barh(shap_summary_df.head(12)["feature"][::-1], shap_summary_df.head(12)["mean_abs_shap_like"][::-1])
plt.title("SHAP-like summary (LightGBM pred_contrib)")
plt.xlabel("mean(|contribution|)")
plt.tight_layout()
plt.show()


## 5) What-If Simulation for Capacity Planning

Scenario examples:
- `incident_spike`: more incidents and latency degradation
- `mitigation_playbook`: operational improvements to reduce risk


In [None]:
scenarios = [
    {
        "name": "incident_spike",
        "actions": [
            {"op": "add", "column": "incidents_7d", "value": 2},
            {"op": "add", "column": "avg_latency_ms", "value": 25},
        ],
    },
    {
        "name": "mitigation_playbook",
        "actions": [
            {"op": "add", "column": "incidents_7d", "value": -1},
            {"op": "mul", "column": "avg_latency_ms", "value": 0.92},
            {"op": "mul", "column": "release_events_7d", "value": 0.90},
        ],
    },
]

sim = simulate(artifact, test_df.copy(), scenarios)
sim_df = sim.data

display(sim_df.head())
summary = (
    sim_df.groupby("scenario")[["delta_pred"]]
    .agg(["mean", "median", "min", "max"])
)
display(summary)

if "delta_u_hat" in sim_df.columns:
    risk_summary = sim_df.groupby("scenario")[["delta_u_hat"]].mean().rename(columns={"delta_u_hat": "mean_delta_u_hat"})
    display(risk_summary)


In [None]:
# Export portable package (python format)
exp = export(artifact, format="python")
print(asdict(exp))


## 6) Frontier-Specific Conclusion Template

- If coverage is materially below alpha, frontier may be too optimistic.
- Use `u_hat` mean and high quantiles to estimate required buffer capacity.
- Rank intervention scenarios by `delta_pred` and `delta_u_hat` to prioritize actions.
