# üî¨ CARIA-SR Rigorous Validation (Anti-Overfitting)

**This notebook addresses overfitting concerns:**
1. **Walk-Forward Cross-Validation** (train/test split with gap)
2. **Permutation Tests** (compare to randomized signal)
3. **Purge + Embargo** (60-day gap to prevent leakage)
4. **Proper Baseline** (VIX-only vs VIX+Peak)
5. **Out-of-Sample Performance Only**

In [None]:
# @title 1. Setup
!pip install -q yfinance pandas numpy scipy scikit-learn statsmodels seaborn matplotlib pyarrow

from google.colab import drive
drive.mount('/content/drive')

import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import yfinance as yf
import warnings
from datetime import datetime
from tqdm import tqdm
import statsmodels.formula.api as smf
from sklearn.covariance import LedoitWolf

warnings.filterwarnings('ignore')
np.random.seed(42)
sns.set_style('whitegrid')

WORK_DIR = '/content/drive/MyDrive/CARIA_Rigorous'
os.makedirs(f'{WORK_DIR}/figures', exist_ok=True)
os.makedirs(f'{WORK_DIR}/tables', exist_ok=True)

# Parameters
START_DATE = "1995-01-01"
END_DATE = datetime.now().strftime("%Y-%m-%d")

# Cross-validation params
TRAIN_YEARS = 5
TEST_YEARS = 1
PURGE_DAYS = 60  # Gap between train and test
EMBARGO_DAYS = 22  # Forward return horizon

print(f"‚úÖ Output: {WORK_DIR}")

In [None]:
# @title 2. Download Data

# S&P 500 sectors for cross-sectional analysis
SECTORS = ['XLF', 'XLK', 'XLE', 'XLV', 'XLI', 'XLY', 'XLP', 'XLB', 'XLU', 'XLRE', 'XLC']

print("Downloading data...")
sectors = yf.download(SECTORS, start=START_DATE, end=END_DATE, progress=False)['Close']
market = yf.download(['^VIX', 'SPY', 'TLT'], start=START_DATE, end=END_DATE, progress=False)['Close']

vix = market['^VIX'].rename('volatility')
spy = market['SPY'].rename('price')
tlt = market['TLT'].rename('tlt')

print(f"\n‚úÖ Sectors: {sectors.shape}")
print(f"‚úÖ Market: {START_DATE} to {END_DATE}")

In [None]:
# @title 3. Core Functions

def cov_to_corr(S):
    d = np.sqrt(np.diag(S))
    d = np.where(d == 0, 1e-10, d)
    return np.nan_to_num((S / np.outer(d, d) + S.T / np.outer(d, d)) / 2)

def calc_absorption_ratio(returns, window=252, k_frac=0.2):
    """Calculate rolling Absorption Ratio."""
    returns = returns.dropna(axis=1, how='all')
    ar_series = pd.Series(index=returns.index, dtype=float)
    lw = LedoitWolf()
    
    for t in range(window, len(returns), 5):
        W = returns.iloc[t-window:t].dropna(axis=1)
        if W.shape[1] < 5:
            continue
        W = W.fillna(W.mean())
        X = W.values - W.values.mean(axis=0)
        try:
            C = cov_to_corr(lw.fit(X).covariance_)
            w = np.sort(np.linalg.eigvalsh(C))[::-1]
            w = np.maximum(w, 1e-10)
            k = max(1, int(np.ceil(k_frac * len(w))))
            ar_series.iloc[t] = np.sum(w[:k]) / np.sum(w)
        except:
            pass
    
    return ar_series.ffill().bfill()

def prepare_dataset(sectors, vix, spy, tlt, memory_window=60):
    """Prepare full dataset with all features."""
    returns = np.log(sectors).diff()
    ar = calc_absorption_ratio(returns)
    
    # Z-score
    ar_z = (ar - ar.rolling(252).mean()) / ar.rolling(252).std()
    
    # Peak Memory
    peak = ar_z.rolling(memory_window).max()
    
    # Merge
    idx = ar.dropna().index.intersection(vix.index).intersection(spy.index)
    df = pd.DataFrame({
        'ar': ar.loc[idx],
        'ar_z': ar_z.loc[idx],
        'peak': peak.loc[idx],
        'volatility': vix.loc[idx],
        'price': spy.loc[idx],
        'tlt': tlt.reindex(idx).ffill()
    }).dropna()
    
    # Target: 22-day forward return
    df['ret_fwd'] = df['price'].pct_change(22).shift(-22)
    
    return df.dropna()

print("‚úÖ Functions defined")

In [None]:
# @title 4. Prepare Dataset
print("Calculating structural metrics...")
df = prepare_dataset(sectors, vix, spy, tlt)
print(f"\n‚úÖ Dataset: {len(df)} observations")
print(f"   Period: {df.index.min().date()} to {df.index.max().date()}")
df.head()

In [None]:
# @title 5. Walk-Forward Cross-Validation (THE KEY TEST)

def walk_forward_cv(df, train_years=5, test_years=1, purge_days=60):
    """
    Time-series cross-validation with purge gap.
    
    For each fold:
    1. Train on years [0, train_years]
    2. Skip purge_days (to avoid leakage)
    3. Test on years [train_years + purge, train_years + purge + test_years]
    4. Roll forward
    """
    results = []
    
    train_days = train_years * 252
    test_days = test_years * 252
    
    n_folds = (len(df) - train_days - purge_days - test_days) // test_days
    print(f"Running {n_folds} walk-forward folds...")
    
    for fold in range(n_folds):
        train_start = fold * test_days
        train_end = train_start + train_days
        test_start = train_end + purge_days
        test_end = test_start + test_days
        
        if test_end > len(df):
            break
        
        train = df.iloc[train_start:train_end]
        test = df.iloc[test_start:test_end]
        
        # Filter low-vol regime in test (VIX < 20)
        test_lowvol = test[test['volatility'] < 20]
        
        if len(test_lowvol) < 50:
            continue
        
        try:
            # Model A: VIX only (proper baseline)
            mod_vix = smf.quantreg('ret_fwd ~ volatility', test_lowvol)
            r2_vix = mod_vix.fit(q=0.05).prsquared
            
            # Model B: VIX + Peak (our signal)
            mod_peak = smf.quantreg('ret_fwd ~ volatility + peak', test_lowvol)
            r2_peak = mod_peak.fit(q=0.05).prsquared
            
            # Calculate improvement (handle near-zero baseline)
            if r2_vix > 0.001:  # Only if baseline has some signal
                improvement = (r2_peak - r2_vix) / r2_vix
            else:
                improvement = r2_peak - r2_vix  # Absolute improvement
            
            results.append({
                'fold': fold,
                'train_end': train.index[-1].date(),
                'test_start': test.index[0].date(),
                'test_end': test.index[-1].date(),
                'n_test': len(test_lowvol),
                'r2_vix': r2_vix,
                'r2_peak': r2_peak,
                'improvement': improvement
            })
        except Exception as e:
            print(f"   Fold {fold} error: {e}")
    
    return pd.DataFrame(results)

# Run cross-validation
cv_results = walk_forward_cv(df, TRAIN_YEARS, TEST_YEARS, PURGE_DAYS)

print(f"\n{'='*60}")
print("WALK-FORWARD CROSS-VALIDATION RESULTS")
print(f"{'='*60}")
print(f"\nFolds completed: {len(cv_results)}")
print(f"\nMean R¬≤ (VIX only):      {cv_results['r2_vix'].mean():.5f}")
print(f"Mean R¬≤ (VIX + Peak):    {cv_results['r2_peak'].mean():.5f}")
print(f"Mean Improvement:        {cv_results['improvement'].mean():.1%}")
print(f"\nFolds where Peak > VIX:  {(cv_results['r2_peak'] > cv_results['r2_vix']).sum()}/{len(cv_results)}")
print(f"Win Rate:                {(cv_results['r2_peak'] > cv_results['r2_vix']).mean():.1%}")

cv_results.to_csv(f'{WORK_DIR}/tables/WalkForward_CV.csv', index=False)

In [None]:
# @title 6. Permutation Test (Is Our Signal Better Than Random?)

def permutation_test(df, n_permutations=500):
    """
    Shuffle the 'peak' column and compare to real signal.
    If our real improvement is in the top 5% of random improvements,
    it's statistically significant.
    """
    # Filter low-vol regime
    test_df = df[df['volatility'] < 20].copy()
    
    # Real improvement
    r2_vix_real = smf.quantreg('ret_fwd ~ volatility', test_df).fit(q=0.05).prsquared
    r2_peak_real = smf.quantreg('ret_fwd ~ volatility + peak', test_df).fit(q=0.05).prsquared
    real_improvement = r2_peak_real - r2_vix_real
    
    # Permutation distribution
    perm_improvements = []
    
    print(f"Running {n_permutations} permutations...")
    for i in range(n_permutations):
        # Shuffle peak signal (break temporal structure)
        test_df_perm = test_df.copy()
        test_df_perm['peak'] = np.random.permutation(test_df_perm['peak'].values)
        
        try:
            r2_perm = smf.quantreg('ret_fwd ~ volatility + peak', test_df_perm).fit(q=0.05).prsquared
            perm_improvements.append(r2_perm - r2_vix_real)
        except:
            pass
        
        if (i+1) % 100 == 0:
            print(f"   {i+1}/{n_permutations}")
    
    perm_improvements = np.array(perm_improvements)
    
    # P-value: fraction of permutations that beat real
    p_value = (perm_improvements >= real_improvement).mean()
    
    return {
        'real_improvement': real_improvement,
        'perm_mean': perm_improvements.mean(),
        'perm_std': perm_improvements.std(),
        'perm_95th': np.percentile(perm_improvements, 95),
        'p_value': p_value,
        'significant': p_value < 0.05
    }

perm_results = permutation_test(df)

print(f"\n{'='*60}")
print("PERMUTATION TEST RESULTS")
print(f"{'='*60}")
print(f"\nReal Improvement (R¬≤):    {perm_results['real_improvement']:.5f}")
print(f"Random Mean:              {perm_results['perm_mean']:.5f}")
print(f"Random 95th Percentile:   {perm_results['perm_95th']:.5f}")
print(f"\nP-value:                  {perm_results['p_value']:.4f}")
print(f"Significant (p < 0.05):   {'‚úÖ YES' if perm_results['significant'] else '‚ùå NO'}")

In [None]:
# @title 7. Parameter Stability Test (Is 60-day window robust?)

def test_memory_windows(df, windows=[20, 40, 60, 90, 120]):
    """
    Test different memory windows with PROPER out-of-sample evaluation.
    Use first 70% for parameter selection, last 30% for final test.
    """
    split = int(len(df) * 0.7)
    train = df.iloc[:split]
    test = df.iloc[split:]
    
    results = []
    
    for w in windows:
        # Recalculate peak with this window
        train_copy = train.copy()
        test_copy = test.copy()
        
        train_copy['peak_w'] = train_copy['ar_z'].rolling(w).max()
        test_copy['peak_w'] = test_copy['ar_z'].rolling(w).max()
        
        # Test on held-out data
        test_lowvol = test_copy[test_copy['volatility'] < 20].dropna()
        
        if len(test_lowvol) < 100:
            continue
        
        try:
            r2_vix = smf.quantreg('ret_fwd ~ volatility', test_lowvol).fit(q=0.05).prsquared
            r2_peak = smf.quantreg('ret_fwd ~ volatility + peak_w', test_lowvol).fit(q=0.05).prsquared
            
            results.append({
                'window': w,
                'r2_vix': r2_vix,
                'r2_peak': r2_peak,
                'improvement': r2_peak - r2_vix,
                'beats_baseline': r2_peak > r2_vix
            })
        except:
            pass
    
    return pd.DataFrame(results)

window_results = test_memory_windows(df)

print(f"\n{'='*60}")
print("PARAMETER STABILITY TEST (Out-of-Sample)")
print(f"{'='*60}")
print(window_results.to_string(index=False))

# Check if results are robust across windows
if len(window_results) > 0:
    wins = window_results['beats_baseline'].sum()
    print(f"\nWindows that beat baseline: {wins}/{len(window_results)}")
    if wins == len(window_results):
        print("‚úÖ ROBUST: Signal works across all memory windows")
    elif wins >= len(window_results) / 2:
        print("‚ö†Ô∏è MODERATE: Signal works for some windows")
    else:
        print("‚ùå FRAGILE: Signal is parameter-dependent")

In [None]:
# @title 8. Minsky Hedge (Out-of-Sample Only)

def minsky_hedge_oos(df, train_pct=0.7, entry_peak=1.0, exit_peak=0.5, vix_entry=20):
    """
    Minsky Hedge with parameters chosen on TRAIN, evaluated on TEST.
    """
    split = int(len(df) * train_pct)
    test = df.iloc[split:].copy()
    
    test['daily_ret'] = np.log(test['price']).diff()
    test['tlt_ret'] = np.log(test['tlt']).diff()
    
    # State machine
    in_hedge = False
    states = []
    
    for i in range(len(test)):
        peak = test['peak'].iloc[i]
        vix = test['volatility'].iloc[i]
        
        if pd.isna(peak):
            states.append(False)
            continue
        
        if not in_hedge:
            if vix < vix_entry and peak > entry_peak:
                in_hedge = True
        else:
            if peak < exit_peak:
                in_hedge = False
        
        states.append(in_hedge)
    
    test['in_hedge'] = states
    test['strat_ret'] = np.where(test['in_hedge'], test['tlt_ret'], test['daily_ret'])
    
    test['cum_bnh'] = (1 + test['daily_ret'].fillna(0)).cumprod()
    test['cum_strat'] = (1 + test['strat_ret'].fillna(0)).cumprod()
    
    years = len(test) / 252
    
    dd_bnh = (test['cum_bnh'] / test['cum_bnh'].cummax() - 1).min()
    dd_strat = (test['cum_strat'] / test['cum_strat'].cummax() - 1).min()
    
    cagr_bnh = test['cum_bnh'].iloc[-1]**(1/years) - 1
    cagr_strat = test['cum_strat'].iloc[-1]**(1/years) - 1
    
    vol_bnh = test['daily_ret'].std() * np.sqrt(252)
    vol_strat = test['strat_ret'].std() * np.sqrt(252)
    
    sharpe_bnh = cagr_bnh / vol_bnh if vol_bnh > 0 else 0
    sharpe_strat = cagr_strat / vol_strat if vol_strat > 0 else 0
    
    return {
        'test_start': test.index[0].date(),
        'test_end': test.index[-1].date(),
        'dd_bnh': dd_bnh,
        'dd_strat': dd_strat,
        'dd_reduction': dd_bnh - dd_strat,
        'cagr_bnh': cagr_bnh,
        'cagr_strat': cagr_strat,
        'sharpe_bnh': sharpe_bnh,
        'sharpe_strat': sharpe_strat,
        'hedge_time': np.mean(states),
        'test_df': test
    }

hedge_results = minsky_hedge_oos(df)

print(f"\n{'='*60}")
print("MINSKY HEDGE (OUT-OF-SAMPLE ONLY)")
print(f"{'='*60}")
print(f"\nTest Period: {hedge_results['test_start']} to {hedge_results['test_end']}")
print(f"\n{'Metric':<20} {'Benchmark':>12} {'Minsky':>12} {'Diff':>12}")
print("-" * 56)
print(f"{'Max Drawdown':<20} {hedge_results['dd_bnh']:>12.1%} {hedge_results['dd_strat']:>12.1%} {hedge_results['dd_reduction']:>12.1%}")
print(f"{'CAGR':<20} {hedge_results['cagr_bnh']:>12.1%} {hedge_results['cagr_strat']:>12.1%} {hedge_results['cagr_strat']-hedge_results['cagr_bnh']:>12.1%}")
print(f"{'Sharpe':<20} {hedge_results['sharpe_bnh']:>12.2f} {hedge_results['sharpe_strat']:>12.2f} {hedge_results['sharpe_strat']-hedge_results['sharpe_bnh']:>12.2f}")
print(f"\nTime in Hedge: {hedge_results['hedge_time']*100:.1f}%")

In [None]:
# @title 9. Visualization

fig, axes = plt.subplots(2, 2, figsize=(14, 10))

# 1. Walk-Forward Results
ax = axes[0, 0]
ax.bar(cv_results['fold'], cv_results['improvement'] * 100, 
       color=['green' if x > 0 else 'red' for x in cv_results['improvement']])
ax.axhline(y=0, color='black', linestyle='-', linewidth=1)
ax.set_xlabel('Fold')
ax.set_ylabel('Improvement (%)')
ax.set_title('Walk-Forward CV: Out-of-Sample Improvement per Fold')

# 2. Parameter Stability
ax = axes[0, 1]
ax.bar(window_results['window'].astype(str), window_results['improvement'] * 100,
       color=['green' if x else 'red' for x in window_results['beats_baseline']])
ax.axhline(y=0, color='black', linestyle='-', linewidth=1)
ax.set_xlabel('Memory Window (days)')
ax.set_ylabel('R¬≤ Improvement')
ax.set_title('Parameter Stability (Out-of-Sample)')

# 3. Equity Curves (OOS only)
ax = axes[1, 0]
test_df = hedge_results['test_df']
ax.plot(test_df.index, test_df['cum_bnh'], label=f"Buy&Hold (DD:{hedge_results['dd_bnh']:.1%})")
ax.plot(test_df.index, test_df['cum_strat'], label=f"Minsky (DD:{hedge_results['dd_strat']:.1%})")
ax.set_yscale('log')
ax.legend()
ax.set_title('Minsky Hedge: Out-of-Sample Equity Curves')

# 4. Drawdown comparison
ax = axes[1, 1]
dd_bnh = test_df['cum_bnh'] / test_df['cum_bnh'].cummax() - 1
dd_strat = test_df['cum_strat'] / test_df['cum_strat'].cummax() - 1
ax.fill_between(test_df.index, dd_bnh, 0, alpha=0.3, label='Buy&Hold DD')
ax.plot(test_df.index, dd_strat, color='blue', label='Minsky DD')
ax.legend()
ax.set_title('Drawdown Comparison (OOS)')

plt.tight_layout()
plt.savefig(f'{WORK_DIR}/figures/Rigorous_Validation.png', dpi=300)
plt.show()

In [None]:
# @title 10. Final Summary

print("\n" + "="*70)
print("üî¨ CARIA-SR RIGOROUS VALIDATION SUMMARY")
print("="*70)

print("\nüìä WALK-FORWARD CROSS-VALIDATION:")
win_rate = (cv_results['r2_peak'] > cv_results['r2_vix']).mean()
print(f"   Win Rate: {win_rate:.1%} ({(cv_results['r2_peak'] > cv_results['r2_vix']).sum()}/{len(cv_results)} folds)")
print(f"   Mean OOS Improvement: {cv_results['improvement'].mean():.1%}")

print("\nüìä PERMUTATION TEST:")
print(f"   P-value: {perm_results['p_value']:.4f}")
print(f"   Significant: {'‚úÖ YES' if perm_results['significant'] else '‚ùå NO'}")

print("\nüìä PARAMETER STABILITY:")
stable = window_results['beats_baseline'].sum() / len(window_results)
print(f"   Windows that work: {window_results['beats_baseline'].sum()}/{len(window_results)} ({stable:.0%})")

print("\nüìä MINSKY HEDGE (OOS):")
print(f"   DD Reduction: {hedge_results['dd_reduction']:.1%}")
print(f"   Sharpe Improvement: {hedge_results['sharpe_strat'] - hedge_results['sharpe_bnh']:.2f}")

# Overall verdict
print("\n" + "="*70)
print("VERDICT:")
if win_rate > 0.6 and perm_results['significant'] and stable > 0.5:
    print("‚úÖ SIGNAL IS ROBUST - Not overfitting")
elif win_rate > 0.5 or perm_results['significant']:
    print("‚ö†Ô∏è SIGNAL SHOWS SOME PROMISE - But needs more data")
else:
    print("‚ùå SIGNAL MAY BE OVERFITTING - Use with caution")
print("="*70)

# Save all results
summary = {
    'cv_win_rate': win_rate,
    'perm_pvalue': perm_results['p_value'],
    'perm_significant': perm_results['significant'],
    'param_stability': stable,
    'oos_dd_reduction': hedge_results['dd_reduction'],
    'oos_sharpe_diff': hedge_results['sharpe_strat'] - hedge_results['sharpe_bnh']
}
pd.DataFrame([summary]).to_csv(f'{WORK_DIR}/tables/Final_Summary.csv', index=False)
print(f"\n‚úÖ Results saved to {WORK_DIR}")