In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy import stats
import seaborn as sns
from hurst import compute_Hc
import statsmodels.tsa.stattools as ts
import statsmodels.api as sm
from statsmodels.tsa.stattools import adfuller
import scipy.optimize as spop
from sklearn.linear_model import LinearRegression
from statistics import mean
from statsmodels.tsa.stattools import coint
import json
from numpy import cumsum, log, polyfit, sqrt, std
from numpy.random import randn
from scipy.stats import norm

In [None]:
plt.style.use('seaborn')
sns.set_style('darkgrid')
prices = pd.read_csv('etfs.csv')
#prices.columns
prices['Date'] = pd.to_datetime(prices['Date'])
prices = prices.set_index('Date')
prices = prices.dropna(axis=1)
prices.head()
returns = np.log(prices).diff().dropna()

In [None]:
P_VALUE_THRESHOLD = 0.05
HURST_THRESHOLD = 0.5
TRADING_PERIOD = 253

In [None]:
form_start = '2011-01-01'
form_end = '2016-12-31'
trade_start = '2017-01-01'
trade_end = '2019-12-31'

prices_form = prices[form_start:form_end]
prices_trade = prices[trade_start:trade_end]
returns_form = returns.loc[form_start:form_end]
returns_trade = returns.loc[trade_start:trade_end]

In [None]:
def engle_granger_cointegration_test(X, Y):
    # Calculate the cointegration test statistics and p-value using the coint() function
    _, pvalue, _ = coint(X, Y)
    return pvalue

   

In [None]:
def hurst(X):
    """Returns the Hurst Exponent of the time series vector X"""
    # Create the range of lag values
    lags = range(2, 100)
    # Calculate the array of the variances of the lagged differences
    tau = [np.sqrt(np.std(np.subtract(X[lag:], X[:-lag]))) for lag in lags]
    # Use polyfit to estimate the Hurst exponent
    poly = np.polyfit(np.log(lags), np.log(tau), 1)
    return poly[0]*2.0

def calculate_hurst_of_spread(X, Y):
    # Calculate the spread between X and Y
    spread = X - Y

    # Check if spread has any zeroes or NaNs
    if np.any(np.isnan(spread)) or np.any(spread == 0):
        return np.nan

    # Calculate the Hurst exponent of the spread
    hurst_exp = hurst(spread)

    return hurst_exp

In [None]:
def count_crosses(X, Y):
    spread = X - Y
    mean_spread = np.mean(spread)
    cross_count = 0
    for i in range(len(spread)-1):
        if (spread[i] - mean_spread) * (spread[i+1] - mean_spread) < 0:
            cross_count += 1
    return cross_count

In [None]:
import numpy as np
from statsmodels.tsa.stattools import adfuller

def calculate_half_life(X, Y):
    spread = X - Y

    delta_spread = np.diff(spread)

    lagged_spread = spread[:-1]

    lagged_spread_with_constant = np.column_stack((lagged_spread, np.ones_like(lagged_spread)))

    coeffs = np.linalg.lstsq(lagged_spread_with_constant, delta_spread, rcond=None)[0]

    half_life = -np.log(2) / coeffs[0]

    return half_life



In [None]:
def calculate_correlation(time_series1, time_series2):
    return stats.pearsonr(time_series1, time_series2)

In [None]:
"""The null hypothesis of DF test is that
there is a unit root in an AR model, which implies
that the data series is not stationary."""

def adf_test(time_series1):
    dftest = adfuller(time_series1)
    return dftest

In [None]:
def parse_pair(pair):
    s1 = pair[:pair.find('-')]
    s2 = pair[pair.find('-')+1:]
    return s1,s2

In [None]:
# ADF for stationarity
results_adf = pd.DataFrame(columns=["ADF"])
selected_ticks = []

for s1 in returns_form.columns:
    if (f'{s1}' not in results_adf.index):
        results_adf.loc[f'{s1}'] = adf_test(returns_form[s1])[1]
    if adf_test(returns_form[s1])[1]< P_VALUE_THRESHOLD:
        selected_ticks.append(f'{s1}')

In [None]:
#Pearson's R
selected_pairs = []
results = pd.DataFrame(columns=["Pearson's R"])
for s1 in selected_ticks:
    for s2 in selected_ticks:
        if (s1!=s2) and (f'{s2}-{s1}' not in results.index):
            results.loc[f'{s1}-{s2}'] = calculate_correlation(returns_form[s1], returns_form[s2])[0]
        if calculate_correlation(returns_form[s1], returns_form[s2])[0] > 0.9 and calculate_correlation(returns_form[s1], returns_form[s2])[0] != 1 :
            selected_pairs.append(f'{s1}-{s2}')

In [None]:
#Engle Granger
selected_pairs_1 = []
#h = []
for pair in selected_pairs:
    s1, s2 = parse_pair(pair)
    interpolated_form_1 = prices_form[s1].interpolate()
    interpolated_form_2 = prices_form[s2].interpolate()
    if engle_granger_cointegration_test(interpolated_form_1, interpolated_form_2)<P_VALUE_THRESHOLD:
            selected_pairs_1.append(f'{s1}-{s2}')
            #he = calculate_hurst_of_spread(interpolated_form_1, interpolated_form_2)
            #h.append(he)

In [None]:
#Half life
selected_pairs_2 = []
hl = []
for pair in selected_pairs_1:
    s1, s2 = parse_pair(pair)
    interpolated_form_1 = prices_form[s1].interpolate()
    interpolated_form_2 = prices_form[s2].interpolate()
    if calculate_half_life(interpolated_form_1, interpolated_form_2) > 1 and calculate_half_life(interpolated_form_1, interpolated_form_2) < 252 :
            selected_pairs_2.append(f'{s1}-{s2}')
            h = calculate_half_life(interpolated_form_1, interpolated_form_2)
            hl.append(h)

In [None]:
# Mean crosses
selected_pairs_3 = []
crosses = []
for pair in selected_pairs_2:
    s1, s2 = parse_pair(pair)
    interpolated_form_1 = prices_form[s1].interpolate()
    interpolated_form_2 = prices_form[s2].interpolate()
    if count_crosses(interpolated_form_1, interpolated_form_2) >= 12:
            selected_pairs_3.append(f'{s1}-{s2}')
            count = count_crosses(interpolated_form_1, interpolated_form_2)
            crosses.append(count)

In [None]:

array = selected_pairs_3

with open('stat_pairs.json', 'w') as f:
    json.dump(array, f)