# Pairs Selection using Cointegration Tests & Kalman Filter

## Imports & Settings

In [1]:
import warnings
warnings.filterwarnings('ignore')

In [2]:
import os
import sys
import csv
import requests
import io
from collections import Counter

from datetime import date
from dataclasses import dataclass, asdict
from time import time
from collections import defaultdict
from pathlib import Path

import numpy as np
import pandas as pd
import pandas_datareader.data as web

from scipy.stats import pearsonr, spearmanr

from pykalman import KalmanFilter

import statsmodels.api as sm
import statsmodels.tsa.api as tsa
from statsmodels.tsa.stattools import acf, q_stat, adfuller, coint
from statsmodels.tsa.vector_ar.vecm import coint_johansen
from statsmodels.tsa.api import VAR

import backtrader as bt
from backtrader.feeds import PandasData

import pyfolio as pf

import matplotlib.pyplot as plt
import matplotlib as mpl
import seaborn as sns

In [3]:
idx = pd.IndexSlice

In [4]:
DATA_PATH = Path('..', 'data') 

In [5]:
def format_time(t):
    m_, s = divmod(t, 60)
    h, m = divmod(m_, 60)
    return f'{h:>02.0f}:{m:>02.0f}:{s:>02.0f}'

### Johansen Test Critical Values

In [6]:
critical_values = {0: {.9: 13.4294, .95: 15.4943, .99: 19.9349},
                   1: {.9: 2.7055, .95: 3.8415, .99: 6.6349}}

In [7]:
trace0_cv = critical_values[0][.95] # critical value for 0 cointegration relationships
trace1_cv = critical_values[1][.95] # critical value for 1 cointegration relationship

## Load Data

In [8]:
DATA_PATH = Path('..', 'data') 
STORE = DATA_PATH / 'stooq' / 'daily.h5'

### Load Stock Prices

In [9]:
stocks = pd.read_hdf('data.h5', 'stocks/close').loc['2015':]
stocks.info()

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 1258 entries, 2015-01-02 to 2019-12-31
Columns: 172 entries, AA.US to YUM.US
dtypes: float64(172)
memory usage: 1.7 MB


### Load ETF Data

In [10]:
etfs = pd.read_hdf('data.h5', 'etfs/close').loc['2015':]
etfs.info()

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 1258 entries, 2015-01-02 to 2019-12-31
Columns: 138 entries, AAXJ.US to YCS.US
dtypes: float64(138)
memory usage: 1.3 MB


### Load Ticker Dictionary

In [11]:
# def get_ticker_dict():
#     with pd.HDFStore(STORE) as store:
#         return (store['us/nysemkt/stocks/symbols']
#                 .append(store['us/nyse/stocks/symbols'])
#                 .append(store['us/nyse/etfs/symbols'])
#                 .append(store['us/nasdaq/etfs/symbols'])
#                 .append(store['us/nasdaq/stocks/symbols'])
#                 .append(store['us/nysemkt/stocks/symbols'])
#                 .drop_duplicates()
#                 .set_index('symbol')
#                 .squeeze()
#                 .to_dict())
    
# names = get_ticker_dict()
# with pd.HDFStore('data.h5') as store:
#     store.put('symbols', pd.Series(names))    

In [30]:
names = pd.read_hdf('data.h5', 'symbols').to_dict()

In [31]:
pd.Series(names).count()

8766

## Precompute Cointegration

In [51]:
def test_cointegration(etfs, stocks, test_end, lookback=2):
    start = time()
    results = []
    test_start = test_end - pd.DateOffset(years=lookback) + pd.DateOffset(days=1)
    etf_symbols = etfs.columns.tolist()
    etf_data = etfs.loc[str(test_start):str(test_end)]

    stock_symbols = stocks.columns.tolist()
    stock_data = stocks.loc[str(test_start):str(test_end)]
    n = len(etf_symbols) * len(stock_symbols)
    j = 0
    for i, s1 in enumerate(etf_symbols, 1):
        for s2 in stock_symbols:
            j += 1
            if j % 1000 == 0:
                print(f'\t{j:5,.0f} ({j/n:3.1%}) | {time() - start:.2f}')
            df = etf_data.loc[:, [s1]].dropna().join(stock_data.loc[:, [s2]].dropna(), how='inner')
            with warnings.catch_warnings():
                warnings.simplefilter('ignore')
                var = VAR(df)
                lags = var.select_order()
                result = [test_end, s1, s2]
                order = lags.selected_orders['aic']
                result += [coint(df[s1], df[s2], trend='c')[1], coint(df[s2], df[s1], trend='c')[1]]

            cj = coint_johansen(df, det_order=0, k_ar_diff=order)
            result += (list(cj.lr1) + list(cj.lr2) + list(cj.evec[:, cj.ind[0]]))
            results.append(result)
    return results

### Define Test Periods

In [None]:
dates = stocks.loc['2016-12':'2019-6'].resample('Q').last().index
dates

### Run Tests

In [None]:
test_results = []
columns = ['test_end', 's1', 's2', 'eg1', 'eg2',
           'trace0', 'trace1', 'eig0', 'eig1', 'w1', 'w2']

for test_end in dates:
    print(test_end)
    result = test_cointegration(etfs, stocks, test_end=test_end)
    test_results.append(pd.DataFrame(result, columns=columns))

pd.concat(test_results).to_hdf('backtest.h5', 'cointegration_test')

2016-12-31 00:00:00
	1,000/23,736 | 57.51
	2,000/23,736 | 114.75
	3,000/23,736 | 171.62
	4,000/23,736 | 231.43
	5,000/23,736 | 290.16
	6,000/23,736 | 349.03
	7,000/23,736 | 406.42
	8,000/23,736 | 463.57
	9,000/23,736 | 520.84
	10,000/23,736 | 577.56
	11,000/23,736 | 635.62
	12,000/23,736 | 698.26
	13,000/23,736 | 777.39
	14,000/23,736 | 850.12
	15,000/23,736 | 915.75
	16,000/23,736 | 973.28
	17,000/23,736 | 1029.81
	18,000/23,736 | 1087.44
	19,000/23,736 | 1144.51
	20,000/23,736 | 1201.44
	21,000/23,736 | 1261.56
	22,000/23,736 | 1321.64
	23,000/23,736 | 1381.51
2017-03-31 00:00:00
	1,000/23,736 | 59.16
	2,000/23,736 | 119.15
	3,000/23,736 | 180.16
	4,000/23,736 | 240.25
	5,000/23,736 | 303.54
	6,000/23,736 | 363.46
	7,000/23,736 | 421.88
	8,000/23,736 | 480.44
	9,000/23,736 | 539.07
	10,000/23,736 | 598.31
	11,000/23,736 | 656.78
	12,000/23,736 | 715.70
	13,000/23,736 | 774.62
	14,000/23,736 | 833.20
	15,000/23,736 | 891.63
	16,000/23,736 | 950.34
	17,000/23,736 | 1009.76
	18,000/23,7

#### Reload  Test Results

In [None]:
test_results = pd.read_hdf('backtest.h5', 'cointegration_test')
test_results.info()

## Identify Cointegrated Pairs

### Significant Johansen Trace Statistic

In [None]:
test_results['joh_sig'] = ((test_results.trace0 > trace0_cv) &
                           (test_results.trace1 > trace1_cv))

In [None]:
test_results.joh_sig.value_counts(normalize=True)

### Significant Engle Granger Test

In [None]:
test_results['eg'] = test_results[['eg1', 'eg2']].min(axis=1)
test_results['s1_dep'] = test_results.eg1 < test_results.eg2
test_results['eg_sig'] = (test_results.eg < .05)

In [None]:
test_results.eg_sig.value_counts(normalize=True)

### Comparison Engle-Granger vs Johansen

In [None]:
test_results['coint'] = (test_results.eg_sig & test_results.joh_sig)
test_results.coint.value_counts(normalize=True)

In [None]:
test_results = test_results.drop(['eg1', 'eg2', 'trace0', 'trace1', 'eig0', 'eig1'], axis=1)
test_results.info()

### Comparison

In [None]:
ax = test_results.groupby('test_end').coint.mean().to_frame('# Pairs').plot()
ax.axhline(.05, lw=1, ls='--', c='k');

### Select Candidate Pairs

In [None]:
def select_candidate_pairs(data):
    candidates = data[data.joh_sig | data.eg_sig]
    candidates['y'] = candidates.apply(lambda x: x.s1 if x.s1_dep else x.s2, axis=1)
    candidates['x'] = candidates.apply(lambda x: x.s2 if x.s1_dep else x.s1, axis=1)
    return candidates.drop(['s1_dep', 's1', 's2'], axis=1)

In [None]:
candidates = select_candidates(test_results)
candidates.to_hdf('backtest.h5', 'candidates')

In [None]:
candidates = pd.read_hdf('backtest.h5', 'candidates')
candidates.info()

#### # Candidates over Time

In [None]:
candidates.groupby('test_end').size().plot(figsize=(8,5));

#### Most Common Pairs 

In [None]:
with pd.HDFStore('data.h5') as store:
    print(store.info())

In [None]:
with pd.HDFStore('backtest.h5') as store:
    print(store.info())

In [None]:
counter = Counter()
for s1, s2 in zip(candidates[candidates.joh_sig & candidates.eg_sig].y, 
                  candidates[candidates.joh_sig & candidates.eg_sig].x):
    if s1 > s2:
        counter[(s2, s1)] += 1
    else: 
        counter[(s1, s2)] += 1

In [None]:
most_common_pairs = pd.DataFrame(counter.most_common(10))
most_common_pairs = pd.DataFrame(most_common_pairs[0].values.tolist(), columns=['s1', 's2'])
most_common_pairs

In [None]:
with pd.HDFStore('backtest.h5') as store:
    prices = store['prices'].close.unstack('symbol').ffill(limit=5)
    symbols = store['symbols'].set_index('symbol').squeeze().to_dict()

In [None]:
cnt = pd.Series(counter).reset_index()
cnt.columns = ['s1', 's2', 'n']
cnt['name1'] = cnt.s1.map(names)
cnt['name2'] = cnt.s2.map(names)
cnt.nlargest(10, columns='n')

In [None]:
fig, axes = plt.subplots(ncols=2, figsize=(14, 5))
for i in range(2):
    s1, s2 = most_common_pairs.at[i, 's1'], most_common_pairs.at[i, 's2']
    prices.loc[:, [s1, s2]].rename(columns=symbols).plot(secondary_y=symbols[s2], ax=axes[i])
    axes[i].grid(False)
    axes[i].set_xlabel('')
    
fig.tight_layout()
fig.savefig('figures/common_pairs', dpi=300)

## Get Entry and Exit Dates 

### Smooth prices using Kalman filter

In [None]:
def KFSmoother(prices):
    """Estimate rolling mean"""
    
    kf = KalmanFilter(transition_matrices=np.eye(1),
                      observation_matrices=np.eye(1),
                      initial_state_mean=0,
                      initial_state_covariance=1,
                      observation_covariance=1,
                      transition_covariance=.05)

    state_means, _ = kf.filter(prices.values)
    return pd.Series(state_means.flatten(),
                     index=prices.index)

In [None]:
smoothed_prices = prices.apply(KFSmoother)
smoothed_prices.to_hdf('tmp.h5', 'smoothed')

In [None]:
smoothed_prices = pd.read_hdf('tmp.h5', 'smoothed')

### Compute rolling hedge ratio using Kalman Filter

In [None]:
def KFHedgeRatio(x, y):
    """Estimate Hedge Ratio"""
    delta = 1e-3
    trans_cov = delta / (1 - delta) * np.eye(2)
    obs_mat = np.expand_dims(np.vstack([[x], [np.ones(len(x))]]).T, axis=1)

    kf = KalmanFilter(n_dim_obs=1, n_dim_state=2,
                      initial_state_mean=[0, 0],
                      initial_state_covariance=np.ones((2, 2)),
                      transition_matrices=np.eye(2),
                      observation_matrices=obs_mat,
                      observation_covariance=2,
                      transition_covariance=trans_cov)

    state_means, _ = kf.filter(y.values)
    return -state_means

### Estimate mean reversion half life

In [None]:
def estimate_half_life(spread):
    X = spread.shift().iloc[1:].to_frame().assign(const=1)
    y = spread.diff().iloc[1:]
    beta = (np.linalg.inv(X.T@X)@X.T@y).iloc[0]
    halflife = int(round(-np.log(2) / beta, 0))
    return max(halflife, 1)

### Compute Spread & Bollinger Bands

In [None]:
def get_spread(candidates, prices):
    pairs = []
    half_lives = []

    periods = pd.DatetimeIndex(sorted(candidates.test_end.unique()))
    start = time()
    for p, test_end in enumerate(periods, 1):
        start_iteration = time()

        period_candidates = candidates.loc[candidates.test_end == test_end, ['y', 'x']]
        trading_start = test_end + pd.DateOffset(days=1)
        t = trading_start - pd.DateOffset(years=2)
        T = trading_start + pd.DateOffset(months=6) - pd.DateOffset(days=1)
        max_window = len(prices.loc[t: test_end].index)
        period_symbols = period_candidates.y.append(period_candidates.x).unique()
        print(test_end.date(), len(period_candidates))
        for i, (y, x) in enumerate(zip(period_candidates.y, period_candidates.x), 1):
            if i % 1000 == 0:
                msg = f'{i:5.0f} | {time() - start_iteration:7.1f} | {time() - start:10.1f}'
                print(msg)
            pair = prices.loc[t: T, [y, x]]
            pair['hedge_ratio'] = KFHedgeRatio(y=KFSmoother(prices.loc[t: T, y]),
                                               x=KFSmoother(prices.loc[t: T, x]))[:, 0]
            pair['spread'] = pair[y].add(pair[x].mul(pair.hedge_ratio))
            half_life = estimate_half_life(pair.spread.loc[t: test_end])

            spread = pair.spread.rolling(window=min(2 * half_life, max_window))
            pair['z_score'] = pair.spread.sub(spread.mean()).div(spread.std())
            pairs.append(pair.loc[trading_start: T].assign(s1=y, s2=x, period=p, pair=i).drop([x, y], axis=1))

            half_lives.append([test_end, y, x, half_life])
    return pairs, half_lives

In [None]:
candidates = pd.read_hdf('backtest.h5', 'candidates')
candidates.info()

In [None]:
pairs, half_lives = get_spread(candidates, smoothed_prices)

### Collect Results

#### Half Lives

In [None]:
hl = pd.DataFrame(half_lives, columns=['test_end', 's1', 's2', 'half_life'])
hl.info()

In [None]:
hl.half_life.describe()

In [None]:
hl.to_hdf('backtest.h5', 'half_lives')

#### Pair Data

In [None]:
pair_data = pd.concat(pairs)
pair_data.info(null_counts=True)

In [None]:
pair_data.to_hdf('backtest.h5', 'pair_data')

In [None]:
pair_data = pd.read_hdf('backtest.h5', 'pair_data')

### Identify Long & Short Entry and Exit Dates

In [None]:
def get_trades(data):
    pair_trades = []
    for i, ((period, s1, s2), pair) in enumerate(data.groupby(['period', 's1', 's2']), 1):
        if i % 100 == 0:
            print(i)

        first3m = pair.first('3M').index
        last3m = pair.last('3M').index

        entry = pair.z_score.abs() > 2
        entry = ((entry.shift() != entry)
                 .mul(np.sign(pair.z_score))
                 .fillna(0)
                 .astype(int)
                 .sub(2))

        exit = (np.sign(pair.z_score.shift().fillna(method='bfill'))
                != np.sign(pair.z_score)).astype(int) - 1

        trades = (entry[entry != -2].append(exit[exit == 0])
                  .to_frame('side')
                  .sort_values(['date', 'side'])
                  .squeeze())
        if not isinstance(trades, pd.Series):
            continue
        try:
            trades.loc[trades < 0] += 2
        except:
            print(type(trades))
            print(trades)
            print(pair.z_score.describe())
            break

        trades = trades[trades.abs().shift() != trades.abs()]
        window = trades.loc[first3m.min():first3m.max()]
        extra = trades.loc[last3m.min():last3m.max()]
        n = len(trades)

        if window.iloc[0] == 0:
            if n > 1:
                print('shift')
                window = window.iloc[1:]
        if window.iloc[-1] != 0:
            extra_exits = extra[extra == 0].head(1)
            if extra_exits.empty:
                continue
            else:
                window = window.append(extra_exits)

        trades = pair[['s1', 's2', 'hedge_ratio', 'period', 'pair']].join(window.to_frame('side'), how='right')
        trades.loc[trades.side == 0, 'hedge_ratio'] = np.nan
        trades.hedge_ratio = trades.hedge_ratio.ffill()
        pair_trades.append(trades)
    return pair_trades

In [None]:
pair_trade_data = pd.concat(pair_trades)
pair_trade_data.info()

In [None]:
pair_trade_data.head()

In [None]:
trades = pair_trade_data['side'].copy()
trades.loc[trades!= 0] =1
trades.loc[trades==0] = -1
trades.sort_index().cumsum().plot();

In [None]:
pair_trade_data.to_hdf('backtest.h5', 'pair_trades')