In [1]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from itertools import combinations
import statsmodels.api as sm
from statsmodels.tsa.stattools import coint, adfuller
from pykalman import KalmanFilter
from math import sqrt
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')
from IPython.display import display
from joblib import Parallel, delayed

In [11]:
# ─── PARAMETERS ─────────────────────────────────────────────────────────────
START_DATE = "2018-01-01"
END_DATE   = "2025-01-01"
OUT_CSV    = "vn_top500_closes_2018_2025.csv"
EXCHANGE   = "HOSE,HNX,UPCOM"
SOURCES     = ['VCI', 'SSI']

TRAIN_FRAC = 0.7       # 70% train / 30% test
MIN_COMMON = 100       # min days overlap
P_THRESH   = 0.05      # cointegration p-value threshold
ENTRY_Z    = 1.25      # entry z-score
EXIT_Z     = 0.5       # exit z-score
MIN_HL     = 5
MAX_HL     = 252
COMMISSION = 0.002    # 0.2% round-trip split per side
SLIPPAGE   = 0.0015 
MIN_COVERAGE_PCT = 0.90   # require ≥90% of the “best” non-NaN days
WF_TRAIN_WINDOW = 12    # Training window in months for walk-forward
WF_TEST_WINDOW = 3      # Test window in months for walk-forward
TRADING_DAYS_PER_MONTH = 21


In [28]:
class ImprovedWalkForwardBacktester:
    def __init__(self, px_data, config):
        self.px = px_data.copy()
        self.config = config.copy()
        # defaults
        self.config.setdefault('corr_threshold', 0.3)
        self.config.setdefault('n_jobs', -1)
        self.config.setdefault('max_pairs', 5)
        self.wf_results = []
        self._kalman_cache = {}

    def find_cointegrated_pairs(self, data, min_obs=MIN_COMMON):
        log_px = np.log(data.replace(0, np.nan))
        corr = log_px.corr().abs()
        syms = corr.columns
        # pre‐filter by correlation
        cand = [
            (s1, s2)
            for i, s1 in enumerate(syms)
            for s2 in syms[i+1:]
            if corr.at[s1, s2] > self.config['corr_threshold']
        ]
        def test_pair(s1, s2):
            x = log_px[s1].dropna()
            y = log_px[s2].dropna()
            idx = x.index.intersection(y.index)
            if len(idx) < min_obs or x.loc[idx].std()==0 or y.loc[idx].std()==0:
                return None
            _, p, _ = coint(x.loc[idx], y.loc[idx])
            return (s1, s2, p) if p < self.config['p_thresh'] else None

        results = Parallel(n_jobs=self.config['n_jobs'])(
            delayed(test_pair)(s1, s2) for s1, s2 in cand
        )
        pairs = [r for r in results if r is not None]
        return sorted(pairs, key=lambda x: x[2])

    def kalman_filter_smooth(self, series):
        key = (series.name, series.index[0], series.index[-1])
        if key not in self._kalman_cache:
            raw_std = series.pct_change().std()
            obs_std = raw_std if (np.isfinite(raw_std) and raw_std > 0.1) else 0.1
            kf = KalmanFilter(
                transition_matrices=[1],
                observation_matrices=[1],
                initial_state_mean=series.iloc[0],
                initial_state_covariance=1.0,
                observation_covariance=obs_std,
                transition_covariance=obs_std * 0.1
            )
            m, _ = kf.filter(series.values)
            self._kalman_cache[key] = pd.Series(m.flatten(), index=series.index)
        return self._kalman_cache[key]

    def kalman_regression(self, x, y, delta=1e-3):
        raw_std = y.pct_change().std()
        obs_std = raw_std if (np.isfinite(raw_std) and raw_std > 0.1) else 0.1
        trans_cov = delta/(1 - delta) * np.eye(2)
        obs_mat   = np.expand_dims(np.vstack([x, np.ones(len(x))]).T, axis=1)
        kf = KalmanFilter(
            n_dim_obs=1, n_dim_state=2,
            initial_state_mean=[0, 0],
            initial_state_covariance=np.eye(2),
            transition_matrices=np.eye(2),
            observation_matrices=obs_mat,
            observation_covariance=obs_std,
            transition_covariance=trans_cov
        )
        means, _ = kf.filter(y.values)
        return means[:,0]

    def test_pair_strategy(self, s1, s2, params, data):
        df = data[[s1, s2]].dropna()
        x, y = df[s1], df[s2]
        mx, my = self.kalman_filter_smooth(x), self.kalman_filter_smooth(y)
        hr = -self.kalman_regression(mx, my)
        spread = y + hr * x

        # half-life
        lag = spread.shift(1)
        regdf = pd.concat([spread - lag, lag], axis=1).dropna()
        if regdf.iloc[:,1].std() == 0:
            return None
        beta = sm.OLS(regdf.iloc[:,0], sm.add_constant(regdf.iloc[:,1])).fit().params[1]
        hl = max(MIN_HL, min(int(round(-np.log(2)/beta)), MAX_HL))

        # z-score
        m = spread.rolling(hl).mean()
        s = spread.rolling(hl).std().replace(0,1e-8)
        z = (spread - m) / s

        e_z, x_z = params['entry_z'], params['exit_z']
        long_sig  = (z < -e_z).astype(int)
        short_sig = (z >  e_z).astype(int)
        pos       = (long_sig - short_sig).shift(1).ffill().fillna(0)

        notional = abs(hr)*x + y
        ret      = spread.diff() / notional.shift(1)
        strat    = ret * pos
        cost     = pos.diff().abs() * (self.config['commission'] + self.config['slippage'])
        net_ret  = strat - cost
        eq       = (1 + net_ret.fillna(0)).cumprod()

        ann_ret = net_ret.mean()*252
        ann_vol = net_ret.std()*sqrt(252)
        sharpe  = ann_ret/ann_vol if ann_vol>0 else np.nan
        win_rt  = (net_ret>0).mean()
        pf      = net_ret[net_ret>0].sum() / -net_ret[net_ret<0].sum() if (net_ret<0).any() else np.nan
        max_dd  = ((eq.cummax()-eq)/eq.cummax()).max()
        total   = eq.iloc[-1] - 1
        cagr    = eq.iloc[-1]**(365.25/len(eq)) - 1
        ntrades = int(pos.diff().abs().sum())

        return {
            'pair':          f"{s1}/{s2}",
            'equity_curve':  eq,
            'entry_z':       e_z,
            'exit_z':        x_z,
            'window_start':  data.index[0],
            'window_end':    data.index[-1],
            'total_return':  total,
            'annual_return': ann_ret,
            'volatility':    ann_vol,
            'sharpe':        sharpe,
            'win_rate':      win_rt,
            'profit_factor': pf,
            'max_drawdown':  max_dd,
            'cagr':          cagr,
            'num_trades':    ntrades,
            'half_life':     hl
        }

    def optimize_thresholds(self, s1, s2, train_data):
        best_sh, best_e, best_x = -np.inf, None, None
        for e in self.config['entry_z_grid']:
            for x in self.config['exit_z_grid']:
                perf = self.test_pair_strategy(s1, s2, {'entry_z':e,'exit_z':x}, train_data)
                if perf and perf['sharpe'] > best_sh:
                    best_sh, best_e, best_x = perf['sharpe'], e, x
        return best_e, best_x

    def walk_forward_analysis(self):
        train_off = pd.DateOffset(months=self.config['wf_train_window'])
        test_off  = pd.DateOffset(months=self.config['wf_test_window'])
        dates = pd.Series(self.px.index).sort_values()
        start, end = dates.min(), dates.max()
        ts, te = start, start + train_off
        te2 = te + test_off

        while te2 <= end:
            train_data = self.px[ts:te]
            test_data  = self.px[te:te2]

            if len(train_data) >= MIN_COMMON:
                pairs = self.find_cointegrated_pairs(train_data)
                top5  = pairs[:self.config['max_pairs']]
                optimized = Parallel(n_jobs=self.config['n_jobs'])(
                    delayed(self.optimize_thresholds)(s1,s2,train_data)
                    for s1, s2, _ in top5
                )
                for (s1,s2,_),(e_z,x_z) in zip(top5, optimized):
                    if e_z is None: continue
                    perf = self.test_pair_strategy(s1, s2, {'entry_z':e_z,'exit_z':x_z}, test_data)
                    if perf: self.wf_results.append(perf)

            ts += test_off; te += test_off; te2 += test_off
            self._kalman_cache.clear()

        return pd.DataFrame(self.wf_results)

In [31]:
# ─── USAGE ───────────────────────────────────────────────────────────────────
if __name__ == "__main__":
    # 1) load & clean
    px = pd.read_csv("/Users/lephuongnghi/Desktop/quant/personal/pairs_trading/notebooks/data_vn/vn_top500_closes_2018_2025_90pct.csv", index_col=0, parse_dates=True)
    px = px.where(px>0).dropna(axis='columns').astype(float)

    config = {
        'p_thresh':        0.05,
        'entry_z_grid':   [1.0, 1.25, 1.5],
        'exit_z_grid':    [0.25, 0.5, 0.75],
        'commission':      0.002,
        'slippage':        0.0015,
        'wf_train_window': 12,
        'wf_test_window':  3,
        'max_pairs':       5,
        'corr_threshold': 0.3,
        'n_jobs':         -1
    }

    wfb = ImprovedWalkForwardBacktester(px, config)
    results_df = wfb.walk_forward_analysis()

    # ─── AGGREGATE ACROSS WINDOWS ────────────────────────────────────────────

    # build a per-pair aggregation
    agg = (
        results_df
        .groupby('pair')
        .agg({
            'sharpe':        'mean',
            'total_return':  'mean',
            'annual_return': 'mean',
            'volatility':    'mean',
            'win_rate':      'mean',
            'profit_factor': 'mean',
            'max_drawdown':  'mean',
            'cagr':          'mean',
            'num_trades':    'sum',
            'half_life':     'mean'
        })
    )

    # pick the top 10 by average Sharpe
    top10 = agg.sort_values('sharpe', ascending=False).head(10)

    print("\nTop 10 pairs by *average* walk-forward Sharpe (aggregated):")
    print(top10)

    # optional: show all windows for these top-10 pairs
    breakdown = (
        results_df[results_df['pair'].isin(top10.index)]
        .sort_values(['pair','window_end','sharpe'], ascending=[True,True,False])
    )
    print("\nBreakdown of all windows for those top 10 pairs:")
    print(breakdown)


Top 10 pairs by *average* walk-forward Sharpe (aggregated):
           sharpe  total_return  annual_return  volatility  win_rate  \
pair                                                                   
VNR/BWS  5.934985      0.613273       2.039449    0.343632  0.225806   
VLB/SZL  4.492795      0.208808       0.827697    0.184228  0.266667   
BWS/TBC  3.435212      0.268135       1.025515    0.298530  0.193548   
VLB/GHC  3.369607      0.108846       0.450326    0.133644  0.150000   
PAC/GVT  3.238195      0.126928       0.474111    0.146412  0.166667   
VLB/SBA  3.114980      0.132782       0.496173    0.159286  0.196970   
SHP/LAS  2.971638      0.143752       0.564125    0.189836  0.206349   
VLB/DTD  2.894960      0.155851       0.645480    0.228037  0.216667   
GVT/EIC  2.875242      0.174121       0.674217    0.230243  0.244789   
VLB/S99  2.782218      0.118935       0.495726    0.178176  0.150000   

         profit_factor  max_drawdown       cagr  num_trades  half_life  
p

In [55]:
pair_window_counts = results_df['pair'].value_counts().rename('window_count')
    
# Filter pairs with >=3 windows and calculate their stats
stable_pairs = (
    results_df[results_df['pair'].isin(pair_window_counts[pair_window_counts >= 2].index)]
    .groupby('pair')
    .agg({
        'window_start': 'count',  # Same as window_count but ensures alignment
        'sharpe': ['mean', 'std'],
        'total_return': 'mean',
        'half_life': 'mean',
        'volatility': 'mean',
        'max_drawdown': 'mean'
        })
    )
stable_pairs.columns = ['window_count', 'sharpe_mean', 'sharpe_std', 
                        'total_return', 'half_life', 'volatility', 'max_drawdown']
    
# Sort by Sharpe consistency (high mean, low std)
stable_pairs['sharpe_consistency'] = stable_pairs['sharpe_mean'] / stable_pairs['sharpe_std']
stable_pairs = stable_pairs.sort_values('sharpe_consistency', ascending=False)

print(f"\nPairs surviving ≥2 test windows ({len(stable_pairs)}/{len(pair_window_counts)}):")
print(stable_pairs)

# ─── TOP PERFORMERS ─────────────────────────────────────────────────────
# Optional: Breakdown of all windows for stable pairs
stable_breakdown = results_df[results_df['pair'].isin(stable_pairs.index)]
print("\nDetailed performance across windows for stable pairs:")
print(stable_breakdown.sort_values(['pair', 'window_start']))


Pairs surviving ≥2 test windows (10/105):
         window_count  sharpe_mean  sharpe_std  total_return  half_life  \
pair                                                                      
BWS/BTP             2     1.749337    0.054600      0.049589        5.0   
VLB/DTD             2     2.894960    0.537179      0.155851        5.0   
GVT/EIC             2     2.875242    1.005653      0.174121        5.0   
DVN/IVS             2     1.242660    0.457381      0.039765        5.0   
VLB/APF             2     2.104824    1.125893      0.117631        5.0   
BWS/DP3             2     0.662862    1.281509      0.024317        5.0   
BWS/LAS             2     0.379272    3.911578      0.043959        5.0   
SHP/LHC             2    -0.350053    1.650212     -0.007365        5.0   
SHP/DDV             2    -0.517894    0.443010     -0.032373        5.0   
BWS/DTD             2    -1.624305    0.989764     -0.052003        5.0   

         volatility  max_drawdown  sharpe_consistency  
