<a href="https://colab.research.google.com/github/viki-m13/SPX-Target-Hit-Forecaster-Edge-vs-Baseline-Barrier-Model/blob/main/SPX_Target_Hit_Forecaster_%E2%80%94_Edge_vs_Baseline_Barrier_Model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#!/usr/bin/env python3
import numpy as np
import pandas as pd
import yfinance as yf

# =========================
# Config
# =========================
SECTOR_TICKERS = [
    "^GSPC"   # we only use ^GSPC here
]

START_DATE    = "2010-01-01"
END_DATE      = None          # latest
HORIZON_DAYS  = 30            # look-ahead window
ATR_WINDOW    = 14            # ATR length
VOL_WINDOW    = 30            # realized vol window
RSI_LENGTH    = 14            # RSI length
RANGE_WINDOW  = 60            # rolling high/low window for range position
POLY_WINDOW   = 63            # days for quadratic fit in convex momentum

MIN_MOVE      = 0.01          # minimum 1% away from current close (baseline)
# Gating thresholds (kept fixed in tuning)
RSI_LOW       = 40.0
RSI_HIGH      = 70.0
POS_LOW       = 0.2           # position in 60d range [0,1]
POS_HIGH      = 0.8

# Hyperparameter grids for WF CV optimization
LAMBDA_GRID   = [3.0, 5.0, 7.0]         # convex momentum weight
ATR_MULT_GRID = [0.5, 1.0, 1.5]         # ATR-based extra move
VOL_MIN_GRID  = [0.004, 0.006, 0.008]   # minimum 30d vol in log-return terms

N_FOLDS       = 5  # walk-forward folds


# =========================
# yfinance download (robust)
# =========================
def download_sector_ohlc() -> pd.DataFrame:
    """
    Download OHLC data from yfinance and reshape
    into a long DataFrame: [date, ticker, high, low, close].

    Handles both MultiIndex layouts:
      - (field, ticker), e.g. ('High','XLB')
      - (ticker, field), e.g. ('XLB','High')
    """
    data = yf.download(
        SECTOR_TICKERS,
        start=START_DATE,
        end=END_DATE,
        auto_adjust=True,
        progress=False,
    )

    if data.empty:
        raise ValueError("No data downloaded. Check tickers or dates.")

    frames = []

    if isinstance(data.columns, pd.MultiIndex):
        cols = data.columns
        for t in SECTOR_TICKERS:
            if ("High", t) in cols and ("Low", t) in cols and ("Close", t) in cols:
                hi = data[("High", t)]
                lo = data[("Low", t)]
                cl = data[("Close", t)]
            elif (t, "High") in cols and (t, "Low") in cols and (t, "Close") in cols:
                hi = data[(t, "High")]
                lo = data[(t, "Low")]
                cl = data[(t, "Close")]
            else:
                continue

            df_t = pd.DataFrame(
                {
                    "date": data.index,
                    "ticker": t,
                    "high": hi.values.astype(float),
                    "low": lo.values.astype(float),
                    "close": cl.values.astype(float),
                }
            )
            frames.append(df_t)
    else:
        # Single-ticker case fallback
        required = {"High", "Low", "Close"}
        if not required.issubset(set(data.columns)):
            raise ValueError(f"Missing OHLC columns: {required}")
        t = SECTOR_TICKERS[0]
        df_t = pd.DataFrame(
            {
                "date": data.index,
                "ticker": t,
                "high": data["High"].astype(float).values,
                "low": data["Low"].astype(float).values,
                "close": data["Close"].astype(float).values,
            }
        )
        frames.append(df_t)

    if not frames:
        raise ValueError("Failed to construct OHLC data for any tickers.")

    stacked = pd.concat(frames, ignore_index=True)
    stacked["date"] = pd.to_datetime(stacked["date"])
    stacked = stacked.sort_values(["ticker", "date"]).reset_index(drop=True)
    return stacked


# =========================
# Indicator helpers
# =========================
def compute_rsi(prices: pd.Series, length: int = RSI_LENGTH) -> np.ndarray:
    delta = prices.diff()
    gain = delta.clip(lower=0.0)
    loss = -delta.clip(upper=0.0)

    avg_gain = gain.rolling(length, min_periods=length).mean()
    avg_loss = loss.rolling(length, min_periods=length).mean()

    rs = avg_gain / avg_loss
    rsi = 100.0 - (100.0 / (1.0 + rs))
    return rsi.to_numpy()


def compute_convex_momentum(log_close: np.ndarray,
                            window: int,
                            lambda_convex: float) -> np.ndarray:
    """
    Convex momentum score:
      - Fit quadratic to log_price over last `window` days.
      - Score = slope at end + lambda_convex * curvature.
      - Positive score = accelerating uptrend; negative = flat/down.
    """
    n = len(log_close)
    cms = np.full(n, np.nan)
    x = np.arange(window)

    for i in range(window - 1, n):
        segment = log_close[i - window + 1: i + 1]
        # If any NaN, skip
        if np.isnan(segment).any():
            continue
        a, b, c = np.polyfit(x, segment, 2)
        slope_end = 2 * a * x[-1] + b   # derivative at last point
        cms[i] = slope_end + lambda_convex * a

    return cms


# =========================
# Core model: labels + gating (parametrized)
# =========================
def _build_labels_for_one_ticker(
    sub: pd.DataFrame,
    lambda_convex: float,
    atr_mult: float,
    vol_min: float,
    rsi_low: float = RSI_LOW,
    rsi_high: float = RSI_HIGH,
    pos_low: float = POS_LOW,
    pos_high: float = POS_HIGH,
    min_move: float = MIN_MOVE,
) -> pd.DataFrame:
    """
    For one ticker:
      - Compute baseline +min_move level and label.
      - Compute ATR-based target level (>= min_move).
      - Compute regime filters (convex momentum, vol, RSI, range position).
      - Define when we PREDICT and whether it was hit.

    No leakage in features:
      - All features (ATR, CMS, vol, RSI, range) use only current & past data.
      - Future highs are only used for labels (hit/miss).
    """
    sub = sub.sort_values("date").reset_index(drop=True).copy()
    high = sub["high"].to_numpy(float)
    low = sub["low"].to_numpy(float)
    close = sub["close"].to_numpy(float)
    n = len(sub)

    # --- Base computations ---
    log_close = np.log(close)

    # ATR
    prev_close = np.roll(close, 1)
    prev_close[0] = np.nan
    tr1 = high - low
    tr2 = np.abs(high - prev_close)
    tr3 = np.abs(low - prev_close)
    tr = np.nanmax(np.stack([tr1, tr2, tr3], axis=1), axis=1)
    atr = pd.Series(tr).rolling(ATR_WINDOW, min_periods=ATR_WINDOW).mean().to_numpy()

    # Realized vol on log returns
    ret = pd.Series(log_close).diff()
    vol30 = ret.rolling(VOL_WINDOW, min_periods=VOL_WINDOW).std().to_numpy()

    # RSI
    rsi = compute_rsi(pd.Series(close), length=RSI_LENGTH)

    # Position in 60d high/low range
    roll_max = pd.Series(high).rolling(RANGE_WINDOW, min_periods=RANGE_WINDOW).max().to_numpy()
    roll_min = pd.Series(low).rolling(RANGE_WINDOW, min_periods=RANGE_WINDOW).min().to_numpy()
    range_span = roll_max - roll_min
    pos_in_range = (close - roll_min) / np.where(range_span > 0, range_span, np.nan)

    # Convex momentum score (parameterized)
    cms = compute_convex_momentum(log_close, window=POLY_WINDOW,
                                  lambda_convex=lambda_convex)

    # --- Target definitions ---
    # Baseline level: +min_move over close (e.g., +1%)
    base_level = close * (1.0 + min_move)

    # Our target: max(min_move, ATR-based %) above close
    atr_pct = np.where(close > 0, atr / close, np.nan)
    extra_pct = atr_mult * atr_pct
    target_pct = np.maximum(min_move, extra_pct)   # >= min_move by construction
    target_level = close * (1.0 + target_pct)

    # --- Future max high over [t+1, t+HORIZON] ---
    D = HORIZON_DAYS
    fut = np.full((D, n), np.nan)
    for k in range(1, D + 1):
        fut[k - 1, :-k] = high[k:]
    fut_max = np.nanmax(fut, axis=0)

    # Valid labels where we have full history and full horizon
    valid = (~np.isnan(fut_max))

    # Baseline (min_move) reached?
    base_reached = np.full(n, np.nan)
    base_reached[valid] = (fut_max[valid] >= base_level[valid]).astype(float)

    # Our target reached?
    target_reached = np.full(n, np.nan)
    valid_target = valid & (~np.isnan(atr)) & (~np.isnan(target_pct))
    target_reached[valid_target] = (fut_max[valid_target] >= target_level[valid_target]).astype(float)

    # --- Gating (regime filter) ---
    cond_cms = cms > 0
    cond_vol = vol30 > vol_min
    cond_rsi = (rsi >= rsi_low) & (rsi <= rsi_high)
    cond_pos = (pos_in_range >= pos_low) & (pos_in_range <= pos_high)

    gating_cond = cond_cms & cond_vol & cond_rsi & cond_pos

    # Prediction flag for evaluation: must have valid target label
    predict_flag = gating_cond & valid_target

    out = pd.DataFrame(
        {
            "date": sub["date"],
            "ticker": sub["ticker"],
            "close": close,
            "high": high,
            "low": low,
            "atr": atr,
            "vol30": vol30,
            "rsi": rsi,
            "pos_in_range": pos_in_range,
            "cms": cms,
            "base_level_min_move": base_level,
            "target_pct": target_pct,
            "target_level": target_level,
            "horizon_days": HORIZON_DAYS,
            "valid": valid,
            "valid_target": valid_target,
            "base_reached": base_reached,        # baseline label
            "target_reached": target_reached,    # our target label
            "gating_cond": gating_cond,          # uses only past/present info
            "predict_flag": predict_flag,        # gating_cond & valid_target
        }
    )
    return out


def build_model_frame_with_params(
    df: pd.DataFrame,
    lambda_convex: float,
    atr_mult: float,
    vol_min: float,
    rsi_low: float = RSI_LOW,
    rsi_high: float = RSI_HIGH,
    pos_low: float = POS_LOW,
    pos_high: float = POS_HIGH,
    min_move: float = MIN_MOVE,
) -> pd.DataFrame:
    frames = []
    for t in sorted(df["ticker"].unique()):
        sub = df[df["ticker"] == t]
        if sub.empty:
            continue
        frames.append(
            _build_labels_for_one_ticker(
                sub,
                lambda_convex=lambda_convex,
                atr_mult=atr_mult,
                vol_min=vol_min,
                rsi_low=rsi_low,
                rsi_high=rsi_high,
                pos_low=pos_low,
                pos_high=pos_high,
                min_move=min_move,
            )
        )
    if not frames:
        return pd.DataFrame()
    return pd.concat(frames, ignore_index=True)


# =========================
# Walk-forward CV scoring
# =========================
def walk_forward_score(preds: pd.DataFrame, n_folds: int = N_FOLDS):
    """
    Walk-forward CV over time for a given parameter set.

    Metric:
      - precision = P(target_reached == 1 | predict_flag == 1) on validation slices
      - coverage  = (#predictions) / (#valid_target rows in validation slices)

    Aggregated across folds (time-ordered).
    """
    df = preds.copy()
    df = df[df["valid_target"]].sort_values("date").reset_index(drop=True)

    if df.empty:
        return {
            "precision": None,
            "coverage": 0.0,
            "n_preds": 0,
            "total_val": 0,
        }

    dates = sorted(df["date"].unique())
    n_dates = len(dates)
    if n_dates < n_folds + 1:
        # not enough data for this many folds
        n_folds = max(1, n_dates - 1)

    boundary_idxs = np.linspace(0, n_dates - 1, n_folds + 1, dtype=int)

    total_preds = 0
    total_hits = 0
    total_val_rows = 0

    for i in range(n_folds):
        val_start_idx = boundary_idxs[i] + 1
        val_end_idx = boundary_idxs[i + 1]
        if val_start_idx > val_end_idx:
            continue

        val_dates = dates[val_start_idx: val_end_idx + 1]
        df_val = df[df["date"].isin(val_dates)].copy()
        if df_val.empty:
            continue

        # Only consider rows where we actually fire
        mask_pred = df_val["predict_flag"].astype(bool)
        n_pred = mask_pred.sum()
        total_val_rows += len(df_val)

        if n_pred == 0:
            continue

        hits = df_val.loc[mask_pred, "target_reached"].sum()
        total_preds += n_pred
        total_hits += hits

    if total_preds == 0:
        return {
            "precision": None,
            "coverage": 0.0,
            "n_preds": 0,
            "total_val": total_val_rows,
        }

    precision = float(total_hits) / float(total_preds)
    coverage = float(total_preds) / float(total_val_rows) if total_val_rows > 0 else 0.0

    return {
        "precision": precision,
        "coverage": coverage,
        "n_preds": total_preds,
        "total_val": total_val_rows,
    }


def tune_hyperparams_wfcv(df: pd.DataFrame):
    """
    Grid search over (lambda_convex, atr_mult, vol_min) using walk-forward CV.

    Objective:
      1) Prefer configs with precision >= 0.9999 (~100%).
         Among those, choose the one with largest n_preds.
      2) If none reach that, choose highest precision;
         tie-breaker: higher coverage.
    """
    best = None
    best_params = None
    best_preds = None

    for lam in LAMBDA_GRID:
        for atr_mult in ATR_MULT_GRID:
            for vol_min in VOL_MIN_GRID:
                preds = build_model_frame_with_params(
                    df,
                    lambda_convex=lam,
                    atr_mult=atr_mult,
                    vol_min=vol_min,
                )

                score = walk_forward_score(preds, n_folds=N_FOLDS)
                prec = score["precision"]
                cov = score["coverage"]
                n_preds = score["n_preds"]

                if prec is None or n_preds == 0:
                    continue

                cand = {
                    "lambda_convex": lam,
                    "atr_mult": atr_mult,
                    "vol_min": vol_min,
                    "precision": prec,
                    "coverage": cov,
                    "n_preds": n_preds,
                    "total_val": score["total_val"],
                }

                if best is None:
                    best = cand
                    best_params = (lam, atr_mult, vol_min)
                    best_preds = preds
                    continue

                best_is_perfect = best["precision"] >= 0.9999
                cand_is_perfect = prec >= 0.9999

                # Ranking logic:
                # 1) Any ~perfect config beats non-perfect.
                # 2) Among perfect, higher n_preds wins.
                # 3) Among non-perfect, higher precision; tie-break coverage.
                if cand_is_perfect and not best_is_perfect:
                    best = cand
                    best_params = (lam, atr_mult, vol_min)
                    best_preds = preds
                elif cand_is_perfect and best_is_perfect:
                    if n_preds > best["n_preds"]:
                        best = cand
                        best_params = (lam, atr_mult, vol_min)
                        best_preds = preds
                elif (not cand_is_perfect) and (not best_is_perfect):
                    if (prec > best["precision"] or
                        (prec == best["precision"] and cov > best["coverage"])):
                        best = cand
                        best_params = (lam, atr_mult, vol_min)
                        best_preds = preds

    return best, best_params, best_preds


# =========================
# Evaluation
# =========================
def evaluate_model(preds: pd.DataFrame) -> None:
    """
    Compare:
      - baseline: unconditional prob that +min_move is hit in HORIZON_DAYS
      - our model: hit rate on days we actually predict (predict_flag == True)
    """
    # Baseline: all valid horizons
    base_mask = preds["valid"].astype(bool)
    base_used = preds[base_mask].copy()

    if base_used.empty:
        print("No valid baseline rows.")
        return

    baseline_hit = base_used["base_reached"].mean()
    print(f"\nBaseline: P(hit +{MIN_MOVE:.1%} within {HORIZON_DAYS}d | all days) = {baseline_hit:.4%}")

    # Our model: only on predicted days
    pred_mask = preds["predict_flag"].astype(bool)
    pred_used = preds[pred_mask].copy()

    if pred_used.empty:
        print("Model never fires. Loosen the gating thresholds.")
        return

    strategy_hit = pred_used["target_reached"].mean()
    coverage = len(pred_used) / len(base_used)

    print(f"Model:   P(hit target (>= {MIN_MOVE:.1%}) | model fires) = {strategy_hit:.4%}")
    print(f"Coverage (fraction of days with prediction): {coverage:.4%}")
    print(f"Lift vs baseline (absolute): {strategy_hit - baseline_hit:.4%}")

    # Per-ETF stats
    print("\nPer-ETF stats (only where model fires):")
    per_ticker = (
        pred_used.groupby("ticker")[["target_reached"]]
        .agg(["mean", "count"])
    )
    # flatten MultiIndex columns
    per_ticker.columns = ["_".join(col) for col in per_ticker.columns]
    per_ticker = per_ticker.rename(
        columns={"target_reached_mean": "hit_rate", "target_reached_count": "n_predictions"}
    )
    print(per_ticker.to_string(float_format=lambda x: f"{x:.4f}"))


# =========================
# Last 10 & current prediction
# =========================
def print_last_10_predictions(preds: pd.DataFrame) -> None:
    """
    Last 10 days where the model actually fired (predict_flag == True),
    with realized outcome.
    """
    pred_used = preds[preds["predict_flag"]].sort_values(["date", "ticker"])
    last10 = pred_used.tail(10)
    if last10.empty:
        print("\nNo model predictions to show.")
        return

    print("\nLast 10 model predictions (chronological):")
    cols = [
        "date", "ticker", "close", "target_pct", "target_level",
        "horizon_days", "base_reached", "target_reached"
    ]
    print(
        last10[cols].to_string(
            index=False,
            float_format=lambda x: f"{x:.4f}"
        )
    )


def print_current_predictions(preds: pd.DataFrame) -> None:
    """
    For each ETF, show the latest bar and whether we have a live signal now.
    - If gating_cond is True on the last date, print the target level.
    - If not, say 'no signal'.
    """
    rows = []
    for t in SECTOR_TICKERS:
        sub = preds[preds["ticker"] == t]
        if sub.empty:
            continue
        last = sub.iloc[-1]
        rows.append(
            {
                "ticker": t,
                "date": last["date"],
                "close": last["close"],
                "target_pct": last["target_pct"],
                "target_level": last["target_level"],
                "signal": bool(last["gating_cond"]),
            }
        )

    if not rows:
        print("\nNo data for current predictions.")
        return

    cur = pd.DataFrame(rows).sort_values("ticker")

    print("\nCurrent prediction status (latest bar per ETF):")
    printable = cur.copy()
    printable["signal"] = printable["signal"].map(
        lambda s: "PREDICT (level >= min target)" if s else "NO SIGNAL"
    )
    cols = ["ticker", "date", "close", "target_pct", "target_level", "signal"]
    print(
        printable[cols].to_string(
            index=False,
            float_format=lambda x: f"{x:.4f}"
        )
    )
    print(
        f"\nInterpretation: where 'PREDICT' is shown, the model predicts that "
        f"within the next {HORIZON_DAYS} trading days the HIGH will touch or "
        f"exceed 'target_level' (which is at least {MIN_MOVE:.1%} above the close). "
        f"Where 'NO SIGNAL' is shown, the regime filter is off and we make no prediction."
    )


# =========================
# Main
# =========================
if __name__ == "__main__":
    # 1) Download data
    df = download_sector_ohlc()
    print(f"Downloaded {len(df)} rows from yfinance.")

    # 2) Hyperparameter tuning via walk-forward CV
    print("\nRunning walk-forward CV hyperparameter tuning...")
    best_score, best_params, best_preds = tune_hyperparams_wfcv(df)

    if best_score is None or best_preds is None:
        raise RuntimeError("Hyperparameter search produced no valid configuration.")

    lam_best, atr_best, vol_best = best_params
    print("\n=== Best parameters from WFCV ===")
    print(f"lambda_convex: {lam_best}")
    print(f"ATR_MULT:      {atr_best}")
    print(f"VOL_MIN:       {vol_best}")
    print(f"WFCV precision: {best_score['precision']:.4f}")
    print(f"WFCV coverage:  {best_score['coverage']:.4%}")
    print(f"WFCV #preds:    {best_score['n_preds']} over {best_score['total_val']} validation rows")

    preds = best_preds
    print(f"\nTotal rows in model frame: {len(preds)}")
    print(f"Rows with valid horizon: {preds['valid'].sum()}")
    print(f"Rows with valid target: {preds['valid_target'].sum()}")
    print(f"Rows where model fires: {preds['predict_flag'].sum()}")

    # 3) Evaluate vs baseline on full history
    evaluate_model(preds)

    # 4) Last 10 model predictions (with realized outcome)
    print_last_10_predictions(preds)

    # 5) Current prediction per ETF (live view as of last bar)
    print_current_predictions(preds)

Downloaded 3993 rows from yfinance.

Running walk-forward CV hyperparameter tuning...


  fut_max = np.nanmax(fut, axis=0)
  fut_max = np.nanmax(fut, axis=0)
  fut_max = np.nanmax(fut, axis=0)
  fut_max = np.nanmax(fut, axis=0)
  fut_max = np.nanmax(fut, axis=0)
  fut_max = np.nanmax(fut, axis=0)
  fut_max = np.nanmax(fut, axis=0)
  fut_max = np.nanmax(fut, axis=0)
  fut_max = np.nanmax(fut, axis=0)
  fut_max = np.nanmax(fut, axis=0)
  fut_max = np.nanmax(fut, axis=0)
  fut_max = np.nanmax(fut, axis=0)
  fut_max = np.nanmax(fut, axis=0)
  fut_max = np.nanmax(fut, axis=0)
  fut_max = np.nanmax(fut, axis=0)
  fut_max = np.nanmax(fut, axis=0)
  fut_max = np.nanmax(fut, axis=0)
  fut_max = np.nanmax(fut, axis=0)
  fut_max = np.nanmax(fut, axis=0)
  fut_max = np.nanmax(fut, axis=0)
  fut_max = np.nanmax(fut, axis=0)
  fut_max = np.nanmax(fut, axis=0)
  fut_max = np.nanmax(fut, axis=0)
  fut_max = np.nanmax(fut, axis=0)
  fut_max = np.nanmax(fut, axis=0)
  fut_max = np.nanmax(fut, axis=0)



=== Best parameters from WFCV ===
lambda_convex: 7.0
ATR_MULT:      0.5
VOL_MIN:       0.008
WFCV precision: 0.4625
WFCV coverage:  6.3600%
WFCV #preds:    253 over 3978 validation rows

Total rows in model frame: 3993
Rows with valid horizon: 3992
Rows with valid target: 3979
Rows where model fires: 253

Baseline: P(hit +3.0% within 10d | all days) = 25.3257%
Model:   P(hit target (>= 3.0%) | model fires) = 46.2451%
Coverage (fraction of days with prediction): 6.3377%
Lift vs baseline (absolute): 20.9194%

Per-ETF stats (only where model fires):
        hit_rate  n_predictions
ticker                         
^GSPC     0.4625            253

Last 10 model predictions (chronological):
      date ticker     close  target_pct  target_level  horizon_days  base_reached  target_reached
2025-02-24  ^GSPC 5983.2500      0.0300     6162.7475            10        0.0000          0.0000
2025-02-28  ^GSPC 5954.5000      0.0300     6133.1350            10        0.0000          0.0000
2025-05-02  

  fut_max = np.nanmax(fut, axis=0)
