# Regime-Aware Multifactor + ML/RL Alpha Engine with Backward & Forward Testing

## Project Description

This project is a **modular trading research system** designed to generate **pure alpha** (market-neutral returns independent of market beta) by combining **proven multifactor investing principles** with **modern machine learning and reinforcement learning techniques**, and testing them with **rigorous statistical validation**.

The strategy’s profit engine comes from exploiting cross-sectional mispricings in a broad large-cap U.S. universe (**S&P 500 training set with dynamic top-N selection by confidence**) by identifying which stocks are likely to outperform or underperform others over the next 5–10 days. This is achieved through:

- **Multifactor Alpha Layer:**  
  - **Value** (cheap stocks with potential to mean-revert up)  
  - **Momentum** (stocks in persistent trends)  
  - **Quality** (financially strong, operationally robust companies)  
  - Per-regime factor blending with shrinkage to avoid overfitting.

- **Machine Learning Overlays:**  
  - **LSTM** (sequence model) to capture time-series patterns in returns, volatility, and technicals.  
  - **LightGBM/XGBoost/MLP** (tabular models) to detect nonlinear interactions in cross-sectional features.  
  - **Stacking meta-learner** to optimally blend factor and ML outputs.  
  - **Uncertainty quantification** via MC-dropout and quantile models to control position sizing.

- **Regime Detection:**  
  - Hidden Markov Model (HMM) to classify markets as **Risk-On**, **Risk-Off**, or **Transition**, adjusting model weights and risk accordingly.

- **Portfolio Construction & Risk Management:**  
  - **Black–Litterman optimization** to integrate model views with market-implied returns.  
  - **Risk parity** to balance sector/factor exposures.  
  - **Dynamic hedging** against SPY/sector ETFs to maintain market neutrality.

- **Reinforcement Learning (PPO):**  
  - Learns a sizing and hedging policy that adapts risk-taking to forecast strength, uncertainty, and current market regime, maximizing return per unit of tail risk (CVaR-aware reward).

## Testing & Validation

The project integrates **both backward and forward testing** to ensure robustness:

- **Backward Testing (Historical):**  
  - Walk-forward analysis with purged cross-validation to avoid look-ahead bias.  
  - Statistical significance tests (Diebold–Mariano, SPA/White Reality Check) to confirm non-randomness.  
  - Monte Carlo block bootstrap to estimate confidence intervals and failure probabilities.  
  - VaR/CVaR analysis and stress testing against historical crisis scenarios.

- **Forward Testing (Shadow, No Trades):**  
  - Daily simulation using only forward data, logging PnL and risk metrics without sending orders.  
  - Weekly retraining and monthly auto-generated tear sheets to track live performance against backtest expectations.  
  - Recommended forward-testing period: 4–12 weeks before considering paper/live execution.

## Goal

The system’s goal is to produce **consistent, statistically validated alpha** with low correlation to the market and controlled drawdowns, using a combination of **factor investing, machine learning, and reinforcement learning**. This approach maximizes the probability of sustainable profitability before any real capital is risked.



# Objectives & Success Criteria
- Primary objective: Generate statistically significant pure alpha (market-neutral) with controlled drawdowns after transaction costs.

- Secondary objective: Build a repeatable process capable of ongoing, unattended forward testing that outputs monthly tear sheets.

- Pass/Fail gates (OO-S):
  - Annualized Sharpe ≥ 1.0 (cost-adjusted) across walk-forward windows.
  - SPA/White Reality Check non-rejection vs family of alternatives at 5–10% level.
  - Max DD ≤ 15–20% (tunable) in backtests.
  - Forward test (4–8+ weeks): positive return, rolling Sharpe > 0.8, tail losses consistent with backtest VaR/CVaR.



# 1. Data & Universe

In [1]:
import torch

device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

if device == "cuda":
    print(torch.cuda.get_device_name(0))


Using device: cpu


In [2]:
%pip -q install yfinance pandas numpy PyYAML pyarrow statsmodels tenacity

In [3]:
# ============================================================
# 1.1 UNIVERSE (UPDATED)
# S&P 500 training set with dynamic top-N selection by confidence (later in pipeline).
# Hedging instruments: SPY + sector ETFs.
# Source: Yahoo Finance (daily bars). Lookback from 2006-01-01 to today.
# Saves: universe.csv and raw_prices.parquet (OHLCV + Adj Close for all tickers incl. hedges + ^VIX)
# ============================================================

import pandas as pd
import numpy as np
import yfinance as yf
from datetime import datetime

START_DATE = "2006-01-01"
END_DATE = datetime.today().strftime("%Y-%m-%d")

def to_fmp_symbol(sym: str) -> str:
    # map Yahoo/WSJ style class tickers to FMP
    return sym.replace("-", ".") if "-" in sym else sym

def is_index_like(sym: str) -> bool:
    # skip ^VIX and other index-style series for FMP backfill
    return sym.startswith("^")

# --- Get S&P 500 constituents from Wikipedia (survivorship bias acknowledged) ---
sp500_url = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies"
tables = pd.read_html(sp500_url)
sp500 = tables[0]  # first table
tickers_raw = sp500["Symbol"].tolist()
# Some tickers on Wikipedia have periods; yfinance uses dashes for certain cases
tickers = [t.replace(".", "-") for t in tickers_raw]

# --- Hedging instruments (market & sector ETFs) ---
hedges = ["SPY", "XLY", "XLF", "XLV", "XLK", "XLI", "XLE", "XLP", "XLB", "XLU", "XLRE"]
context_symbols = ["^VIX"]  # market context series

universe = sorted(set(tickers))
universe_all = sorted(set(universe + hedges + context_symbols))

# --- Save universe to CSV ---
pd.DataFrame({"ticker": universe}).to_csv(f"universe_{END_DATE}.csv", index=False)
pd.DataFrame({"ticker": universe}).to_csv("universe.csv", index=False)  # pointer


# --- Download daily OHLCV for all symbols ---
# yfinance handles adjusted prices; we’ll keep both Close & Adj Close.
data = yf.download(
    universe_all,
    start=START_DATE,
    end=END_DATE,
    auto_adjust=False,
    group_by="ticker",
    progress=False,
    threads=True,
)

if data is None or getattr(data, "empty", False):
    raise RuntimeError("yfinance returned no data — try rerunning or chunking the request.")

def top_level_symbols(df):
    # Handles both MultiIndex (normal multi-ticker) and flat columns (edge cases)
    if isinstance(df.columns, pd.MultiIndex):
        return set(df.columns.get_level_values(0))
    # flat columns -> we can only have one symbol; yfinance puts OHLCV names as columns
    return set()  # treat as empty to trigger backfill logic safely

# added: tells us if yfinance skipped any tickers
available = top_level_symbols(data)
missing = [sym for sym in universe_all if sym not in available]
if missing:
    pd.Series(missing, name="missing_symbols").to_csv("missing_symbols.csv", index=False)
    print(f"WARNING: {len(missing)} symbols missing from download. Saved to missing_symbols.csv")

# Normalize to tidy format: MultiIndex -> long DataFrame
frames = []
if isinstance(data.columns, pd.MultiIndex):
    for sym in universe_all:
        if sym not in available:
            continue
        df = data[sym].copy()
        df.columns = [c.lower().replace(" ", "_") for c in df.columns]
        df["ticker"] = sym
        frames.append(df.reset_index().rename(columns={"Date": "date"}))
else:
    # Edge: flat columns — shouldn't happen with many symbols, but keep it safe
    df = data.copy()
    df.columns = [c.lower().replace(" ", "_") for c in df.columns]
    df["ticker"] = universe_all[0]
    frames.append(df.reset_index().rename(columns={"Date": "date"}))

prices = pd.concat(frames, ignore_index=True).sort_values(["ticker", "date"])
prices["date"] = pd.to_datetime(prices["date"])

# Basic sanity: drop rows with all NaNs for OHLCV
keep_cols = ["open", "high", "low", "close", "adj_close", "volume"]
prices = prices.dropna(subset=keep_cols, how="all")

# Save raw prices
prices.to_parquet("raw_prices.parquet", index=False)

print(f"Universe size (S&P 500): {len(universe)} tickers")
print(f"Total symbols incl. hedges/context: {len(universe_all)}")
print("Saved: universe.csv, raw_prices.parquet")


Universe size (S&P 500): 502 tickers
Total symbols incl. hedges/context: 514
Saved: universe.csv, raw_prices.parquet


In [4]:
# ---- Optional: Backfill any missing tickers with FMP (skip ^VIX etc.) ----
import os, requests, time
from getpass import getpass

if os.path.exists("missing_symbols.csv"):
    missing = pd.read_csv("missing_symbols.csv")["missing_symbols"].tolist()
else:
    uni = pd.read_csv("universe.csv")["ticker"].tolist()
    hedges = ["SPY","XLY","XLF","XLV","XLK","XLI","XLE","XLP","XLB","XLU","XLRE"]
    context = ["^VIX"]
    universe_all = sorted(set(uni + hedges + context))
    base_prices = pd.read_parquet("raw_prices.parquet")
    present = set(base_prices["ticker"].unique())
    missing = [s for s in universe_all if s not in present]

missing = [s for s in missing if not is_index_like(s)]
if not missing:
    print("No missing symbols to backfill.")
else:
    print(f"Backfilling {len(missing)} symbols from FMP (skipping indexes):", missing[:8], "...")
    FMP_API_KEY = os.environ.get("FMP_API_KEY", "").strip() or getpass("Enter FMP API key for price backfill: ").strip()
    if not FMP_API_KEY:
        raise RuntimeError("FMP_API_KEY required for backfill.")

    base_url = "https://financialmodelingprep.com/api/v3/historical-price-full"
    def fetch_fmp_prices(sym):
        fmp_sym = to_fmp_symbol(sym)
        url = f"{base_url}/{fmp_sym}?from={START_DATE}&to={END_DATE}&serietype=line&apikey={FMP_API_KEY}"
        r = requests.get(url, timeout=30)
        r.raise_for_status()
        js = r.json()
        hist = js.get("historical", [])
        if not hist:
            return None
        df = pd.DataFrame(hist)
        df["date"] = pd.to_datetime(df["date"])
        # Map columns; fall back if adjClose missing
        df = df.rename(columns={"adjClose":"adj_close"})
        if "adj_close" not in df.columns:
            df["adj_close"] = df["close"]
        cols = ["date","open","high","low","close","adj_close","volume"]
        for c in cols:
            if c not in df.columns: df[c] = np.nan
        df = df[cols]
        df["ticker"] = sym
        return df.sort_values("date")

    filled = []
    for i, sym in enumerate(missing, 1):
        try:
            df = fetch_fmp_prices(sym)
            if df is not None and len(df):
                filled.append(df)
        except Exception:
            pass
        if i % 10 == 0:
            time.sleep(0.5)  # be polite

    if filled:
        add = pd.concat(filled, ignore_index=True)
        base_prices = pd.read_parquet("raw_prices.parquet")
        prices_fixed = pd.concat([base_prices, add], ignore_index=True).sort_values(["ticker","date"])
        prices_fixed.to_parquet("raw_prices.parquet", index=False)
        print(f"Backfilled {add['ticker'].nunique()} symbols and re-saved raw_prices.parquet")
    else:
        print("FMP backfill returned no data; proceeding without these tickers.")

No missing symbols to backfill.


In [5]:
# ============================================================
# 1.2 FEATURES (FMP Premium, no hard-coded key)
# ------------------------------------------------------------
# Builds:
#   • Price/technical features (returns/vol/ATR/momentum/trend)
#   • Market context (SPY vol, ^VIX, breadth)
#   • Fundamentals via FMP (quarterly BS/IS/CF), cached per ticker,
#     forward-filled to daily, and ratio metrics (Value + Quality)
# Post-merge:
#   • Leakage control (shift all predictive features by 1 day)
#   • Winsorize & cross-sectional z-score (by date)
#   • Fundamentals imputation + missing masks
# Saves:
#   • features.parquet
#   • funda_quarterly.parquet, funda_daily.parquet
#   • cache/funda_q_<TICKER>.parquet (per-ticker cache)
# Notes:
#   - API key is taken from env var FMP_API_KEY or prompted securely.
#   - GPU not used here (CPU/I/O heavy); that’s normal.
# ============================================================

# %pip -q install yfinance pyarrow tenacity

import os, time, random, gc
import pandas as pd
import numpy as np
import yfinance as yf
from getpass import getpass
from concurrent.futures import ThreadPoolExecutor, as_completed
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

# ---------- Config / Toggles ----------
COMPUTE_SLOPE = True      # slope_20 via vectorized method
SLOPE_WINDOW = 20
RV_WIN = 20
ATR_WIN = 14

# Fundamentals provider config (FMP Premium)
PROVIDER = "fmp"          # fixed to FMP for reliability
FMP_API_KEY = os.environ.get("FMP_API_KEY", "").strip()
if not FMP_API_KEY:
    # Prompt securely; not echoed, not written to disk
    FMP_API_KEY = getpass("Enter your FMP API key (kept in-memory for this session): ").strip()
if not FMP_API_KEY:
    raise RuntimeError("FMP_API_KEY is required. Set env var FMP_API_KEY or enter it when prompted.")

def to_fmp_symbol(sym: str) -> str:
    """
    Convert Yahoo-style tickers to FMP-style.
    Yahoo uses '-' for class/shared tickers (e.g., BRK-B),
    while FMP uses '.' (e.g., BRK.B). Everything else stays the same.
    """
    # common class/delimiter cases
    # e.g., BRK-B, BF-B, FOXA (no change), META (no change)
    if "-" in sym:
        return sym.replace("-", ".")
    return sym

# Chunking: Premium can fetch all at once. If you ever need throttling, set CHUNK_TICKERS to an int.
CHUNK_TICKERS = 20        # TOCHANGE: None = process entire universe in one go > change this later to None to check all stocks instead of just 100
START_AT = 0              # offset if chunking
SKIP_IF_CACHED = True     # skip ticker if cache exists

MAX_WORKERS = 4           # Premium can handle more concurrency; tune 4–12 as you like
RETRY_ATTEMPTS = 5
BATCH_SLEEP = (0.2, 0.6)  # polite jitter between HTTP calls
CACHE_DIR = "cache"
os.makedirs(CACHE_DIR, exist_ok=True)

# ---------- Load raw prices & universe (from 1.1) ----------
prices = pd.read_parquet("raw_prices.parquet")
universe_full = list(pd.read_csv("universe.csv")["ticker"])
hedges = {"SPY", "XLY", "XLF", "XLV", "XLK", "XLI", "XLE", "XLP", "XLB", "XLU", "XLRE"}
context_symbols = {"^VIX"}

# ============================================================
# A) PRICE / TECHNICAL FEATURES
# ============================================================

def compute_atr(df, window=ATR_WIN):
    high, low, close = df["high"], df["low"], df["close"]
    prev_close = close.shift(1)
    tr = pd.concat([(high - low),
                    (high - prev_close).abs(),
                    (low - prev_close).abs()], axis=1).max(axis=1)
    return tr.rolling(window).mean()

def vectorized_rolling_slope(y: pd.Series, window=SLOPE_WINDOW) -> pd.Series:
    N = window
    if N <= 1:
        return pd.Series(np.nan, index=y.index, dtype=float)
    x = np.arange(N, dtype=float)
    Sx = x.sum()
    Sxx = (x**2).sum()
    yv = y.to_numpy(dtype=float)
    yv = np.where(np.isfinite(yv), yv, 0.0)
    k = np.ones(N, dtype=float)
    Sy  = np.convolve(yv, k[::-1], mode="full")[N-1:len(yv)+N-1]
    Sxy = np.convolve(yv, x[::-1], mode="full")[N-1:len(yv)+N-1]
    denom = N * Sxx - Sx * Sx + 1e-12
    slope = (N * Sxy - Sx * Sy) / denom
    out = pd.Series(np.nan, index=y.index, dtype=float)
    out.iloc[N-1:] = slope[N-1:]
    return out

def mom_over_n(adj_close, n):
    return np.log(adj_close / adj_close.shift(n))

feat_frames = []
tickers = sorted(prices["ticker"].unique())
total = len(tickers)

for i, (sym, df_sym) in enumerate(prices.groupby("ticker"), start=1):
    if sym in context_symbols:
        continue
    if i % 25 == 0:
        print(f"[Features] {i}/{total} processed… ({sym})")

    df = df_sym.sort_values("date").copy()
    df["ret_1d"] = np.log(df["adj_close"] / df["adj_close"].shift(1))
    for l in range(1, 61):
        df[f"ret_lag_{l}"] = df["ret_1d"].shift(l)

    df["rv_20"] = df["ret_1d"].rolling(RV_WIN).std() * np.sqrt(252)
    df["atr_14"] = compute_atr(df, ATR_WIN)

    df["mom_20"]  = mom_over_n(df["adj_close"], 20)
    df["mom_6m"]  = mom_over_n(df["adj_close"], 126)
    df["mom_12m"] = mom_over_n(df["adj_close"], 252)
    df["mom_12_1"] = np.log(df["adj_close"].shift(21) / df["adj_close"].shift(252))
    df["mom_6_1"]  = np.log(df["adj_close"].shift(21) / df["adj_close"].shift(126))

    df["sma_20"] = df["adj_close"].rolling(20).mean()
    df["sma_50"] = df["adj_close"].rolling(50).mean()
    df["sma_20_gt_50"] = (df["sma_20"] > df["sma_50"]).astype("float32")
    df["slope_20"] = vectorized_rolling_slope(df["adj_close"], window=SLOPE_WINDOW) if COMPUTE_SLOPE else np.nan

    df["mom_20_vs_vol"] = df["mom_20"] / (df["ret_1d"].rolling(20).std() + 1e-8)

    feat_frames.append(df)

features = pd.concat(feat_frames, ignore_index=True)

# ============================================================
# B) MARKET CONTEXT (SPY vol, VIX, breadth)
# ============================================================

vix = prices[prices["ticker"] == "^VIX"][["date", "adj_close"]].rename(columns={"adj_close": "vix_close"})
spy = prices[prices["ticker"] == "SPY"].copy()
spy["spy_ret"] = np.log(spy["adj_close"] / spy["adj_close"].shift(1))
spy["spy_rv_20"] = spy["spy_ret"].rolling(20).std() * np.sqrt(252)
ctx = spy[["date", "spy_rv_20"]].merge(vix, on="date", how="left")

rets = features.pivot(index="date", columns="ticker", values="ret_1d")
advancers = (rets > 0).sum(axis=1)
# Fixed denominator for stability = full S&P 500 count from universe.csv
breadth = (advancers / len(universe_full)).rename("breadth")
ctx = ctx.merge(breadth.reset_index(), on="date", how="left")

features = features.merge(ctx, on="date", how="left")

# ============================================================
# C) FUNDAMENTALS (FMP Premium primary; cached per ticker)
# ============================================================

px_daily_all = prices[prices["ticker"].isin(universe_full)][["date", "ticker", "adj_close"]].copy()
px_daily_all["date"] = pd.to_datetime(px_daily_all["date"])
dates_all = px_daily_all[["date"]].drop_duplicates().sort_values("date")

def _tidy_quarterly_df(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    if "date" in df.columns:
        df["date"] = pd.to_datetime(df["date"])
    elif "fillingDate" in df.columns:
        df["date"] = pd.to_datetime(df["fillingDate"])
    return df

def _coalesce_cols(df: pd.DataFrame, cols: list[str], default=np.nan) -> pd.Series:
    avail = [c for c in cols if c in df.columns]
    if not avail:
        return pd.Series(default, index=df.index)
    tmp = df[avail].apply(pd.to_numeric, errors="coerce")
    # first non-null across the candidate columns
    s = tmp.bfill(axis=1).iloc[:, 0]
    return s

def _fetch_quarterly_funda_fmp(ticker: str) -> pd.DataFrame:
    import requests
    base = "https://financialmodelingprep.com/api/v3"
    fmp_ticker = to_fmp_symbol(ticker)   # BRK-B -> BRK.B

    def jget(url):
        r = requests.get(url, timeout=30)
        r.raise_for_status()
        return r.json()

    # pull a long history (Premium supports it)
    bs = _tidy_quarterly_df(pd.DataFrame(jget(f"{base}/balance-sheet-statement/{fmp_ticker}?period=quarter&limit=120&apikey={FMP_API_KEY}")))
    is_ = _tidy_quarterly_df(pd.DataFrame(jget(f"{base}/income-statement/{fmp_ticker}?period=quarter&limit=120&apikey={FMP_API_KEY}")))
    cf = _tidy_quarterly_df(pd.DataFrame(jget(f"{base}/cash-flow-statement/{fmp_ticker}?period=quarter&limit=120&apikey={FMP_API_KEY}")))
    if bs.empty and is_.empty and cf.empty:
        raise RuntimeError(f"FMP fundamentals empty for {ticker} (queried as {fmp_ticker})")

    out = bs.merge(is_, on="date", how="outer").merge(cf, on="date", how="outer")

    # Coalesce across schema variants
    out["book_equity"]  = _coalesce_cols(out, ["totalStockholdersEquity","totalShareholderEquity","totalEquity"]).astype(float)
    out["net_income"]   = _coalesce_cols(out, ["netIncome","netIncomeApplicableToCommonShares"]).astype(float)
    out["ocf"]          = _coalesce_cols(out, [
        "netCashProvidedByOperatingActivities",
        "netCashProvidedByUsedInOperatingActivities",
        "netCashProvidedByUsedInOperatingActivitiesContinuingOperations"
    ]).astype(float)
    out["gross_profit"] = _coalesce_cols(out, ["grossProfit"]).astype(float)
    out["total_assets"] = _coalesce_cols(out, ["totalAssets"]).astype(float)

    # total_debt: prefer totalDebt; else short + long
    td = _coalesce_cols(out, ["totalDebt"])
    if td.isna().all():
        short = _coalesce_cols(out, ["shortTermDebt","shortLongTermDebtTotal"])
        long  = _coalesce_cols(out, ["longTermDebt"])
        td = (short.fillna(0) + long.fillna(0)).replace({0: np.nan})
    out["total_debt"] = td.astype(float)

    # dividends / buybacks (raw signs as provided by FMP)
    out["dividends"] = _coalesce_cols(out, ["dividendsPaid","dividendsPaidCashFlow"]).astype(float)
    out["buybacks"]  = _coalesce_cols(out, ["commonStockRepurchased","purchaseOfCommonStock"]).astype(float)

    out["ticker"] = ticker  # keep Yahoo-style symbol for our dataset

    cols = ["date","ticker","book_equity","net_income","ocf","gross_profit",
            "total_assets","total_debt","dividends","buybacks"]
    return out[cols].dropna(subset=["date"])


def fetch_or_load_cached_quarterly(ticker: str) -> pd.DataFrame | None:
    path = os.path.join(CACHE_DIR, f"funda_q_{ticker}.parquet")
    if SKIP_IF_CACHED and os.path.exists(path):
        try:
            return pd.read_parquet(path)
        except Exception:
            pass
    try:
        df = _fetch_quarterly_funda_fmp(ticker)
        if df is None or df.empty:
            return None
        df.to_parquet(path, index=False)
        time.sleep(random.uniform(*BATCH_SLEEP))  # polite pause
        return df
    except Exception:
        return None

# ---- SMOKE TEST (run once, then you can comment it out) ----
try:
    from IPython.display import display
except Exception:
    pass

test_syms = ["AAPL", "MSFT", "BRK-B", "BF-B"]
for s in test_syms:
    try:
        df = _fetch_quarterly_funda_fmp(s)
        # 👇 trim preview to match your price history
        df = df[df["date"] >= pd.to_datetime(START_DATE)]
        print(s, "→", to_fmp_symbol(s), "rows:", len(df))
        try:
            display(df.head(2))
        except Exception:
            print(df.head(2))
    except Exception as e:
        print("ERR", s, e)

# Determine chunk (or all)
if CHUNK_TICKERS:
    end_at = min(len(universe_full), START_AT + CHUNK_TICKERS)
    tickers_chunk = sorted(universe_full[START_AT:end_at])
    print(f"[FMP] Processing chunk {START_AT}:{end_at} (size={len(tickers_chunk)})")
else:
    tickers_chunk = sorted(universe_full)
    print(f"[FMP] Processing entire universe (size={len(tickers_chunk)})")

# Parallel fetch with caching
funda_parts, successes = [], 0
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
    futs = {ex.submit(fetch_or_load_cached_quarterly, t): t for t in tickers_chunk}
    for j, fut in enumerate(as_completed(futs), start=1):
        df = fut.result()
        if df is not None and len(df):
            funda_parts.append(df)
            successes += 1
        if j % 25 == 0:
            print(f"[Fundamentals/FMP] {j}/{len(tickers_chunk)} processed… (successes: {successes})")

if successes == 0:
    raise RuntimeError("No fundamentals fetched. Check your FMP key or try a smaller CHUNK_TICKERS with fewer workers.")

# Merge chunk with existing quarterly file (so multiple runs accumulate)
q_path = "funda_quarterly.parquet"
funda_q_chunk = pd.concat(funda_parts, ignore_index=True).sort_values(["ticker","date"])

# after you build funda_q_chunk
cutoff = pd.to_datetime(px_daily_all["date"].min())
funda_q_chunk = funda_q_chunk[funda_q_chunk["date"] >= cutoff]

if os.path.exists(q_path):
    old = pd.read_parquet(q_path)
    old["date"] = pd.to_datetime(old["date"])
    old = old[old["date"] >= cutoff]  # <- trim the old file too
    funda_q = (
        pd.concat([old, funda_q_chunk], ignore_index=True)
          .drop_duplicates(["ticker","date"], keep="last")
          .sort_values(["ticker","date"])
    )
else:
    funda_q = funda_q_chunk

funda_q = funda_q.sort_values(["ticker","date"])
funda_q.to_parquet(q_path, index=False)
print(
    f"Saved: {q_path}  "
    f"(tickers with funda total: {funda_q['ticker'].nunique()}, "
    f"rows: {len(funda_q)})"
)

# Quarterly → daily (forward-fill per ticker) across ALL tickers collected so far
ff = []
for sym, grp in funda_q.groupby("ticker"):
    g = dates_all.merge(grp, on="date", how="left")
    g["ticker"] = sym
    g = g.sort_values("date").ffill()
    ff.append(g)
funda_daily = pd.concat(ff, ignore_index=True)

# Ratios (Value & Quality)
fd = funda_daily.merge(px_daily_all, on=["date","ticker"], how="left")
price = fd["adj_close"].replace(0, np.nan)

fd["book_to_price"]     = fd["book_equity"] / price
fd["earnings_yield"]    = fd["net_income"]  / price
fd["cf_yield"]          = fd["ocf"]         / price
fd["shareholder_yield"] = (fd["dividends"].fillna(0) * -1 + fd["buybacks"].fillna(0)) / price

fd["gross_profitability"] = fd["gross_profit"] / fd["total_assets"].replace(0, np.nan)
fd["roe"]                 = fd["net_income"] / fd["book_equity"].replace(0, np.nan)
fd["accruals"]            = (fd["net_income"] - fd["ocf"]) / fd["total_assets"].replace(0, np.nan)
fd["leverage"]            = fd["total_debt"] / fd["total_assets"].replace(0, np.nan)

funda_daily = fd[[
    "date","ticker","book_to_price","earnings_yield","cf_yield","shareholder_yield",
    "gross_profitability","roe","accruals","leverage"
]]
funda_daily.to_parquet("funda_daily.parquet", index=False)
print("Saved: funda_daily.parquet")

# Merge fundamentals into features
features = features.merge(funda_daily, on=["date","ticker"], how="left")

# ============================================================
# D) POST-MERGE HYGIENE
# ============================================================

# 1) Leakage control
non_feature_cols = {"date","ticker","open","high","low","close","adj_close","volume"}
cols_to_shift = [c for c in features.columns if c not in non_feature_cols]
features[cols_to_shift] = features.groupby("ticker")[cols_to_shift].shift(1)

# 2) Winsorize & cross-sectional z-score
def winsorize_cs(s, lo=0.01, hi=0.99):
    ql, qh = s.quantile(lo), s.quantile(hi)
    return s.clip(ql, qh)

# --- choose features for cross-sectional standardization (exclude context & raw SMAs) ---
cs_cols = [
    "rv_20","atr_14","mom_20","mom_6m","mom_12m","mom_12_1","mom_6_1",
    "sma_20_gt_50","slope_20","mom_20_vs_vol",
    # fundamentals
    "book_to_price","earnings_yield","cf_yield","shareholder_yield",
    "gross_profitability","roe","accruals","leverage"
] + [f"ret_lag_{l}" for l in range(1,61)]

# keep context raw (no CS z-score)
context_keep_raw = ["spy_rv_20","vix_close","breadth"]

present = [c for c in cs_cols if c in features.columns]

print(f"[Standardize] Cross-sectional z-score on {len(present)} features")

def cs_standardize_fast(df, cols, lo=0.01, hi=0.99):
    out = df.copy()
    out[cols] = out[cols].astype("float32")

    d = out["date"]
    for c in cols:
        s = out[c]

        ql = s.groupby(d).transform(lambda x: x.quantile(lo))
        qh = s.groupby(d).transform(lambda x: x.quantile(hi))
        s_clip = s.clip(ql, qh)

        mu = s_clip.groupby(d).transform("mean")
        sd = s_clip.groupby(d).transform("std")

        # if std is 0 or NaN (date-constant or all-NaN), set denom=1 to avoid blowing up / NaNs
        denom = sd.fillna(0.0).replace(0.0, 1.0)

        out[c] = ((s_clip - mu) / (denom + 1e-9)).astype("float32")

    return out

features = cs_standardize_fast(features, present)

# 3) Fundamentals imputation + masks
funda_cols = ["book_to_price","earnings_yield","cf_yield","shareholder_yield",
              "gross_profitability","roe","accruals","leverage"]
for c in funda_cols:
    if c in features.columns:
        features[f"{c}_is_missing"] = features[c].isna().astype(int)
        features[c] = features.groupby("date")[c].transform(lambda s: s.fillna(s.median()))

# Save final
features.to_parquet("features.parquet", index=False)
print("Saved: features.parquet (lagged, winsorized, cross-sectional z-scored)")
print("Artifacts: funda_quarterly.parquet, funda_daily.parquet, cache/funda_q_*.parquet")

gc.collect()

Enter your FMP API key (kept in-memory for this session): ··········
[Features] 25/514 processed… (AMAT)
[Features] 50/514 processed… (BA)
[Features] 75/514 processed… (CARR)
[Features] 100/514 processed… (CNP)
[Features] 125/514 processed… (DAL)
[Features] 150/514 processed… (EA)
[Features] 175/514 processed… (EXE)
[Features] 200/514 processed… (GEHC)
[Features] 225/514 processed… (HON)
[Features] 250/514 processed… (IT)
[Features] 275/514 processed… (LDOS)
[Features] 300/514 processed… (MCO)
[Features] 325/514 processed… (MTCH)
[Features] 350/514 processed… (OKE)
[Features] 375/514 processed… (PNR)
[Features] 400/514 processed… (RTX)
[Features] 425/514 processed… (SYF)
[Features] 450/514 processed… (TT)
[Features] 475/514 processed… (VTR)
[Features] 500/514 processed… (XLI)
AAPL → AAPL rows: 78


Unnamed: 0,date,ticker,book_equity,net_income,ocf,gross_profit,total_assets,total_debt,dividends,buybacks
42,2006-04-01,AAPL,8682000000.0,,-125000000.0,1297000000.0,13911000000.0,0.0,0.0,0.0
43,2006-07-01,AAPL,9330000000.0,,1007000000.0,1325000000.0,15114000000.0,0.0,0.0,-1000000.0


MSFT → MSFT rows: 78


Unnamed: 0,date,ticker,book_equity,net_income,ocf,gross_profit,total_assets,total_debt,dividends,buybacks
42,2006-03-31,MSFT,42038000000.0,,4563000000.0,8872000000.0,66854000000.0,0.0,-925000000.0,-4675000000.0
43,2006-06-30,MSFT,40104000000.0,,3281000000.0,9674000000.0,69597000000.0,0.0,-917000000.0,-3981000000.0


BRK-B → BRK.B rows: 78


Unnamed: 0,date,ticker,book_equity,net_income,ocf,gross_profit,total_assets,total_debt,dividends,buybacks
42,2006-03-31,BRK-B,95349000000.0,,2359000000.0,6162000000.0,230206000000.0,30479000000.0,0.0,0.0
43,2006-06-30,BRK-B,97613000000.0,,1092000000.0,8197000000.0,232331000000.0,30557000000.0,0.0,0.0


ERR BF-B FMP fundamentals empty for BF-B (queried as BF.B)
[FMP] Processing chunk 0:20 (size=20)
Saved: funda_quarterly.parquet  (tickers with funda total: 99, rows: 7373)
Saved: funda_daily.parquet
[Standardize] Cross-sectional z-score on 78 features
Saved: features.parquet (lagged, winsorized, cross-sectional z-scored)
Artifacts: funda_quarterly.parquet, funda_daily.parquet, cache/funda_q_*.parquet


0

In [6]:
# ============================================================
# 1.3 DATA HYGIENE / QC (non-destructive)
# ------------------------------------------------------------
# - Summarize coverage & missingness (post 1.2)
# - Optional pruning: drop early warmup dates & low-coverage dates
# - Write meta.yaml and QC CSVs
# ============================================================

import pandas as pd
import numpy as np
import yaml

FEATURES_PATH = "features.parquet"
UNIVERSE_PATH = "universe.csv"

features = pd.read_parquet(FEATURES_PATH)
universe_df = pd.read_csv(UNIVERSE_PATH)

# ---------- QC: basics ----------
min_date = pd.to_datetime(features["date"]).min()
max_date = pd.to_datetime(features["date"]).max()
n_rows = len(features)
n_tickers = features["ticker"].nunique()

# Columns we standardized in 1.2 (will exist if 1.2 ran)
feature_cols = [c for c in features.columns
                if c not in {"date","ticker","open","high","low","close","adj_close","volume"}]

# Per-column missingness (after 1.2; should be low except earliest windows)
missing_pct = (1.0 - features[feature_cols].notna().mean()).sort_values(ascending=False)
missing_pct.to_csv("qc_missing_by_feature.csv", header=["missing_pct"])

# Coverage by date (# of tickers with at least 1 valid feature on that date)
valid_any = features[feature_cols].notna().sum(axis=1) > 0
coverage_by_date = (features.assign(valid_any=valid_any)
                             .groupby("date")["ticker"]
                             .nunique()
                             .rename("n_tickers"))
coverage_by_date.to_csv("qc_coverage_by_date.csv")

# ---------- Optional: pruning rules (non-destructive by default) ----------
# 1) Warmup: many features need long windows (max ≈ 252 + 21). Keep dates after first 273 trading days.
#    We'll infer a warmup cutoff from SPY availability to be robust.
spy_dates = features.loc[features["ticker"]=="SPY", "date"].sort_values().unique()
if len(spy_dates) > 300:
    warmup_cutoff = pd.to_datetime(spy_dates[min(273, len(spy_dates)-1)])
else:
    warmup_cutoff = min_date  # fallback

# 2) Low coverage: drop dates with very few names (e.g., <300) — tweak if you want.
COVERAGE_MIN = 300
low_cov_dates = coverage_by_date[coverage_by_date < COVERAGE_MIN].index

# We don’t mutate features here; write a recommended mask so training can filter.
date_mask_keep = (~pd.Series(features["date"]).isin(low_cov_dates)) & (features["date"] >= warmup_cutoff)
keep_rate = date_mask_keep.mean()
pd.DataFrame({
    "warmup_cutoff":[warmup_cutoff],
    "coverage_min":[COVERAGE_MIN],
    "keep_rate":[float(keep_rate)]
}).to_csv("qc_recommendations.csv", index=False)

# ---------- Meta ----------
meta = {
    "universe": {
        "description": "S&P 500 (current constituents; survivorship bias acknowledged).",
        "count": int(len(universe_df)),
        "hedges": ["SPY","XLY","XLF","XLV","XLK","XLI","XLE","XLP","XLB","XLU","XLRE"],
        "context_symbols": ["^VIX"],
        "lookback": {"start": str(min_date.date()), "end": str(max_date.date())}
    },
    "pricing": {
        "source": "Yahoo Finance via yfinance",
        "adjusted_prices_used": True,
        "file": "raw_prices.parquet"
    },
    "features": {
        "file": "features.parquet",
        "rows": int(n_rows),
        "tickers": int(n_tickers),
        "leakage_control": "All predictive features shifted by 1 day.",
        "cross_sectional_processing": "Winsorized [1%,99%] & z-scored by date (see 1.2).",
        "imputation": "Fundamentals imputed (cross-sectional median) in 1.2; *_is_missing masks present."
    },
    "qc": {
        "missing_by_feature_csv": "qc_missing_by_feature.csv",
        "coverage_by_date_csv": "qc_coverage_by_date.csv",
        "recommendations_csv": "qc_recommendations.csv",
        "warmup_cutoff": str(warmup_cutoff.date()),
        "coverage_min": COVERAGE_MIN,
        "recommendation": "Filter training rows to dates >= warmup_cutoff and dates with coverage >= coverage_min."
    },
    "deliverables": ["universe.csv", "raw_prices.parquet", "features.parquet",
                     "funda_quarterly.parquet", "funda_daily.parquet", "meta.yaml",
                     "qc_missing_by_feature.csv", "qc_coverage_by_date.csv", "qc_recommendations.csv"]
}

with open("meta.yaml", "w") as f:
    yaml.safe_dump(meta, f, sort_keys=False)

print("Saved: meta.yaml + QC CSVs")

Saved: meta.yaml + QC CSVs


In [7]:
# ============================================================
# 1.4 DATA QC & ASSERTIONS (non-destructive; optional filtered view)
# Produces: qc_summary.json, qc_constant_cols.csv, qc_missing_by_feature.csv (again),
#           qc_skew_kurtosis.csv, qc_outlier_rate.csv, qc_drift.csv,
#           features_filtered.parquet (optional, if you turn on APPLY_FILTERS)
# ============================================================

import json, pandas as pd, numpy as np
from scipy.stats import skew, kurtosis
import warnings
warnings.filterwarnings("ignore", message="Precision loss occurred in moment calculation")
warnings.filterwarnings("ignore", message="Degrees of freedom <= 0 for slice")

FEATURES_PATH = "features.parquet"

APPLY_FILTERS = True          # set False if you only want reports
COVERAGE_MIN = 300            # min tickers per date
Z_OUTLIER = 5.0               # |z| threshold post-standardization
EARLY_YEARS = 5               # windows for drift check
RECENT_YEARS = 5

df = pd.read_parquet(FEATURES_PATH)
df["date"] = pd.to_datetime(df["date"])
feature_cols = [c for c in df.columns if c not in {"date","ticker","open","high","low","close","adj_close","volume"}]

# Basic shape / duplicates
dup_count = df.duplicated(["date","ticker"]).sum()
idx_dupes = int(dup_count)

# Per-ticker monotonic date check
monotonic_bad = []
for t, g in df.groupby("ticker"):
    if not g["date"].sort_values().is_monotonic_increasing:
        monotonic_bad.append(t)

# Constant/empty columns
MIN_N = 200  # only compute moments if we’ve got enough points
sk_stats = []

const_cols, empty_cols = [], []
for c in feature_cols:
    nn = df[c].notna().sum()
    if nn == 0:
        empty_cols.append(c)
        continue
    # treat “constant” as very low variance or single unique value
    if df[c].nunique(dropna=True) == 1 or np.nanstd(df[c].to_numpy(dtype=float)) < 1e-12:
        const_cols.append(c)

pd.Series(const_cols, name="constant_cols").to_csv("qc_constant_cols.csv", index=False)
pd.Series(empty_cols,  name="empty_cols").to_csv("qc_empty_cols.csv", index=False)

# Missingness
missing_pct = (1.0 - df[feature_cols].notna().mean()).sort_values(ascending=False)
missing_pct.to_csv("qc_missing_by_feature.csv", header=["missing_pct"])

# Coverage by date and warmup/low-coverage mask (reuse warmup logic from 1.3)
spy_dates = df.loc[df["ticker"]=="SPY", "date"].sort_values().unique()
warmup_cutoff = pd.to_datetime(spy_dates[min(273, len(spy_dates)-1)]) if len(spy_dates) > 300 else df["date"].min()
coverage = df.groupby("date")["ticker"].nunique()
low_cov_dates = coverage[coverage < COVERAGE_MIN].index
keep_mask = (df["date"] >= warmup_cutoff) & (~df["date"].isin(low_cov_dates))
keep_rate = float(keep_mask.mean())

# Outlier rate (features are z-scored per date already)
outlier_rate = {}
for c in feature_cols:
    s = df[c]
    outlier_rate[c] = float((s.abs() > Z_OUTLIER).mean())
pd.Series(outlier_rate, name="outlier_rate").sort_values(ascending=False).to_csv("qc_outlier_rate.csv")

# Skew/Kurtosis (global, ignoring NaNs)
sk_rows = []
for c in feature_cols:
    x = df[c].to_numpy(dtype=float)
    x = x[np.isfinite(x)]
    if len(x) < MIN_N or np.nanstd(x) < 1e-8:
        # optional: quantile-based skew as fallback
        try:
            q1,q2,q3 = np.nanpercentile(x, [25,50,75])
            bowley = ((q3 + q1) - 2*q2) / ((q3 - q1) + 1e-9)
        except Exception:
            bowley = np.nan
        sk_rows.append([c, np.nan, np.nan, bowley, np.nan, np.nan])
        continue
    sk = float(skew(x, bias=False))
    ku = float(kurtosis(x, fisher=True, bias=False))
    p99 = float(np.nanpercentile(x, 99))
    med = float(np.nanmedian(x))
    dom = abs(p99) / (abs(med) + 1e-9)
    sk_rows.append([c, sk, ku, np.nan, dom, p99])

pd.DataFrame(sk_rows, columns=["feature","skew","kurtosis_fisher","bowley_skew","p99_to_median_abs","p99"])\
  .sort_values("p99_to_median_abs", ascending=False)\
  .to_csv("qc_skew_kurtosis.csv", index=False)

# Drift: early vs recent windows
dstart, dend = df["date"].min(), df["date"].max()
span_years = (dend - dstart).days / 365.25
if span_years < (EARLY_YEARS + RECENT_YEARS):
    # fallback: split the dataset in half
    mid = dstart + (dend - dstart) / 2
    early = df[(df["date"] >= dstart) & (df["date"] <= mid)]
    late  = df[(df["date"] >  mid) & (df["date"] <= dend)]
else:
    early_end    = pd.Timestamp(dstart) + pd.DateOffset(years=EARLY_YEARS)
    recent_start = pd.Timestamp(dend)   - pd.DateOffset(years=RECENT_YEARS)
    early = df[(df["date"] >= dstart) & (df["date"] <= early_end)]
    late  = df[(df["date"] >= recent_start) & (df["date"] <= dend)]

drift_rows = []
for c in feature_cols:
    e = early[c].astype("float64"); l = late[c].astype("float64")
    e = e[np.isfinite(e)]; l = l[np.isfinite(l)]
    if len(e) < MIN_N or len(l) < MIN_N:
        drift_rows.append([c, np.nan, np.nan, np.nan, np.nan, np.nan, np.nan])
        continue
    e_mean, e_std = float(np.nanmean(e)), float(np.nanstd(e))
    l_mean, l_std = float(np.nanmean(l)), float(np.nanstd(l))
    drift_rows.append([c, e_mean, l_mean, l_mean - e_mean, e_std, l_std, (l_std+1e-9)/(e_std+1e-9)])

pd.DataFrame(
    drift_rows,
    columns=["feature","early_mean","late_mean","mean_diff","early_std","late_std","std_ratio_late_over_early"]
).to_csv("qc_drift.csv", index=False)

def bowley_skew(x):
    q1, q2, q3 = np.nanpercentile(x, [25,50,75])
    denom = (q3 - q1) + 1e-9
    return float(((q3 + q1) - 2*q2) / denom)
# you can compute this alongside or instead of moment skew for each feature

# Optionally write filtered view for modeling
if APPLY_FILTERS:
    # Also drop truly empty/constant cols from the filtered file only
    drop_cols = list(set(empty_cols) | set(const_cols))
    cols_keep = [c for c in df.columns if c not in drop_cols]
    df_filt = df.loc[keep_mask, cols_keep].copy()
    df_filt.to_parquet("features_filtered.parquet", index=False)

# Summary JSON (for quick eyeball)
summary = {
    "rows": int(len(df)),
    "tickers": int(df["ticker"].nunique()),
    "dates": int(df["date"].nunique()),
    "date_min": str(df["date"].min().date()),
    "date_max": str(df["date"].max().date()),
    "duplicates_idx": idx_dupes,
    "monotonic_date_issues": len(monotonic_bad),
    "constant_cols": len(const_cols),
    "empty_cols": len(empty_cols),
    "warmup_cutoff": str(warmup_cutoff.date()),
    "coverage_min": COVERAGE_MIN,
    "keep_rate_after_filters": keep_rate,
    "median_missing_pct": float(missing_pct.median()),
    "max_missing_pct": float(missing_pct.max()),
    "mean_outlier_rate_|z|>5": float(pd.Series(outlier_rate).mean()),
    "filtered_file_written": APPLY_FILTERS
}

# 🚦 Hard QC checks — stop if these fail
assert summary["duplicates_idx"] == 0, "Duplicate (date,ticker) rows found."
assert summary["keep_rate_after_filters"] >= 0.85, "Too many rows dropped by filters."
assert summary["constant_cols"] <= 10, "Suspicious number of constant columns."

with open("qc_summary.json","w") as f:
    json.dump(summary, f, indent=2)

print("QC done → qc_summary.json, qc_* CSVs",
      "and features_filtered.parquet" if APPLY_FILTERS else "")

  return np.nanmean(a, axis, out=out, keepdims=keepdims)


QC done → qc_summary.json, qc_* CSVs and features_filtered.parquet


In [8]:
import json, pandas as pd

with open("qc_summary.json") as f:
    s = json.load(f)

print("=== QC SUMMARY ===")
for k in [
    "rows","tickers","dates","date_min","date_max",
    "duplicates_idx","monotonic_date_issues",
    "constant_cols","empty_cols",
    "warmup_cutoff","coverage_min","keep_rate_after_filters",
    "median_missing_pct","max_missing_pct","mean_outlier_rate_|z|>5",
    "filtered_file_written"
]:
    print(f"{k}: {s.get(k)}")

print("\n=== Top 10 most-missing features ===")
print(pd.read_csv("qc_missing_by_feature.csv").head(10))

print("\n=== Top 10 highest outlier rates (|z|>5) ===")
print(pd.read_csv("qc_outlier_rate.csv").head(10))

print("\n=== Constant / Empty columns ===")
try: print(pd.read_csv("qc_constant_cols.csv").head())
except: print("none")
try: print(pd.read_csv("qc_empty_cols.csv").head())
except: print("none")

print("\n=== Drift (largest mean change early→late) ===")
drift = pd.read_csv("qc_drift.csv")
drift["abs_mean_diff"] = drift["mean_diff"].abs()
print(drift.sort_values("abs_mean_diff", ascending=False).head(10))

print("\n=== Filtered file shape ===")
ff = pd.read_parquet("features_filtered.parquet")
print(ff.shape, "rows x cols; dates:", ff['date'].min(), "→", ff['date'].max(), "; tickers:", ff['ticker'].nunique())


=== QC SUMMARY ===
rows: 2331764
tickers: 513
dates: 4931
date_min: 2006-01-03
date_max: 2025-08-08
duplicates_idx: 0
monotonic_date_issues: 0
constant_cols: 3
empty_cols: 3
warmup_cutoff: 2007-02-05
coverage_min: 300
keep_rate_after_filters: 0.9515559893711371
median_missing_pct: 0.005390125244235655
max_missing_pct: 1.0
mean_outlier_rate_|z|>5: 0.03214174650234944
filtered_file_written: True

=== Top 10 most-missing features ===
       Unnamed: 0  missing_pct
0  earnings_yield     1.000000
1             roe     1.000000
2        accruals     1.000000
3        mom_12_1     0.055661
4         mom_12m     0.055661
5         mom_6_1     0.027941
6          mom_6m     0.027941
7      ret_lag_60     0.013640
8      ret_lag_59     0.013420
9      ret_lag_58     0.013200

=== Top 10 highest outlier rates (|z|>5) ===
          Unnamed: 0  outlier_rate
0          vix_close      0.999780
1             sma_20      0.973347
2             sma_50      0.967120
3             atr_14      0.005802
4  

<details>
<summary>📦 Summary — Section 1 (Data & Universe)</summary>

In this section, we **built the full, modeling-ready dataset** by merging historical prices, technical indicators, market context, and fundamentals into a single leakage-controlled feature matrix.  
Key steps included:

- **Data acquisition** — pulled long-term daily OHLCV for the equity universe, hedges, and context symbols, plus quarterly fundamentals from FMP.
- **Feature engineering** — created lagged returns/volatility, momentum metrics, trend filters, ATR, volatility-adjusted momentum, and value/quality factor composites. Fundamentals were forward-filled to daily frequency.
- **Leakage control & scaling** — shifted predictive features by one day, winsorized extreme values, and cross-sectionally z-scored each feature per date.
- **Missing data handling** — conservative imputation for fundamentals and binary masks to record missingness.
- **Quality control** — removed low-coverage dates, early warmup period, constant/empty columns, and duplicate rows; generated QC reports and metadata.

**Outcome:** A clean, consistent, and statistically robust `features_filtered.parquet` file — ready for direct use in **Section 2 (Regime Modeling)** without recomputing or re-fetching any raw data.

</details>


<details>
<summary> Variables to reuse — Section 1 (Data & Universe) </summary>
**Status:** Done. Artifacts are written; QC checks passed; ready to start **Section 2 (Regime Modeling)** using the saved files and globals below.

---

## Canonical Artifacts (reuse, don’t recompute)
- `universe.csv` – S&P 500 tickers (Yahoo-style), excludes hedges/context.
- `raw_prices.parquet` – OHLCV + `adj_close` for equities + hedges + `^VIX` (long format).
- `features.parquet` – lagged, winsorized, cross-sectionally z-scored features (+ *_is_missing masks).
- `features_filtered.parquet` – modeling-ready view (warmup & low-coverage dates removed; empty/constant cols dropped).
- `funda_quarterly.parquet`, `funda_daily.parquet` – fundamentals at quarterly/daily granularity.
- `meta.yaml` – machine-readable metadata (sources, lookback, QC guidance).
- QC reports: `qc_summary.json`, `qc_missing_by_feature.csv`, `qc_coverage_by_date.csv`, `qc_constant_cols.csv`, `qc_empty_cols.csv`, `qc_outlier_rate.csv`, `qc_skew_kurtosis.csv`, `qc_drift.csv`, `qc_recommendations.csv`.

---

## Reusable Globals (organized)
> These exist (or are trivially reloadable) after Section 1. Prefer these over re-deriving.

### Dates / Ranges
- `START_DATE = "2006-01-01"`  
- `END_DATE = datetime.today().strftime("%Y-%m-%d")`

### Universe & Symbols
- `sp500_url` – Wikipedia source for constituents.
- `tickers_raw` → raw symbols from Wikipedia.
- `tickers` → Yahoo-normalized tickers (periods → dashes).
- `hedges` → `["SPY","XLY","XLF","XLV","XLK","XLI","XLE","XLP","XLB","XLU","XLRE"]`
- `context_symbols` → `["^VIX"]`  *(later used as `{"^VIX"}` set in 1.2)*
- `universe` → sorted unique S&P tickers.
- `universe_all` → `universe + hedges + context_symbols`
- `universe_full` → list from `universe.csv` (canonical equities universe for downstream code).

### DataFrames (load-once, reuse)
- `prices` → long OHLCV for `universe_all` (saved as `raw_prices.parquet`).
- `features` → merged technical + context + fundamentals (post-shift, winsorize, z-score) (saved).
- `vix` → `^VIX` close series; `spy` → SPY prices with `spy_ret`, `spy_rv_20`.
- `ctx` → market context by date: `["spy_rv_20","vix_close","breadth"]`.
- `px_daily_all` → `["date","ticker","adj_close"]` for equities universe.
- `dates_all` → unique trading dates.
- `funda_q` → quarterly fundamentals by ticker (saved).
- `funda_daily` → daily forward-filled fundamentals (saved).

### Feature Engineering Toggles / Windows
- `COMPUTE_SLOPE = True`
- `SLOPE_WINDOW = 20`
- `RV_WIN = 20`
- `ATR_WIN = 14`

### Provider / API / Caching
- `PROVIDER = "fmp"`
- `FMP_API_KEY` – from env or prompt (in-memory only).
- `CACHE_DIR = "cache"`
- `CHUNK_TICKERS = 100`, `START_AT = 0`, `SKIP_IF_CACHED = True`
- `MAX_WORKERS = 4`, `RETRY_ATTEMPTS = 5`, `BATCH_SLEEP = (0.2, 0.6)`

### Useful Function Handles
- `to_fmp_symbol(sym)` – Yahoo “-” ↔ FMP “.” class ticker mapping.
- `is_index_like(sym)` – identifies index symbols (e.g., `^VIX`).
- `compute_atr(df, window=ATR_WIN)`
- `vectorized_rolling_slope(y, window=SLOPE_WINDOW)`
- `mom_over_n(adj_close, n)`
- `_tidy_quarterly_df(df)`, `_coalesce_cols(df, cols, default)`
- `_fetch_quarterly_funda_fmp(ticker)` – pulls BS/IS/CF, coalesces variants.
- `fetch_or_load_cached_quarterly(ticker)` – cached loader for fundamentals.
- `cs_standardize_fast(df, cols, lo=0.01, hi=0.99)` – per-date winsorize+z-score.

### Column Sets / Masks (downstream-friendly)
- `non_feature_cols = {"date","ticker","open","high","low","close","adj_close","volume"}`
- `cols_to_shift` – all predictive feature columns actually shifted by 1 bar.
- `cs_cols` – features standardized cross-sectionally (lags, vol, mom, fundamentals, etc.).
- `context_keep_raw = ["spy_rv_20","vix_close","breadth"]`
- *(QC section)*
  - `FEATURES_PATH = "features.parquet"`, `UNIVERSE_PATH = "universe.csv"`
  - `COVERAGE_MIN = 300`
  - `APPLY_FILTERS = True`
  - `Z_OUTLIER = 5.0`, `EARLY_YEARS = 5`, `RECENT_YEARS = 5`
  - `warmup_cutoff` – computed from SPY date series (≈273 trading-day warmup).
  - `keep_mask` – dates ≥ `warmup_cutoff` and with coverage ≥ `COVERAGE_MIN`.
  - *(Note: `features_filtered.parquet` is written using `keep_mask` and pruned columns.)*

---

## What this means for Section 2 (Regimes)
- **Use** `features_filtered.parquet` (or reload `features` and apply `keep_mask`) to build HMM inputs.
- Inputs available out of the box: `spy_rv_20`, `vix_close`, `breadth`, and per-asset returns (`ret_1d`), plus everything in `cs_cols`.
- **No duplicate `(date, ticker)` rows**, **no monotonic issues**; early sparse periods removed by `warmup_cutoff`/`keep_mask`.

---

## Sanity Questions (short answers)
- **“Are we good to go?”** Yes — Section 1 is complete and validated; proceed to regime modeling.
- **“Empty rows?”** Raw OHLCV rows with all NaNs were dropped; the modeling file (`features_filtered.parquet`) is filtered to warmup/coverage and prunes empty/constant columns. Row-level all-NaN feature cases should not remain after these filters.
- **“Add the assertions?”** Already present and passing in QC (`qc_summary.json`). No need to add them again unless you change the pipeline.
</details>

# 2. Regime Modeling

<details> <summary>
Outline (HMM → Regime Labels & Probabilities)</summary>

# 2) Regime Modeling — Updated Outline (HMM → Regime Labels & Probabilities)

## 2.0 Scope & Interfaces
- **Goal:** Assign a daily market regime (Risk-On, Risk-Off, Transition) with posterior probabilities to drive regime-aware weighting, turnover caps, and risk targets in Sections 3–5.
- **Inputs (from Section 1):**
  - `features_filtered.parquet` with **raw** `spy_rv_20`, `vix_close`, `breadth`, and SPY `adj_close` for return computation.
  - Trading calendar (aligned daily business days).
- **Outputs (artifacts):**
  - `regime_labels.parquet`: `date, state_id, p0..pK, regime_label`
  - `regime_labels.csv` (plot-friendly)
  - `regime_plot.png` (timeline with shading), `state_profiles.csv` (state stats)
  - `regime_hmm.pkl` (bundle: scaler + HMM per walk-forward window)
  - `regime_meta.json` (config, state→label map, scaler params, transition matrix, diagnostics)
  - `regime_sensitivity.json` (K/feature/era stability tests)
- **Pass/Fail gates:**
  - Interpretable state profiles (return/vol ordering aligns with labels)
  - Reasonable persistence (median run length > 5–10 days; no chattering)
  - Stable mapping across walk-forward windows (low semantic flip rate)
  - No leakage (all inputs at t known at t)

---

## 2.1 Data Assembly (Market Panel)
- **Series:**
  - SPY **log return** at t (computed from `adj_close`, shifted to avoid leakage if needed).
  - SPY realized volatility (20-day) — from raw `spy_rv_20`.
  - VIX **level** (`vix_close`) and optionally **daily Δ** (t − t-1).
  - Market breadth (% advancers in S&P, known at t).
- **IMPORTANT:** Use **raw** context series from Section 1 (`spy_rv_20`, `vix_close`, `breadth`), **not** cross-sectional z-scored features.
- **Breadth timing:** Confirm that `breadth` reflects t-1 data available at t; if not, shift by 1.
- **Alignment:** Daily business days; merge by `date`; forward-fill only for indicators known at t; drop rows with missing core inputs.
- **Standardization:** Fit `StandardScaler` **per train window** on the raw context features; persist scaler per window (stored in `regime_hmm.pkl`).
- **Sanity checks:**
  - Stationarity proxy (mean/var drift over eras).
  - Outlier handling: no winsorization needed for HMM since we scale raw series per window.
  - Coverage check: ensure no missing dates in test stitching.

---

## 2.2 Model Choice & Configuration
- **Primary:** Gaussian HMM with `covariance_type="full"`; components K ∈ {2,3} (default 3).
- **Alternative (optional):** Student-t HMM, GMM-HMM, Markov-Switching VAR, or Bayesian HMM with sticky priors.
- **Hyperparameters:**
  - `n_components`, `covariance_type`, `n_iter`, `random_state`.
  - Dirichlet priors / sticky transitions to enforce regime persistence.
- **Training protocol:**
  - Train on standardized features in the train window.
  - Multiple random restarts; choose model with highest log-likelihood.
  - If applying **finance recency weighting rule**: optionally weight log-likelihood so recent data has more influence (can be implemented here if desired).

---

## 2.3 State Labeling & Semantics
- **Profile each state:**
  - Mean and vol of SPY returns.
  - Mean VIX level, mean ΔVIX.
  - Mean breadth, tail metrics (5% quantile returns).
- **Label rules:**
  - Highest mean return & lowest vol → **Risk-On**
  - Highest vol & lowest return → **Risk-Off**
  - Remaining state → **Transition**
- **Tie-breakers:** breadth, VIX changes, downside tails.
- **Persist mapping:** Save `state_id → regime_label` per window in `regime_meta.json` so semantics don’t silently drift across walk-forward windows.

---

## 2.4 Smoothing, Persistence & Debounce
- **Posterior smoothing:** Option to use Viterbi most-likely path vs. raw posterior argmax.
- **Debounce parameters:** `MIN_DWELL_DAYS` and `POSTERIOR_THRESH` from `config.yaml`.
- **Gap handling:** Holidays/missing days inherit last known regime; no forward-looking fill.

---

## 2.5 Robustness & Sensitivity
- **K sensitivity:** Run K=2 and K=3; prefer K with clearest separation (return/vol) and healthy dwell-time.
- **Feature sensitivity:** Drop-one/add-one tests (remove VIX, remove breadth, etc.) to check label stability.
- **Era stability:** Compare state profiles and transition matrices pre/post-2015 and during crisis years (e.g., 2020).
- **Bootstrap:** Block bootstrap re-fit; produce confusion matrix for label stability across samples.

---

## 2.6 Diagnostics & QA
- **Plots:**
  - Timeline with regime shading over SPY price & drawdown.
  - Posterior probabilities (stacked area).
  - State return histograms, QQ plots.
  - Transition matrix heatmap, dwell-time distribution.
- **Tables:**
  - State profiles (returns, vol, VIX, breadth, tails).
  - Transition matrix & steady-state distribution.
  - Switch frequency and chattering metrics.
- **Alerts:**
  - Flag if any state has inconsistent semantics (positive mean but top-2 vol, dwell-time < 3 days, mapping flips).

---

## 2.7 Regime-Aware Policy Hooks (Interfaces to Sections 3–5)
- **Weights & turnover caps:** JSON map per regime (e.g., throttle momentum in Risk-Off, upweight quality).
- **Risk targets:** Per-regime vol targets (e.g., 10%/8%/6% for On/Trans/Off).
- **Hedge intensity:** Baseline hedge ratios per regime; pass to RL policy as defaults.
- **Confidence proxy:** Use max posterior or entropy to scale aggressiveness.

---

## 2.8 Walk-Forward Integration
- **Windows:** Match Section 6 (rolling/expanding).
- **Per window:**
  - Fit scaler + HMM on train subset.
  - Apply to test subset only.
  - Save artifacts: `regime_labels_<winid>.parquet`, `regime_hmm.pkl`, `regime_meta.json`.
- **Stitching:** Concatenate per-window outputs into one continuous timeline for backtests.
- **Label stability:** Use saved state→label mapping to avoid regime meaning drift.

---

## 2.9 Forward (Shadow) Mode
- **Daily update:** Apply persisted scaler + HMM to latest t; append to `regime_labels.parquet`.
- **Retrain cadence:** Weekly/bi-weekly.
- **Logging:** Save model hash, posterior, chosen label, features vector.
- **Alerts:** If mapping flips or dwell-time anomaly detected.

---

## 2.10 Configuration & Reproducibility
- **Config keys (`config.yaml`):**
  - Features list for HMM.
  - `n_components`, `MIN_DWELL_DAYS`, `POSTERIOR_THRESH`.
  - Finance recency weighting toggle & decay parameter (if implemented here).
  - Random seed, plot toggles.
- **Serialization:**
  - joblib for model + scaler.
  - JSON for meta (labels, thresholds, diagnostics).
- **Tests:**
  - Deterministic output with fixed seed.
  - No leakage (t-only features).
  - Posterior rows sum to 1; dates strictly increasing.
  - No gaps after stitching.
  - Label semantics test per window.

---

## 2.11 Deliverables Checklist
- `regime_labels.parquet` (+ CSV).
- `regime_hmm.pkl` (model + scaler per window).
- `regime_meta.json` (state→label, scaler params, diagnostics).
- `regime_timeline.png`, `regime_posteriors.png`, `state_profiles.csv`, `transition_matrix.csv`.
- `regime_sensitivity.json` (K/feature/era stability).
- `regime_policy_map.json` (interfaces to Sections 3–5).


---
</details>

In [1]:
# ============================================================
# Section 2.0 — Scope & Interfaces (Regime Modeling bootstrap)
# Builds on Section 1 artifacts; defines config, I/O, sanity checks,
# and prepares the market-level panel stub used by 2.1+ (no HMM yet).
# ============================================================

from __future__ import annotations

import os
import json
import yaml
from dataclasses import dataclass, asdict
from typing import Dict, Any, List
from datetime import datetime

import numpy as np
import pandas as pd

# ─────────────────────────────────────────────────────────────
# 0) Paths & directories (reuse Section 1 outputs)
# ─────────────────────────────────────────────────────────────
FEATURES_PATH_DEFAULT = (
    "features_filtered.parquet"
    if os.path.exists("features_filtered.parquet")
    else "features.parquet"
)
UNIVERSE_PATH = "universe.csv"
ARTIFACT_DIR = "artifacts"
REGIME_DIR = os.path.join(ARTIFACT_DIR, "regimes")
PLOTS_DIR = os.path.join(REGIME_DIR, "plots")

os.makedirs(REGIME_DIR, exist_ok=True)
os.makedirs(PLOTS_DIR, exist_ok=True)

# ─────────────────────────────────────────────────────────────
# 1) Config — defaults + optional override via config.yaml
# Keys are intentionally minimal here; 2.1–2.10 will read them.
# ─────────────────────────────────────────────────────────────
@dataclass
class RegimeConfig:
    # Raw context features for HMM (NOT cross-sectional z-scores)
    hmm_features: List[str]
    include_dvix: bool                 # add ΔVIX feature to panel
    n_components_grid: List[int]       # HMM K sensitivity (e.g., [2,3])
    covariance_type: str               # "full" by default
    random_seed: int
    # Debounce (used later in 2.4)
    min_dwell_days: int
    posterior_thresh: float
    # Optional finance rule: give more weight to recent samples during HMM fit
    recency_weighting: bool
    recency_half_life_days: int
    # I/O
    plots_enabled: bool
    save_csv_alongside_parquet: bool
    features_path: str = FEATURES_PATH_DEFAULT
    universe_path: str = UNIVERSE_PATH
    regime_dir: str = REGIME_DIR
    plots_dir: str = PLOTS_DIR

DEFAULT_CFG = RegimeConfig(
    hmm_features=["spy_rv_20", "vix_close", "breadth"],  # from Section 1 (raw context)
    include_dvix=True,
    n_components_grid=[2, 3],
    covariance_type="full",
    random_seed=42,
    min_dwell_days=3,
    posterior_thresh=0.55,
    recency_weighting=False,           # flip to True if enabling in 2.2
    recency_half_life_days=90,
    plots_enabled=True,
    save_csv_alongside_parquet=True,
)

CONFIG_FILE = "config.yaml"
user_cfg = {}
if os.path.exists(CONFIG_FILE):
    try:
        with open(CONFIG_FILE, "r") as f:
            raw_cfg = yaml.safe_load(f) or {}
            if isinstance(raw_cfg, dict):
                user_cfg = raw_cfg.get("regimes", {}) or {}
    except Exception:
        user_cfg = {}

def merge_cfg(default: RegimeConfig, override: Dict[str, Any]) -> RegimeConfig:
    d = asdict(default)
    for k, v in override.items():
        if k in d and v is not None:
            d[k] = v
    return RegimeConfig(**d)

CFG = merge_cfg(DEFAULT_CFG, user_cfg)

# Persist effective config for traceability
with open(os.path.join(REGIME_DIR, "regime_config_effective.json"), "w") as f:
    json.dump(asdict(CFG), f, indent=2)

# ─────────────────────────────────────────────────────────────
# 2) Load Section 1 artifacts and build the market panel stub
# NOTE: use RAW context features from Section 1 (no CS-z).
# This version auto-detects whether ^VIX exists as a ticker,
# or vix_close/breadth/spy_rv_20 are already on every row.
# ─────────────────────────────────────────────────────────────
assert os.path.exists(CFG.features_path), f"Missing features file: {CFG.features_path}"
fe = pd.read_parquet(CFG.features_path)
fe["date"] = pd.to_datetime(fe["date"], utc=False, errors="coerce")
fe = fe.dropna(subset=["date"]).sort_values(["date", "ticker"])

# Required columns present?
required_cols = {"date", "ticker", "adj_close", "spy_rv_20", "vix_close", "breadth"}
missing = list(required_cols - set(fe.columns))
if missing:
    raise ValueError(f"Required columns missing in features file: {sorted(missing)}")

# SPY must exist for returns
if not (fe["ticker"] == "SPY").any():
    raise ValueError("SPY rows not found in features file; cannot compute spy_ret.")

# Build SPY returns
spy = fe.loc[fe["ticker"] == "SPY", ["date", "adj_close", "spy_rv_20"]].copy()
spy["spy_ret"] = np.log(spy["adj_close"] / spy["adj_close"].shift(1))

# vix_close / breadth / rv_20 may be replicated on every row; prefer unique-by-date view
# If ^VIX rows exist, we can still just take unique-by-date—works for both layouts.
vix_by_date = fe[["date", "vix_close"]].drop_duplicates("date").copy()
breadth_by_date = fe[["date", "breadth"]].drop_duplicates("date").copy()
rv20_by_date = fe[["date", "spy_rv_20"]].drop_duplicates("date").copy()

# Merge market panel (date-level)
mkt = (
    spy[["date", "spy_ret"]]                     # SPY returns
    .merge(rv20_by_date, on="date", how="inner") # realized vol
    .merge(vix_by_date, on="date", how="inner")  # VIX level
    .merge(breadth_by_date, on="date", how="inner")  # breadth
    .sort_values("date")
)

# Optional ΔVIX (level change)
if CFG.include_dvix:
    mkt["dvix"] = mkt["vix_close"].diff()

# Breadth timing guard: uncomment if your breadth is same-day and should be known-at-t
# mkt["breadth"] = mkt["breadth"].shift(1)

# Complete-case rows only (HMM requires no NaNs)
core_cols = ["spy_ret", "spy_rv_20", "vix_close", "breadth"] + (["dvix"] if CFG.include_dvix else [])
mkt = mkt.dropna(subset=core_cols).reset_index(drop=True)

# Save panel (consumed by 2.1/2.2)
panel_path = os.path.join(CFG.regime_dir, "market_panel.parquet")
mkt.to_parquet(panel_path, index=False)
if CFG.save_csv_alongside_parquet:
    mkt.to_csv(os.path.join(CFG.regime_dir, "market_panel.csv"), index=False)

# ─────────────────────────────────────────────────────────────
# 5) Finalize & console summary (with robust date handling)
# ─────────────────────────────────────────────────────────────
def _fmt_date(ts):
    return None if pd.isna(ts) else pd.Timestamp(ts).strftime("%Y-%m-%d")

if mkt.empty:
    # Diagnostics to help you decide if breadth shift is needed, etc.
    core_for_diag = ["spy_ret", "spy_rv_20", "vix_close", "breadth"] + (["dvix"] if CFG.include_dvix else [])
    non_null_counts = {c: int(fe[c].notna().sum()) if c in fe.columns else 0 for c in core_for_diag}
    spy_src = fe.loc[fe["ticker"] == "SPY", ["date", "adj_close", "spy_rv_20"]].assign(
        spy_ret=lambda d: np.log(d["adj_close"] / d["adj_close"].shift(1))
    )
    coverage_diag = {
        "rows_with_spy_ret_and_rv20": int(spy_src.dropna(subset=["spy_ret", "spy_rv_20"]).shape[0]),
        "unique_dates_with_vix": int(vix_by_date.dropna(subset=["vix_close"]).shape[0]),
        "unique_dates_with_breadth": int(breadth_by_date.dropna(subset=["breadth"]).shape[0]),
    }
    raise ValueError(
        "Market panel is empty after merging/dropping NaNs. "
        f"Non-null counts (in features file): {non_null_counts}. "
        f"Coverage by component: {coverage_diag}. "
        "Common fixes: ensure breadth timing (try shifting breadth by 1), "
        "or check for gaps in SPY/VIX/breadth date alignment."
    )

summary = {
    "features_file": CFG.features_path,
    "universe_file": CFG.universe_path,
    "market_panel_rows": int(mkt.shape[0]),
    "market_panel_cols": list(mkt.columns),
    "date_min": _fmt_date(mkt['date'].min()),
    "date_max": _fmt_date(mkt['date'].max()),
    "config_effective": os.path.abspath(os.path.join(CFG.regime_dir, "regime_config_effective.json")),
    "panel_path": os.path.abspath(panel_path),
    "meta_path": os.path.abspath(os.path.join(CFG.regime_dir, 'regime_meta.json')),
}
print(json.dumps(summary, indent=2))

{
  "features_file": "features_filtered.parquet",
  "universe_file": "universe.csv",
  "market_panel_rows": 4657,
  "market_panel_cols": [
    "date",
    "spy_ret",
    "spy_rv_20",
    "vix_close",
    "breadth",
    "dvix"
  ],
  "date_min": "2007-02-06",
  "date_max": "2025-08-08",
  "config_effective": "/content/artifacts/regimes/regime_config_effective.json",
  "panel_path": "/content/artifacts/regimes/market_panel.parquet",
  "meta_path": "/content/artifacts/regimes/regime_meta.json"
}


In [2]:
# ============================================================
# Section 2.1 — Data Assembly (windowed extraction + scaling)
# Uses artifacts from 2.0; prepares X_train/X_test for HMM.
# ============================================================

from __future__ import annotations

import os
import json
from dataclasses import asdict
from typing import Dict, Any, Tuple, List, Optional
from datetime import datetime

import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
import joblib

# Reuse CFG, paths from 2.0
REGIME_DIR = CFG.regime_dir
PLOTS_DIR = CFG.plots_dir
PANEL_PATH = os.path.join(REGIME_DIR, "market_panel.parquet")
META_PATH = os.path.join(REGIME_DIR, "regime_meta.json")

assert os.path.exists(PANEL_PATH), f"Missing market panel: {PANEL_PATH}"
mkt = pd.read_parquet(PANEL_PATH)
mkt["date"] = pd.to_datetime(mkt["date"])
mkt = mkt.sort_values("date").reset_index(drop=True)

# Choose feature list (raw context features only; dvix optional)
hmm_feat_cols = list(CFG.hmm_features)
if CFG.include_dvix and "dvix" not in hmm_feat_cols:
    hmm_feat_cols.append("dvix")

# Safety: ensure columns exist
missing_cols = [c for c in hmm_feat_cols + ["spy_ret"] if c not in mkt.columns]
if missing_cols:
    raise ValueError(f"Missing required columns in market panel: {missing_cols}")

def make_window_masks(df: pd.DataFrame,
                      train_start: str,
                      train_end: str,
                      test_start: str,
                      test_end: str) -> Tuple[pd.Series, pd.Series]:
    d = df["date"]
    train_mask = (d >= pd.to_datetime(train_start)) & (d <= pd.to_datetime(train_end))
    test_mask  = (d >= pd.to_datetime(test_start))  & (d <= pd.to_datetime(test_end))
    return train_mask, test_mask

def build_hmm_matrices(df: pd.DataFrame,
                       features: List[str],
                       train_start: str,
                       train_end: str,
                       test_start: str,
                       test_end: str,
                       scaler_out_path: Optional[str] = None,
                       breadth_shift_days: int = 0) -> Dict[str, Any]:
    """
    Returns:
      {
        'X_train': np.ndarray,
        'X_test': np.ndarray,
        'dates_train': pd.DatetimeIndex,
        'dates_test': pd.DatetimeIndex,
        'scaler_path': str,
        'scaler_mean': list,
        'scaler_scale': list,
        'qc': dict
      }
    """
    dfw = df.copy()

    # Optional breadth shift (if you decide breadth should be known-at-t from t-1)
    if breadth_shift_days != 0 and "breadth" in features:
        dfw["breadth"] = dfw["breadth"].shift(breadth_shift_days)

    # Drop rows with missing features
    dfw = dfw.dropna(subset=features).reset_index(drop=True)

    # Window masks
    tr_mask, te_mask = make_window_masks(dfw, train_start, train_end, test_start, test_end)

    # Slice
    train_df = dfw.loc[tr_mask, ["date"] + features].dropna()
    test_df  = dfw.loc[te_mask, ["date"] + features].dropna()

    if train_df.empty or test_df.empty:
        raise ValueError(
            f"Empty train/test after slicing: "
            f"train({train_start}→{train_end}) rows={train_df.shape[0]}, "
            f"test({test_start}→{test_end}) rows={test_df.shape[0]}. "
            f"Consider adjusting dates or breadth_shift_days."
        )

    # Standardize on TRAIN ONLY; transform TEST with same scaler
    scaler = StandardScaler()
    X_train = scaler.fit_transform(train_df[features].to_numpy(dtype=float))
    X_test  = scaler.transform(test_df[features].to_numpy(dtype=float))

    # Persist scaler per window
    if scaler_out_path is None:
        win_tag = f"{train_start}_{train_end}__{test_start}_{test_end}".replace("-", "")
        scaler_out_path = os.path.join(REGIME_DIR, f"scaler_{win_tag}.joblib")
    joblib.dump(scaler, scaler_out_path)

    # Quick QC: mean/var drift (train vs test) and feature coverage
    qc = {
        "train_rows": int(train_df.shape[0]),
        "test_rows": int(test_df.shape[0]),
        "features": features,
        "train_means": dict(zip(features, np.mean(X_train, axis=0).round(6).tolist())),
        "train_stds": dict(zip(features, np.std(X_train, axis=0, ddof=0).round(6).tolist())),
        "test_means": dict(zip(features, np.mean(X_test, axis=0).round(6).tolist())),
        "test_stds": dict(zip(features, np.std(X_test, axis=0, ddof=0).round(6).tolist())),
    }

    # Save a tiny per-window QC file
    win_qc_path = scaler_out_path.replace(".joblib", "_qc.json")
    with open(win_qc_path, "w") as f:
        json.dump(qc, f, indent=2)

    return {
        "X_train": X_train,
        "X_test": X_test,
        "dates_train": train_df["date"].to_list(),
        "dates_test": test_df["date"].to_list(),
        "scaler_path": scaler_out_path,
        "scaler_mean": scaler.mean_.round(12).tolist(),
        "scaler_scale": scaler.scale_.round(12).tolist(),
        "qc": qc,
    }

# Example: pick a first walk-forward split anchored to your warmup_cutoff
# You can replace these with your Section 6 generator later.
train_start = "2007-02-06"  # day after warmup_cutoff in your QC
train_end   = "2016-12-30"
test_start  = "2017-01-03"
test_end    = mkt["date"].max().strftime("%Y-%m-%d")

window = build_hmm_matrices(
    df=mkt,
    features=hmm_feat_cols,
    train_start=train_start,
    train_end=train_end,
    test_start=test_start,
    test_end=test_end,
    scaler_out_path=None,
    breadth_shift_days=0,  # set to 1 if you confirm breadth must be known-at-t from t-1
)

# Persist a small window manifest so later steps (2.2+) can load it
manifest = {
    "window": {
        "train_start": train_start,
        "train_end": train_end,
        "test_start": test_start,
        "test_end": test_end,
    },
    "features": hmm_feat_cols,
    "scaler_path": window["scaler_path"],
    "n_train": len(window["dates_train"]),
    "n_test": len(window["dates_test"]),
}
with open(os.path.join(REGIME_DIR, "window_manifest.json"), "w") as f:
    json.dump(manifest, f, indent=2)

print(json.dumps({
    "status": "2.1 ready",
    "features_used": hmm_feat_cols,
    "scaler_saved": window["scaler_path"],
    "train_rows": manifest["n_train"],
    "test_rows": manifest["n_test"],
}, indent=2))

{
  "status": "2.1 ready",
  "features_used": [
    "spy_rv_20",
    "vix_close",
    "breadth",
    "dvix"
  ],
  "scaler_saved": "artifacts/regimes/scaler_20070206_20161230__20170103_20250808.joblib",
  "train_rows": 2495,
  "test_rows": 2162
}


In [3]:
!pip install hmmlearn --quiet

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/165.9 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━[0m [32m153.6/165.9 kB[0m [31m4.3 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m165.9/165.9 kB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[?25h

In [4]:
# ============================================================
# Section 2.2 — Model Choice & Configuration (Gaussian HMM)
# Primary: GaussianHMM (full covariance), K in {2,3}
# - Multiple restarts; pick best train log-likelihood
# - Sticky transitions (Dirichlet-like persistence) via diagonal bias
# - Finance recency weighting: time-decayed sub-sequences (ENABLED)
# Reuses:
#   - artifacts/regimes/market_panel.parquet (from 2.0)
#   - artifacts/regimes/window_manifest.json (from 2.1)
#   - scaler_*.joblib (from 2.1)
# Outputs:
#   - artifacts/regimes/hmm_best.pkl (joblib bundle: model + meta)
#   - artifacts/regimes/hmm_kgrid.json (scores by K)
# ============================================================

from __future__ import annotations

import os
import json
from typing import Dict, Any, List, Tuple
from datetime import datetime

import numpy as np
import pandas as pd
import joblib

from hmmlearn.hmm import GaussianHMM

# Reuse config and paths from 2.0 / 2.1
REGIME_DIR = CFG.regime_dir
PANEL_PATH = os.path.join(REGIME_DIR, "market_panel.parquet")
MANIFEST_PATH = os.path.join(REGIME_DIR, "window_manifest.json")
META_PATH = os.path.join(REGIME_DIR, "regime_meta.json")

assert os.path.exists(PANEL_PATH), f"Missing market panel: {PANEL_PATH}"
assert os.path.exists(MANIFEST_PATH), f"Missing window manifest: {MANIFEST_PATH}"

with open(MANIFEST_PATH, "r") as f:
    MAN = json.load(f)

mkt = pd.read_parquet(PANEL_PATH).sort_values("date").reset_index(drop=True)
mkt["date"] = pd.to_datetime(mkt["date"])

scaler = joblib.load(MAN["scaler_path"])
features = MAN["features"]
assert all(c in mkt.columns for c in features), f"Panel missing features: {set(features) - set(mkt.columns)}"

# Train/test windows
train_start = pd.to_datetime(MAN["window"]["train_start"])
train_end   = pd.to_datetime(MAN["window"]["train_end"])
test_start  = pd.to_datetime(MAN["window"]["test_start"])
test_end    = pd.to_datetime(MAN["window"]["test_end"])

train_df = mkt[(mkt["date"] >= train_start) & (mkt["date"] <= train_end)][["date"] + features].dropna().reset_index(drop=True)
test_df  = mkt[(mkt["date"] >= test_start)  & (mkt["date"] <= test_end)][["date"] + features].dropna().reset_index(drop=True)

X_train = scaler.transform(train_df[features].to_numpy(dtype=float))
X_test  = scaler.transform(test_df[features].to_numpy(dtype=float))
dates_train = train_df["date"].to_numpy()
dates_test  = test_df["date"].to_numpy()

# ─────────────────────────────────────────────────────────────
# Hyperparameters — Test run now, bump for real run (marked TOCHANGE)
# ─────────────────────────────────────────────────────────────
N_COMPONENTS_GRID = [2]   # TOCHANGE: [3] or [2,3] for real run
N_ITER = 200              # TOCHANGE: 1000 for real run
N_INIT = 2                # TOCHANGE: 10 for real run
RANDOM_STATE = 42
COVARIANCE_TYPE = "full"
TOL = 1e-3                # TOCHANGE: 1e-4 for real run

# Sticky transitions strength (Dirichlet-like, diagonal blend post-fit)
# Larger -> stickier regimes (longer dwell times)
LAMBDA_STICK = 0.15       # TOCHANGE: 0.30–0.50 for real run

# Finance recency weighting — ENABLED
APPLY_RECENCY = True
HALF_LIFE_DAYS = 756      # ~3 years; keeps 2008 meaningful
# TOCHANGE: try 504 (~2y, more recency), 756 (~3y, balanced), 1260 (~5y, less recency)

EPSILON_FLOOR = 0.10      # ensures old episodes never get <10% of peak weight
# TOCHANGE: 0.05–0.15 depending on how protective you want to be

# For recency sampler (still lightweight in test; scale for real run)
SEG_LEN = 60              # TOCHANGE: 90–120 for real run
N_SEGMENTS = 80           # TOCHANGE: 200–400 for real run

# ─────────────────────────────────────────────────────────────
# Utilities
# ─────────────────────────────────────────────────────────────
def _diag_sticky_blend(transmat: np.ndarray, lam: float) -> np.ndarray:
    k = transmat.shape[0]
    T = (1.0 - lam) * transmat + lam * np.eye(k)
    T = T / T.sum(axis=1, keepdims=True)
    return T

def _build_time_decay_weights(dates: np.ndarray, half_life_days: int) -> np.ndarray:
    t = np.array([pd.Timestamp(d).toordinal() for d in dates], dtype=float)
    age = (t.max() - t)  # newer dates -> smaller age
    decay = np.log(2) / max(1, half_life_days)
    w = np.exp(-decay * age)
    return w / (w.sum() + 1e-12)

def _sample_time_weighted_subsequences(
    X: np.ndarray,
    dates: np.ndarray,
    seg_len: int,
    n_segments: int,
    half_life_days: int,
    random_state: int,
) -> Tuple[np.ndarray, List[int]]:
    rng = np.random.RandomState(random_state)
    n = X.shape[0]
    if n < seg_len:
        return X.copy(), [n]
    ends = np.arange(seg_len - 1, n)
    p = _build_time_decay_weights(dates[ends], half_life_days)
    p = np.maximum(p, EPSILON_FLOOR * p.max())
    p = p / p.sum()

    chosen = rng.choice(ends, size=min(n_segments, len(ends)), replace=True, p=p)
    lengths, chunks = [], []
    for e in chosen:
        s = e - (seg_len - 1)
        chunk = X[s:e+1]
        chunks.append(chunk)
        lengths.append(len(chunk))
    X_concat = np.vstack(chunks)
    return X_concat, lengths

# ─────────────────────────────────────────────────────────────
# Train HMMs; pick best by train log-likelihood
# ─────────────────────────────────────────────────────────────
best = {"score": -np.inf, "model": None, "k": None, "seed": None, "train_lengths": None, "fit_mode": None}
results = []

for k in N_COMPONENTS_GRID:
    for r in range(N_INIT):
        seed = RANDOM_STATE + r

        if APPLY_RECENCY:
            X_fit, lengths = _sample_time_weighted_subsequences(
                X_train, dates_train,
                seg_len=SEG_LEN,
                n_segments=N_SEGMENTS,
                half_life_days=HALF_LIFE_DAYS,
                random_state=seed,
            )
            fit_mode = "recency"
        else:
            X_fit, lengths = X_train, [len(X_train)]
            fit_mode = "plain"

        # Init model
        model = GaussianHMM(
            n_components=k,
            covariance_type=COVARIANCE_TYPE,
            n_iter=N_ITER,
            tol=TOL,
            random_state=seed,
            verbose=False,
            # IMPORTANT: do not include 's' or 't' here, otherwise your custom
            # startprob_/transmat_ get overwritten on init.
            init_params="mc",      # means, covars only
            params="stmc",         # learn startprob, transmat, means, covars
        )

        # Sticky-biased initialization (near-diagonal)
        trans0 = np.full((k, k), (1.0 - 0.90) / max(1, k - 1))
        np.fill_diagonal(trans0, 0.90)
        model.transmat_ = trans0

        # Uniform start probabilities
        model.startprob_ = np.full(k, 1.0 / k)

        # Fit
        model.fit(X_fit, lengths=lengths)

        # Post-fit sticky blend (Dirichlet-like)
        model.transmat_ = _diag_sticky_blend(model.transmat_, LAMBDA_STICK)

        score = model.score(X_train)  # comparable scoring on original train sequence

        results.append({
            "k": k,
            "seed": seed,
            "score": float(score),
            "fit_mode": fit_mode,
            "transmat": model.transmat_.tolist(),
        })

        if score > best["score"]:
            best.update({"score": score, "model": model, "k": k, "seed": seed, "train_lengths": lengths, "fit_mode": fit_mode})

# Save grid scores
with open(os.path.join(REGIME_DIR, "hmm_kgrid.json"), "w") as f:
    json.dump({"results": results, "chosen": {"k": best["k"], "seed": best["seed"], "score": float(best["score"]), "fit_mode": best["fit_mode"]}}, f, indent=2)

# Persist best model bundle
bundle = {
    "model": best["model"],
    "k": best["k"],
    "random_state": best["seed"],
    "features": features,
    "scaler_path": MAN["scaler_path"],
    "train_dates": [str(d) for d in dates_train],
    "test_dates": [str(d) for d in dates_test],
    "fit_mode": best["fit_mode"],
    "sticky_lambda": LAMBDA_STICK,
    "n_iter": N_ITER,
    "n_init": N_INIT,
    "tol": TOL,
    "covariance_type": COVARIANCE_TYPE,
    "recency_weighting": True,  # enabled
    "recency_half_life_days": HALF_LIFE_DAYS,
    "recency_seg_len": SEG_LEN,         # TOCHANGE: 90–120
    "recency_n_segments": N_SEGMENTS,   # TOCHANGE: 200–400
    "created_at": datetime.utcnow().isoformat() + "Z",
    "recency_epsilon_floor": EPSILON_FLOOR
}
joblib.dump(bundle, os.path.join(REGIME_DIR, "hmm_best.pkl"))

joblib.dump(bundle, os.path.join(REGIME_DIR, "regime_hmm.pkl"))  # alias for checklist

print(json.dumps({
    "status": "2.2 trained",
    "chosen_k": best["k"],
    "fit_mode": best["fit_mode"],
    "train_score": float(best["score"]),
    "n_iter": N_ITER,
    "n_init": N_INIT,
    "sticky_lambda": LAMBDA_STICK,
    "recency_weighting": True,
    "half_life_days": HALF_LIFE_DAYS,
    "seg_len": SEG_LEN,
    "n_segments": N_SEGMENTS,
}, indent=2))

{
  "status": "2.2 trained",
  "chosen_k": 2,
  "fit_mode": "recency",
  "train_score": -8178.605247677998,
  "n_iter": 200,
  "n_init": 2,
  "sticky_lambda": 0.15,
  "recency_weighting": true,
  "half_life_days": 756,
  "seg_len": 60,
  "n_segments": 80
}


In [5]:
# Quick peek at effective sampling weights (fixed timedelta math)
ends = np.arange(SEG_LEN - 1, len(dates_train))
end_dates = pd.to_datetime(dates_train[ends])

pp = _build_time_decay_weights(end_dates, HALF_LIFE_DAYS)
pp = np.maximum(pp, EPSILON_FLOOR * pp.max())
pp = pp / pp.sum()

# Age in *days* relative to the most recent end_date
max_date = end_dates.max()
ages_days = (max_date - end_dates) / np.timedelta64(1, "D")  # float days

# Weighted mean age (how "old" the typical sampled segment end is)
w_mean_age_days = float(np.sum(ages_days * pp))

# 95% weight age: age threshold below which 95% of weight lies
order = np.argsort(ages_days)                  # youngest → oldest
cumw = np.cumsum(pp[order])
w95_idx = np.searchsorted(cumw, 0.95)
w95_age_days = float(ages_days[order][min(w95_idx, len(ages_days)-1)])

print({
    "weights_min": float(pp.min()),
    "weights_max": float(pp.max()),
    "weighted_mean_age_days": w_mean_age_days,
    "weighted_p95_age_days": w95_age_days,
})

{'weights_min': 0.0001335955016895934, 'weights_max': 0.001335955016895934, 'weighted_mean_age_days': 1017.4276078929489, 'weighted_p95_age_days': 2990.0}


In [6]:
# ============================================================
# Section 2.3 — State Labeling & Semantics
# - Score posteriors for all dates
# - Profile states on TRAIN window only (no peeking)
# - Label states: Risk-On (↑ret, ↓vol), Risk-Off (↓ret, ↑vol), Transition (rest)
# - Persist labels, posteriors, and profiles
# Outputs:
#   artifacts/regimes/regime_labels.parquet (date, state_id, p0..pK, regime_label)
#   artifacts/regimes/state_profiles.csv
#   artifacts/regimes/regime_meta.json (updated label map)
# ============================================================

from __future__ import annotations
import os, json
from typing import Dict, Any
import numpy as np
import pandas as pd
import joblib
from datetime import datetime

REGIME_DIR = CFG.regime_dir
PANEL_PATH = os.path.join(REGIME_DIR, "market_panel.parquet")
MANIFEST_PATH = os.path.join(REGIME_DIR, "window_manifest.json")
BUNDLE_PATH = os.path.join(REGIME_DIR, "hmm_best.pkl")
META_PATH = os.path.join(REGIME_DIR, "regime_meta.json")

# Load artifacts
mkt = pd.read_parquet(PANEL_PATH).sort_values("date").reset_index(drop=True)
mkt["date"] = pd.to_datetime(mkt["date"])
with open(MANIFEST_PATH, "r") as f:
    MAN = json.load(f)
bundle = joblib.load(BUNDLE_PATH)
model = bundle["model"]
features = bundle["features"]
scaler = joblib.load(bundle["scaler_path"])

# Prepare matrices for ALL dates (but label semantics computed on TRAIN ONLY)
X_all = scaler.transform(mkt[features].to_numpy(dtype=float))
dates_all = mkt["date"].to_numpy()

# Score posteriors
post = model.predict_proba(X_all)  # shape: (T, K)
states_argmax = post.argmax(axis=1)
K = post.shape[1]

# Train/test masks
train_start = pd.to_datetime(MAN["window"]["train_start"])
train_end   = pd.to_datetime(MAN["window"]["train_end"])
test_start  = pd.to_datetime(MAN["window"]["test_start"])
test_end    = pd.to_datetime(MAN["window"]["test_end"])
train_mask = (mkt["date"] >= train_start) & (mkt["date"] <= train_end)
test_mask  = (mkt["date"] >= test_start)  & (mkt["date"] <= test_end)

# Helper: posterior-weighted stats on TRAIN window
def weighted_mean(x, w):
    w = np.asarray(w, dtype=float)
    x = np.asarray(x, dtype=float)
    s = w.sum()
    return float((x * w).sum() / s) if s > 0 else np.nan

def weighted_std(x, w):
    mu = weighted_mean(x, w)
    w = np.asarray(w, dtype=float)
    x = np.asarray(x, dtype=float)
    s = w.sum()
    if s <= 1: return np.nan
    var = ((w * (x - mu)**2).sum()) / s
    return float(np.sqrt(max(var, 0.0)))

def weighted_quantile(x, w, q=0.05):
    x = np.asarray(x, dtype=float)
    w = np.asarray(w, dtype=float)
    order = np.argsort(x)
    x_sorted, w_sorted = x[order], w[order]
    cw = np.cumsum(w_sorted)
    if cw[-1] == 0: return np.nan
    return float(x_sorted[np.searchsorted(cw, q * cw[-1])])

# Compute per-state profiles on TRAIN
train_ix = np.where(train_mask.values)[0]
has_dvix = "dvix" in mkt.columns
profiles = []
for s in range(K):
    w = post[train_ix, s]
    if w.sum() == 0:
        mu_ret = mu_vol = mu_vix = mu_brd = q05 = np.nan
        sd_ret = np.nan
    else:
        mu_ret = weighted_mean(mkt.loc[train_mask, "spy_ret"].values, w)
        sd_ret = weighted_std(mkt.loc[train_mask, "spy_ret"].values, w)
        mu_vol = weighted_mean(mkt.loc[train_mask, "spy_rv_20"].values, w)
        mu_vix = weighted_mean(mkt.loc[train_mask, "vix_close"].values, w)
        mu_brd = weighted_mean(mkt.loc[train_mask, "breadth"].values, w)
        mu_dvix = weighted_mean(mkt.loc[train_mask, "dvix"].values, w) if has_dvix else np.nan
        q05    = weighted_quantile(mkt.loc[train_mask, "spy_ret"].values, w, q=0.05)

    profiles.append({
        "state_id": s,
        "ret_mean": mu_ret,
        "ret_std": sd_ret,
        "rv20_mean": mu_vol,
        "vix_mean": mu_vix,
        "dvix_mean": mu_dvix if has_dvix else np.nan,
        "breadth_mean": mu_brd,
        "ret_q05": q05,
    })

prof_df = pd.DataFrame(profiles)

# Labeling rules (train window only, no peeking):
#  - Risk-On: highest mean return, lowest vol (tie-breakers help if ambiguous)
#  - Risk-Off: highest vol, lowest mean return (tie-breakers help if ambiguous)
#  - Transition: whichever state is not assigned above
# Primary ranks
ret_rank = prof_df["ret_mean"].rank(method="dense")                        # higher = better
# For clarity: choose Risk-Off by *highest* rv20 (vol spike)
risk_off_id = int(prof_df["rv20_mean"].idxmax())                           # highest vol
risk_on_id  = int(ret_rank.idxmax())                                       # highest return

# Tie-breaker refinement (only if they collide or look ambiguous)
# Risk-On tie-breakers: breadth↑, VIX↓, tail q05↑
# Risk-Off tie-breakers: vol↑, ret↓, ΔVIX↑, breadth↓
def _best_risk_on_row(df: pd.DataFrame) -> int:
    score = (
        df["breadth_mean"].fillna(-1.0).rank(method="dense", ascending=False) +
        df["vix_mean"].fillna(np.inf).rank(method="dense", ascending=True) +
        df["ret_q05"].fillna(-np.inf).rank(method="dense", ascending=False)
    )
    return int(score.idxmax())

def _best_risk_off_row(df: pd.DataFrame) -> int:
    score = (
        df["rv20_mean"].fillna(-np.inf).rank(method="dense", ascending=False) +
        df["ret_mean"].fillna(np.inf).rank(method="dense", ascending=True) +
        (df["dvix_mean"] if "dvix_mean" in df.columns else pd.Series(0.0, index=df.index)).fillna(0.0).rank(method="dense", ascending=False) +
        df["breadth_mean"].fillna(np.inf).rank(method="dense", ascending=True)
    )
    return int(score.idxmax())

if risk_on_id == risk_off_id:
    risk_on_id  = _best_risk_on_row(prof_df)
    risk_off_id = _best_risk_off_row(prof_df)
    # Safety: if still colliding (extremely rare), force Risk-Off = highest vol, Risk-On = highest return
    if risk_on_id == risk_off_id:
        risk_off_id = int(prof_df["rv20_mean"].idxmax())
        risk_on_id  = int(prof_df["ret_mean"].idxmax())

# Final label map
label_map = {risk_on_id: "Risk-On", risk_off_id: "Risk-Off"}
for s in range(K):
    if s not in label_map:
        label_map[s] = "Transition"

# ---------- Build outputs USING the FINAL label_map ----------
out = mkt[["date"]].copy()
out["state_id"] = post.argmax(axis=1)
for s in range(K):
    out[f"p{s}"] = post[:, s]
out["regime_label"] = out["state_id"].map(label_map)

out_path = os.path.join(REGIME_DIR, "regime_labels.parquet")
out.to_parquet(out_path, index=False)
out.to_csv(os.path.join(REGIME_DIR, "regime_labels.csv"), index=False)

prof_df.to_csv(os.path.join(REGIME_DIR, "state_profiles.csv"), index=False)

# ---------- Update meta WITH the FINAL label_map ----------
if os.path.exists(META_PATH):
    with open(META_PATH, "r") as f:
        meta = json.load(f)
else:
    meta = {}

meta.setdefault("created_at", datetime.utcnow().isoformat() + "Z")
meta.setdefault("config", {})
meta.setdefault("diagnostics", {})
meta["diagnostics"]["state_profiles_train"] = prof_df.to_dict(orient="records")
meta["state_label_map"] = {int(k): v for k, v in label_map.items()}
meta.setdefault("features_used", features)
meta["notes"] = meta.get("notes", []) + [
    "State labeling computed on train window only (no peeking).",
    "Risk-On: highest mean ret & lowest vol; Risk-Off: highest vol & lowest ret; else Transition.",
    "Tie-breakers: breadth↑, VIX↓, tail q05↑ (Risk-On); vol↑, ret↓, ΔVIX↑, breadth↓ (Risk-Off).",
]

with open(META_PATH, "w") as f:
    json.dump(meta, f, indent=2)

print(json.dumps({
    "status": "2.3 labeled",
    "k": K,
    "label_map": label_map,
    "profiles_path": os.path.join(REGIME_DIR, "state_profiles.csv"),
    "labels_path": out_path,
}, indent=2))

# Posteriors sanity
assert np.allclose(post.sum(axis=1), 1.0, atol=1e-6), "Posterior rows must sum to 1."
assert not pd.isna(out["regime_label"]).any(), "All states must map to a regime label."

{
  "status": "2.3 labeled",
  "k": 2,
  "label_map": {
    "0": "Risk-On",
    "1": "Risk-Off"
  },
  "profiles_path": "artifacts/regimes/state_profiles.csv",
  "labels_path": "artifacts/regimes/regime_labels.parquet"
}


In [7]:
# ============================================================
# Section 2.4 — Smoothing, Persistence & Debounce
# - Option: Viterbi most-likely path vs. posterior argmax
# - Debounce: POSTERIOR_THRESH and MIN_DWELL_DAYS from config
# - Gap handling: inherit last known regime (dates already market-days)
# Reuses:
#   - artifacts/regimes/market_panel.parquet (2.0)
#   - artifacts/regimes/window_manifest.json (2.1)
#   - artifacts/regimes/hmm_best.pkl (2.2)
#   - artifacts/regimes/regime_labels.parquet (2.3)
# Outputs:
#   - artifacts/regimes/regime_labels.parquet (updated with *_smoothed cols)
#   - artifacts/regimes/regime_meta.json (updated diagnostics)
#   - console summary of dwell-time stats
# ============================================================

from __future__ import annotations
import os, json
from typing import List, Dict, Any, Tuple
from datetime import datetime
import numpy as np
import pandas as pd
import joblib

REGIME_DIR     = CFG.regime_dir
PANEL_PATH     = os.path.join(REGIME_DIR, "market_panel.parquet")
LABELS_PATH    = os.path.join(REGIME_DIR, "regime_labels.parquet")
META_PATH      = os.path.join(REGIME_DIR, "regime_meta.json")
BUNDLE_PATH    = os.path.join(REGIME_DIR, "hmm_best.pkl")

assert os.path.exists(PANEL_PATH),  f"Missing market panel: {PANEL_PATH}"
assert os.path.exists(LABELS_PATH), f"Missing labels from 2.3: {LABELS_PATH}"
assert os.path.exists(BUNDLE_PATH), f"Missing HMM bundle: {BUNDLE_PATH}"

# --- Config knobs (extend 2.0 config if not present) ---
P_THRESH   = getattr(CFG, "posterior_thresh", 0.55) # TOCHANGE: consider 0.60–0.65 for a stricter switch confirmation.
MIN_DWELL  = getattr(CFG, "min_dwell_days", 3) #  # TOCHANGE: consider 5–10 to further reduce chattering.
SMOOTH_MTH = getattr(CFG, "smoothing_method", "posterior")  # "posterior" | "viterbi" # TOCHANGE: try "viterbi" for the real run and compare dwell-time stats and chattering.

# --- Load artifacts ---
labels = pd.read_parquet(LABELS_PATH).sort_values("date").reset_index(drop=True)
bundle = joblib.load(BUNDLE_PATH)
model  = bundle["model"]
features = bundle["features"]
scaler  = joblib.load(bundle["scaler_path"])

mkt = pd.read_parquet(PANEL_PATH).sort_values("date").reset_index(drop=True)
mkt["date"] = pd.to_datetime(mkt["date"])

# sanity
assert np.array_equal(labels["date"].values, mkt["date"].values), "Date alignment mismatch between labels and market panel."

# --- Choose base path: Viterbi vs posterior argmax ---
# We need posteriors for thresholding either way; for Viterbi we re-score X_all.
X_all = scaler.transform(mkt[features].to_numpy(dtype=float))
post  = model.predict_proba(X_all)  # (T, K)
K     = post.shape[1]

if SMOOTH_MTH.lower() == "viterbi":
    base_states = model.predict(X_all)     # most-likely state path
else:
    base_states = post.argmax(axis=1)      # raw posterior argmax (already in 2.3)

# --- Debounce step 1: posterior threshold gating (no switch if low confidence) ---
maxp = post.max(axis=1)
debounce_states = np.array(base_states, dtype=int)
for i in range(1, len(debounce_states)):
    if debounce_states[i] != debounce_states[i-1]:
        # require sufficient posterior confidence on the *new* state
        if maxp[i] < P_THRESH:
            debounce_states[i] = debounce_states[i-1]

# --- Debounce step 2: enforce minimum dwell time by collapsing short runs ---
def _runs(state_series: np.ndarray) -> List[Tuple[int,int,int]]:
    """Return list of (start_idx, end_idx_inclusive, state) runs."""
    out = []
    s = 0
    cur = state_series[0]
    for i in range(1, len(state_series)):
        if state_series[i] != cur:
            out.append((s, i-1, cur))
            s = i
            cur = state_series[i]
    out.append((s, len(state_series)-1, cur))
    return out

def _collapse_short_runs(states: np.ndarray, min_len: int, post: np.ndarray) -> np.ndarray:
    arr = states.copy()
    changed = True
    # iterate until stable (collapsing can merge adjacent runs)
    while changed:
        changed = False
        runs = _runs(arr)
        for (s, e, st) in runs:
            run_len = e - s + 1
            if run_len < min_len:
                # Candidate neighbors: previous and next, choose higher avg posterior over this segment
                prev_state = runs[runs.index((s, e, st))-1][2] if runs.index((s, e, st)) > 0 else None
                next_state = runs[runs.index((s, e, st))+1][2] if runs.index((s, e, st)) < len(runs)-1 else None

                # If no neighbors (degenerate), skip
                if prev_state is None and next_state is None:
                    continue

                # Compute average posterior for neighbors over the short segment
                best_neighbor = None
                best_score = -np.inf
                for cand in [prev_state, next_state]:
                    if cand is None:
                        continue
                    score = float(post[s:e+1, cand].mean())
                    if score > best_score:
                        best_score = score
                        best_neighbor = cand
                # Relabel the short run to best neighbor
                arr[s:e+1] = best_neighbor
                changed = True
                break  # restart since runs have changed
    return arr

smoothed_states = _collapse_short_runs(debounce_states, MIN_DWELL, post)

# --- Map to labels using the semantics from 2.3 (state_label_map) ---
# read label map from meta
if os.path.exists(META_PATH):
    with open(META_PATH, "r") as f:
        meta = json.load(f)
else:
    meta = {}

state_label_map = meta.get("state_label_map", None)
if state_label_map is None:
    # fallback to identity names if meta missing (shouldn't happen)
    state_label_map = {int(s): f"State{s}" for s in range(K)}

# Update labels DataFrame with smoothed outputs
labels["state_id_smoothed"] = smoothed_states
for s in range(K):
    # keep original p0..pK as-is from 2.3; they reflect the raw model posteriors
    if f"p{s}" not in labels.columns:
        labels[f"p{s}"] = post[:, s]

labels["regime_label_smoothed"] = labels["state_id_smoothed"].map({int(k): v for k, v in state_label_map.items()})

# --- Dwell-time diagnostics ---
def _dwell_stats(states: np.ndarray) -> pd.DataFrame:
    rr = _runs(states)
    return pd.DataFrame({
        "state_id": [st for (_,_,st) in rr],
        "run_len":  [e - s + 1 for (s,e,_) in rr],
    }).groupby("state_id").agg(
        median_run_length=("run_len", "median"),
        mean_run_length  =("run_len", "mean"),
        n_runs           =("run_len", "count"),
        max_run_length   =("run_len", "max"),
    ).reset_index()

dwell_df = _dwell_stats(labels["state_id_smoothed"].to_numpy())

# --- Save updated labels back to disk ---
labels.to_parquet(LABELS_PATH, index=False)
labels.to_csv(LABELS_PATH.replace(".parquet", ".csv"), index=False)

# --- Update regime_meta.json diagnostics & config snapshot ---
meta.setdefault("diagnostics", {})
meta["diagnostics"]["smoothing"] = {
    "method": SMOOTH_MTH,
    "posterior_thresh": P_THRESH,
    "min_dwell_days": MIN_DWELL,
    "dwell_stats": dwell_df.to_dict(orient="records"),
}
meta.setdefault("notes", [])
meta["notes"] += [
    "2.4 smoothing applied with debounce (posterior threshold + min dwell).",
    "If method='viterbi', base path is Viterbi; else posterior argmax.",
]
# de-dup notes
meta["notes"] = list(dict.fromkeys(meta["notes"]))

with open(META_PATH, "w") as f:
    json.dump(meta, f, indent=2)

print(json.dumps({
    "status": "2.4 smoothed",
    "method": SMOOTH_MTH,
    "posterior_thresh": P_THRESH,
    "min_dwell_days": MIN_DWELL,
    "k": K,
    "median_dwell_by_state": {
        int(r["state_id"]): float(r["median_run_length"]) for r in dwell_df.to_dict(orient="records")
    },
    "labels_path": LABELS_PATH
}, indent=2))

{
  "status": "2.4 smoothed",
  "method": "posterior",
  "posterior_thresh": 0.55,
  "min_dwell_days": 3,
  "k": 2,
  "median_dwell_by_state": {
    "0": 33.5,
    "1": 12.0
  },
  "labels_path": "artifacts/regimes/regime_labels.parquet"
}


In [10]:
# ============================================================
# Section 2.5 — Robustness & Sensitivity
# - K sensitivity: K ∈ {2,3}
# - Feature sensitivity: drop-one/add-one variants
# - Era stability: pre/post-2015 and 2020 crisis
# - Bootstrap: block bootstrap label stability
# Reuses:
#   - artifacts/regimes/market_panel.parquet (2.0)
#   - artifacts/regimes/window_manifest.json (2.1)
#   - scaler_*.joblib (2.1)
#   - artifacts/regimes/hmm_best.pkl (2.2 baseline)
#   - artifacts/regimes/regime_labels.parquet (2.3 baseline labels)
# Outputs:
#   - artifacts/regimes/regime_sensitivity.json
# Notes:
#   This is a light test pass; heavier settings are tagged with # TOCHANGE
# ============================================================

from __future__ import annotations
import os, json
from typing import Dict, Any, List, Tuple
from datetime import datetime
import numpy as np
import pandas as pd
import joblib
from hmmlearn.hmm import GaussianHMM

REGIME_DIR  = CFG.regime_dir
PANEL_PATH  = os.path.join(REGIME_DIR, "market_panel.parquet")
MAN_PATH    = os.path.join(REGIME_DIR, "window_manifest.json")
BUNDLE_PATH = os.path.join(REGIME_DIR, "hmm_best.pkl")
LABELS_PATH = os.path.join(REGIME_DIR, "regime_labels.parquet")
OUT_PATH    = os.path.join(REGIME_DIR, "regime_sensitivity.json")

assert os.path.exists(PANEL_PATH) and os.path.exists(MAN_PATH) and os.path.exists(BUNDLE_PATH)
mkt = pd.read_parquet(PANEL_PATH).sort_values("date").reset_index(drop=True)
mkt["date"] = pd.to_datetime(mkt["date"])

with open(MAN_PATH, "r") as f:
    MAN = json.load(f)

bundle   = joblib.load(BUNDLE_PATH)
features_base = bundle["features"]
scaler   = joblib.load(bundle["scaler_path"])
k_base   = int(bundle["k"])
recency  = bool(bundle.get("recency_weighting", True))
hl_days  = int(bundle.get("recency_half_life_days", 756))
seg_len  = int(bundle.get("recency_seg_len", 60))
n_segs   = int(bundle.get("recency_n_segments", 80))
tol      = float(bundle.get("tol", 1e-3))
n_iter   = int(bundle.get("n_iter", 200))         # TOCHANGE: 1000 for real run
n_init   = int(bundle.get("n_init", 2))           # TOCHANGE: 10 for real run
covtype  = bundle.get("covariance_type", "full")
lam_stick= float(bundle.get("sticky_lambda", 0.15))  # TOCHANGE: 0.30–0.50 for real run
rand0    = int(bundle.get("random_state", 42))

train_start = pd.to_datetime(MAN["window"]["train_start"])
train_end   = pd.to_datetime(MAN["window"]["train_end"])
test_start  = pd.to_datetime(MAN["window"]["test_start"])
test_end    = pd.to_datetime(MAN["window"]["test_end"])

mask_train = (mkt["date"] >= train_start) & (mkt["date"] <= train_end)
mask_test  = (mkt["date"] >= test_start)  & (mkt["date"] <= test_end)

# ⬇️ ADD: tiny helper to fit a local scaler on the train (or era) subset
from sklearn.preprocessing import StandardScaler

def _fit_local_scaler(feats: List[str], subset_mask: pd.Series) -> StandardScaler:
    df = mkt.loc[subset_mask, feats].dropna()
    scaler_local = StandardScaler()
    scaler_local.fit(df.to_numpy(dtype=float))
    return scaler_local

def _diag_sticky_blend(T: np.ndarray, lam: float) -> np.ndarray:
    k = T.shape[0]
    out = (1.0 - lam) * T + lam * np.eye(k)
    return out / out.sum(axis=1, keepdims=True)

def _build_time_decay_weights(dates: np.ndarray, half_life_days: int) -> np.ndarray:
    t = np.array([pd.Timestamp(d).toordinal() for d in dates], dtype=float)
    age = (t.max() - t)
    decay = np.log(2) / max(1, half_life_days)
    w = np.exp(-decay * age)
    return w / (w.sum() + 1e-12)

# Canonical recency-sampling params (align with 2.2 bundle keys)
REC_SEG_LEN    = int(bundle.get("recency_seg_len", 60))
REC_N_SEGMENTS = int(bundle.get("recency_n_segments", 80))
REC_HALF_LIFE  = int(bundle.get("recency_half_life_days", 756))
REC_EPS        = float(bundle.get("recency_epsilon_floor", 0.10))  # matches 2.2 key

def _sample_time_weighted_subsequences(
    X: np.ndarray,
    dates: np.ndarray,
    seg_len: int = REC_SEG_LEN,
    n_segments: int = REC_N_SEGMENTS,
    half_life_days: int = REC_HALF_LIFE,
    seed: int = 42,
):
    rng = np.random.RandomState(seed)
    n = X.shape[0]
    if n < seg_len:
        return X.copy(), [n]
    ends = np.arange(seg_len - 1, n)
    p = _build_time_decay_weights(dates[ends], half_life_days)
    p = np.maximum(p, REC_EPS * p.max())
    p = p / p.sum()
    chosen = rng.choice(ends, size=min(n_segments, len(ends)), replace=True, p=p)
    chunks, lengths = [], []
    for e in chosen:
        s = e - (seg_len - 1)
        chunks.append(X[s:e+1])
        lengths.append(seg_len)
    return np.vstack(chunks), lengths

# ⬇️ MODIFY: _fit_hmm_for_features now fits and returns a local scaler,
# and uses it for both training and scoring.
def _fit_hmm_for_features(feats: List[str], k: int, rs: int, subset_mask: pd.Series) -> Dict[str, Any]:
    # Fit local scaler on the subset (train or era) to avoid feature-count mismatch
    scaler_local = _fit_local_scaler(feats, subset_mask)

    df = mkt.loc[subset_mask, ["date"] + feats].dropna().reset_index(drop=True)
    dates = df["date"].to_numpy()
    X = scaler_local.transform(df[feats].to_numpy(dtype=float))

    if recency:
        X_fit, lengths = _sample_time_weighted_subsequences(
            X, dates, seg_len=seg_len, n_segments=n_segs, half_life_days=hl_days, seed=rs
        )
    else:
        X_fit, lengths = X, [len(X)]

    model = GaussianHMM(
        n_components=k,
        covariance_type=covtype,
        n_iter=n_iter,
        tol=tol,
        random_state=rs,
        verbose=False,
        init_params="mc",   # means, covars
        params="stmc",      # learn startprob/transmat as well
    )
    # sticky-ish init
    T0 = np.full((k, k), (1.0 - 0.90) / max(1, k - 1)); np.fill_diagonal(T0, 0.90)
    model.transmat_ = T0
    model.startprob_ = np.full(k, 1.0 / k)

    model.fit(X_fit, lengths=lengths)
    model.transmat_ = _diag_sticky_blend(model.transmat_, lam_stick)

    # score on original (non-sampled) sequence
    score = float(model.score(X))

    return {
        "model": model,
        "score": score,
        "dates": dates,
        "feats": feats,
        "scaler": scaler_local  # ⬅️ return it
    }

# ⬇️ MODIFY: _profile_and_label takes the local scaler we fit above
def _profile_and_label(model, feats: List[str], scaler_local: StandardScaler) -> Dict[str, Any]:
    X_all = scaler_local.transform(mkt[feats].to_numpy(dtype=float))
    post  = model.predict_proba(X_all)
    K = post.shape[1]

    # compute profiles on TRAIN ONLY (no peeking)
    train_ix = np.where(mask_train.values)[0]

    def wmean(x, w):
        w = np.asarray(w); x = np.asarray(x); s = w.sum()
        return float((x*w).sum()/s) if s>0 else np.nan
    def wstd(x, w):
        mu = wmean(x, w); w = np.asarray(w); x = np.asarray(x); s=w.sum()
        if s<=1: return np.nan
        return float(np.sqrt(max(((w*(x-mu)**2).sum()/s), 0.0)))
    def wq05(x, w):
        x=np.asarray(x); w=np.asarray(w); o=np.argsort(x); xs,ws=x[o],w[o]; cw=np.cumsum(ws)
        return float(xs[np.searchsorted(cw, 0.05*cw[-1])]) if cw[-1]>0 else np.nan

    prof = []
    for s in range(K):
        w = post[train_ix, s]
        prof.append({
            "state_id": s,
            "ret_mean": wmean(mkt.loc[mask_train,"spy_ret"].values, w),
            "ret_std":  wstd (mkt.loc[mask_train,"spy_ret"].values, w),
            "rv20_mean": wmean(mkt.loc[mask_train,"spy_rv_20"].values, w),
            "vix_mean":  wmean(mkt.loc[mask_train,"vix_close"].values, w),
            "dvix_mean": wmean(mkt.loc[mask_train,"dvix"].values, w) if "dvix" in mkt.columns else np.nan,
            "breadth_mean": wmean(mkt.loc[mask_train,"breadth"].values, w),
            "ret_q05":  wq05 (mkt.loc[mask_train,"spy_ret"].values, w),
        })
    prof_df = pd.DataFrame(prof)

    risk_off_id = int(prof_df["rv20_mean"].idxmax())
    risk_on_id  = int(prof_df["ret_mean"].idxmax())
    label_map = {risk_on_id: "Risk-On", risk_off_id: "Risk-Off"}
    for s in range(K):
        if s not in label_map:
            label_map[s] = "Transition"

    return {
        "profiles": prof_df.to_dict(orient="records"),
        "label_map": {int(k): v for k,v in label_map.items()},
        "posteriors_shape": list(post.shape),
        "transmat": model.transmat_.tolist(),
    }

def _agreement_vs_baseline(new_states: np.ndarray, baseline_states: np.ndarray) -> float:
    # simple percent agreement
    if len(new_states) != len(baseline_states):
        return np.nan
    return float((new_states == baseline_states).mean())

# --- Load baseline label sequence (we'll compare to *smoothed* if present) ---
base_labels = pd.read_parquet(LABELS_PATH).sort_values("date")
base_labels["date"] = pd.to_datetime(base_labels["date"])  # <- add this

if "state_id_smoothed" in base_labels.columns:
    base_states = base_labels["state_id_smoothed"].to_numpy()
else:
    base_states = base_labels["state_id"].to_numpy()

results: Dict[str, Any] = {"k_sensitivity": [], "feature_sensitivity": [], "era_stability": [], "bootstrap": {}}

def _score_states_on_valid_dates(model, feats, scaler_local):
    full_df = mkt[["date"] + feats].dropna().reset_index(drop=True)
    Xa = scaler_local.transform(full_df[feats].to_numpy(dtype=float))
    states = model.predict_proba(Xa).argmax(axis=1)
    return full_df["date"].to_numpy(), states

def _agreement_on_intersection(dates_new, states_new, base_labels_df) -> float:
    df_new = pd.DataFrame({"date": dates_new, "state_new": states_new})
    df_join = df_new.merge(
        base_labels_df[["date", "state_id_smoothed" if "state_id_smoothed" in base_labels_df.columns else "state_id"]]
        .rename(columns={"state_id_smoothed":"state_base","state_id":"state_base"}),
        on="date", how="inner"
    )
    if len(df_join) == 0:
        return np.nan
    return float((df_join["state_new"].to_numpy() == df_join["state_base"].to_numpy()).mean())

# 1) K sensitivity ------------------------------------------------------------
K_GRID = [2, 3]  # TOCHANGE: can expand to [2,3] in real run if currently narrowed
for k in K_GRID:
    best = {"score": -np.inf, "meta": None}
    for r in range(n_init):
        rs = rand0 + r
        fit = _fit_hmm_for_features(features_base, k, rs, mask_train)
        meta = _profile_and_label(fit["model"], features_base, fit["scaler"])
        dates_scored, states_all = _score_states_on_valid_dates(fit["model"], features_base, fit["scaler"])
        agree = _agreement_on_intersection(dates_scored, states_all, base_labels)

        entry = {
            "k": k, "seed": rs, "score": fit["score"], "agreement_vs_baseline": agree,
            "profiles": meta["profiles"], "label_map": meta["label_map"],
            "transmat": meta["transmat"]
        }
        if fit["score"] > best["score"]:
            best = {"score": fit["score"], "meta": entry}
    results["k_sensitivity"].append(best["meta"])

# 2) Feature sensitivity -------------------------------------------------------
# Define variants relative to baseline features
fsets = []
fsets.append(("baseline", features_base))
if "vix_close" in features_base: fsets.append(("no_vix", [f for f in features_base if f!="vix_close"]))
if "breadth"   in features_base: fsets.append(("no_breadth", [f for f in features_base if f!="breadth"]))
if "dvix"      in features_base: fsets.append(("no_dvix", [f for f in features_base if f!="dvix"]))
# minimal core set
core = [f for f in ["spy_rv_20","vix_close"] if f in mkt.columns]
if core: fsets.append(("core_rv_vix", core))

for name, feats in fsets:
    k = k_base
    best = {"score": -np.inf, "meta": None}
    for r in range(n_init):
        rs = rand0 + 100 + r
        fit = _fit_hmm_for_features(feats, k, rs, mask_train)
        meta = _profile_and_label(fit["model"], feats, fit["scaler"])
        dates_scored, states_all = _score_states_on_valid_dates(fit["model"], feats, fit["scaler"])
        agree = _agreement_on_intersection(dates_scored, states_all, base_labels)
        entry = {
            "feature_set": name, "k": k, "seed": rs, "feats": feats,
            "score": fit["score"], "agreement_vs_baseline": agree,
            "profiles": meta["profiles"], "label_map": meta["label_map"],
            "transmat": meta["transmat"]
        }
        if fit["score"] > best["score"]:
            best = {"score": fit["score"], "meta": entry}
    results["feature_sensitivity"].append(best["meta"])


# 3) Era stability -------------------------------------------------------------
def _fit_on_era(start: str, end: str, k: int, seed: int, feats: List[str], name: str) -> Dict[str, Any]:
    mask = (mkt["date"] >= pd.to_datetime(start)) & (mkt["date"] <= pd.to_datetime(end))
    fit  = _fit_hmm_for_features(feats, k, seed, mask)
    meta = _profile_and_label(fit["model"], feats, fit["scaler"])
    return {
        "era": name, "k": k, "seed": seed, "score": fit["score"],
        "profiles": meta["profiles"], "label_map": meta["label_map"],
        "transmat": meta["transmat"], "start": str(start), "end": str(end)
    }


# ⬇️ OPTIONAL TIDY-UP — Bootstrap: use a local scaler for baseline features too
df_tr = mkt.loc[mask_train, ["date"] + features_base].dropna().reset_index(drop=True)
scaler_base_local = StandardScaler().fit(df_tr[features_base].to_numpy(dtype=float))  # local
X_tr  = scaler_base_local.transform(df_tr[features_base].to_numpy(dtype=float))
dt_tr = df_tr["date"].to_numpy()

eras = [
    ("pre_2015",  "2007-02-06", "2014-12-31"),
    ("post_2015", "2015-01-01", str(train_end.date())),
    ("crisis_2020","2020-02-15","2020-12-31"),
]
for name, s, e in eras:
    rs = rand0 + hash(name) % 1000
    results["era_stability"].append(_fit_on_era(s, e, k_base, rs, features_base, name))


# 4) Bootstrap (block) ---------------------------------------------------------
def _block_bootstrap_indices(n: int, block: int, n_blocks: int, rng: np.random.RandomState):
    starts = rng.randint(0, n, size=n_blocks)
    idx = []
    for st in starts:
        idx.extend([(st + j) % n for j in range(block)])
    return np.array(idx[:n])

BOOT_REPS   = 5    # TOCHANGE: 100–300 for real run
BLOCK_DAYS  = 20   # TOCHANGE: 20–60 depending on serial corr
rng = np.random.RandomState(rand0+999)

# Build train matrix for bootstrap using local scaler
df_tr = mkt.loc[mask_train, ["date"] + features_base].dropna().reset_index(drop=True)
scaler_base_local = StandardScaler().fit(df_tr[features_base].to_numpy(dtype=float))
X_tr  = scaler_base_local.transform(df_tr[features_base].to_numpy(dtype=float))
dt_tr = df_tr["date"].to_numpy()

boot_summ = {"k": k_base, "reps": BOOT_REPS, "block_days": BLOCK_DAYS, "agreement_vs_baseline": []}
for b in range(BOOT_REPS):
    idx = _block_bootstrap_indices(len(dt_tr), BLOCK_DAYS, max(1, len(dt_tr)//BLOCK_DAYS), rng)
    Xb  = X_tr[idx]

    model = GaussianHMM(
        n_components=k_base, covariance_type=covtype, n_iter=n_iter, tol=tol,
        random_state=rand0 + 500 + b, verbose=False, init_params="mc", params="stmc"
    )
    T0 = np.full((k_base, k_base), (1.0 - 0.90) / max(1, k_base - 1)); np.fill_diagonal(T0, 0.90)
    model.transmat_ = T0; model.startprob_ = np.full(k_base, 1.0 / k_base)
    model.fit(Xb, lengths=[len(Xb)])
    model.transmat_ = _diag_sticky_blend(model.transmat_, lam_stick)

    # Score on valid dates & align to baseline
    dates_scored, states_all = _score_states_on_valid_dates(model, features_base, scaler_base_local)
    agree = _agreement_on_intersection(dates_scored, states_all, base_labels)
    boot_summ["agreement_vs_baseline"].append(agree)

boot_summ["agreement_mean"] = float(np.mean(boot_summ["agreement_vs_baseline"]))
boot_summ["agreement_std"]  = float(np.std (boot_summ["agreement_vs_baseline"]))
results["bootstrap"] = boot_summ

# Save results
with open(OUT_PATH, "w") as f:
    json.dump({
        "created_at": datetime.utcnow().isoformat() + "Z",
        "inputs": {
            "features_base": features_base,
            "k_base": k_base,
            "recency_weighting": recency,
            "half_life_days": hl_days,
            "lam_stick": lam_stick,
            "n_iter": n_iter, "n_init": n_init, "tol": tol, "covariance_type": covtype
        },
        "results": results
    }, f, indent=2)

print(json.dumps({
    "status": "2.5 done",
    "out": OUT_PATH,
    "k_choices": [2,3],
    "feature_sets_tested": [fs[0] for fs in fsets],
    "bootstrap": {"reps": BOOT_REPS, "agreement_mean": boot_summ["agreement_mean"], "agreement_std": boot_summ["agreement_std"]},
}, indent=2))


{
  "status": "2.5 done",
  "out": "artifacts/regimes/regime_sensitivity.json",
  "k_choices": [
    2,
    3
  ],
  "feature_sets_tested": [
    "baseline",
    "no_vix",
    "no_breadth",
    "no_dvix",
    "core_rv_vix"
  ],
  "bootstrap": {
    "reps": 5,
    "agreement_mean": 0.5312862357741035,
    "agreement_std": 0.247818080468718
  }
}


In [12]:
# ============================================================
# Section 2.6 — Diagnostics & QA
# Plots:
#   • Timeline with regime shading over SPY price (rebased) & drawdown
#   • Posterior probabilities (stacked area)
#   • State return histograms, QQ plots
#   • Transition matrix heatmap, dwell-time distribution
# Tables:
#   • State profiles (load from 2.3), transition matrix & steady-state
#   • Switch frequency & chattering metrics
# Alerts:
#   • Inconsistent semantics (e.g., positive mean but highest vol)
#   • Very short dwell (median < 3d)
#   • Mapping flips / excessive chattering
# Reuses (do not recompute):
#   - artifacts/regimes/market_panel.parquet (2.0)
#   - artifacts/regimes/window_manifest.json (2.1)
#   - artifacts/regimes/hmm_best.pkl (2.2)
#   - artifacts/regimes/regime_labels.parquet (2.3/2.4)
#   - artifacts/regimes/state_profiles.csv (2.3)
#   - artifacts/regimes/regime_meta.json (2.3/2.4)
# Outputs:
#   - artifacts/regimes/diagnostics/*.png
#   - artifacts/regimes/diagnostics/*.csv / *.json
#   - console summary + alerts
# Notes:
#   - #TOCHANGE marks spots to bump for real run (heavier plots/points)
# ============================================================

from __future__ import annotations
import os, json
from typing import Dict, Any, List, Tuple
import numpy as np
import pandas as pd
import joblib
import matplotlib.pyplot as plt

REGIME_DIR  = CFG.regime_dir
DIAG_DIR    = os.path.join(REGIME_DIR, "diagnostics")
os.makedirs(DIAG_DIR, exist_ok=True)

PANEL_PATH  = os.path.join(REGIME_DIR, "market_panel.parquet")
MAN_PATH    = os.path.join(REGIME_DIR, "window_manifest.json")
BUNDLE_PATH = os.path.join(REGIME_DIR, "hmm_best.pkl")
LABELS_PATH = os.path.join(REGIME_DIR, "regime_labels.parquet")
META_PATH   = os.path.join(REGIME_DIR, "regime_meta.json")
PROF_PATH   = os.path.join(REGIME_DIR, "state_profiles.csv")

# --- Load artifacts
assert os.path.exists(PANEL_PATH) and os.path.exists(MAN_PATH) and os.path.exists(BUNDLE_PATH) and os.path.exists(LABELS_PATH)

mkt = pd.read_parquet(PANEL_PATH).sort_values("date").reset_index(drop=True)
mkt["date"] = pd.to_datetime(mkt["date"])
labels = pd.read_parquet(LABELS_PATH).sort_values("date").reset_index(drop=True)
labels["date"] = pd.to_datetime(labels["date"])

with open(MAN_PATH, "r") as f:
    MAN = json.load(f)

bundle = joblib.load(BUNDLE_PATH)
features = bundle["features"]
k = int(bundle["k"])

meta = {}
if os.path.exists(META_PATH):
    with open(META_PATH, "r") as f:
        meta = json.load(f)

# --- Derived / convenience
date_equal = np.array_equal(mkt["date"].values, labels["date"].values)
if not date_equal:
    # Align by inner-join on date (some rows may be dropped if any side had NA)
    labels = labels.merge(mkt[["date"]], on="date", how="inner").sort_values("date").reset_index(drop=True)
    mkt    = mkt.merge(labels[["date"]], on="date", how="inner").sort_values("date").reset_index(drop=True)

# Prefer smoothed series if present
state_col  = "state_id_smoothed" if "state_id_smoothed" in labels.columns else "state_id"
label_col  = "regime_label_smoothed" if "regime_label_smoothed" in labels.columns else "regime_label"

# K from posterior columns p0..pK-1 (robust to re-fits)
p_cols = [c for c in labels.columns if c.startswith("p")]
K = len(p_cols) if len(p_cols) > 0 else k

# --- Helpers
def _runs(series: np.ndarray) -> List[Tuple[int,int,int]]:
    """Return (start_idx, end_idx, value) runs for integer state series."""
    out = []
    s = 0; cur = series[0]
    for i in range(1, len(series)):
        if series[i] != cur:
            out.append((s, i-1, int(cur)))
            s = i; cur = series[i]
    out.append((s, len(series)-1, int(cur)))
    return out

def _transition_matrix(states: np.ndarray, K: int) -> np.ndarray:
    T = np.zeros((K, K), dtype=float)
    for i in range(len(states)-1):
        T[states[i], states[i+1]] += 1.0
    row_sums = T.sum(axis=1, keepdims=True)
    row_sums[row_sums==0] = 1.0
    return T / row_sums

def _steady_state(T: np.ndarray) -> np.ndarray:
    # Empirical steady-state as left eigenvector (or fallback to state freq)
    try:
        vals, vecs = np.linalg.eig(T.T)
        i = np.argmin(np.abs(vals - 1.0))
        v = np.real(vecs[:, i]); v = np.maximum(v, 0)
        if v.sum() == 0: raise ValueError
        return v / v.sum()
    except Exception:
        return np.ones(T.shape[0]) / T.shape[0]

def _rebase_price_from_returns(rets: np.ndarray, start=100.0) -> np.ndarray:
    # Assumes rets are simple daily returns (e.g., spy_ret). If logrets, replace with exp(cumsum).
    out = np.empty_like(rets, dtype=float); out[0] = start * (1.0 + np.nan_to_num(rets[0], nan=0.0))
    for i in range(1, len(rets)):
        out[i] = out[i-1] * (1.0 + np.nan_to_num(rets[i], nan=0.0))
    return out

def _drawdown(price: np.ndarray) -> np.ndarray:
    cummax = np.maximum.accumulate(price)
    dd = price / np.where(cummax==0, 1.0, cummax) - 1.0
    return dd

# --- Compute core diagnostics
states = labels[state_col].to_numpy(dtype=int)
Tmat   = _transition_matrix(states, K)
ss_emp = _steady_state(Tmat)
runs   = _runs(states)
dwell  = pd.DataFrame({
    "state_id": [st for (s,e,st) in runs],
    "run_len":  [e-s+1 for (s,e,st) in runs],
})

# --- Switch/chattering metrics
switches = (states[1:] != states[:-1]).sum()
switch_rate = switches / max(1, len(states)-1)
one_day_runs = (dwell["run_len"] == 1).mean()  # fraction single-day
lt3_runs = (dwell["run_len"] < 3).mean()

# --- Load state profiles (2.3) if present, else compute from TRAIN weights
profiles_df = None
if os.path.exists(PROF_PATH):
    profiles_df = pd.read_csv(PROF_PATH)
else:
    # Fallback: rough unweighted per-state profiles on all data (not ideal, but avoids recompute)
    tmp = []
    for s in range(K):
        mask = (states == s)
        tmp.append({
            "state_id": s,
            "ret_mean": float(np.nanmean(mkt.loc[mask,"spy_ret"])),
            "ret_std":  float(np.nanstd (mkt.loc[mask,"spy_ret"])),
            "rv20_mean": float(np.nanmean(mkt.loc[mask,"spy_rv_20"])),
            "vix_mean":  float(np.nanmean(mkt.loc[mask,"vix_close"])),
            "dvix_mean": float(np.nanmean(mkt.loc[mask,"dvix"])) if "dvix" in mkt.columns else np.nan,
            "breadth_mean": float(np.nanmean(mkt.loc[mask,"breadth"])),
            "ret_q05":  float(np.nanquantile(mkt.loc[mask,"spy_ret"], 0.05)),
        })
    profiles_df = pd.DataFrame(tmp)
profiles_df.to_csv(os.path.join(DIAG_DIR, "state_profiles_table.csv"), index=False)

# --- Transition matrix & steady-state tables
pd.DataFrame(Tmat, columns=[f"to_{i}" for i in range(K)], index=[f"from_{i}" for i in range(K)]) \
  .to_csv(os.path.join(DIAG_DIR, "transition_matrix.csv"))
pd.DataFrame({"state_id": list(range(K)), "steady_state_prob": ss_emp}) \
  .to_csv(os.path.join(DIAG_DIR, "steady_state.csv"), index=False)

# --- Switch frequency table (yearly)
lab = labels[["date", state_col]].copy()
lab["year"] = lab["date"].dt.year
lab["sw"] = (lab[state_col].shift(-1) != lab[state_col]).astype(int)
switch_by_year = lab.groupby("year")["sw"].sum().reset_index().rename(columns={"sw":"n_switches"})
switch_by_year.to_csv(os.path.join(DIAG_DIR, "switches_by_year.csv"), index=False)

# ============================================================
# PLOTS
# ============================================================

# 1) Price timeline with regime shading & drawdown
spy_price = _rebase_price_from_returns(mkt["spy_ret"].to_numpy(dtype=float), start=100.0)
spy_dd    = _drawdown(spy_price)

fig, ax = plt.subplots(figsize=(12, 5))
ax.plot(mkt["date"], spy_price, lw=1.25)
# Shade by regime
for (s,e,st) in runs:
    ax.axvspan(mkt["date"].iloc[s], mkt["date"].iloc[e], alpha=0.15, label=f"State {st}" if s==runs[0][0] else None)
ax.set_title("SPY (rebased) with Regime Shading")
ax.set_xlabel("Date"); ax.set_ylabel("Rebased Price")
fig.tight_layout()
fig.savefig(os.path.join(DIAG_DIR, "timeline_regime_shading.png"), dpi=150)
plt.close(fig)

fig, ax = plt.subplots(figsize=(12, 3))
ax.plot(mkt["date"], spy_dd, lw=1.0)
ax.set_title("SPY Drawdown")
ax.set_xlabel("Date"); ax.set_ylabel("Drawdown")
fig.tight_layout()
fig.savefig(os.path.join(DIAG_DIR, "timeline_drawdown.png"), dpi=150)
plt.close(fig)

# 2) Posterior probabilities (stacked area)
if len(p_cols) == K:
    fig, ax = plt.subplots(figsize=(12, 4))
    ax.stackplot(labels["date"], *(labels[c].to_numpy() for c in p_cols))
    ax.set_ylim(0,1); ax.set_title("Posterior Probabilities (Stacked)")
    ax.set_xlabel("Date"); ax.set_ylabel("Probability")
    fig.tight_layout()
    fig.savefig(os.path.join(DIAG_DIR, "posteriors_stacked.png"), dpi=150)
    plt.close(fig)


# (erfinv helper without SciPy)
def erfinv(y):
    # Approximation (Winitzki) good enough for QQ visual; replace with SciPy in real run
    a = 0.147
    sign = np.sign(y)
    x = np.clip(y, -0.999999, 0.999999)
    ln = np.log(1 - x**2)
    first = 2/(np.pi*a) + ln/2
    return sign * np.sqrt( np.sqrt(first**2 - ln/a) - first )

# 3) State return histograms + QQ plots
# TOCHANGE: bump N_QQ_POINTS to 1000 for real run
N_QQ_POINTS = 200
qs = np.linspace(0.01, 0.99, N_QQ_POINTS)
for s in range(K):
    mask = (states == s)
    r = mkt.loc[mask, "spy_ret"].dropna().to_numpy()
    if len(r) == 0:
        continue

    # Histogram
    fig, ax = plt.subplots(figsize=(5,3))
    ax.hist(r, bins=40, alpha=0.8)  # #TOCHANGE: 80 bins for real run
    ax.set_title(f"State {s} return histogram")
    fig.tight_layout()
    fig.savefig(os.path.join(DIAG_DIR, f"state_{s}_ret_hist.png"), dpi=150)
    plt.close(fig)

    # QQ vs normal
    mu, sd = float(np.mean(r)), float(np.std(r, ddof=0))
    if sd <= 0:
        sd = 1e-9
    emp_q = np.quantile(r, qs)
    nor_q = mu + sd * np.sqrt(2) * erfinv(2*qs - 1)  # inverse CDF via erfinv
    fig, ax = plt.subplots(figsize=(5,3))
    ax.scatter(nor_q, emp_q, s=6, alpha=0.7)
    lims = [min(nor_q.min(), emp_q.min()), max(nor_q.max(), emp_q.max())]
    ax.plot(lims, lims, lw=1.0)
    ax.set_title(f"State {s} QQ vs Normal")
    ax.set_xlabel("Theoretical quantiles"); ax.set_ylabel("Empirical quantiles")
    fig.tight_layout()
    fig.savefig(os.path.join(DIAG_DIR, f"state_{s}_qq.png"), dpi=150)
    plt.close(fig)


# 4) Transition heatmap
fig, ax = plt.subplots(figsize=(5,4))
im = ax.imshow(Tmat, aspect="auto", vmin=0, vmax=np.max(Tmat))
ax.set_title("Transition Matrix")
ax.set_xlabel("to"); ax.set_ylabel("from")
ax.set_xticks(range(K)); ax.set_yticks(range(K))
fig.colorbar(im, ax=ax, fraction=0.046, pad=0.04)
fig.tight_layout()
fig.savefig(os.path.join(DIAG_DIR, "transition_matrix_heatmap.png"), dpi=150)
plt.close(fig)

# 5) Dwell-time distribution per state
fig, ax = plt.subplots(figsize=(6,4))
for s in range(K):
    ax.hist(dwell.loc[dwell["state_id"]==s, "run_len"], bins=range(1,51), alpha=0.6, label=f"State {s}")
ax.legend()
ax.set_title("Dwell-time distribution (days)")
ax.set_xlabel("Run length (days)"); ax.set_ylabel("Count")
fig.tight_layout()
fig.savefig(os.path.join(DIAG_DIR, "dwell_time_distribution.png"), dpi=150)
plt.close(fig)

# ============================================================
# ALERTS
# ============================================================
alerts = []

# Semantics: positive mean but highest vol -> suspicious "Risk-On"
if profiles_df is not None and len(profiles_df) >= K:
    vol_rank = profiles_df["rv20_mean"].rank(ascending=True)  # 1 = lowest vol
    hi_vol_state = int(profiles_df.loc[profiles_df["rv20_mean"].idxmax(), "state_id"])
    pos_mean_states = profiles_df.loc[profiles_df["ret_mean"] > 0, "state_id"].astype(int).tolist()
    if hi_vol_state in pos_mean_states and K >= 2:
        alerts.append(f"State {hi_vol_state}: positive mean return but highest realized vol (check semantics).")

# Dwell-time < 3 days median
dwell_median = dwell.groupby("state_id")["run_len"].median()
for s, med in dwell_median.items():
    if med < 3:
        alerts.append(f"State {s}: median dwell {med}d < 3 (too chatty).")

# Chattering: high switch rate or many single-day runs
if switch_rate > 0.15:  # #TOCHANGE: tighten to 0.10 for real run
    alerts.append(f"High switch rate: {switch_rate:.2%}")
if one_day_runs > 0.10:  # #TOCHANGE: tighten to 0.05 for real run
    alerts.append(f"Single-day run fraction elevated: {one_day_runs:.2%}")

# Mapping flips heuristic: compare label continuity around major drawdowns
# (simple heuristic: if label changes >3 times within any 20-day window)
# #TOCHANGE: widen window to 60 days for real run
WINDOW = 20
roll_switch = pd.Series((states[1:] != states[:-1]).astype(int)).rolling(WINDOW).sum().fillna(0)
if (roll_switch > 3).any():
    alerts.append("Frequent label flips in short windows (potential mapping instability).")

# Save alerts
with open(os.path.join(DIAG_DIR, "alerts.json"), "w") as f:
    json.dump({"alerts": alerts}, f, indent=2)

# Save a compact summary CSV
pd.DataFrame({
    "metric": ["K", "switches", "switch_rate", "one_day_runs_frac", "lt3_runs_frac"],
    "value": [K, switches, switch_rate, one_day_runs, lt3_runs]
}).to_csv(os.path.join(DIAG_DIR, "summary_metrics.csv"), index=False)

print(json.dumps({
    "status": "2.6 diagnostics complete",
    "plots_dir": DIAG_DIR,
    "alerts_count": len(alerts),
    "notes": [
        "State profiles loaded from 2.3 if available; else quick fallback was used.",
        "Semantics checks are heuristics; confirm with 2.3 profiles and 2.5 sensitivity."
    ]
}, indent=2))

{
  "status": "2.6 diagnostics complete",
  "plots_dir": "artifacts/regimes/diagnostics",
  "alerts_count": 1,
  "notes": [
    "State profiles loaded from 2.3 if available; else quick fallback was used.",
    "Semantics checks are heuristics; confirm with 2.3 profiles and 2.5 sensitivity."
  ]
}


In [None]:
# ============================================================
# Section 2.7 — Regime-Aware Policy Hooks (Interfaces to Sec 3–5)
# Reuses:
#   - artifacts/regimes/regime_labels.parquet (2.3/2.4)
#   - artifacts/regimes/regime_meta.json (2.3/2.4)
#   - artifacts/regimes/hmm_best.pkl (2.2)  [fallback if p-cols missing]
#   - artifacts/regimes/window_manifest.json (2.1)
#   - artifacts/regimes/market_panel.parquet (2.0)  [fallback scoring]
# Outputs:
#   - artifacts/regimes/regime_policy_map.json
# Notes:
#   - This file is the single interface consumed by Sections 3–5.
#   - #TOCHANGE marks values to tune for the real run.
# ============================================================

from __future__ import annotations
import os, json, hashlib
from typing import Dict, Any
import numpy as np
import pandas as pd
import joblib

REGIME_DIR  = CFG.regime_dir
PANEL_PATH  = os.path.join(REGIME_DIR, "market_panel.parquet")
LABELS_PATH = os.path.join(REGIME_DIR, "regime_labels.parquet")
META_PATH   = os.path.join(REGIME_DIR, "regime_meta.json")
MAN_PATH    = os.path.join(REGIME_DIR, "window_manifest.json")
BUNDLE_PATH = os.path.join(REGIME_DIR, "hmm_best.pkl")
OUT_PATH    = os.path.join(REGIME_DIR, "regime_policy_map.json")

# --- Load essentials
labels = pd.read_parquet(LABELS_PATH).sort_values("date").reset_index(drop=True)
labels["date"] = pd.to_datetime(labels["date"])
with open(MAN_PATH, "r") as f: MAN = json.load(f)
bundle = joblib.load(BUNDLE_PATH)

# state→label semantics
state_label_map = None
if os.path.exists(META_PATH):
    with open(META_PATH, "r") as f:
        meta = json.load(f)
    state_label_map = meta.get("state_label_map", None)
else:
    meta = {}

# infer K and get posteriors
p_cols = [c for c in labels.columns if c.startswith("p")]
K = len(p_cols) if p_cols else int(bundle["k"])

# fallback: if no p-cols, score from model on all dates
if not p_cols:
    feats = bundle["features"]
    scaler = joblib.load(bundle["scaler_path"])
    mkt = pd.read_parquet(PANEL_PATH).sort_values("date").reset_index(drop=True)
    X_all = scaler.transform(mkt[feats].to_numpy(dtype=float))
    post = bundle["model"].predict_proba(X_all)
    for s in range(post.shape[1]):
        labels[f"p{s}"] = post[:, s]
    p_cols = [f"p{s}" for s in range(K)]

# choose smoothed ids/labels if available
state_col = "state_id_smoothed" if "state_id_smoothed" in labels.columns else "state_id"
label_col = "regime_label_smoothed" if "regime_label_smoothed" in labels.columns else "regime_label"

# if meta has mapping but label_col missing, map on the fly
if label_col not in labels.columns and state_label_map is not None:
    labels[label_col] = labels[state_col].map({int(k): v for k, v in state_label_map.items()})

# --- Confidence proxies
def entropy(p: np.ndarray) -> float:
    p = np.clip(p, 1e-12, 1.0)
    return float(-(p*np.log(p)).sum() / np.log(len(p)))  # normalized to [0,1]

def aggressiveness_from_confidence(p: np.ndarray) -> Dict[str, float]:
    # Proxy 1: max posterior
    c_max = float(p.max())
    # Proxy 2: 1 - normalized entropy (higher -> more certain)
    c_ent = 1.0 - entropy(p)
    # Combine (simple mean)  #TOCHANGE: use weighted combo or monotone spline
    c = 0.5 * (c_max + c_ent)
    # Map to aggressiveness scalar g ∈ [g_min, g_max]
    g_min, g_max = 0.35, 1.00     #TOCHANGE: (0.25,1.00) if you want deeper throttling
    g = g_min + (g_max - g_min) * c
    return {"c_max": c_max, "c_entropy": c_ent, "c": c, "g": g}

# --- Latest regime & confidence (optionally smooth over last N days)
#TOCHANGE: set N_SMOOTH=5–10 for prod; 1 for fast test
N_SMOOTH = 3
tail = labels.tail(N_SMOOTH)
p_tail = tail[p_cols].to_numpy(dtype=float)
p_mean = p_tail.mean(axis=0)
latest_row = labels.iloc[-1]
latest_label = str(latest_row[label_col]) if label_col in labels.columns else f"State{int(latest_row[state_col])}"
conf = aggressiveness_from_confidence(p_mean)

# --- Per-regime policy defaults (edit for your stack)
# Use intuitive names; downstream can match by these labels
# Turnover caps are relative (e.g., fraction of portfolio eligible to trade)
policy_by_regime: Dict[str, Dict[str, Any]] = {
    "Risk-On": {
        "weights_multipliers": {          #TOCHANGE: tailor to your factors
            "momentum": 1.20,
            "quality":  1.00,
            "value":    1.00,
            "low_vol":  0.85,
        },
        "turnover_cap": 0.20,             #TOCHANGE: 0.25
        "risk_target_vol_annual": 0.10,   # 10%
        "hedge_intensity": 0.0,           # baseline hedge ratio
    },
    "Transition": {
        "weights_multipliers": {
            "momentum": 0.95,
            "quality":  1.05,
            "value":    1.05,
            "low_vol":  1.05,
        },
        "turnover_cap": 0.15,
        "risk_target_vol_annual": 0.08,   # 8%
        "hedge_intensity": 0.15,
    },
    "Risk-Off": {
        "weights_multipliers": {
            "momentum": 0.70,             # throttle momo
            "quality":  1.15,             # upweight quality/defensive
            "value":    1.05,
            "low_vol":  1.25,
        },
        "turnover_cap": 0.10,
        "risk_target_vol_annual": 0.06,   # 6%
        "hedge_intensity": 0.35,
    },
}

# --- If our label universe differs (e.g., only 2 states), coerce keys
present_labels = set(labels[label_col].dropna().astype(str).unique()) if label_col in labels.columns else set()
for lbl in list(policy_by_regime.keys()):
    if lbl not in present_labels and present_labels:
        # map missing labels to a reasonable fallback  #TOCHANGE: make explicit mapping per run
        del policy_by_regime[lbl]
# If states are only numeric (no semantic labels), synthesize keys
if not policy_by_regime and state_label_map is None:
    unique_states = sorted(labels[state_col].unique())
    for s in unique_states:
        policy_by_regime[f"State{s}"] = {
            "weights_multipliers": {"momentum":1.0,"quality":1.0,"value":1.0,"low_vol":1.0},
            "turnover_cap": 0.15, "risk_target_vol_annual": 0.08, "hedge_intensity": 0.15,
        }

# --- Global scaling by confidence g (downstream can apply this linearly)
# We expose both the raw confidence and recommend common scalings.
scaling = {
    "aggressiveness_scalar_g": conf["g"],
    "confidence": conf,                           # contains c_max, c_entropy, c (combined)
    "recommendations": {
        # Downstream usage suggestions
        "scale_position_sizes_by_g": True,
        "scale_turnover_cap_by_g": True,
        "scale_hedge_intensity_by_(1-g)": True
    }
}

# --- Package the full map
out = {
    "created_at": pd.Timestamp.utcnow().isoformat() + "Z",
    "latest_date": str(latest_row["date"].date()),
    "k": int(K),
    "latest_regime_label": latest_label,
    "latest_state_id": int(latest_row[state_col]),
    "latest_posteriors": {f"p{s}": float(latest_row.get(f"p{s}", np.nan)) for s in range(K)},
    "confidence": scaling,
    "policy_by_regime": policy_by_regime,
    "inputs": {
        "labels_path": LABELS_PATH,
        "meta_path": META_PATH,
        "bundle_path": BUNDLE_PATH,
        "scaler_path": MAN["scaler_path"],
        "features": bundle["features"],
        "window": MAN.get("window", {}),
        "smoothing_window_days": N_SMOOTH,  #TOCHANGE
    },
}

# include sensitivity & diagnostics pointers if present
sens_path = os.path.join(REGIME_DIR, "regime_sensitivity.json")
diag_dir  = os.path.join(REGIME_DIR, "diagnostics")
if os.path.exists(sens_path):
    out["inputs"]["sensitivity_path"] = sens_path
if os.path.isdir(diag_dir):
    out["inputs"]["diagnostics_dir"] = diag_dir

# hash a minimal signature (useful for caching / auditing)
sig = hashlib.sha256(json.dumps({
    "features": out["inputs"]["features"],
    "window": out["inputs"]["window"],
    "k": out["k"]
}, sort_keys=True).encode()).hexdigest()
out["signature"] = sig

with open(OUT_PATH, "w") as f:
    json.dump(out, f, indent=2)

print(json.dumps({
    "status": "2.7 policy hooks exported",
    "out": OUT_PATH,
    "latest_label": latest_label,
    "g_scalar": round(out["confidence"]["aggressiveness_scalar_g"], 4),
}, indent=2))

# 3. Alpha Layer (Signals)

# 4. Portfolio Construction & Risk

# 5. RL Sizing Policy (PPO)

# 6. Backtesting (Backward Testing) — Rigor

# 7. Forward Testing (No Orders; Shadow Runs)

# 8. Cost Model & Execution Assumptions

# 9. Reproducibility & Testability

# 10. Visualization & Reporting

# 11. Automation Options (Optional, no trading)

# 12. Optional Alpaca Integration (disabled by default)

# 13. File/Module Structure (Colab-friendly)




```
/project
  config.yaml
  data/
    universe.csv
    features.parquet
    regime_labels.parquet
  models/
    lstm_*.pt / .h5
    gbm_*.txt
    stacker_*.pkl
    rl_policy_*.pkl
  runs/YYYY-MM-DD/
    signals.parquet
    weights.parquet
    hedges.parquet
    daily_pnl.csv
    risk.json
  reports/
    backtest_tearsheet.html
    forward_tearsheet_YYYY-MM.html
  src/
    data_loader.py
    feature_engineering.py
    regime.py
    models_lstm.py
    models_tabular.py
    stacking.py
    uncertainty.py
    portfolio_bl_rp.py
    hedging.py
    rl_policy.py
    backtest.py
    forward_shadow.py
    risk_metrics.py
    stats_tests.py  # DM, SPA/White RC, Sharpe inference
    monte_carlo.py  # block bootstrap
    reporting.py    # plots & HTML/PDF
  main.py          # CLI: daily-shadow / weekly-train / monthly-report
  notebook.ipynb   # Colab master: end-to-end run with toggles

```



# 14. More info

- Suggested stack: pandas, numpy, scikit-learn, lightgbm, xgboost, tensorflow/PyTorch (choose one for LSTM), hmmlearn, stable-baselines3, cvxpy (for BL/optimization), arch (optional), statsmodels, scipy, matplotlib/plotly.

Compute plan (fits $50–$100):

- S&P 100, 5–8 walk-forward windows.

- LSTM 1–2 layers (64–128 units), MC-dropout 20 samples.

- PPO with modest timesteps per window.

- 200–400 Monte Carlo bootstrap paths.

- 1–3 GPU hours on Colab Pro/Pro+; RAM < 24GB.

# 15. Build Order (fastest to value)

1. Data + Features + Regimes → validate leakage & plots.

2. Multifactor composite → baseline cross-sec L/S backtest.

3. GBM/MLP + LSTM → stacking + uncertainty; re-run backtest.

4. BL + RP + Dynamic hedge → re-run backtest & stress.

5. RL sizing → ablation vs no-RL; finalize backtest.

6. Forward shadow loop (daily), weekly retrain, monthly reports.

7. Automation (Actions/cron), optional Alpaca paper stub (off).

# 16. What you'll see in the first results
- Backtest tear sheet with OO-S equity curve, MC bands, by-regime tables, SPA/DM outcomes, VaR/CVaR & stress.

- Ablation:

  - Multifactor only → +ML → +ML+RL;

  - Market-neutral vs long-only w/ hedging;

  - Cost sensitivity 5–20 bps.

- A live forward dashboard (from Day 1) accumulating daily PnL + monthly report.



# 17. Forward-Testing Duration Recommendation

- Run at least 4 weeks forward shadow to confirm plumbing & stability.

- Prefer 8–12 weeks to evaluate regime adaptation, RL sizing behavior under drawdowns, and cost realism.

- Only after the forward period matches backtest risk/return within expected error bands should you consider paper-trading execution.



<details>
<summary><strong>Outline Details</strong></summary>

# Project Outline — Regime-Aware Multifactor + LSTM/Ensembles + RL (with rigorous back & forward testing)

## 0) Objectives & Success Criteria
**Primary objective:** Generate statistically significant pure alpha (market-neutral) with controlled drawdowns after transaction costs.  

**Secondary objective:** Build a repeatable process capable of ongoing, unattended forward testing that outputs monthly tear sheets.  

**Pass/Fail gates (OO-S):**  
- Annualized Sharpe ≥ 1.0 (cost-adjusted) across walk-forward windows.  
- SPA/White Reality Check non-rejection vs family of alternatives at 5–10% level.  
- Max DD ≤ 15–20% (tunable) in backtests.  
- Forward test (4–8+ weeks): positive return, rolling Sharpe > 0.8, tail losses consistent with backtest VaR/CVaR.  

---

## 1) Data & Universe

### 1.1 Universe
- S&P 100 equities (liquid, keeps compute sane).  
- Hedging instruments: SPY + sector ETFs (XLY, XLF, XLV, XLK, XLI, XLE, XLP, XLB, XLU, XLRE).  
- Source: Yahoo Finance (daily bars).  
- Lookback: 10–15 years if available (train 2012→, test recent).  

### 1.2 Features
- **Returns/vol:** log returns (1–60d lags), realized vol, ATR.  
- **Momentum:** 12–1, 6–1, 20d, trend filters (e.g., SMA cross, slope).  
- **Value:** B/P, E/P, CF/P, shareholder yield (latest available; forward-fill monthly/quarterly).  
- **Quality:** gross profitability, ROE, accruals, leverage, F-Score-like composite.  
- **Market context:** VIX, SPY vol, market breadth (% advancers, optional).  
- Leakage controls: strictly lag all features, align to t-1; winsorize & z-score cross-sectionally.  

### 1.3 Data Hygiene
- Survivorship-bias approach: use current S&P 100 for practicality; (optional) point-in-time later.  
- Corporate actions: use adjusted prices.  
- Missing fundamentals: impute conservatively or drop; record masks for model.  
- **Deliverables:** `features.parquet`, `universe.csv`, `meta.yaml`.  

---

## 2) Regime Modeling

### 2.1 HMM (2–3 states)
- Inputs: SPY daily returns/vol, VIX level/change, market breadth.  
- States: Risk-On, Risk-Off, Transition (labeled by average return/vol).  
- **Output:** daily regime label + posterior probabilities.  

### 2.2 Usage
- Regime-specific ensemble weights, turnover caps, and risk targets.  
- Momentum throttled in Risk-Off; quality emphasized.  
- **Deliverables:** `regime_labels.parquet`, regime plot.  

---

## 3) Alpha Layer (Signals)

### 3.1 Multifactor Composite
- Value/Momentum/Quality composites (winsorized, z-scored).  
- Per-regime blend fit with ridge.  
- **Output:** factor alpha score per asset/day.  

### 3.2 ML Overlays
- **LSTM:** 60-day sequences → t+5/t+10 returns; MC-dropout for uncertainty.  
- **Tabular ensembles:** LightGBM (primary), XGBoost, small MLP; also quantile versions.  
- **Stacking meta-learner:** ridge/LightGBM; OOF training within walk-forward train window.  
- **Output:** final forecast (mean) + uncertainty proxy.  

### 3.3 Uncertainty → Confidence
- Expected Sharpe proxy = mean / std_hat.  
- Bucket confidence for analytics.  
- **Deliverables:** `alpha_raw.parquet`, `alpha_ensemble.parquet`, feature importance charts.  

---

## 4) Portfolio Construction & Risk

### 4.1 Baseline Weights
- Cross-sectional L/S: long top decile, short bottom decile by forecasted Sharpe.  
- Beta-neutral, per-name and sector caps.  

### 4.2 Black–Litterman (BL)
- Prior: market-cap weights → implied μ.  
- Views: ensemble alphas scaled by uncertainty.  
- Posterior μ̂ → mean-variance with L2 & turnover penalty.  

### 4.3 Risk Parity & Vol Target
- Equalize risk across sector/factor clusters.  
- Target portfolio vol (8–12% ann.).  

### 4.4 Dynamic Hedging
- Daily orthogonalization vs SPY + sectors; hedge ratios adjustable by RL.  
- **Deliverables:** weights, exposures, hedge plots.  

---

## 5) RL Sizing Policy (PPO)

### 5.1 Role
- Scales risk target and tunes hedges.  

### 5.2 State
- Regime, vol, drawdown, alpha strength, uncertainty, turnover, betas, cost model.  

### 5.3 Reward
- PnL – costs – λ·CVaR_tail – κ·Δdrawdown – penalties.  

### 5.4 Training
- Train within walk-forward segments; fixed seeds.  
- **Deliverables:** `rl_policy.pkl`, diagnostics.  

---

## 6) Backtesting (Backward Testing) — Rigor

### 6.1 Walk-Forward Engine
- Rolling/expanding windows; purged & embargoed CV.  
- Refit all models per window; test daily with costs.  

### 6.2 Significance & Reality Checks
- DM test, SPA/White RC, Sharpe inference.  

### 6.3 Tail Risk & Stress
- VaR/CVaR; stress tests (2008/2020, vol shocks, liquidity cuts).  

### 6.4 Monte Carlo Robustness
- Block bootstrap; output PnL envelopes.  
- **Deliverables:** equity curves, DD charts, ablations.  

---

## 7) Forward Testing (Shadow Mode)

### 7.1 Daily Shadow Run
- No backfill; use latest models; log all artifacts.  

### 7.2 Retraining Cadence
- Weekly or bi-weekly; strict forward-only.  

### 7.3 Monthly Auto-Report
- Tear sheets with returns, Sharpe, DD, risk, regime PnL, VaR/CVaR.  

### 7.4 Duration
- Min: 4 weeks; Pref: 8–12 weeks.  
- **Deliverables:** daily run files, monthly reports.  

---

## 8) Cost Model & Execution Assumptions
- Costs: 10 bps round-trip (sweep 5–20).  
- Slippage: 1–2 bps; higher in Risk-Off.  
- Short borrow: 10–50 bps ann.  
- Liquidity caps: ≤5–10% ADV.  

---

## 9) Reproducibility & Testability
- Config-driven (`config.yaml`); fixed seeds.  
- Unit/integration tests for leakage, CV folds, NaNs, RL bounds.  
- Experiment tracking with CSV/JSON + git hash.  

---

## 10) Visualization & Reporting
- Equity curves with regime shading, rolling metrics, exposures, attribution, bucket PnL, by-regime performance, risk dashboards.  

---

## 11) Automation Options
- **Colab:** manual or scheduled;  
- **GitHub Actions:** nightly, weekly, monthly;  
- **VM + cron:** low-budget option.  

---

## 12) Optional Alpaca Integration
- Disabled by default; forward test never sends orders; later optional paper fills.  

</details>
