In [44]:
import warnings
import numpy as np
import pandas as pd
import yfinance as yf
import pandas_ta as ta
from scipy.stats import spearmanr
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import KFold
from sklearn.linear_model import Ridge, ElasticNet
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_error

warnings.filterwarnings("ignore")

In [36]:
def _extract_adj_close(px_raw: pd.DataFrame) -> pd.Series:
    
    if px_raw.empty:
        raise ValueError("Empty price frame from yfinance.")
    
    cols = px_raw.columns
    if not isinstance(cols, pd.MultiIndex):
        if 'Adj Close' not in px_raw.columns:
            raise ValueError(f"'Adj Close' not in columns: {list(px_raw.columns)}")
        s = px_raw['Adj Close']
    else:
        # usual multi-ticker layout: level 0 = field, level 1 = ticker
        if 'Adj Close' in cols.get_level_values(0):
            sub = px_raw['Adj Close']
        elif 'Adj Close' in cols.get_level_values(1):
            sub = px_raw.xs('Adj Close', axis=1, level=1)
        else:
            raise ValueError("Could not find 'Adj Close' in MultiIndex columns.")
        if isinstance(sub, pd.DataFrame):
            if sub.shape[1] != 1:
                raise ValueError(f"Multiple tickers detected: {list(sub.columns)}. Fetch one ticker.")
            s = sub.iloc[:, 0]
        else:
            s = sub

    return s.dropna().rename('adj_close')

def _log_returns_no_nan(price: pd.Series) -> pd.Series:
    """Vectorized log returns starting at row 2 (never creates a NaN)."""
    v = price.astype(float).to_numpy()
    if v.size < 2:
        return pd.Series(index=price.index[0:0], dtype=float, name='r_t')
    lr = np.log(v[1:] / v[:-1])
    return pd.Series(lr, index=price.index[1:], name='r_t')

def _rsi14(price: pd.Series) -> pd.Series:
    rsi = ta.rsi(price, length=14)
    if isinstance(rsi, pd.DataFrame):
        rsi = rsi.iloc[:, 0]
    if rsi is None or rsi.isna().all():
        delta = price.diff()
        gain = delta.clip(lower=0)
        loss = -delta.clip(upper=0)
        n = 14
        avg_gain = gain.ewm(alpha=1/n, adjust=False, min_periods=n).mean()
        avg_loss = loss.ewm(alpha=1/n, adjust=False, min_periods=n).mean()
        rs = avg_gain / avg_loss
        rsi = 100 - (100 / (1 + rs))
    return rsi.rename('rsi_14')

def create_dataset(ticker="^GSPC", start_date="2015-01-01", end_date="2024-03-31",
                   horizon=1, lookback_days=40):
    """
    Fetches data and engineers features/labels with no NaNs by construction.
    - Log returns start at row 2 (no top NaN).
    - Warmup handled by rolling 'min_periods', final dropna keeps only valid rows.
    """
    print(f"Creating dataset for {ticker} from {start_date} to {end_date}…")

    fetch_start = pd.to_datetime(start_date) - pd.Timedelta(days=lookback_days)
    px_raw = yf.download(ticker, start=fetch_start, end=end_date, auto_adjust=False, progress=False)
    if px_raw.empty:
        raise ValueError(f"No data downloaded for {ticker}. Check ticker/dates.")

    price = _extract_adj_close(px_raw)

    # r_t (no NaN at top), align price to r_t index
    r_t = _log_returns_no_nan(price)
    price = price.loc[r_t.index]

    # base frame aligned from row 2 onward
    base = pd.DataFrame({'adj_close': price, 'r_t': r_t}, index=r_t.index)

    # --- Features (as-of t) ---
    feats = pd.DataFrame(index=base.index)

    # Lagged returns
    for lag in [1, 2, 5, 10]:
        feats[f'r_t_{lag}'] = base['r_t'].shift(lag)

    # Rolling stats on returns (full windows only)
    for w in [5, 10, 20]:
        feats[f'vol_{w}']  = base['r_t'].rolling(w, min_periods=w).std(ddof=0)
        feats[f'mean_{w}'] = base['r_t'].rolling(w, min_periods=w).mean()

    # RSI(14) on price (as-of t)
    feats['rsi_14'] = _rsi14(base['adj_close'])

    # 20-day high/low distances (full windows)
    roll_max = base['adj_close'].rolling(20, min_periods=20).max()
    roll_min = base['adj_close'].rolling(20, min_periods=20).min()
    feats['dist_high_20'] = base['adj_close'] / roll_max - 1.0
    feats['dist_low_20']  = base['adj_close'] / roll_min - 1.0

    # Calendar features
    feats['dow']   = feats.index.dayofweek   # 0..6 (Mon..Sun)
    feats['month'] = feats.index.month       # 1..12

    # --- Label: y_t = r_{t+1} ---
    y = base['r_t'].shift(-horizon).rename('label')

    # --- Final alignment ---
    full = pd.concat([feats, y], axis=1).dropna()
    full = full.loc[pd.to_datetime(start_date):]  # cut off warmup padding before start_date

    if full.empty:
        raise SystemError("Empty dataset after alignment. Check date range and that enough data exists.")

    X = full.drop(columns='label')
    y = full['label']
    r_t_aligned = base['r_t'].reindex(X.index)

    print(f"Dataset ready. Features shape: {X.shape}, Label shape: {y.shape}")
    return X, y, r_t_aligned

In [40]:
class WalkForwardExpanding:
    def __init__(self, step=250, min_train=500):
        self.step = int(step)
        self.min_train = int(min_train)
    def split(self, X):
        n = len(X)
        for t_end in range(self.min_train, n, self.step):
            te_start, te_stop = t_end, min(t_end + self.step, n)
            if te_start >= te_stop:
                break
            yield np.arange(0, t_end), np.arange(te_start, te_stop)

class PurgedKFoldEmbargo:
    def __init__(self, n_splits=5, purge=20, embargo=5):
        self.n_splits = int(n_splits)
        self.purge = int(purge)
        self.embargo = int(embargo)
    def split(self, X):
        n = len(X)
        idx = np.arange(n)
        fold_sizes = np.full(self.n_splits, n // self.n_splits, dtype=int)
        fold_sizes[: n % self.n_splits] += 1
        start = 0
        for size in fold_sizes:
            stop = start + size
            test_idx = idx[start:stop]

            mask = np.ones(n, dtype=bool)
            # remove test itself
            mask[start:stop] = False
            # purge: remove neighbors around test
            ps = max(0, start - self.purge)
            pe = min(n, stop + self.purge)
            mask[ps:pe] = False
            # embargo: remove days right after test
            es = stop
            ee = min(n, stop + self.embargo)
            mask[es:ee] = False

            train_idx = idx[mask]
            yield train_idx, test_idx
            start = stop

# metrics 

def info_coefficient(y_true, y_pred):
    if len(y_true) < 2:
        return np.nan
    return spearmanr(y_true, y_pred).correlation

def hit_rate(y_true, y_pred):
    return (np.sign(y_true) == np.sign(y_pred)).mean()

# evaluator

def evaluate_cv(model, X: pd.DataFrame, y: pd.Series, splitter, scale=True, random_state=42):
    yhat_chunks, ytrue_chunks, idx_chunks = [], [], []

    for tr, te in splitter.split(X):
        # --- 1) slice
        X_tr, X_te = X.iloc[tr], X.iloc[te]
        y_tr, y_te = y.iloc[tr], y.iloc[te]

        # --- 2) scale within-train only
        if scale:
            scaler = StandardScaler().fit(X_tr)
            X_tr = scaler.transform(X_tr)
            X_te = scaler.transform(X_te)

        # --- 3) reinit model fresh (no state carry)
        est = model.__class__(**getattr(model, "get_params")())
        if hasattr(est, "random_state"):
            est.random_state = random_state

        # --- 4) fit & predict
        est.fit(X_tr, y_tr)
        yhat = est.predict(X_te)

        # --- 5) collect
        yhat_chunks.append(yhat)
        ytrue_chunks.append(y_te.values)
        idx_chunks.append(y_te.index)

    # --- 6) concatenate OOS
    y_pred = np.concatenate(yhat_chunks) if yhat_chunks else np.array([])
    y_true = np.concatenate(ytrue_chunks) if ytrue_chunks else np.array([])
    idx = pd.Index(np.concatenate([i.values for i in idx_chunks])) if idx_chunks else pd.Index([])

    # --- 7) metrics
    if len(y_true) == 0:
        raise RuntimeError("Splitter produced no test samples. Check your min_train/step or n_splits/purge/embargo.")

    metrics = {
        "MSE": mean_squared_error(y_true, y_pred),
        "MAE": mean_absolute_error(y_true, y_pred),
        "IC": info_coefficient(y_true, y_pred),
        "HitRate": hit_rate(y_true, y_pred),
        "N_test": len(y_true)
    }
    y_pred_series = pd.Series(y_pred, index=idx).sort_index()
    return metrics, y_pred_series

# runner to compare models 

def compare_models_splitters(X, y, models: dict, splitters: dict, scale=True):
    rows = []
    oos_preds = {}
    for s_name, splitter in splitters.items():
        for m_name, model in models.items():
            metrics, yhat = evaluate_cv(model, X, y, splitter, scale=scale)
            rows.append({"Splitter": s_name, "Model": m_name, **metrics})
            oos_preds[(s_name, m_name)] = yhat
    results = pd.DataFrame(rows).sort_values(["Splitter", "Model"]).reset_index(drop=True)
    return results, oos_preds

# verification

def inspect_splits(splitter, X, k=3):
    i = 0
    for tr, te in splitter.split(X):
        print(f"Split {i}: train[{len(tr)}] {X.index[tr[0]].date()} → {X.index[tr[-1]].date()}  "
              f"| test[{len(te)}] {X.index[te[0]].date()} → {X.index[te[-1]].date()}")
        i += 1
        if i >= k: break


In [46]:
splitters = {
    "KFold(lie)": KFold(n_splits=5, shuffle=True, random_state=42),
    "WalkFwd":    WalkForwardExpanding(step=250, min_train=500),
    "Purged+Emb": PurgedKFoldEmbargo(n_splits=5, purge=20, embargo=5),
}

inspect_splits(splitters["WalkFwd"], X)
inspect_splits(splitters["Purged+Emb"], X)

models = {
    "Ridge": Ridge(alpha=1.0, random_state=42),
    "ElasticNet": ElasticNet(alpha=0.001, l1_ratio=0.2, max_iter=5000, random_state=42),
    "RF": RandomForestRegressor(n_estimators=200, max_depth=4, random_state=42, n_jobs=-1),
}

# Compare
results, oos_preds = compare_models_splitters(X, y, models, splitters, scale=True)
results

Split 0: train[500] 2015-01-02 → 2016-12-23  | test[250] 2016-12-27 → 2017-12-21
Split 1: train[750] 2015-01-02 → 2017-12-21  | test[250] 2017-12-22 → 2018-12-20
Split 2: train[1000] 2015-01-02 → 2018-12-20  | test[250] 2018-12-21 → 2019-12-18
Split 0: train[1839] 2016-12-05 → 2024-03-27  | test[465] 2015-01-02 → 2016-11-03
Split 1: train[1819] 2015-01-02 → 2024-03-27  | test[465] 2016-11-04 → 2018-09-11
Split 2: train[1819] 2015-01-02 → 2024-03-27  | test[465] 2018-09-12 → 2020-07-17


Unnamed: 0,Splitter,Model,MSE,MAE,IC,HitRate,N_test
0,KFold(lie),ElasticNet,0.000132,0.007502,0.01411,0.503873,2324
1,KFold(lie),RF,0.000133,0.007423,-0.034345,0.53012,2324
2,KFold(lie),Ridge,0.000133,0.0076,0.015086,0.505164,2324
3,Purged+Emb,ElasticNet,0.000133,0.007528,-0.001885,0.518072,2324
4,Purged+Emb,RF,0.000132,0.007472,-0.053592,0.523236,2324
5,Purged+Emb,Ridge,0.000137,0.007661,-0.003904,0.501721,2324
6,WalkFwd,ElasticNet,0.000148,0.007788,-0.006127,0.495066,1824
7,WalkFwd,RF,0.000157,0.007817,-0.030391,0.516447,1824
8,WalkFwd,Ridge,0.000153,0.007949,-0.025892,0.491228,1824
