# FX Skew Divergence Signal Investigation — GBP Focus

**Key finding from v1:** When a short-dated tenor moves but a longer tenor *hasn't yet moved*,
GBPUSD spot moves in the direction implied by the short tenor. The edge disappears once the long tenor catches up.

**v2 finding:** 1M vs 3M divergence works (t=5.98 at best). 
**v3 question:** Can 1W options detect the move even *earlier* than 1M?

**Signal concept:** `divergence = (fast tenor rho moved) AND (slow tenor rho still quiet)`

**Tenor pairs tested:**
- **1W -> 1M** (1W moved, 1M hasn't yet — earliest possible entry)
- **1W -> 3M** (1W moved, 3M hasn't yet — maximum divergence)
- **1M -> 3M** (1M moved, 3M hasn't yet — the v2 baseline)

| Section | Content |
|---------|--------|
| 0 | Setup & Configuration |
| 1 | Load & Prepare GBP Data |
| 2 | Reproduce Key Finding — Cascade Timing |
| 3 | Build Divergence Signal Engine |
| 4 | Divergence Signal Evaluation — Grid Search |
| 5 | Ablation: Divergence vs Fast-Only vs Slow-Quiet |
| 6 | Optimal MA Window Deep-Dive |
| 7 | Event Study — Divergence Events |
| 8 | Signal Magnitude — Does Size Matter? |
| 9 | Alpha & Nu Confirmation |
| 10 | Backtest — Best Divergence Signal |
| 11 | Robustness & Reality Check |
| 12 | Summary & Next Steps |

---
## Section 0: Setup & Configuration

In [None]:
import warnings
from pathlib import Path

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from scipy import stats

warnings.filterwarnings('ignore')
pd.set_option('future.no_silent_downcasting', True)
plt.style.use('seaborn-v0_8-whitegrid')
plt.rcParams.update({'figure.figsize': (14, 6), 'font.size': 11})

DATA_DIR = Path.home() / 'trade_data' / 'ETFTrader'
FX_SPOT_DIR = DATA_DIR / 'fx_spot'
FX_SABR_DIR = DATA_DIR / 'fx_sabr'

# Focus currency
CCY = 'GBP'
PAIR = 'GBPUSD'

# Tenors — now includes 1W for earlier detection
TENORS = ['1W', '1M', '3M', '6M']
TARGET_DTE = {'1W': 7, '1M': 30, '3M': 91, '6M': 182}

# Tenor pairs: (fast, slow) — fast moves first, slow should be quiet
TENOR_PAIRS = [
    ('1W', '1M'),   # 1W moved, 1M hasn't yet (earliest entry)
    ('1W', '3M'),   # 1W moved, 3M hasn't yet (max divergence)
    ('1M', '3M'),   # 1M moved, 3M hasn't yet (v2 baseline)
]

# MA windows to test for divergence detection
MA_WINDOWS = [3, 5, 7, 10, 14, 21]

# Forward return horizons
HORIZONS = {'1d': 1, '5d': 5, '10d': 10, '21d': 21}

# Rho boundary filter
RHO_BOUNDARY = 0.98

RANDOM_STATE = 42
COLORS_TENOR = {'1W': '#9b59b6', '1M': '#e74c3c', '3M': '#3498db', '6M': '#2ecc71'}
COLORS_PAIR = {'1W1M': '#9b59b6', '1W3M': '#e67e22', '1M3M': '#3498db'}

print(f'Currency: {CCY} ({PAIR})')
print(f'Tenors:   {TENORS}')
print(f'Tenor pairs: {[f"{f}->{s}" for f, s in TENOR_PAIRS]}')
print(f'MA windows to test: {MA_WINDOWS}')
print(f'Horizons: {list(HORIZONS.keys())}')

---
## Section 1: Load & Prepare GBP Data

In [None]:
# --- Load and clean SABR params ---
sabr_raw = pd.read_parquet(FX_SABR_DIR / 'sabr_params_historical.parquet')
sabr_raw['date'] = pd.to_datetime(sabr_raw['timestamp']).dt.normalize()

sabr = sabr_raw[(sabr_raw['currency'] == CCY) & (sabr_raw['tenor_bucket'].isin(TENORS))].copy()
print(f'GBP SABR rows (raw): {len(sabr)}')

# Remove boundary rho (calibration failures)
boundary_mask = sabr['rho'].abs() > RHO_BOUNDARY
print(f'Boundary rho removed: {boundary_mask.sum()} ({boundary_mask.mean():.1%})')
sabr = sabr[~boundary_mask].copy()

# Deduplicate: keep row closest to target DTE per (date, tenor)
sabr['target_dte'] = sabr['tenor_bucket'].map(TARGET_DTE)
sabr['dte_diff'] = (sabr['dte'] - sabr['target_dte']).abs()
sabr = sabr.sort_values('dte_diff').drop_duplicates(['date', 'tenor_bucket'], keep='first')
sabr = sabr.drop(columns=['target_dte', 'dte_diff'])

# Pivot wide
df = sabr.pivot_table(index='date', columns='tenor_bucket',
                      values=['rho', 'alpha', 'nu'], aggfunc='first')
df.columns = [f'{p}_{t}' for p, t in df.columns]
df = df.reset_index().sort_values('date')

print(f'\nPivoted: {len(df)} dates')
for t in TENORS:
    col = f'rho_{t}'
    if col in df.columns:
        print(f'  {t}: {df[col].notna().sum()} dates with rho')

has_1M_3M = df['rho_1M'].notna() & df['rho_3M'].notna()
print(f'\n  Both 1M+3M: {has_1M_3M.sum()} dates (primary analysis set)')

In [None]:
# --- Load GBPUSD spot and compute forward returns ---
spot_raw = pd.read_parquet(FX_SPOT_DIR / f'{PAIR}.parquet')
spot_raw['date'] = pd.to_datetime(spot_raw['date']).dt.normalize()
spot_raw = spot_raw.sort_values('date')

for label, days in HORIZONS.items():
    spot_raw[f'fwd_ret_{label}'] = spot_raw['close'].shift(-days) / spot_raw['close'] - 1

# Merge
merge_cols = ['date', 'close'] + [c for c in spot_raw.columns if c.startswith('fwd_')]
df = df.merge(spot_raw[merge_cols], on='date', how='left')
df = df.rename(columns={'close': 'spot'})

print(f'Merged: {len(df)} rows, spot coverage: {df["spot"].notna().sum()}')
print(f'Date range: {df["date"].min().date()} to {df["date"].max().date()}')

# --- Per-pair data masks ---
mask_pair = {}
for fast, slow in TENOR_PAIRS:
    m = df[f'rho_{fast}'].notna() & df[f'rho_{slow}'].notna()
    pair_label = f'{fast}{slow}'
    mask_pair[(fast, slow)] = m
    print(f'  {fast}->{slow} overlap: {m.sum()} dates')

# Legacy alias for downstream code
has_1M_3M = mask_pair.get(('1M', '3M'), df['rho_1M'].notna() & df['rho_3M'].notna())

# Quick look
sample_cols = ['date', 'spot', 'rho_1W', 'rho_1M', 'rho_3M', 'fwd_ret_5d']
avail = [c for c in sample_cols if c in df.columns]
print(f'\nLast 5 rows with 1W+1M+3M:')
mask_all3 = df['rho_1M'].notna() & df['rho_3M'].notna()
if 'rho_1W' in df.columns:
    mask_all3 = mask_all3 & df['rho_1W'].notna()
print(df.loc[mask_all3, avail].tail().to_string(index=False))

---
## Section 2: Reproduce the Key Finding — Cascade Timing

v1 of this notebook tested cascades (1M steepens/flattens → 3M follows). The striking result:
**the edge exists at the 1M trigger and vanishes by the time 3M confirms.**
This section reproduces that finding to motivate the divergence approach.

In [None]:
# 2a. Detect cascades across tenor pairs
# Compute 5d rho changes for all tenors
for tenor in TENORS:
    col = f'rho_{tenor}'
    if col in df.columns:
        df[f'{col}_chg5d'] = df[col].diff(5)

# Steepening/flattening thresholds (p25) for each tenor
thresholds = {}
for tenor in ['1W', '1M', '3M']:
    chg_col = f'rho_{tenor}_chg5d'
    if chg_col not in df.columns:
        continue
    chg = df[chg_col].dropna()
    if len(chg) < 20:
        continue
    thresholds[tenor] = {'p25': chg.quantile(0.25), 'p75': chg.quantile(0.75)}
    print(f'rho_{tenor} 5d change: p25={thresholds[tenor]["p25"]:.4f}, p75={thresholds[tenor]["p75"]:.4f}')

# Mark threshold-crossing events
for tenor in thresholds:
    chg_col = f'rho_{tenor}_chg5d'
    df[f'steep_{tenor}'] = (df[chg_col] < thresholds[tenor]['p25']).astype(float)
    df.loc[df[chg_col].isna(), f'steep_{tenor}'] = np.nan
    df[f'flat_{tenor}'] = (df[chg_col] > thresholds[tenor]['p75']).astype(float)
    df.loc[df[chg_col].isna(), f'flat_{tenor}'] = np.nan

# Cascade detection function (parameterised by trigger/confirm tenor)
def detect_cascades(df, direction, trigger_tenor='1M', confirm_tenor='3M', max_lag=10):
    """Detect fast->slow cascades. direction='steep' or 'flat'."""
    sig_t = f'{direction}_{trigger_tenor}'
    sig_c = f'{direction}_{confirm_tenor}'
    if sig_t not in df.columns or sig_c not in df.columns:
        return []
    cascades = []
    used = set()
    for i in range(len(df)):
        if df[sig_t].iloc[i] != 1.0 or df['date'].iloc[i] in used:
            continue
        for j in range(i + 1, min(i + max_lag + 1, len(df))):
            if df[sig_c].iloc[j] == 1.0:
                cascades.append({'idx_trigger': i, 'idx_confirm': j,
                                 'lag': j - i, 'date_trigger': df['date'].iloc[i]})
                used.add(df['date'].iloc[i])
                break
    return cascades

# Detect cascades for each tenor pair
all_cascades = {}
for fast, slow in TENOR_PAIRS:
    for direction, label in [('steep', 'bearish'), ('flat', 'bullish')]:
        key = (fast, slow, direction)
        c = detect_cascades(df, direction, trigger_tenor=fast, confirm_tenor=slow)
        all_cascades[key] = c
        if c:
            print(f'{label.title()} cascade {fast}->{slow}: {len(c)} events')

# Legacy references for the 1M->3M pair
cascades_steep = all_cascades.get(('1M', '3M', 'steep'), [])
cascades_flat = all_cascades.get(('1M', '3M', 'flat'), [])

In [None]:
# 2b. THE KEY RESULT: returns at trigger vs at confirmation — now for ALL pairs
print('=== CASCADE TIMING: Edge at Trigger vs at Confirmation ===\n')
print(f'{"Pair":<8s} {"Signal":<18s} {"Entry":<18s} {"Horizon":<8s} '
      f'{"Mean ret":>10s} {"t":>7s} {"p":>7s} {"Hit%":>6s} {"n":>4s}')
print('-' * 105)

for fast, slow in TENOR_PAIRS:
    for label, direction, sign in [('Bearish', 'steep', -1), ('Bullish', 'flat', +1)]:
        cascades = all_cascades.get((fast, slow, direction), [])
        if not cascades:
            continue
        pair_str = f'{fast}->{slow}'
        for entry, idx_key in [(f'At {fast} trigger', 'idx_trigger'),
                                (f'At {slow} confirm', 'idx_confirm')]:
            for h_label in ['5d', '10d']:
                ret_col = f'fwd_ret_{h_label}'
                rets = []
                for c in cascades:
                    idx = c[idx_key]
                    if idx < len(df) and pd.notna(df[ret_col].iloc[idx]):
                        rets.append(df[ret_col].iloc[idx] * sign)
                if len(rets) > 3:
                    rets = np.array(rets)
                    t, p = stats.ttest_1samp(rets, 0)
                    hit = np.mean(rets > 0)
                    print(f'{pair_str:<8s} {label:<18s} {entry:<18s} {h_label:<8s} '
                          f'{rets.mean()*10000:>+9.1f}bps {t:>7.2f} {p:>7.3f} '
                          f'{hit:>5.0%} {len(rets):>4d}')

print('\n>>> Compare: does 1W trigger capture the edge EARLIER than 1M trigger?')
print('>>> Key: look for strong t-stats at "trigger" that vanish at "confirm".')

In [None]:
# 2c. Visual: event study overlay — now includes 1W trigger where available
fig, axes = plt.subplots(1, 2, figsize=(16, 6))
event_window = 21

for ax, (label, direction, cascade_sign) in zip(axes,
        [('Bullish cascade (fast flattens)', 'flat', +1),
         ('Bearish cascade (fast steepens)', 'steep', -1)]):

    # Plot each tenor pair's trigger timing
    for fast, slow in TENOR_PAIRS:
        pair_label = f'{fast}{slow}'
        color = COLORS_PAIR.get(pair_label, 'black')
        cascades = all_cascades.get((fast, slow, direction), [])
        if not cascades:
            continue
        # At trigger
        paths = []
        for c in cascades:
            idx = c['idx_trigger']
            if idx + event_window < len(df):
                fwd = df['spot'].iloc[idx:idx + event_window + 1].values
                if not np.any(np.isnan(fwd)):
                    paths.append((fwd / fwd[0] - 1) * 100 * cascade_sign)
        if paths:
            paths = np.array(paths)
            avg = paths.mean(axis=0)
            sem = paths.std(axis=0) / np.sqrt(len(paths))
            days = range(event_window + 1)
            ax.plot(days, avg, color=color, linewidth=2, linestyle='-',
                    label=f'@ {fast} trigger (n={len(paths)})')
            ax.fill_between(days, avg - 1.96*sem, avg + 1.96*sem,
                            alpha=0.08, color=color)

    # Also plot 3M confirm for 1M->3M as dashed reference
    cascades_ref = all_cascades.get(('1M', '3M', direction), [])
    if cascades_ref:
        paths = []
        for c in cascades_ref:
            idx = c['idx_confirm']
            if idx + event_window < len(df):
                fwd = df['spot'].iloc[idx:idx + event_window + 1].values
                if not np.any(np.isnan(fwd)):
                    paths.append((fwd / fwd[0] - 1) * 100 * cascade_sign)
        if paths:
            paths = np.array(paths)
            avg = paths.mean(axis=0)
            days = range(event_window + 1)
            ax.plot(days, avg, color='gray', linewidth=1.5, linestyle='--',
                    label=f'@ 3M confirm (n={len(paths)})')

    ax.axhline(0, color='gray', linestyle='--', linewidth=0.8)
    ax.set_xlabel('Days after entry')
    ax.set_ylabel('Cumulative signed GBPUSD return (%)')
    ax.set_title(label, fontweight='bold')
    ax.legend(fontsize=8)

fig.suptitle('Cascade Event Study: 1W vs 1M Trigger vs 3M Confirmation\n'
             'Earlier trigger = more edge captured?',
             fontweight='bold')
fig.tight_layout()
plt.show()

---
## Section 3: Build the Divergence Signal Engine

**Divergence = (fast tenor rho moved significantly) AND (slow tenor rho still quiet)**

Test across three tenor pairs: 1W->1M, 1W->3M, 1M->3M.
Multiple detection methods and MA windows to find the optimal timing.

In [None]:
# 3a. Multi-window detection metrics for ALL tenors used in pairs
# For each window N, compute:
#   - deviation from MA: rho - rho.rolling(N).mean()
#   - simple change: rho - rho.shift(N)
#   - MA crossover: fast MA - slow MA (for N >= 5)

# Collect all tenors that appear in any pair
pair_tenors = sorted(set(t for pair in TENOR_PAIRS for t in pair))

detection_cols = {}  # (tenor, method, window) -> column name

for tenor in pair_tenors:
    rho_col = f'rho_{tenor}'
    if rho_col not in df.columns:
        continue

    for N in MA_WINDOWS:
        # Deviation from MA
        col_dev = f'rho_{tenor}_dev{N}d'
        if col_dev not in df.columns:
            df[col_dev] = df[rho_col] - df[rho_col].rolling(N, min_periods=max(2, N//2)).mean()
        detection_cols[(tenor, 'dev', N)] = col_dev

        # Simple change
        col_chg = f'rho_{tenor}_chg{N}d'
        if col_chg not in df.columns:
            df[col_chg] = df[rho_col] - df[rho_col].shift(N)
        detection_cols[(tenor, 'chg', N)] = col_chg

        # MA crossover (fast=min(3,N) vs slow=N, only if N >= 5)
        if N >= 5:
            fast_w = min(3, N // 2)
            col_cross = f'rho_{tenor}_cross{N}d'
            if col_cross not in df.columns:
                df[col_cross] = (df[rho_col].rolling(fast_w, min_periods=2).mean()
                                 - df[rho_col].rolling(N, min_periods=max(3, N//2)).mean())
            detection_cols[(tenor, 'cross', N)] = col_cross

print(f'Detection metrics computed: {len(detection_cols)} columns')
for tenor in pair_tenors:
    n = sum(1 for k in detection_cols if k[0] == tenor)
    print(f'  {tenor} metrics: {n}')

# Show distributions for key metrics per tenor
print(f'\nKey detection metric distributions:')
for tenor in pair_tenors:
    mask_t = df[f'rho_{tenor}'].notna()
    for method in ['dev', 'chg']:
        for N in [3, 5, 7]:
            key = (tenor, method, N)
            if key in detection_cols:
                col = detection_cols[key]
                vals = df.loc[mask_t, col].dropna()
                if len(vals) > 10:
                    print(f'  {col}: p10={vals.quantile(0.1):.4f}, p25={vals.quantile(0.25):.4f}, '
                          f'p75={vals.quantile(0.75):.4f}, p90={vals.quantile(0.9):.4f} (n={len(vals)})')

In [None]:
# 3b. "Slow tenor quiet" condition
# For each slow tenor in our pairs, compute quiet thresholds
# "Quiet" = |slow tenor change| is below median or p25 of absolute changes

# Identify all slow tenors from TENOR_PAIRS
slow_tenors = sorted(set(slow for _, slow in TENOR_PAIRS))

quiet_thresholds = {}  # (slow_tenor, method, window, level) -> threshold value

for slow_tenor in slow_tenors:
    print(f'{slow_tenor} "quiet" thresholds (absolute change below this = quiet):\n')
    for method in ['dev', 'chg']:
        for N in MA_WINDOWS:
            key = (slow_tenor, method, N)
            if key not in detection_cols:
                continue
            col = detection_cols[key]
            # Use dates where slow tenor has data
            vals = df.loc[df[f'rho_{slow_tenor}'].notna(), col].dropna().abs()
            if len(vals) < 20:
                continue
            for level, q in [('med', 0.5), ('q25', 0.25)]:
                thresh = vals.quantile(q)
                quiet_thresholds[(slow_tenor, method, N, level)] = thresh
            print(f'  |rho_{slow_tenor}_{method}{N}d|: median={vals.quantile(0.5):.4f}, '
                  f'p25={vals.quantile(0.25):.4f} (n={len(vals)})')
    print()

print(f'Total quiet thresholds: {len(quiet_thresholds)}')
for st in slow_tenors:
    n = sum(1 for k in quiet_thresholds if k[0] == st)
    print(f'  {st}: {n} thresholds')

In [None]:
# 3c. Compose divergence signals for ALL tenor pairs
# divergence = (fast tenor metric exceeds threshold) AND (slow tenor is quiet)

divergence_signals = {}  # name -> (mask, expected_sign)

for fast_tenor, slow_tenor in TENOR_PAIRS:
    pair_label = f'{fast_tenor}{slow_tenor}'
    pair_mask = mask_pair[(fast_tenor, slow_tenor)]
    n_pair_dates = pair_mask.sum()

    if n_pair_dates < 30:
        print(f'Skipping {pair_label}: only {n_pair_dates} overlap dates')
        continue

    for method_fast in ['dev', 'chg', 'cross']:
        for win_fast in MA_WINDOWS:
            key_fast = (fast_tenor, method_fast, win_fast)
            if key_fast not in detection_cols:
                continue
            col_fast = detection_cols[key_fast]
            vals_fast = df.loc[pair_mask, col_fast].dropna()
            if len(vals_fast) < 30:
                continue

            for q_label, q in [('p25', 0.25), ('p10', 0.10)]:
                thresh_lo = vals_fast.quantile(q)       # bearish: fast drops below this
                thresh_hi = vals_fast.quantile(1 - q)   # bullish: fast rises above this

                # --- Divergence signals (with slow-tenor quiet filter) ---
                for method_slow in ['dev', 'chg']:
                    for win_slow in MA_WINDOWS:
                        key_slow = (slow_tenor, method_slow, win_slow)
                        if key_slow not in detection_cols:
                            continue
                        col_slow = detection_cols[key_slow]

                        for quiet_level in ['med', 'q25']:
                            qt_key = (slow_tenor, method_slow, win_slow, quiet_level)
                            if qt_key not in quiet_thresholds:
                                continue
                            qt = quiet_thresholds[qt_key]
                            quiet_mask = df[col_slow].abs() < qt

                            # Bearish divergence: fast drops, slow quiet
                            name_bear = (f'bear_div_{pair_label}_{method_fast}{win_fast}d'
                                         f'_{q_label}_{method_slow}{win_slow}d_{quiet_level}')
                            mask_bear = (df[col_fast] < thresh_lo) & quiet_mask & pair_mask
                            if mask_bear.sum() >= 5:
                                divergence_signals[name_bear] = (mask_bear, -1)

                            # Bullish divergence: fast rises, slow quiet
                            name_bull = (f'bull_div_{pair_label}_{method_fast}{win_fast}d'
                                         f'_{q_label}_{method_slow}{win_slow}d_{quiet_level}')
                            mask_bull = (df[col_fast] > thresh_hi) & quiet_mask & pair_mask
                            if mask_bull.sum() >= 5:
                                divergence_signals[name_bull] = (mask_bull, +1)

                # --- Fast-tenor-only signals (for ablation) ---
                name_fast_bear = f'bear_{fast_tenor}only_{method_fast}{win_fast}d_{q_label}'
                mask_fast_bear = (df[col_fast] < thresh_lo) & pair_mask
                if mask_fast_bear.sum() >= 5 and name_fast_bear not in divergence_signals:
                    divergence_signals[name_fast_bear] = (mask_fast_bear, -1)

                name_fast_bull = f'bull_{fast_tenor}only_{method_fast}{win_fast}d_{q_label}'
                mask_fast_bull = (df[col_fast] > thresh_hi) & pair_mask
                if mask_fast_bull.sum() >= 5 and name_fast_bull not in divergence_signals:
                    divergence_signals[name_fast_bull] = (mask_fast_bull, +1)

# Summary
n_div = sum(1 for k in divergence_signals if '_div_' in k)
n_only = sum(1 for k in divergence_signals if 'only' in k.lower())
print(f'Total signals: {len(divergence_signals)}')
print(f'  Divergence: {n_div}')
print(f'  Fast-only (ablation): {n_only}')

# Breakdown by pair
for fast, slow in TENOR_PAIRS:
    pair_label = f'{fast}{slow}'
    n_pair = sum(1 for k in divergence_signals if f'_div_{pair_label}_' in k)
    print(f'  {fast}->{slow} divergence: {n_pair}')
n_1w = sum(1 for k in divergence_signals if '1Wonly' in k)
n_1m = sum(1 for k in divergence_signals if '1Monly' in k)
print(f'  1W-only: {n_1w}, 1M-only: {n_1m}')

---
## Section 4: Divergence Signal Evaluation — Grid Search

In [None]:
# 4a. Evaluate all signals
def evaluate_signals(signal_dict, df, horizons=('5d', '10d')):
    """Evaluate all signals: mean signed return, t-stat, p-value, hit rate."""
    results = []
    for name, (mask, expected_sign) in signal_dict.items():
        mask_clean = mask.fillna(False).astype(bool)
        events = df.index[mask_clean]
        for h in horizons:
            ret_col = f'fwd_ret_{h}'
            rets = df.loc[events, ret_col].dropna()
            if len(rets) < 5:
                continue
            signed = rets * expected_sign
            t, p = stats.ttest_1samp(signed, 0)
            # Extract tenor pair from signal name
            pair = 'unknown'
            for fast, slow in TENOR_PAIRS:
                pl = f'{fast}{slow}'
                if f'_div_{pl}_' in name:
                    pair = f'{fast}->{slow}'
                    break
            if 'only' in name.lower():
                pair = name.split('_')[1].replace('only', '-only')  # e.g. '1W-only'
            results.append({
                'signal': name, 'horizon': h,
                'direction': 'bear' if expected_sign == -1 else 'bull',
                'pair': pair,
                'n': len(rets),
                'mean_bps': signed.mean() * 10000,
                'hit_rate': (signed > 0).mean(),
                't_stat': t, 'p_value': p,
            })
    res = pd.DataFrame(results).sort_values('t_stat', ascending=False)
    n_tests = len(res)
    res['p_bonf'] = (res['p_value'] * n_tests).clip(upper=1.0)
    res['sig_bonf'] = res['p_bonf'] < 0.05
    return res

results_df = evaluate_signals(divergence_signals, df)

n_tests = len(results_df)
n_sig = (results_df['p_value'] < 0.05).sum()
n_bonf = results_df['sig_bonf'].sum()

print(f'=== GRID SEARCH: {n_tests} tests ===')
print(f'Significant at p<0.05 (raw):  {n_sig}')
print(f'Significant after Bonferroni: {n_bonf}')
print(f'Expected false positives:     {n_tests * 0.05:.1f}')

# Separate divergence vs fast-only results
div_results = results_df[results_df['signal'].str.contains('_div_')]
only_results = results_df[results_df['signal'].str.contains('only')]

# Breakdown by pair
print(f'\n--- By tenor pair ---')
for fast, slow in TENOR_PAIRS:
    pair_label = f'{fast}{slow}'
    pair_div = div_results[div_results['signal'].str.contains(f'_div_{pair_label}_')]
    n_sig_pair = (pair_div['p_value'] < 0.05).sum()
    print(f'  {fast}->{slow}: {len(pair_div)} tests, p<0.05: {n_sig_pair}')

print(f'\n--- Fast-only signals ---')
for tenor in sorted(set(f for f, _ in TENOR_PAIRS)):
    t_only = only_results[only_results['signal'].str.contains(f'{tenor}only')]
    n_sig_t = (t_only['p_value'] < 0.05).sum()
    print(f'  {tenor}-only: {len(t_only)} tests, p<0.05: {n_sig_t}')

print(f'\nTop 15 divergence signals by t-stat:\n')
cols = ['signal', 'pair', 'horizon', 'n', 'mean_bps', 'hit_rate', 't_stat', 'p_value', 'sig_bonf']
print(div_results[cols].head(15).to_string(index=False, float_format='%.3f'))

In [None]:
# 4b. Heatmap: best t-stat by tenor pair x detection method x window

def parse_signal_name(name):
    """Extract components from the generalised signal name."""
    parts = {}
    parts['is_div'] = '_div_' in name
    parts['direction'] = 'bear' if name.startswith('bear') else 'bull'
    if parts['is_div']:
        after_div = name.split('_div_')[1]
        # First token is pair label (e.g., '1W1M', '1M3M')
        tokens = after_div.split('_')
        parts['pair'] = tokens[0]
        # Find the fast tenor pair
        for ft, st in TENOR_PAIRS:
            if tokens[0] == f'{ft}{st}':
                parts['fast_tenor'] = ft
                parts['slow_tenor'] = st
                break
        # tokens[1] = method+window (e.g., 'chg3d')
        mw = tokens[1]
        for m in ['cross', 'dev', 'chg']:
            if mw.startswith(m):
                parts['method_fast'] = m
                parts['win_fast'] = int(mw[len(m):-1])
                break
        parts['threshold'] = tokens[2]  # 'p25' or 'p10'
        # tokens[3] = slow method+window (e.g., 'dev5d')
        sm = tokens[3]
        for m in ['cross', 'dev', 'chg']:
            if sm.startswith(m):
                parts['method_slow'] = m
                parts['win_slow'] = int(sm[len(m):-1])
                break
        parts['quiet_level'] = tokens[4] if len(tokens) > 4 else 'med'
    else:
        # Fast-only signal: e.g., 'bull_1Wonly_chg3d_p25'
        after = name.split('only_')[1] if 'only_' in name else ''
        tokens = after.split('_')
        mw = tokens[0]
        for m in ['cross', 'dev', 'chg']:
            if mw.startswith(m):
                parts['method_fast'] = m
                parts['win_fast'] = int(mw[len(m):-1])
                break
        parts['threshold'] = tokens[1] if len(tokens) > 1 else 'p25'
        # Extract tenor
        for tenor in ['1W', '1M', '3M']:
            if f'{tenor}only' in name:
                parts['fast_tenor'] = tenor
                break
    return parts

# Per-pair heatmap: method x window -> best t-stat
n_pairs = len(TENOR_PAIRS)
fig, axes = plt.subplots(1, n_pairs, figsize=(6 * n_pairs, 5))
if n_pairs == 1:
    axes = [axes]
methods = ['dev', 'chg', 'cross']
windows = MA_WINDOWS

for ax, (fast, slow) in zip(axes, TENOR_PAIRS):
    pair_label = f'{fast}{slow}'
    pair_div = div_results[(div_results['signal'].str.contains(f'_div_{pair_label}_')) &
                           (div_results['horizon'] == '5d')]

    grid = np.full((len(methods), len(windows)), np.nan)
    for _, row in pair_div.iterrows():
        try:
            p = parse_signal_name(row['signal'])
        except (ValueError, IndexError, KeyError):
            continue
        m_idx = methods.index(p['method_fast']) if p.get('method_fast') in methods else -1
        w_idx = windows.index(p['win_fast']) if p.get('win_fast') in windows else -1
        if m_idx >= 0 and w_idx >= 0:
            if np.isnan(grid[m_idx, w_idx]) or row['t_stat'] > grid[m_idx, w_idx]:
                grid[m_idx, w_idx] = row['t_stat']

    im = ax.imshow(grid, cmap='RdYlGn', aspect='auto', vmin=-2, vmax=4)
    ax.set_xticks(range(len(windows)))
    ax.set_xticklabels([f'{w}d' for w in windows])
    ax.set_yticks(range(len(methods)))
    ax.set_yticklabels(['Deviation', 'Change', 'MA Cross'])
    ax.set_xlabel(f'{fast} Detection Window')
    ax.set_title(f'{fast}->{slow} divergence\n(best t-stat, 5d horizon)',
                 fontweight='bold', color=COLORS_PAIR.get(pair_label, 'black'))
    for i in range(len(methods)):
        for j in range(len(windows)):
            if not np.isnan(grid[i, j]):
                ax.text(j, i, f'{grid[i,j]:.1f}', ha='center', va='center',
                        fontsize=9, fontweight='bold' if grid[i,j] > 1.96 else 'normal')

plt.colorbar(im, ax=axes, shrink=0.8, label='t-statistic')
fig.suptitle('Divergence Signal t-stats by Tenor Pair, Method, and Window', fontweight='bold')
fig.tight_layout()
plt.show()

In [None]:
# 4c. Top 20 overall — with tenor pair annotation
print('=== TOP 20 SIGNALS (ALL TYPES) ===\n')
cols = ['signal', 'pair', 'horizon', 'n', 'mean_bps', 'hit_rate', 't_stat', 'p_value', 'sig_bonf']
top20 = results_df[cols].head(20)
print(top20.to_string(index=False, float_format='%.3f'))

# Count by type in top 20
print(f'\n--- Top 20 breakdown ---')
for fast, slow in TENOR_PAIRS:
    pair_label = f'{fast}{slow}'
    n = sum(f'_div_{pair_label}_' in s for s in top20['signal'])
    if n > 0:
        print(f'  {fast}->{slow} divergence: {n}')
n_only_top = sum('only' in s for s in top20['signal'])
if n_only_top > 0:
    print(f'  Fast-only: {n_only_top}')

# KEY COMPARISON: best signal per pair
print(f'\n=== BEST SIGNAL PER TENOR PAIR (5d horizon) ===\n')
div_5d = div_results[div_results['horizon'] == '5d']
for fast, slow in TENOR_PAIRS:
    pair_label = f'{fast}{slow}'
    pair_best = div_5d[div_5d['signal'].str.contains(f'_div_{pair_label}_')]
    if len(pair_best) > 0:
        row = pair_best.iloc[0]
        print(f'  {fast}->{slow}: t={row["t_stat"]:.2f}, hit={row["hit_rate"]:.0%}, '
              f'n={row["n"]}, edge={row["mean_bps"]:+.1f}bps')
        print(f'     {row["signal"]}')

---
## Section 5: Ablation — Does the "Slow Quiet" Filter Help?

The critical test: compare each divergence signal to its fast-tenor-only counterpart.
If divergence > fast-only, the slow quiet filter genuinely adds value.
Test this separately for each tenor pair (1W->1M, 1W->3M, 1M->3M).

In [None]:
# 5a. Paired comparison: divergence vs fast-only for each tenor pair
ablation_pairs = []

only_5d = only_results[only_results['horizon'] == '5d'].copy()
div_5d = div_results[div_results['horizon'] == '5d'].copy()

for _, row_only in only_5d.iterrows():
    name_only = row_only['signal']
    # Extract the fast tenor and method/window from the only-signal name
    # e.g., 'bear_1Wonly_chg3d_p25' -> look for 'bear_div_1W??_chg3d_p25_*'
    for fast_tenor in ['1W', '1M']:
        if f'{fast_tenor}only' not in name_only:
            continue
        # The method+window+threshold part after 'only_'
        suffix = name_only.split('only_')[1]  # e.g., 'chg3d_p25'

        # Find matching divergence signals for this fast tenor across all slow tenors
        for _, slow_tenor in [(f, s) for f, s in TENOR_PAIRS if f == fast_tenor]:
            pair_label = f'{fast_tenor}{slow_tenor}'
            prefix = f'{row_only["direction"]}_div_{pair_label}_{suffix}'
            matching = div_5d[div_5d['signal'].str.startswith(prefix)]
            if len(matching) > 0:
                best_div = matching.sort_values('t_stat', ascending=False).iloc[0]
                ablation_pairs.append({
                    'fast_signal': name_only,
                    'div_signal': best_div['signal'],
                    'pair': f'{fast_tenor}->{slow_tenor}',
                    't_fast': row_only['t_stat'],
                    't_div': best_div['t_stat'],
                    'delta_t': best_div['t_stat'] - row_only['t_stat'],
                    'n_fast': row_only['n'],
                    'n_div': best_div['n'],
                    'hit_div': best_div['hit_rate'],
                })

abl_df = pd.DataFrame(ablation_pairs).sort_values('delta_t', ascending=False)

print('=== ABLATION: Divergence vs Fast-Only ===\n')
print(f'Cases where divergence improves on fast-only: '
      f'{(abl_df["delta_t"] > 0).sum()} / {len(abl_df)}')
print(f'Mean delta t-stat: {abl_df["delta_t"].mean():+.3f}')

# Per-pair summary
print(f'\n--- By tenor pair ---')
for fast, slow in TENOR_PAIRS:
    pair_str = f'{fast}->{slow}'
    pair_abl = abl_df[abl_df['pair'] == pair_str]
    if len(pair_abl) > 0:
        improved = (pair_abl['delta_t'] > 0).mean() * 100
        mean_dt = pair_abl['delta_t'].mean()
        print(f'  {pair_str}: improves {improved:.0f}% of cases, avg delta-t: {mean_dt:+.3f}')

print(f'\nTop improvements (slow quiet adds most value):\n')
cols = ['pair', 'fast_signal', 'div_signal', 't_fast', 't_div', 'delta_t', 'n_fast', 'n_div', 'hit_div']
print(abl_df[cols].head(15).to_string(index=False, float_format='%.3f'))

In [None]:
# 5b. Ablation bar chart — grouped by tenor pair
top_pairs_abl = abl_df.head(min(15, len(abl_df)))
if len(top_pairs_abl) > 0:
    fig, ax = plt.subplots(figsize=(14, max(5, len(top_pairs_abl) * 0.45)))

    y = range(len(top_pairs_abl))
    labels = []
    pair_colors = []
    for _, r in top_pairs_abl.iterrows():
        short = r['fast_signal'].replace('bull_', 'B:').replace('bear_', 'S:')
        labels.append(f"[{r['pair']}] {short}")
        pc = r['pair'].replace('->', '')
        pair_colors.append(COLORS_PAIR.get(pc, '#2ecc71'))

    ax.barh([i - 0.15 for i in y], top_pairs_abl['t_fast'].values, height=0.3,
            color='#95a5a6', label='Fast-only', alpha=0.8)
    for i, (_, r) in enumerate(top_pairs_abl.iterrows()):
        pc = r['pair'].replace('->', '')
        ax.barh(i + 0.15, r['t_div'], height=0.3,
                color=COLORS_PAIR.get(pc, '#2ecc71'), alpha=0.8)

    # Legend entries for pairs
    from matplotlib.patches import Patch
    handles = [Patch(color='#95a5a6', label='Fast-only')]
    for fast, slow in TENOR_PAIRS:
        pl = f'{fast}{slow}'
        handles.append(Patch(color=COLORS_PAIR.get(pl, '#2ecc71'),
                             label=f'{fast}->{slow} divergence'))

    ax.set_yticks(list(y))
    ax.set_yticklabels(labels, fontsize=8)
    ax.set_xlabel('t-statistic (5d horizon)')
    ax.axvline(1.96, color='red', linestyle='--', linewidth=0.8, label='p=0.05')
    ax.axvline(0, color='black', linewidth=0.5)
    ax.set_title('Ablation: Does the Slow Quiet Filter Improve the Signal?\n'
                 'Colored > Gray means quiet filter adds value', fontweight='bold')
    ax.legend(handles=handles, loc='lower right', fontsize=8)
    fig.tight_layout()
    plt.show()

    # Summary stats
    for fast, slow in TENOR_PAIRS:
        pair_str = f'{fast}->{slow}'
        pair_abl = abl_df[abl_df['pair'] == pair_str]
        if len(pair_abl) > 0:
            improved = (pair_abl['delta_t'] > 0).mean() * 100
            print(f'{pair_str}: slow quiet filter improves {improved:.0f}% of cases, '
                  f'avg delta-t: {pair_abl["delta_t"].mean():+.3f}')

---
## Section 6: Optimal MA Window Deep-Dive

Fix the best detection method and sweep the MA windows to find the sweet spot.

In [None]:
# 6a. Determine best method per pair and sweep fast-tenor window
method_counts = {}
for _, row in div_results.head(30).iterrows():
    try:
        p = parse_signal_name(row['signal'])
        m = p.get('method_fast', 'chg')
        method_counts[m] = method_counts.get(m, 0) + 1
    except (ValueError, IndexError, KeyError):
        pass

best_method = max(method_counts, key=method_counts.get) if method_counts else 'chg'
print(f'Method frequency in top-30 divergence signals: {method_counts}')
print(f'Best fast detection method: {best_method}')

# Compare t-stat vs fast window for each pair
fig, axes = plt.subplots(1, len(TENOR_PAIRS), figsize=(6 * len(TENOR_PAIRS), 5))
if len(TENOR_PAIRS) == 1:
    axes = [axes]
sweep_windows = MA_WINDOWS

for ax, (fast, slow) in zip(axes, TENOR_PAIRS):
    pair_label = f'{fast}{slow}'
    pair_color = COLORS_PAIR.get(pair_label, 'black')

    for h_label, ls in [('5d', '-'), ('10d', '--')]:
        t_stats = []
        for w in sweep_windows:
            prefix = f'bull_div_{pair_label}_{best_method}{w}d_p25'
            matching = results_df[(results_df['signal'].str.startswith(prefix)) &
                                  (results_df['horizon'] == h_label)]
            if len(matching) > 0:
                t_stats.append(matching['t_stat'].max())
            else:
                # Try bear too
                prefix_b = f'bear_div_{pair_label}_{best_method}{w}d_p25'
                matching_b = results_df[(results_df['signal'].str.startswith(prefix_b)) &
                                        (results_df['horizon'] == h_label)]
                t_stats.append(matching_b['t_stat'].max() if len(matching_b) > 0 else np.nan)

        ax.plot(sweep_windows, t_stats, 'o-', color=pair_color, linewidth=2,
                markersize=8, linestyle=ls, label=f'{h_label} horizon')

    ax.axhline(1.96, color='red', linestyle='--', linewidth=0.8, alpha=0.5, label='p=0.05')
    ax.axhline(0, color='gray', linewidth=0.5)
    ax.set_xlabel(f'{fast} Detection Window (days)')
    ax.set_ylabel('t-statistic')
    ax.set_title(f'{fast}->{slow}: t-stat vs {fast} window\n'
                 f'(method={best_method}, threshold=p25)', fontweight='bold')
    ax.legend(fontsize=9)
    ax.set_xticks(sweep_windows)

fig.tight_layout()
plt.show()

In [None]:
# 6b. 2D heatmap: fast window x slow quiet window for best pair
# Find which pair has the best overall signal
best_pair = None
best_pair_t = -999
for fast, slow in TENOR_PAIRS:
    pair_label = f'{fast}{slow}'
    pair_div_5d = div_results[(div_results['signal'].str.contains(f'_div_{pair_label}_')) &
                              (div_results['horizon'] == '5d')]
    if len(pair_div_5d) > 0 and pair_div_5d.iloc[0]['t_stat'] > best_pair_t:
        best_pair_t = pair_div_5d.iloc[0]['t_stat']
        best_pair = (fast, slow)

fig, axes = plt.subplots(1, 2, figsize=(14, 5))
if best_pair:
    fast_bp, slow_bp = best_pair
    pair_label_bp = f'{fast_bp}{slow_bp}'

    for ax, (direction, h_label) in zip(axes, [('bull', '5d'), ('bear', '5d')]):
        grid = np.full((len(sweep_windows), len(sweep_windows)), np.nan)
        for i, w_fast in enumerate(sweep_windows):
            for j, w_slow in enumerate(sweep_windows):
                prefix = f'{direction}_div_{pair_label_bp}_{best_method}{w_fast}d_p25'
                matching = results_df[
                    (results_df['signal'].str.startswith(prefix)) &
                    (results_df['signal'].str.contains(f'\\w+{w_slow}d', regex=True)) &
                    (results_df['horizon'] == h_label)
                ]
                if len(matching) > 0:
                    grid[i, j] = matching['t_stat'].max()

        im = ax.imshow(grid, cmap='RdYlGn', aspect='auto', vmin=-1, vmax=4)
        ax.set_xticks(range(len(sweep_windows)))
        ax.set_xticklabels([f'{w}d' for w in sweep_windows])
        ax.set_yticks(range(len(sweep_windows)))
        ax.set_yticklabels([f'{w}d' for w in sweep_windows])
        ax.set_xlabel(f'{slow_bp} Quiet Window')
        ax.set_ylabel(f'{fast_bp} Detection Window')
        ax.set_title(f'{direction.upper()} {fast_bp}->{slow_bp}, {h_label}\n'
                     f't-stat (method={best_method})', fontweight='bold')
        for i in range(len(sweep_windows)):
            for j in range(len(sweep_windows)):
                if not np.isnan(grid[i, j]):
                    ax.text(j, i, f'{grid[i,j]:.1f}', ha='center', va='center',
                            fontsize=8, fontweight='bold' if grid[i,j] > 1.96 else 'normal',
                            color='white' if abs(grid[i,j]) > 2.5 else 'black')

    plt.colorbar(im, ax=axes, shrink=0.8, label='t-statistic')
    fig.suptitle(f'2D Grid Search: {fast_bp} Window x {slow_bp} Quiet Window ({pair_label_bp})',
                 fontweight='bold')
else:
    axes[0].text(0.5, 0.5, 'No divergence signals found', transform=axes[0].transAxes, ha='center')

fig.tight_layout()
plt.show()

# Print best overall
best_overall = div_results.iloc[0] if len(div_results) > 0 else None
if best_overall is not None:
    print(f'\nBest overall divergence signal: {best_overall["signal"]}')
    print(f'  Pair: {best_overall["pair"]}, Horizon: {best_overall["horizon"]}, '
          f't={best_overall["t_stat"]:.2f}, p={best_overall["p_value"]:.4f}, '
          f'hit={best_overall["hit_rate"]:.0%}, n={best_overall["n"]}')

---
## Section 7: Event Study — Divergence Events

In [None]:
# 7a. Cumulative return paths — best divergence per pair vs fast-only
best_bull_div = div_results[div_results['direction'] == 'bull'].iloc[0]['signal'] if len(div_results[div_results['direction'] == 'bull']) > 0 else None
best_bear_div = div_results[div_results['direction'] == 'bear'].iloc[0]['signal'] if len(div_results[div_results['direction'] == 'bear']) > 0 else None

fig, axes = plt.subplots(1, 2, figsize=(16, 6))
event_window = 21

for ax, (sig_name, label, color) in zip(axes,
        [(best_bull_div, 'Bullish divergence', 'green'),
         (best_bear_div, 'Bearish divergence', 'red')]):
    if sig_name is None or sig_name not in divergence_signals:
        ax.text(0.5, 0.5, 'No signal', transform=ax.transAxes, ha='center')
        continue

    # Find matching fast-only signal
    try:
        p = parse_signal_name(sig_name)
        ft = p.get('fast_tenor', '1M')
        method = p.get('method_fast', 'chg')
        win = p.get('win_fast', 5)
        thresh = p.get('threshold', 'p25')
        name_only = f'{p["direction"]}_{ft}only_{method}{win}d_{thresh}'
    except (ValueError, IndexError, KeyError):
        name_only = None

    for sig, lbl, c, ls in [(sig_name, 'Divergence', color, '-'),
                             (name_only, 'Fast-only', 'gray', '--')]:
        if sig is None or sig not in divergence_signals:
            continue
        m, s = divergence_signals[sig]
        m = m.fillna(False).astype(bool)
        events = df.index[m]

        paths = []
        for idx in events:
            loc = df.index.get_loc(idx)
            if loc + event_window < len(df):
                fwd = df['spot'].iloc[loc:loc + event_window + 1].values
                if not np.any(np.isnan(fwd)):
                    paths.append((fwd / fwd[0] - 1) * 100 * s)

        if paths:
            paths = np.array(paths)
            avg = paths.mean(axis=0)
            sem = paths.std(axis=0) / np.sqrt(len(paths))
            days = range(event_window + 1)
            ax.plot(days, avg, color=c, linewidth=2, linestyle=ls,
                    label=f'{lbl} (n={len(paths)})')
            ax.fill_between(days, avg - 1.96*sem, avg + 1.96*sem, alpha=0.1, color=c)

    ax.axhline(0, color='gray', linestyle='--', linewidth=0.8)
    ax.set_xlabel('Days after signal')
    ax.set_ylabel('Cumulative signed return (%)')
    ax.set_title(f'{label}\n(solid=divergence, dashed=fast-only)', fontweight='bold')
    ax.legend(fontsize=9)

fig.suptitle('Event Study: Divergence vs Fast-Only', fontweight='bold')
fig.tight_layout()
plt.show()

In [None]:
# 7b. Decay profile for top 3 divergence signals
fine_horizons = [1, 2, 3, 5, 7, 10, 14, 21]
for h in fine_horizons:
    col = f'fwd_ret_{h}d_fine'
    if col not in df.columns:
        df[col] = df['spot'].shift(-h) / df['spot'] - 1

seen = set()
top_div_signals = []
for _, row in div_results.iterrows():
    if row['signal'] not in seen and len(top_div_signals) < 3:
        top_div_signals.append(row['signal'])
        seen.add(row['signal'])

fig, ax = plt.subplots(figsize=(12, 6))
colors_decay = plt.cm.Set1(np.linspace(0, 0.5, max(1, len(top_div_signals))))

for sig_name, color in zip(top_div_signals, colors_decay):
    if sig_name not in divergence_signals:
        continue
    mask, sign = divergence_signals[sig_name]
    mask = mask.fillna(False).astype(bool)
    events = df.index[mask]

    edges = []
    for h in fine_horizons:
        rets = df.loc[events, f'fwd_ret_{h}d_fine'].dropna() * sign
        edges.append(rets.mean() * 10000 if len(rets) > 3 else np.nan)

    # Extract pair for label
    try:
        p = parse_signal_name(sig_name)
        pair_str = p.get('pair', '')
        short_name = f"[{pair_str}] {sig_name.split('_div_')[1][:30]}" if '_div_' in sig_name else sig_name[:35]
    except (ValueError, IndexError, KeyError):
        short_name = sig_name[:35]
    ax.plot(fine_horizons, edges, 'o-', color=color, linewidth=1.5,
            markersize=6, label=short_name)

ax.axhline(0, color='gray', linestyle='--', linewidth=0.8)
ax.set_xlabel('Holding period (days)')
ax.set_ylabel('Mean signed return (bps)')
ax.set_title('Signal Decay: Edge vs Holding Period (top divergence signals)', fontweight='bold')
ax.legend(fontsize=8)
ax.set_xticks(fine_horizons)
fig.tight_layout()
plt.show()

print('Optimal holding period (peak edge):')
for sig_name in top_div_signals:
    if sig_name not in divergence_signals:
        continue
    mask, sign = divergence_signals[sig_name]
    mask = mask.fillna(False).astype(bool)
    events = df.index[mask]
    best_h, best_edge = 0, 0
    for h in fine_horizons:
        rets = df.loc[events, f'fwd_ret_{h}d_fine'].dropna() * sign
        if len(rets) > 3 and rets.mean() > best_edge:
            best_edge = rets.mean()
            best_h = h
    try:
        p = parse_signal_name(sig_name)
        pair_str = p.get('pair', '')
    except (ValueError, IndexError, KeyError):
        pair_str = ''
    short = sig_name.replace('bull_div_', 'B:').replace('bear_div_', 'S:')[:40]
    print(f'  [{pair_str}] {short}: peak at {best_h}d ({best_edge*10000:.1f}bps)')

---
## Section 8: Signal Magnitude — Does Size Matter?

In [None]:
# 8a. Tercile analysis: split by magnitude of fast-tenor move
best_sig = top_div_signals[0] if top_div_signals else None

if best_sig and best_sig in divergence_signals:
    mask, sign = divergence_signals[best_sig]
    mask = mask.fillna(False).astype(bool)

    try:
        p = parse_signal_name(best_sig)
        ft = p.get('fast_tenor', '1M')
        method = p.get('method_fast', 'chg')
        win = p.get('win_fast', 5)
        col_fast = detection_cols.get((ft, method, win), f'rho_{ft}_chg5d')
    except (ValueError, IndexError, KeyError):
        col_fast = 'rho_1M_chg5d'

    event_df = df.loc[mask, [col_fast, 'fwd_ret_5d', 'fwd_ret_10d']].dropna()

    if len(event_df) >= 9:
        event_df['magnitude'] = event_df[col_fast].abs()
        event_df['tercile'] = pd.qcut(event_df['magnitude'], 3,
                                       labels=['Small', 'Medium', 'Large'])

        fig, axes = plt.subplots(1, 2, figsize=(12, 5))
        for ax, h in zip(axes, ['5d', '10d']):
            ret_col = f'fwd_ret_{h}'
            means = event_df.groupby('tercile', observed=False)[ret_col].mean() * sign * 10000
            counts = event_df.groupby('tercile', observed=False)[ret_col].count()
            colors_t = ['#3498db', '#f39c12', '#e74c3c']
            bars = ax.bar(range(3), means.values, color=colors_t)
            ax.set_xticks(range(3))
            ax.set_xticklabels(means.index)
            ax.set_ylabel('Mean signed return (bps)')
            ax.set_title(f'{h} forward return by {p.get("fast_tenor","fast")} move magnitude',
                         fontweight='bold')
            ax.axhline(0, color='black', linewidth=0.5)
            for bar, n in zip(bars, counts.values):
                ax.text(bar.get_x() + bar.get_width()/2, bar.get_height(),
                        f'n={n}', ha='center', va='bottom', fontsize=9)

        fig.suptitle(f'Does Fast Tenor Move Size Matter? ({best_sig[:50]})', fontweight='bold')
        fig.tight_layout()
        plt.show()
    else:
        print(f'Insufficient events for tercile analysis (n={len(event_df)})')

In [None]:
# 8b. Continuous: fast-tenor magnitude vs forward return
if best_sig and best_sig in divergence_signals:
    mask, sign = divergence_signals[best_sig]
    mask = mask.fillna(False).astype(bool)

    try:
        p = parse_signal_name(best_sig)
        ft = p.get('fast_tenor', '1M')
        method = p.get('method_fast', 'chg')
        win = p.get('win_fast', 5)
        col_fast = detection_cols.get((ft, method, win), f'rho_{ft}_chg5d')
    except (ValueError, IndexError, KeyError):
        col_fast = 'rho_1M_chg5d'

    fig, axes = plt.subplots(1, 2, figsize=(14, 5))
    for ax, h in zip(axes, ['5d', '10d']):
        ret_col = f'fwd_ret_{h}'
        valid = df.loc[mask, [col_fast, ret_col]].dropna()
        if len(valid) < 10:
            ax.text(0.5, 0.5, f'n={len(valid)}', transform=ax.transAxes, ha='center')
            continue
        x = valid[col_fast].abs()
        y = valid[ret_col] * sign * 100
        ax.scatter(x, y, alpha=0.4, s=20, color='steelblue')
        slope, intercept, r, p_val, se = stats.linregress(x, y)
        x_line = np.linspace(x.min(), x.max(), 100)
        ax.plot(x_line, slope * x_line + intercept, 'k-', linewidth=1.5)
        ax.set_xlabel(f'|{ft} rho {method}{win}d|')
        ax.set_ylabel(f'{h} signed return (%)')
        ax.set_title(f'r={r:.3f}, p={p_val:.3f}, n={len(valid)}', fontsize=10)
        ax.axhline(0, color='gray', linestyle='--', linewidth=0.5)

    fig.suptitle(f'Fast Tenor Move Magnitude vs Forward Return\n'
                 f'Positive slope = bigger divergence -> bigger spot move', fontweight='bold')
    fig.tight_layout()
    plt.show()

---
## Section 9: Alpha & Nu Confirmation

In [None]:
# 9a. Does adding alpha/nu improve the divergence signal?
for param in ['alpha', 'nu']:
    for tenor in pair_tenors:
        col = f'{param}_{tenor}'
        if col in df.columns:
            for N in [5, 10]:
                chg_col = f'{col}_chg{N}d'
                if chg_col not in df.columns:
                    df[chg_col] = df[col].diff(N)

if best_sig and best_sig in divergence_signals:
    mask_base, sign = divergence_signals[best_sig]
    mask_base = mask_base.fillna(False).astype(bool)

    # Determine which fast tenor the best signal uses
    try:
        p = parse_signal_name(best_sig)
        ft = p.get('fast_tenor', '1M')
    except (ValueError, IndexError, KeyError):
        ft = '1M'

    print(f'=== ALPHA/NU CONFIRMATION (base: {best_sig[:60]}) ===\n')
    print(f'{"Condition":<45s} {"5d ret":>10s} {"t":>7s} {"p":>7s} {"hit%":>6s} {"n":>5s}')
    print('-' * 85)

    # Use the pair mask for this signal's fast tenor
    pair_m = df[f'rho_{ft}'].notna()
    configs = [('Base divergence only', mask_base)]

    for N in [5, 10]:
        acol = f'alpha_{ft}_chg{N}d'
        if acol in df.columns:
            vals = df.loc[pair_m, acol].dropna()
            if len(vals) > 10:
                thresh = vals.quantile(0.75)
                configs.append((f'+ alpha_{ft} rises ({N}d, p75)', mask_base & (df[acol] > thresh)))

    for N in [5, 10]:
        ncol = f'nu_{ft}_chg{N}d'
        if ncol in df.columns:
            vals = df.loc[pair_m, ncol].dropna()
            if len(vals) > 10:
                thresh = vals.quantile(0.75)
                configs.append((f'+ nu_{ft} rises ({N}d, p75)', mask_base & (df[ncol] > thresh)))

    acol = f'alpha_{ft}_chg5d'
    if acol in df.columns:
        vals = df.loc[pair_m, acol].dropna()
        if len(vals) > 10:
            thresh = vals.quantile(0.25)
            configs.append((f'+ alpha_{ft} falls (5d, p25)', mask_base & (df[acol] < thresh)))

    for label, mask in configs:
        rets = df.loc[mask, 'fwd_ret_5d'].dropna() * sign
        if len(rets) >= 5:
            t, p_val = stats.ttest_1samp(rets, 0)
            hit = (rets > 0).mean()
            print(f'{label:<45s} {rets.mean()*10000:>+9.1f}bps {t:>7.2f} {p_val:>7.3f} {hit:>5.0%} {len(rets):>5d}')
        else:
            print(f'{label:<45s}    too few events (n={len(rets)})')

---
## Section 10: Backtest — Best Divergence Signal

In [None]:
# 10a. Determine optimal hold period and run backtest
HOLD_DAYS = 5
if best_sig and best_sig in divergence_signals:
    mask, sign = divergence_signals[best_sig]
    mask = mask.fillna(False).astype(bool)
    events = df.index[mask]
    best_h, best_edge = 5, 0
    for h in fine_horizons:
        rets = df.loc[events, f'fwd_ret_{h}d_fine'].dropna() * sign
        if len(rets) > 3 and rets.mean() > best_edge:
            best_edge = rets.mean()
            best_h = h
    HOLD_DAYS = best_h
    print(f'Optimal hold period: {HOLD_DAYS} days')

COST_PER_TRADE_PCT = 0.00005

bt = df[['date', 'spot']].dropna().copy().reset_index(drop=True)
bt['daily_ret'] = bt['spot'].pct_change()
bt['position'] = 0.0

bull_mask = pd.Series(False, index=df.index)
bear_mask = pd.Series(False, index=df.index)
if best_bull_div and best_bull_div in divergence_signals:
    bull_mask = divergence_signals[best_bull_div][0].fillna(False).astype(bool)
if best_bear_div and best_bear_div in divergence_signals:
    bear_mask = divergence_signals[best_bear_div][0].fillna(False).astype(bool)

hold_until_idx = -1
for i in range(len(bt)):
    if i <= hold_until_idx:
        bt.iloc[i, bt.columns.get_loc('position')] = bt.iloc[i-1, bt.columns.get_loc('position')]
        continue
    date_i = bt['date'].iloc[i]
    if date_i in df['date'].values:
        df_idx = df.index[df['date'] == date_i]
        if len(df_idx) > 0:
            idx = df_idx[0]
            if idx in bull_mask.index and bull_mask.get(idx, False):
                bt.iloc[i, bt.columns.get_loc('position')] = 1.0
                hold_until_idx = min(i + HOLD_DAYS, len(bt) - 1)
            elif idx in bear_mask.index and bear_mask.get(idx, False):
                bt.iloc[i, bt.columns.get_loc('position')] = -1.0
                hold_until_idx = min(i + HOLD_DAYS, len(bt) - 1)

bt['signal_ret'] = bt['position'].shift(1) * bt['daily_ret']
bt['pos_change'] = bt['position'].diff().abs()
bt['cost'] = bt['pos_change'] * COST_PER_TRADE_PCT
bt['net_ret'] = bt['signal_ret'] - bt['cost']
bt['cum_gross'] = (1 + bt['signal_ret'].fillna(0)).cumprod()
bt['cum_net'] = (1 + bt['net_ret'].fillna(0)).cumprod()
bt['cum_bh'] = (1 + bt['daily_ret'].fillna(0)).cumprod()

n_trades = (bt['pos_change'] > 0).sum()
active_days = (bt['position'] != 0).sum()
sr_gross = bt['signal_ret'].mean() / bt['signal_ret'].std() * np.sqrt(252) if bt['signal_ret'].std() > 0 else 0
sr_net = bt['net_ret'].mean() / bt['net_ret'].std() * np.sqrt(252) if bt['net_ret'].std() > 0 else 0
max_dd = (bt['cum_net'] / bt['cum_net'].cummax() - 1).min()

# Identify pair info for the signals used
bull_pair = 'n/a'
bear_pair = 'n/a'
try:
    if best_bull_div:
        bp = parse_signal_name(best_bull_div)
        bull_pair = bp.get('pair', 'n/a')
    if best_bear_div:
        bp = parse_signal_name(best_bear_div)
        bear_pair = bp.get('pair', 'n/a')
except (ValueError, IndexError, KeyError):
    pass

print(f'\n=== BACKTEST: Best Divergence Signal, hold {HOLD_DAYS}d ===')
print(f'  Bull signal [{bull_pair}]: {best_bull_div}')
print(f'  Bear signal [{bear_pair}]: {best_bear_div}')
print(f'  Trades:       {n_trades}')
print(f'  Active days:  {active_days} / {len(bt)} ({active_days/len(bt):.0%})')
print(f'  Gross Sharpe: {sr_gross:.2f}')
print(f'  Net Sharpe:   {sr_net:.2f}')
print(f'  Total gross:  {(bt["cum_gross"].iloc[-1]-1)*100:+.2f}%')
print(f'  Total net:    {(bt["cum_net"].iloc[-1]-1)*100:+.2f}%')
print(f'  Max drawdown: {max_dd*100:.2f}%')

In [None]:
# 10b. Equity curve
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(16, 8), sharex=True,
                                gridspec_kw={'height_ratios': [3, 1]})

ax1.plot(bt['date'], (bt['cum_gross']-1)*100, color='blue', linewidth=1.2, label='Gross')
ax1.plot(bt['date'], (bt['cum_net']-1)*100, color='darkblue', linewidth=1.5, label='Net (0.5pip)')
ax1.plot(bt['date'], (bt['cum_bh']-1)*100, color='gray', linewidth=0.8, alpha=0.5, label='Buy & hold')
ax1.axhline(0, color='black', linewidth=0.5)
ax1.set_ylabel('Cumulative return (%)')
ax1.set_title(f'GBPUSD Backtest: Divergence Signal, hold={HOLD_DAYS}d\n'
              f'Gross Sharpe={sr_gross:.2f}, Net Sharpe={sr_net:.2f}, Trades={n_trades}',
              fontweight='bold')
ax1.legend()

ax2.fill_between(bt['date'], bt['position'], 0, alpha=0.4,
                 where=bt['position'] > 0, color='green', label='Long')
ax2.fill_between(bt['date'], bt['position'], 0, alpha=0.4,
                 where=bt['position'] < 0, color='red', label='Short')
ax2.set_ylabel('Position')
ax2.set_xlabel('Date')
ax2.legend(loc='upper right', fontsize=9)
ax2.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m'))
fig.tight_layout()
plt.show()

---
## Section 11: Robustness & Reality Check

In [None]:
# 11a. Sub-period consistency
print('=== SUB-PERIOD CONSISTENCY ===\n')
mid_date = df['date'].median()
first_half = df[df['date'] <= mid_date]
second_half = df[df['date'] > mid_date]
print(f'Period 1: {first_half["date"].min().date()} to {first_half["date"].max().date()} ({len(first_half)} days)')
print(f'Period 2: {second_half["date"].min().date()} to {second_half["date"].max().date()} ({len(second_half)} days)')

top_for_robustness = []
seen_r = set()
for _, row in div_results[div_results['horizon'] == '5d'].iterrows():
    if row['signal'] not in seen_r and len(top_for_robustness) < 5:
        top_for_robustness.append(row['signal'])
        seen_r.add(row['signal'])

print(f'\n{"Signal":<50s} {"P1":>10s} {"P2":>10s} {"Consistent?":>12s}')
print('-' * 85)

for sig_name in top_for_robustness:
    if sig_name not in divergence_signals:
        continue
    mask, sign = divergence_signals[sig_name]
    mask = mask.fillna(False).astype(bool)
    means = []
    for half_df in [first_half, second_half]:
        events = half_df.index[mask.reindex(half_df.index, fill_value=False)]
        rets = half_df.loc[events, 'fwd_ret_5d'].dropna() * sign
        means.append(rets.mean() * 10000 if len(rets) > 2 else np.nan)
    consistent = 'YES' if (not np.isnan(means[0]) and not np.isnan(means[1])
                           and means[0] * means[1] > 0) else 'NO'
    p1 = f'{means[0]:+.1f}bps' if not np.isnan(means[0]) else 'n/a'
    p2 = f'{means[1]:+.1f}bps' if not np.isnan(means[1]) else 'n/a'
    short = sig_name[:48]
    print(f'{short:<50s} {p1:>10s} {p2:>10s} {consistent:>12s}')

In [None]:
# 11b. Random signal comparison
rng = np.random.RandomState(RANDOM_STATE)
n_random = 1000

# Use all dates that have spot data for the random baseline
mask_any = df['spot'].notna()
ret_5d_vals = df.loc[mask_any, 'fwd_ret_5d'].dropna().values

if best_sig and best_sig in divergence_signals:
    m, s = divergence_signals[best_sig]
    n_target = m.fillna(False).astype(bool).sum()
else:
    n_target = 30

random_tstats = []
for _ in range(n_random):
    idx = rng.choice(len(ret_5d_vals), size=min(n_target, len(ret_5d_vals)), replace=False)
    sample = ret_5d_vals[idx]
    if np.std(sample) > 0:
        t = np.mean(sample) / (np.std(sample) / np.sqrt(len(sample)))
        random_tstats.append(abs(t))

our_t = abs(div_results.iloc[0]['t_stat']) if len(div_results) > 0 else 0
pctile = (np.array(random_tstats) < our_t).mean() * 100

fig, ax = plt.subplots(figsize=(10, 5))
ax.hist(random_tstats, bins=50, alpha=0.6, color='gray', edgecolor='black', linewidth=0.5)
ax.axvline(our_t, color='red', linewidth=2,
           label=f'Our best (|t|={our_t:.2f}, {pctile:.0f}th pctile)')
ax.set_xlabel('|t-statistic| of random entry rules')
ax.set_ylabel('Count')
ax.set_title(f'Random Signal Comparison ({n_random} random, n={n_target})', fontweight='bold')
ax.legend()
fig.tight_layout()
plt.show()

---
## Section 12: Summary & Next Steps

In [None]:
print('=' * 70)
print('INVESTIGATION SUMMARY: GBP SKEW DIVERGENCE SIGNAL')
print('=' * 70)

print(f'\n1. KEY FINDING (Section 2)')
print(f'   When a fast tenor moves but a slower tenor hasn\'t yet, GBPUSD spot')
print(f'   moves in the direction implied by the fast tenor.')

print(f'\n2. DATA')
print(f'   Currency: {CCY} ({PAIR})')
print(f'   Date range: {df["date"].min().date()} to {df["date"].max().date()}')
for fast, slow in TENOR_PAIRS:
    n_overlap = mask_pair[(fast, slow)].sum()
    print(f'   {fast}->{slow} overlap: {n_overlap} dates')

if len(div_results) > 0:
    n_tests_div = len(div_results)
    n_sig_div = (div_results['p_value'] < 0.05).sum()
    n_bonf_div = div_results['sig_bonf'].sum()

    print(f'\n3. DIVERGENCE SIGNAL GRID SEARCH ({n_tests_div} tests)')
    print(f'   Significant at p<0.05: {n_sig_div}')
    print(f'   After Bonferroni:      {n_bonf_div}')
    print(f'   Expected false pos:    {n_tests_div * 0.05:.1f}')

    # Per-pair best signals
    print(f'\n4. BEST SIGNAL PER TENOR PAIR')
    div_5d_summary = div_results[div_results['horizon'] == '5d']
    for fast, slow in TENOR_PAIRS:
        pair_label = f'{fast}{slow}'
        pair_best = div_5d_summary[div_5d_summary['signal'].str.contains(f'_div_{pair_label}_')]
        if len(pair_best) > 0:
            row = pair_best.iloc[0]
            print(f'   {fast}->{slow}: t={row["t_stat"]:.2f}, hit={row["hit_rate"]:.0%}, '
                  f'n={row["n"]}, edge={row["mean_bps"]:+.1f}bps')

    print(f'\n5. OVERALL BEST DIVERGENCE SIGNALS')
    for i, (_, row) in enumerate(div_results.head(5).iterrows()):
        flag = ' ***' if row.get('sig_bonf', False) else ''
        print(f'   {i+1}. [{row.get("pair","?")}] {row["signal"]}')
        print(f'      {row["horizon"]}: edge={row["mean_bps"]:+.1f}bps, '
              f'hit={row["hit_rate"]:.0%}, t={row["t_stat"]:.2f}, '
              f'p={row["p_value"]:.4f}, n={row["n"]}{flag}')

print(f'\n6. ABLATION: SLOW QUIET FILTER VALUE')
if len(abl_df) > 0:
    for fast, slow in TENOR_PAIRS:
        pair_str = f'{fast}->{slow}'
        pair_abl = abl_df[abl_df['pair'] == pair_str]
        if len(pair_abl) > 0:
            improved_pct = (pair_abl['delta_t'] > 0).mean() * 100
            avg_delta = pair_abl['delta_t'].mean()
            print(f'   {pair_str}: improves {improved_pct:.0f}% of cases, avg delta-t: {avg_delta:+.3f}')

print(f'\n7. OPTIMAL TIMING')
print(f'   Hold period: {HOLD_DAYS} days')
print(f'   Best detection method: {best_method}')

print(f'\n8. BACKTEST')
print(f'   Gross Sharpe: {sr_gross:.2f}')
print(f'   Net Sharpe:   {sr_net:.2f}')
print(f'   Max drawdown: {max_dd*100:.1f}%')
print(f'   Trades:       {n_trades}')

print(f'\n9. KEY QUESTION: CAN 1W GET US IN EARLIER?')
# Compare best 1W vs 1M based signals
best_1w = div_5d_summary[div_5d_summary['signal'].str.contains('_div_1W')]
best_1m = div_5d_summary[div_5d_summary['signal'].str.contains('_div_1M3M')]
if len(best_1w) > 0 and len(best_1m) > 0:
    t_1w = best_1w.iloc[0]['t_stat']
    t_1m = best_1m.iloc[0]['t_stat']
    if t_1w > t_1m:
        print(f'   YES: Best 1W signal (t={t_1w:.2f}) beats best 1M->3M (t={t_1m:.2f})')
    else:
        print(f'   NO: Best 1M->3M (t={t_1m:.2f}) still beats best 1W signal (t={t_1w:.2f})')
    print(f'   Best 1W: {best_1w.iloc[0]["signal"]}')
    print(f'   Best 1M: {best_1m.iloc[0]["signal"]}')
elif len(best_1w) > 0:
    print(f'   1W signals found (best t={best_1w.iloc[0]["t_stat"]:.2f}) but no 1M->3M for comparison')
else:
    print(f'   No 1W signals passed minimum event threshold')

print(f'\n10. HONEST ASSESSMENT')
if len(div_results) > 0:
    if n_bonf_div > 0:
        print(f'   STRONG: {n_bonf_div} signals survive Bonferroni.')
    elif n_sig_div > n_tests_div * 0.05 * 2:
        print(f'   PROMISING: {n_sig_div} significant (expected {n_tests_div*0.05:.1f}).')
    else:
        print(f'   CAUTIOUS: Signal count near chance level. Small samples remain a concern.')

print(f'\n11. NEXT STEPS')
print(f'   - Apply to EUR (more 3M/6M data)')
print(f'   - Pool GBP+EUR for more statistical power')
print(f'   - Explore 2W tenor (different expiry phase from 1W)')
print(f'   - Fine-tune around the best MA window')
print(f'   - Paper trade with live data')