In [None]:

# --- Setup (hidden) ---
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, date, timedelta
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from IPython.display import Markdown, display

# Toggle for small teaching callouts under key visuals
TEACH_MODE = True

# Deterministic synthetic data (teaching project)
rng = np.random.default_rng(42)
GENERATED_AT = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

# Decision thresholds (single source)
SHIFT_THRESHOLD = 0.15
PARTIAL_LOW_MAX = 0.10
PARTIAL_MED_MAX = 0.20
TOP3_SHARE_THRESHOLD = 0.60
MIN_DOW_OUTSIDE_BAND = 2
SENS_WINDOWS = (6, 8)
TRIM_FRACTION = 0.10


def partial_week_risk_label(ratio: float) -> str:
    if ratio < PARTIAL_LOW_MAX:
        return "LOW"
    if ratio < PARTIAL_MED_MAX:
        return "MED"
    return "HIGH"


# Synthetic calendar and dataset generation (no hardcoded headline totals)
synthetic_anchor_monday = date(2026, 1, 5)
this_monday = synthetic_anchor_monday
complete_week_starts = [this_monday - timedelta(weeks=w) for w in range(14, 1, -1)]  # 13 complete weeks
seed_current_week_start = this_monday - timedelta(weeks=1)
seed_partial_week_start = seed_current_week_start + timedelta(days=7)

segments = ["Payer A", "Payer B", "Payer C", "Payer D", "Payer E", "Payer F", "Payer G"]
baseline_shares = np.array([0.30, 0.20, 0.15, 0.10, 0.08, 0.09, 0.08])
current_shares = np.array([0.375, 0.19, 0.14, 0.09, 0.075, 0.07, 0.06])


def make_week(week_start: date, total: int, shares: np.ndarray, complete: bool = True) -> pd.DataFrame:
    dows = np.arange(7)
    base = np.array([0.13, 0.15, 0.16, 0.15, 0.14, 0.14, 0.13])
    if not complete:
        dows = np.arange(4)
        base = base[:4] / base[:4].sum()
    daily = np.round(total * base).astype(int)
    daily[-1] += total - daily.sum()

    rows = []
    for di, dow in enumerate(dows):
        dt = week_start + timedelta(days=int(dow))
        seg_counts = np.round(daily[di] * shares).astype(int)
        seg_counts[-1] += daily[di] - seg_counts.sum()
        for s, v in zip(segments, seg_counts):
            rows.append({"week_start": week_start, "date": dt, "dow": int(dow), "segment": s, "volume": int(v)})
    return pd.DataFrame(rows)


frames = []
history_totals = []
for ws in complete_week_starts:
    total = max(6000, int(rng.normal(7800, 220)))
    history_totals.append(total)
    frames.append(make_week(ws, total, baseline_shares, complete=True))

# Inject a deterministic shift in the latest complete week relative to rolling baseline
latest_complete_baseline = int(np.median(history_totals[-8:]))
current_total_seed = int(round(latest_complete_baseline * 1.65))
frames.append(make_week(seed_current_week_start, current_total_seed, current_shares, complete=True))

# Partial week seeded as a fraction of current week to test guardrails
partial_total_seed = int(round(current_total_seed * 0.18))
frames.append(make_week(seed_partial_week_start, partial_total_seed, current_shares, complete=False))

df = pd.concat(frames, ignore_index=True)

# Add a small invalid stream (watchlist only)
df["invalid"] = rng.binomial(1, 0.012, size=len(df))
df["invalid_volume"] = np.where(df["invalid"] == 1, np.minimum(df["volume"], rng.integers(1, 10, size=len(df))), 0)


# Anchor and through-date must come from data
DATA_THROUGH = pd.to_datetime(df["date"]).max().date().isoformat()
raw_current_week_start = pd.to_datetime(df["week_start"]).max().date()
week_coverage = (
    df.groupby("week_start", as_index=False)["dow"]
    .nunique()
    .rename(columns={"dow": "dow_count"})
)
week_coverage["week_start"] = pd.to_datetime(week_coverage["week_start"]).dt.date
coverage_map = dict(zip(week_coverage["week_start"], week_coverage["dow_count"]))

# Primary rule: current_week_start is the latest week in data.
# If that week is incomplete, treat it as partial and anchor on latest complete week.
if coverage_map.get(raw_current_week_start, 0) == 7:
    current_week_start = raw_current_week_start
    candidate_partial = current_week_start + timedelta(days=7)
    if candidate_partial in coverage_map:
        partial_week_start = candidate_partial
    else:
        missing_weeks = sorted([wk for wk, d in coverage_map.items() if d < 7 and wk > current_week_start])
        partial_week_start = missing_weeks[0] if missing_weeks else None
else:
    partial_week_start = raw_current_week_start
    complete_prior = sorted([wk for wk, d in coverage_map.items() if d == 7 and wk < raw_current_week_start])
    if not complete_prior:
        raise RuntimeError("No complete week available before partial week in dataset.")
    current_week_start = complete_prior[-1]

# Baseline window = last 8 complete weeks before current anchor week
complete_weeks_sorted = sorted([wk for wk, d in coverage_map.items() if d == 7 and wk <= current_week_start])
if current_week_start not in complete_weeks_sorted:
    raise RuntimeError("Current anchor week is not complete in this dataset.")

current_idx = complete_weeks_sorted.index(current_week_start)
prior_complete_weeks = complete_weeks_sorted[:current_idx]
baseline_weeks = prior_complete_weeks[-8:]

if len(baseline_weeks) == 0:
    raise RuntimeError("No prior complete weeks available for baseline.")


def callout(text: str):
    if TEACH_MODE:
        display(Markdown(f"> **Interpretation:** {text}"))


def baseline_stat(values: pd.Series, estimator: str) -> float:
    s = values.astype(float).sort_values()
    if estimator == "median":
        return float(s.median())
    if estimator == "trimmed_mean":
        k = int(len(s) * TRIM_FRACTION)
        if (len(s) - 2 * k) < 1:
            return float(s.mean())
        return float(s.iloc[k: len(s) - k].mean())
    raise ValueError(f"Unknown estimator: {estimator}")


@dataclass(frozen=True)
class State:
    decision: str
    comparator_validity: str
    actionability_status: str
    partial_week_ratio: float
    partial_week_risk: str
    partial_week_numerator: int
    partial_week_denominator: int
    baseline_week_total: int
    current_week_total: int
    delta_week: int
    delta_per_bus_day: float
    volume_shift_flag: bool
    history_tier: str
    top3_share: float
    dow_outside_band_count: int
    sensitivity_df: pd.DataFrame
    sensitivity_disagreement: bool
    flip_conditions: list[str]
    disprove_line: str


def compute_state() -> State:
    weekly_complete = (
        df[df["week_start"].apply(lambda d: pd.to_datetime(d).date() <= current_week_start)]
        .groupby("week_start", as_index=True)["volume"]
        .sum()
        .sort_index()
    )
    weekly_complete.index = pd.to_datetime(weekly_complete.index).date

    history = weekly_complete.loc[[wk for wk in weekly_complete.index if wk < current_week_start]]
    current_total = int(weekly_complete.loc[current_week_start])
    baseline_total = int(round(history.loc[baseline_weeks].median()))
    delta_week = int(current_total - baseline_total)
    delta_per_day = delta_week / 5.0
    pct_delta = (delta_week / baseline_total) if baseline_total else 0.0
    volume_shift_flag = abs(pct_delta) >= SHIFT_THRESHOLD

    comparator_validity = "CONFIRMED" if len(history) >= 1 else "NOT_CONFIRMED"

    history_count = int(len(history))
    if history_count < 13:
        history_tier = f"{history_count}w (LIMITED_CONTEXT)"
    elif history_count < 26:
        history_tier = f"{history_count}w (DIRECTIONAL)"
    else:
        history_tier = f"{history_count}w (ROBUST)"

    if partial_week_start is not None:
        partial_total = int(
            df[df["week_start"].apply(lambda d: pd.to_datetime(d).date() == partial_week_start)]["volume"].sum()
        )
    else:
        partial_total = 0

    partial_ratio = (partial_total / current_total) if current_total else 0.0
    partial_risk = partial_week_risk_label(partial_ratio)

    hist_dow = (
        df[df["week_start"].apply(lambda d: pd.to_datetime(d).date() in baseline_weeks)]
        .groupby(["week_start", "dow"], as_index=False)["volume"]
        .sum()
        .pivot(index="week_start", columns="dow", values="volume")
        .reindex(columns=range(7), fill_value=0)
    )
    q1 = hist_dow.quantile(0.25)
    q3 = hist_dow.quantile(0.75)
    current_dow = (
        df[df["week_start"].apply(lambda d: pd.to_datetime(d).date() == current_week_start)]
        .groupby("dow", as_index=True)["volume"]
        .sum()
        .reindex(range(7), fill_value=0)
    )
    dow_outside = int(((current_dow < q1) | (current_dow > q3)).sum())

    seg_week = (
        df[df["week_start"].apply(lambda d: pd.to_datetime(d).date() in (baseline_weeks + [current_week_start]))]
        .groupby(["week_start", "segment"], as_index=False)["volume"]
        .sum()
    )
    seg_week["week_start"] = pd.to_datetime(seg_week["week_start"]).dt.date

    baseline_seg = (
        seg_week[seg_week["week_start"].isin(baseline_weeks)]
        .groupby("segment", as_index=True)["volume"]
        .median()
        .round()
        .astype(int)
        .reindex(segments, fill_value=0)
    )
    current_seg = (
        seg_week[seg_week["week_start"] == current_week_start]
        .set_index("segment")["volume"]
        .reindex(segments, fill_value=0)
        .astype(int)
    )

    seg_delta_abs = (current_seg - baseline_seg).abs().sort_values(ascending=False)
    total_abs_delta = float(seg_delta_abs.sum())
    top3_share = float(seg_delta_abs.head(3).sum() / total_abs_delta) if total_abs_delta else 0.0

    rows = []
    hist_series = history.copy()
    for w in SENS_WINDOWS:
        window_values = hist_series.tail(w)
        if len(window_values) == 0:
            continue
        for estimator in ("median", "trimmed_mean"):
            b = baseline_stat(window_values, estimator)
            d = current_total - b
            pct = (d / b) if b else 0.0
            shift = abs(pct) >= SHIFT_THRESHOLD
            implied = "EXPAND_CANDIDATE" if shift else "HOLD"
            rows.append(
                {
                    "window_weeks": w,
                    "estimator": estimator,
                    "baseline_week": int(round(b)),
                    "delta_pct": float(pct),
                    "shift_flag": bool(shift),
                    "implied_decision": implied,
                }
            )
    sensitivity_df = pd.DataFrame(rows)
    sensitivity_disagreement = sensitivity_df["shift_flag"].nunique() > 1 if not sensitivity_df.empty else False

    limited_triggers = []
    if comparator_validity != "CONFIRMED":
        limited_triggers.append("comparator not confirmed")
    if partial_risk in {"MED", "HIGH"}:
        limited_triggers.append("partial-week risk")
    if volume_shift_flag:
        limited_triggers.append("volume shift trigger")
    if history_count < 13:
        limited_triggers.append("short history tier")
    if sensitivity_disagreement:
        limited_triggers.append("sensitivity disagreement")

    actionability = "LIMITED_CONTEXT" if limited_triggers else "STABLE"
    decision = "HOLD and monitor; reversible actions only."
    if actionability == "STABLE":
        decision = "EXPAND candidate after next confirmed week."
    if sensitivity_disagreement:
        decision = "HOLD and monitor; sensitivity disagreement locks the decision."

    pct_now = (delta_week / baseline_total) if baseline_total else 0.0
    flip_conditions = [
        f"Next complete+mature week remains >= +15% vs baseline (now: {pct_now*100:.1f}%).",
        f"Sensitivity panel agrees across 6w/8w and median/trimmed mean (now: {'NO' if sensitivity_disagreement else 'YES'}).",
        f"DOW divergence persists on >= {MIN_DOW_OUTSIDE_BAND} weekdays outside baseline band (now: {dow_outside}).",
        f"Top-3 contributors explain >= {TOP3_SHARE_THRESHOLD*100:.0f}% of delta (now: {top3_share*100:.1f}%).",
    ]
    disprove_line = (
        "What would disprove this HOLD: all flip conditions are TRUE on the next complete+mature week under the same filters."
    )

    return State(
        decision=decision,
        comparator_validity=comparator_validity,
        actionability_status=actionability,
        partial_week_ratio=partial_ratio,
        partial_week_risk=partial_risk,
        partial_week_numerator=partial_total,
        partial_week_denominator=current_total,
        baseline_week_total=baseline_total,
        current_week_total=current_total,
        delta_week=delta_week,
        delta_per_bus_day=delta_per_day,
        volume_shift_flag=volume_shift_flag,
        history_tier=history_tier,
        top3_share=top3_share,
        dow_outside_band_count=dow_outside,
        sensitivity_df=sensitivity_df,
        sensitivity_disagreement=sensitivity_disagreement,
        flip_conditions=flip_conditions,
        disprove_line=disprove_line,
    )


def build_snapshot_md(state: State) -> str:
    lines = [
        f"**Decision:** {state.decision}",
        "",
        (
            f"**Impact (vs 8w baseline):** Baseline **{state.baseline_week_total:,.0f}/wk** -> "
            f"Current **{state.current_week_total:,.0f}/wk**  \n"
            f"Delta **{state.delta_week:+,.0f}/wk** (about **{state.delta_per_bus_day:+,.0f}/business day**)"
        ),
        "",
        f"**Comparator validity:** {state.comparator_validity}",
        f"**Actionability status:** {state.actionability_status}",
        (
            f"**Partial-week risk:** {state.partial_week_risk} "
            f"({state.partial_week_numerator:,.0f}/{state.partial_week_denominator:,.0f} = {state.partial_week_ratio*100:.1f}%)"
        ),
        "",
        "**Next actions (reversible):**",
        "- Validate day-of-week divergence against baseline band.",
        "- Segment the delta and triage the top contributors.",
        "- Re-check after the next complete+mature week closes before irreversible actions.",
    ]
    return "\n".join(lines)


def build_receipt_md(state: State) -> str:
    return (
        f"Data through: {DATA_THROUGH} | Anchor week: {current_week_start} "
        f"| Comparator: {state.comparator_validity} | Actionability: {state.actionability_status} "
        f"| Partial-week risk: {state.partial_week_risk} "
        f"({state.partial_week_numerator:,.0f}/{state.partial_week_denominator:,.0f} = {state.partial_week_ratio*100:.1f}%) "
        f"| History tier: {state.history_tier}"
    )


def build_decision_md(state: State) -> str:
    pct_delta = (state.current_week_total - state.baseline_week_total) / state.baseline_week_total
    lines = [
        f"- **Decision:** {state.decision}",
        f"- **Volume shift trigger:** absolute percent delta >= 15% vs rolling 8-week median baseline (observed: {pct_delta*100:.1f}%).",
        f"- **Comparator validity gate:** complete+mature comparator is {state.comparator_validity}.",
        f"- **Actionability rule:** {state.actionability_status}. Use reversible actions only unless all flip conditions pass.",
    ]
    return "\n".join(lines)


def assert_message_consistency(state: State, snapshot_md: str, receipt_md: str, decision_md: str) -> None:
    assert state.decision in snapshot_md, "Snapshot drift: decision text mismatch"
    assert state.decision in decision_md, "Decision slide drift: decision text mismatch"
    assert f"Actionability: {state.actionability_status}" in receipt_md, "Receipt drift: actionability mismatch"
    pr = f"{state.partial_week_risk} ({state.partial_week_numerator:,.0f}/{state.partial_week_denominator:,.0f}"
    assert pr in snapshot_md, "Snapshot drift: partial-week ratio mismatch"
    assert pr in receipt_md, "Receipt drift: partial-week ratio mismatch"


def build_export_receipt(state: State) -> str:
    return (
        "**Export receipt:** "
        f"run {GENERATED_AT} | data_through {DATA_THROUGH} | "
        f"anchor_week {current_week_start} | baseline {state.baseline_week_total:,.0f} | "
        f"current {state.current_week_total:,.0f} | delta {state.delta_week:+,.0f}"
    )


state = compute_state()
export_receipt_md = build_export_receipt(state)
snapshot_md = ""
receipt_md = ""
decision_md = ""


# Queue Volume Shift Brief (v1.6)

*Operator decision brief for weekly queue volume shifts.*

In [None]:

display(Markdown(f"**Data through:** {DATA_THROUGH}  \n**Generated:** {GENERATED_AT}"))


## Executive Snapshot

In [None]:

snapshot_md = build_snapshot_md(state)
display(Markdown(snapshot_md))
callout("Decision remains HOLD unless persistence is confirmed under the same comparator-valid conditions.")


## Executive Translation (plain English)

In [None]:

lines = [
"- **Baseline** = the typical weekly volume (rolling 8-week median).",
"- **Current** = the latest complete + mature week we trust for decisions.",
"- **Partial-week activity** = the current in-progress week (can be misleading).",
"- **Comparator validity** = we have a complete week to compare against baseline (apples-to-apples).",
"- **Actionability status (LIMITED_CONTEXT)** = we treat the signal as real enough to investigate, not real enough for irreversible changes.",
"- **Load multiple** = current ÷ baseline (how many times larger than normal).",
]
display(Markdown("\n".join(lines)))


## Micro glossary (6 terms)

In [None]:

items = [
"- **Complete week:** 7/7 days present for the anchor week.",
"- **Mature gate:** in real ops, use settled/adjudicated window (e.g., 45+ days) before trusting volume.",
"- **Rolling 8-week median:** robust baseline resistant to one-off spikes.",
"- **Partial-week ratio:** partial-week volume ÷ anchor week volume.",
"- **Actionability status:** STABLE / INVESTIGATE / LIMITED_CONTEXT (drives reversible vs irreversible actions).",
"- **Other (mix shift):** remainder outside Top-N segments, aggregated so totals reconcile.",
]
display(Markdown("\n".join(items)))


## Data contract (required fields)

In [None]:
rows = [
    {"Field":"week_start", "Alt name":"—", "Required?":"Yes", "Type":"date", "Used for":"weekly rollups + baselines"},
    {"Field":"date", "Alt name":"—", "Required?":"Yes", "Type":"date", "Used for":"day-of-week ramp"},
    {"Field":"dow", "Alt name":"day_of_week", "Required?":"Yes", "Type":"int (0-6)", "Used for":"DOW comparisons + bands"},
    {"Field":"segment", "Alt name":"payer / lob / driver", "Required?":"Yes", "Type":"string", "Used for":"segment deltas + mix shift"},
    {"Field":"volume", "Alt name":"count", "Required?":"Yes", "Type":"int", "Used for":"all volume KPIs"},
    {"Field":"invalid_volume", "Alt name":"—", "Required?":"No", "Type":"int", "Used for":"data quality watchlist (materiality gating)"},
    {"Field":"invalid", "Alt name":"invalid_flag", "Required?":"No", "Type":"0/1", "Used for":"invalid reason drilldown (optional)"},
]
dfc = pd.DataFrame(rows)
display(dfc)


## Impact translation (baseline vs current)

In [None]:

# Bars: baseline vs current
fig, ax = plt.subplots(figsize=(8.5,4.5))
ax.bar(["Baseline (8w median)","Current (complete+mature)"], [state.baseline_week_total, state.current_week_total])
ax.set_ylabel("Weekly volume")
ax.set_title("Weekly volume: baseline vs current")
for i,v in enumerate([state.baseline_week_total, state.current_week_total]):
    ax.text(i, v, f"{v:,.0f}", ha="center", va="bottom")
ax.text(0.5, max(state.baseline_week_total, state.current_week_total)*0.85,
        f"Delta {state.delta_week:+,.0f}/wk  ({state.delta_per_bus_day:+,.0f}/business day)",
        ha="center")
plt.show()
callout("Use this slide to size the magnitude. Percent change is secondary; the actionable number is delta per day.")

# Load multiple + ops-load sizing (realistic bands)
load_multiple = state.current_week_total / state.baseline_week_total
assumed = [500, 1000, 2500, 5000]
tbl=[]
for tp in assumed:
    net = state.delta_per_bus_day - tp
    tbl.append([tp, f"{net:+,.0f}"])
out = pd.DataFrame(tbl, columns=["Assumed throughput/day","Net backlog change/day"])
display(Markdown(f"Load multiple ? **{load_multiple:.2f}x** (current ? baseline)."))
display(Markdown("(Use delta/day for sizing; this is not a staffing recommendation.)"))
display(out)


## Validity Receipt

In [None]:

receipt_md = build_receipt_md(state)
display(Markdown(receipt_md))


## Week completeness ramp

In [None]:

# Baseline DOW band (median + IQR) across baseline weeks
baseline_weeks_df = df[df["week_start"].apply(lambda d: pd.to_datetime(d).date() in baseline_weeks)]
dow_base = baseline_weeks_df.groupby(["week_start", "dow"])["volume"].sum().reset_index()

med = dow_base.groupby("dow")["volume"].median()
q1 = dow_base.groupby("dow")["volume"].quantile(0.25)
q3 = dow_base.groupby("dow")["volume"].quantile(0.75)

cur = (
    df[df["week_start"].apply(lambda d: pd.to_datetime(d).date() == current_week_start)]
    .groupby("dow")["volume"]
    .sum()
    .reindex(range(7), fill_value=0)
)

cum_med = med.cumsum()
cum_q1 = q1.cumsum()
cum_q3 = q3.cumsum()
cum_cur = cur.cumsum()

fig, ax = plt.subplots(figsize=(9, 4.5))
ax.plot(cum_med.index, cum_med.values, marker="o", label="Baseline median (cum)")
ax.fill_between(cum_med.index, cum_q1.values, cum_q3.values, alpha=0.2, label="Baseline IQR band")
ax.plot(cum_cur.index, cum_cur.values, marker="o", label="Current (cum)")
ax.set_xticks(range(7))
ax.set_xticklabels(["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"])
ax.set_ylabel("Cumulative volume")
ax.set_title("Cumulative volume by day-of-week vs baseline band")
plt.legend()
plt.show()

diff = (cur - med).sort_values(ascending=False)
top = diff.head(2)
callout(
    f"Largest positive divergence days: {', '.join([['Mon','Tue','Wed','Thu','Fri','Sat','Sun'][i] for i in top.index])}. "
    "Confirm if pattern persists in next complete week."
)


## Segment drivers + mix shift

In [None]:

# Segment totals for baseline vs current (fully data-derived)
seg_week = (
    df[df["week_start"].apply(lambda d: pd.to_datetime(d).date() in (baseline_weeks + [current_week_start]))]
    .groupby(["week_start", "segment"], as_index=False)["volume"]
    .sum()
)
seg_week["week_start"] = pd.to_datetime(seg_week["week_start"]).dt.date

baseline_seg = (
    seg_week[seg_week["week_start"].isin(baseline_weeks)]
    .groupby("segment", as_index=True)["volume"]
    .median()
    .round()
    .astype(int)
)
current_seg = (
    seg_week[seg_week["week_start"] == current_week_start]
    .set_index("segment")["volume"]
    .astype(int)
)

all_segments = sorted(set(baseline_seg.index).union(set(current_seg.index)))
baseline_seg = baseline_seg.reindex(all_segments, fill_value=0)
current_seg = current_seg.reindex(all_segments, fill_value=0)

delta = (current_seg - baseline_seg).sort_values(ascending=False)
topN = min(3, len(delta))
top = delta.head(topN)
other = delta.iloc[topN:]
other_total = int(other.sum()) if len(other) else 0

contrib = pd.DataFrame(
    {
        "Segment": list(top.index) + (["Other"] if len(other) else []),
        "Delta (current - baseline)": list(top.values) + ([other_total] if len(other) else []),
    }
)
fig, ax = plt.subplots(figsize=(9, 4.5))
ax.bar(contrib["Segment"], contrib["Delta (current - baseline)"])
ax.set_ylabel("Delta volume")
ax.set_title("Top contributors to delta (Top-3 + Other)")
for i, v in enumerate(contrib["Delta (current - baseline)"]):
    ax.text(i, v, f"{v:,.0f}", ha="center", va="bottom")
plt.show()

mix = pd.DataFrame(
    {
        "Segment": all_segments,
        "Baseline share": (baseline_seg / max(1, baseline_seg.sum())).values,
        "Current share": (current_seg / max(1, current_seg.sum())).values,
    }
)
mix["Delta share (pp)"] = (mix["Current share"] - mix["Baseline share"]) * 100
mix_sort = mix.sort_values("Current share", ascending=False)

fig, ax = plt.subplots(figsize=(9, 4.5))
ax.bar(mix_sort["Segment"], mix_sort["Baseline share"] * 100, label="Baseline")
ax.bar(mix_sort["Segment"], mix_sort["Current share"] * 100, alpha=0.7, label="Current")
ax.set_ylabel("Share of weekly volume (%)")
ax.set_title("Mix shift: baseline vs current share")
plt.xticks(rotation=30, ha="right")
plt.legend()
plt.show()

other_segments = list(other.index)
top_inside_other = other.abs().sort_values(ascending=False).head(2).index.tolist() if len(other) else []
if len(other):
    display(
        Markdown(
            f"Other = all remaining segments outside Top-N ranked by |delta| (N={len(other_segments)}). "
            f"Top contributors inside Other: {', '.join(top_inside_other)}."
        )
    )

display(Markdown(f"Drivers shown: {topN} (of {len(all_segments)})."))

callout("Use this slide to answer: where the delta comes from and whether composition changed.")


## Decision + trigger rules

In [None]:

decision_md = build_decision_md(state)
display(Markdown(decision_md))


## Decision Standard (HOLD -> EXPAND)

In [None]:

criteria_lines = ["- **Flip HOLD -> EXPAND only if ALL conditions are TRUE on the next complete+mature week:**"]
for idx, cond in enumerate(state.flip_conditions, start=1):
    criteria_lines.append(f"  {idx}) {cond}")
criteria_lines.append("")
criteria_lines.append("- **If any condition fails:** HOLD remains locked.")
criteria_lines.append(f"- **What would disprove this?** {state.disprove_line}")

if state.sensitivity_disagreement:
    criteria_lines.append("- **Sensitivity verdict:** DISAGREEMENT detected; HOLD is locked until methods agree.")
else:
    criteria_lines.append("- **Sensitivity verdict:** Agreement detected across methods.")

sensitivity_panel = state.sensitivity_df.copy()
sensitivity_panel["delta_pct"] = (sensitivity_panel["delta_pct"] * 100).round(1)
sensitivity_panel = sensitivity_panel.rename(
    columns={
        "window_weeks": "Window",
        "estimator": "Estimator",
        "baseline_week": "Baseline/wk",
        "delta_pct": "Delta %",
        "shift_flag": "Shift >=15%",
        "implied_decision": "Implied decision",
    }
)

display(Markdown("\n".join(criteria_lines)))
display(sensitivity_panel)


## Appendix: Data Quality Watchlist

In [None]:

# KPI card style outputs (materiality gated)
week = df[df["week_start"]==current_week_start]
invalid_total = int(week["invalid_volume"].sum())
week_total = int(week["volume"].sum())
rate = invalid_total / max(1, week_total)

display(Markdown(f"**Invalid volume (anchor week):** {invalid_total:,.0f}  \n**Invalid rate:** {rate*100:.2f}%"))
if rate >= 0.05:
    display(Markdown("?? **ALERT:** invalid rate breaches 5% threshold - investigate before trusting deltas."))
else:
    display(Markdown("No alert: invalids are below materiality threshold; monitor only."))

tri = week.groupby("segment")[["invalid_volume","volume"]].sum().reset_index()
tri["invalid_rate_%"] = (tri["invalid_volume"] / tri["volume"] * 100).round(2)
tri = tri.sort_values(["invalid_volume"], ascending=False).head(5)
display(tri)

display(Markdown(export_receipt_md))


In [None]:

# Hidden QC and publish guardrail: execute asserts, never render in exported deck
assert_message_consistency(state, snapshot_md, receipt_md, decision_md)
if state.sensitivity_disagreement:
    assert "HOLD" in state.decision, "Sensitivity disagreement must force HOLD."
assert "Export receipt:" in export_receipt_md, "Export receipt missing"
