In [1]:
import numpy as np
import pandas as pd
from scipy import stats
import matplotlib.pyplot as plt
import seaborn as sns
import yfinance as yf
from pytickersymbols import PyTickerSymbols
from pandas_datareader.stooq import StooqDailyReader
import re
from datetime import datetime
import warnings
warnings.filterwarnings("ignore")


plt.style.use("bmh")
plt.rcParams["figure.figsize"] = (15, 8)

### Prepare data

In this research, the stock symbols of the S&P500 composite will be selected by their sectors and current bidding prices (smaller than 500 USD).  
  
**NOTE**: the selection of the S&P500 stock universe is not accounted for Survivorship Bias. *This is not expected to affect the results of our arbitrage strategy as the strategy does not rely on the performances of the individual stocks*.

Data: 13 years of historical Daily Opens and Closes from Stooq.com

In [2]:
def get_stocks_by_sector_and_price(sectors_list, max_price):
    # create a PyTickerSymbols object
    stock_data = PyTickerSymbols()

    # get all the stocks of the S&P 500 index
    sp500_stocks = stock_data.get_stocks_by_index("S&P 500")

    # create a dictionary to store the industries
    stocks_from_sp500 = {}

    # loop through the stocks and get the ticker and industry for each one
    for stock in sp500_stocks:
        # get the ticker symbol from the symbol attribute
        ticker = stock["symbol"]
        # get the industry from the industries attribute
        industry = stock["industries"]
        # convert the industry to a string
        industry = str(industry)
        # use a regex to match the unique industry
        pattern = r"(" + "|".join(sectors_list) + ")"
        match = re.search(pattern, industry)
        # check if the match object is not None
        if match is not None:
            matched_industry = match.group(0)
            # if the industry is not in the dictionary, add it with an empty list
            if matched_industry not in stocks_from_sp500:
                stocks_from_sp500[matched_industry] = []
            # append the ticker to the list of the corresponding industry
            stocks_from_sp500[matched_industry].append(ticker)

    for _, stocks in stocks_from_sp500.items():
        for ticker in stocks:
            try:
                data = yf.Ticker(ticker).history(raise_errors=True)['Close']
                if data is not None or len(data) > 0:
                    price = data.iloc[-1]
                    if price >= max_price:
                        stocks_from_sp500[matched_industry].remove(ticker)
            except Exception as _:
                continue

    # loop through the items of the dictionary and print them
    total_stocks = 0
    for industry, stocks in stocks_from_sp500.items():
        # print the industry and a newline character
        print(industry + ":\n")
        # print the list of stocks and a newline character
        print(stocks, "\n")
        print('numer of stocks', len(stocks), "\n")
        total_stocks += len(stocks)
        print("--------------------------------------------------\n")
    print('total stocks: ', total_stocks, "\n")

    return stocks_from_sp500

In [3]:
sectors_list = ['Industrials', 'Consumer', 'Financial', 'Energy', 'Real Estate', 'Food', 'Pharmaceuticals', 'Healthcare', 'Materials']
tickers_by_sectors = get_stocks_by_sector_and_price(sectors_list, max_price=500.0)

Materials:

['LIN', 'DOW', 'APD', 'ALB', 'AVY', 'BLL', 'CE', 'CF', 'EMN', 'ECL', 'FMC', 'FCX', 'LYB', 'MLM', 'MOS', 'NEM', 'NUE', 'PKG', 'PPG', 'SEE', 'SHW', 'VMC', 'WRK', 'CTVA'] 

numer of stocks 24 

--------------------------------------------------

Consumer:

['CCL', 'HD', 'MCD', 'PG', 'WMT', 'WBA', 'DIS', 'AMZN', 'CHTR', 'CMCSA', 'DISH', 'DLTR', 'EXPE', 'MAR', 'ROST', 'SBUX', 'TSLA', 'TSCO', 'ULTA', 'CL', 'F', 'GM', 'LOW', 'TGT', 'AAP', 'ABC', 'BBY', 'BWA', 'CAH', 'KMX', 'CHD', 'CLX', 'CPRT', 'CMI', 'DRI', 'DG', 'DHI', 'FBHS', 'FOXA', 'GPC', 'HAS', 'HLT', 'IPG', 'KMB', 'KR', 'LEN', 'LKQ', 'MAS', 'MCK', 'MGM', 'MHK', 'NWL', 'NWSA', 'NCLH', 'OMC', 'PHM', 'RL', 'RCL', 'SYY', 'TTWO', 'TPR', 'TJX', 'VFC', 'WHR', 'WYNN', 'YUM', 'META'] 

numer of stocks 67 

--------------------------------------------------

Industrials:

['MMM', 'BA', 'CAT', 'AAL', 'CSX', 'FAST', 'FISV', 'PCAR', 'PAYX', 'EMR', 'FDX', 'HON', 'RTN', 'UNP', 'UPS', 'AMD', 'ALK', 'AME', 'AOS', 'BR', 'CHRW', 'CTAS', 'DE',

**NOTE**:  
Due to the [After Hour Trading Gap](https://www.investopedia.com/terms/g/gap.asp) phenomena that is inherent to the stock market, the common way of calculating daily returns by using today Close and yesterday Close should be avoided.  
In order to best appropriately model daily returns that is the most realistic to live trading, the daily log returns should be calculated as the following: *today Close - today Open*.

In [4]:
# # #NOTE: remove path later
# prices = pd.read_csv(r"C:\Users\Steven\OneDrive\FreeLance Works\Pairs-Trading-Statistical-Arbitrage\US_NYSE_Daily_2010.csv", parse_dates=[0], index_col=[0], engine='c')
# # get the selected tickers from the S&P 500
# tickers = []
# for _, stocks in tickers_by_sectors.items():
#     tickers.extend(stocks)
# # download historical data from stooq.com
# end = datetime.now().date()
# start = datetime(2010, 1, 1).date()
# data = StooqDailyReader(tickers, start, end, chunksize=100).read()
# data.drop(['High', 'Low', 'Volume'], axis=1, inplace=True)
# prices = data[::-1].copy()


In [4]:
# import
prices = pd.read_csv(r"C:\Users\Steven\OneDrive\FreeLance Works\Pairs-Trading-Statistical-Arbitrage\copulas_pairs\SP500_stock_data.csv", parse_dates=[0], index_col=[0], header=[0,1], engine='c')
prices.head()

Attributes,Close,Close,Close,Close,Close,Close,Close,Close,Close,Close,...,Open,Open,Open,Open,Open,Open,Open,Open,Open,Open
Symbols,LIN,DOW,APD,ALB,AVY,CE,CF,EMN,ECL,FMC,...,MAA,PLD,PSA,O,SBAC,UDR,VTR,VNO,WELL,WY
2010-01-04,,,61.5609,32.0135,28.0723,27.3038,13.9217,22.7835,39.5997,21.1501,...,32.4312,22.0269,52.5185,15.5413,33.4733,11.1372,22.7042,31.397,25.7504,10.4915
2010-01-05,,,61.0518,31.9568,28.256,27.9106,14.145,22.4922,38.995,21.0768,...,31.8861,21.8828,51.3045,15.6182,34.535,10.8917,22.2132,31.1412,25.6165,10.587
2010-01-06,,,60.5478,32.0135,28.1909,28.2166,14.7369,22.3411,38.8811,21.2808,...,31.952,21.8211,51.8577,15.7409,35.0336,10.9063,22.371,30.9236,25.8881,10.595
2010-01-07,,,60.2015,31.7856,28.4575,28.3646,14.5896,22.3948,39.2998,21.0174,...,31.8802,22.0906,51.312,15.7321,34.7894,10.7856,22.0544,30.6657,25.7464,10.6461
2010-01-08,,,60.5882,32.2654,29.0009,28.0992,14.9019,22.4571,39.6323,20.8292,...,32.0179,22.1552,51.1564,15.9659,35.58,10.7856,22.22,30.9868,25.6165,10.5763


The strategy algorithm will be demonstrated in ordered steps in this section:

### 1. Pair Selection
The first step is identifying potential pairs for trading.  

Pairs in top *90th percentile* of the highest [Kendall’s rank correlation coefficient](https://en.wikipedia.org/wiki/Kendall_rank_correlation_coefficient) (Kendall’s tau) between log-returns will be selected.  



In [5]:
def calc_rank_correlation(log_returns: pd.DataFrame) -> pd.Series:
    """
    Calculate the Kendall’s rank correlation coefficient statistic for pairwise comparison of columns in a DataFrame.
    
    Parameters:
        log_returns (pd.DataFrame): A DataFrame containing log returns of assets.
        
    Returns:
        pd.Series: A Series containing the Kendall’s rank correlation coefficient statistic for pairwise comparison of columns.
    """
    tau_ser = pd.Series(name="tau")
    for s1 in log_returns.columns:
        for s2 in log_returns.columns:
            if (s1 != s2) and (f"{s2}-{s1}" not in tau_ser.index):
                try:
                    tau_ser.loc[f"{s1}-{s2}"] = stats.kendalltau(
                        log_returns[s1].values, log_returns[s2].values
                    )[0]
                except:
                    continue
    return tau_ser


def parse_pair(pair):
    """
    Parse a pair string into two separate strings.

    Parameters:
        pair (str): A string representing a pair of values separated by a hyphen.

    Returns:
        tuple: A tuple containing two strings, the first string before the hyphen and the second string after the hyphen.
    """
    s1 = pair[: pair.find("-")]
    s2 = pair[pair.find("-") + 1 :]
    return s1, s2


def preprocess_data(data: pd.DataFrame):
    """Clean and log-transform the price dataframe."""  
    # Impute missing or infinite values
    data.replace([np.inf, -np.inf], np.nan, inplace=True)
    # Interpolate missing values
    data.interpolate(source='spline', order=5, limit_direction='both', inplace=True)
    # drop nan values
    data.dropna(inplace=True)
    # Compute Log-Returns
    try:
        df = np.log(data)
        log_returns = pd.DataFrame(columns=df.columns.get_level_values(1).unique(), index=df.index)
        for col in df.columns.get_level_values(1).unique():
            close_prices = df.Close[col]
            open_prices = df.Open[col]
            log_returns[col] =  close_prices - open_prices
        return log_returns
    
    except ValueError as e:
        print(f"Log-transforming data FAILED!!!\
                \n{type(e).__name__}: {str(e)}\n")
        return     
    
    

In [6]:
train_data = prices.iloc[:252*3] # create train dataset of 3 years

log_returns = preprocess_data(train_data)
kendall_T = calc_rank_correlation(log_returns)
top_kendall_T = kendall_T[kendall_T > kendall_T.quantile(0.9)]

# Select pairs
selected_assets = []
selected_pairs = []
for pair in reversed(top_kendall_T.index):
    s1, s2 = parse_pair(pair)
    if (s1 not in selected_assets) and (s2 not in selected_assets):
        selected_assets.append(s1)
        selected_assets.append(s2)
        selected_pairs.append(pair)

In [7]:
print(selected_pairs)

[]


### 2. Fit Marginals  
Find the best fitted marginal distributions for individual currencies within each pair.
The five following four parametric distributions will be used: Normal, Laplace, Student’s t, Logistic and Extreme.  
The best fitted distributions will be chosen based on Bayesian Information Criterion (BIC), and then those that pass the Kolmogorov-Smirnov goodness-of-fit test will be carried on as our selected distributions. 

In [8]:
def fit_marginals(
    log_returns: pd.DataFrame, selected_assets: list[str]
) -> dict:
    """
    Fits marginal distributions to the given log returns data for selected assets.

    Parameters:
    - log_returns: A pandas DataFrame containing the log returns data.
    - selected_assets: A list of strings representing the selected assets.

    Returns:
    - fitted_marginals: A dictionary mapping each stock to the best fitted marginal distribution.
    """
    fitted_marginals = {}
    for stock in selected_assets:
        
        data = log_returns[stock]
        best_pval = 0.0

        for dist in [
            stats.norm,
            stats.laplace,
            stats.t,
            stats.genhyperbolic,
            stats.genextreme,
        ]:
            try:
                params = dist.fit(data)
                dist_fit = dist(*params)
                ks_pval = stats.kstest(data, dist_fit.cdf, N=len(data))[1]
                if ks_pval > best_pval:
                    best_dist = dist_fit
                    best_pval = ks_pval
            except:
                continue
            
        fitted_marginals[stock] = best_dist

    return fitted_marginals

In [9]:
fitted_marginals = fit_marginals(log_returns, selected_assets)
# First 5 fitted marginals
for stock, dist in list(fitted_marginals.items())[:5]:
    print(f"Stock: '{stock}', Marginal Distribution: {dist.dist.name.upper()}.")

### 3. Fit Copulas  
First, individual assets' log returns are converted to their Uniform samples (CDFs) via applying the Probability Integral Transform to their best fitted marginal distributions earlier.  
Then fitting each pair's log return CDFs to seven Copula types: Gaussian, N13, N14, Clayton, Gumbel, Frank and Joe.
Copulas with the either smallest Akaike Information Criterion (AIC) or the Bayesian Information Criterion (BIC) will be selected for the two-dimensional Kolmogorov-Smirnov test.  
The two-dimensional Kolmogorov-Smirnov test compares the join-CDFs between 2 pairs of marginal distributions and the Null Hypothesis is that they are identical (the github repo of its Python implementation is [here](https://github.com/syrte/ndtest/blob/master/ndtest.py)).  
The top **10** Copulas which pass the 2d-KS test with the highest P-values will beour selected Copulas for trading.  
  
  
(OR The top **10** Copulas which pass the 2d-KS test with the highest P-values while also have the lowest AIC and BIC scores will be our selected Copulas for trading.
)

In [10]:
import copulas # Thanks to courtesy of Alexander Pavlov at https://github.com/financialnoob
import ndtest

def fit_copulas(
    log_returns: pd.DataFrame, fitted_marginals: dict, selected_pairs: list[tuple[str]]
) -> tuple[pd.DataFrame, dict]:
    """
    Fits copulas to log returns data.

    Args:
        log_returns (pd.DataFrame): A DataFrame containing log returns data.
        fitted_marginals (dict): A dictionary of fitted marginal distributions.
        selected_pairs (list): A list of selected pairs of assets.

    Returns:
        Tuple[pd.DataFrame, dict]: A tuple containing two objects:
            - copulas_df: A DataFrame with fit statistics for each pair of assets.
            - fitted_copulas: A dictionary of fitted copula models for each pair of assets.
    """
    copulas_df = pd.DataFrame(index=selected_pairs, columns=["KS_pvalue"])
    fitted_copulas = {}

    for pair in selected_pairs:
        s1, s2 = parse_pair(pair)

        dist_s1 = fitted_marginals[s1]
        dist_s2 = fitted_marginals[s2]

        # apply probability integral transform
        u = dist_s1.cdf(log_returns[s1])
        v = dist_s2.cdf(log_returns[s2])

        best_aic = np.inf
        best_bic = np.inf
        best_copula = None

        for copula in [
            copulas.GaussianCopula(),
            copulas.N13Copula(),
            copulas.N14Copula(),
            copulas.ClaytonCopula(),
            copulas.GumbelCopula(),
            copulas.FrankCopula(),
            copulas.JoeCopula(),
        ]:
            try:
                copula.fit(u, v)
                L = copula.log_likelihood(u, v)
                aic = 2 * copula.num_params - 2 * L
                bic = copula.num_params * np.log(len(u)) - 2 * L
                if aic < best_aic or bic < best_bic:
                    best_aic = aic
                    best_bic = bic
                    best_copula = copula
                    # calculate KS-pvalue
                    smp = best_copula.sample(size=len(u))  # generate sample from fit copula
                    s_u = smp[:, 0]
                    s_v = smp[:, 1]
                    ks_pval = ndtest.ks2d2s(u, v, s_u, s_v)
                    if ks_pval > 0.2: # 20% threshold indicates significant as the authors recommended.
                        copulas_df.loc[pair, "KS_pvalue"] = ks_pval
                        fitted_copulas[pair] = {"s1": dist_s1, "s2": dist_s2, "copula": best_copula}
            except:
                continue

    return copulas_df, fitted_copulas

In [11]:
copulas_df, fitted_copulas = fit_copulas(log_returns, fitted_marginals, selected_pairs)
if copulas_df.empty:
    print("No Copulas satisfied the 2D-KS test.")
# sort the copulas_df by the lowest aic, bic and the highest KS_pvalue
# sorted_copulas_df = copulas_df.sort_values(
#     by=["KS_pvalue", "bic", "aic"], ascending=[False, True, True]
# )
# sort the copulas_df by the highest KS_pvalue
sorted_copulas_df = copulas_df.sort_values(
    by=["KS_pvalue"], ascending=[False]
)

top_copulas_df = sorted_copulas_df.iloc[:10]

top_copulas_df

No Copulas satisfied the 2D-KS test.


Unnamed: 0,KS_pvalue


Now that we are familiar with the trading algorithm's processes, let's neatly combine them into a single function


In [12]:

def formation(data: pd.DataFrame, use_num=False, num_pairs=10, top_pct: int = 0.9) -> dict[str, dict]:
    """
    Generate a formation based on the given data.

    Parameters:
    - data: A pandas DataFrame containing the input data.
    - top_pct: An integer representing the top percentile of tau values
    and the best fitted marginals and corresponding copulas.

    Returns:
    - result_dict: A dictionary of the selected pairs of assets
    and the best fitted marginals and corresponding copulas.
    """
    log_returns = preprocess_data(data)
    hoeffdingD_ser = calc_rank_correlation(log_returns)
    top_hoeffdingD_ser = hoeffdingD_ser[hoeffdingD_ser > hoeffdingD_ser.quantile(top_pct)]

    selected_assets = []
    selected_pairs = []
    for pair in top_hoeffdingD_ser.index:
        s1, s2 = parse_pair(pair)
        if (s1 not in selected_assets) and (s2 not in selected_assets):
            selected_assets.append(s1)
            selected_assets.append(s2)
            selected_pairs.append(pair)

    fitted_marginals = fit_marginals(log_returns, selected_assets)
    copulas_df, fitted_copulas = fit_copulas(log_returns, fitted_marginals, selected_pairs)
    # if all failed 2d-KS-test's criteria, return empty dict
    if copulas_df.empty:
        return {}
    # sort the copulas_df by the highest KS_pvalue
    sorted_copulas_df = copulas_df.sort_values(
        by=["KS_pvalue"], ascending=[False]
    )
    if use_num:
        top_copulas_df = sorted_copulas_df.iloc[:num_pairs]
    else: # get the top percentile
        p90 = int(len(sorted_copulas_df) * top_pct)
        top_copulas_df = sorted_copulas_df.iloc[:p90]
    # match the dictionaries in the dictionary fitted_copulas
    # with the top 90 percentile in top_copulas_df
    result_dict = {}
    for pair in top_copulas_df.index:
        if pair in fitted_copulas.keys():
            result_dict[pair] = fitted_copulas[pair]

    return result_dict


# Backtest
Trade decisions are made based on the conditional probabilities of each cryptocurrency pairs' log-returns. By defining Upper and Lower thresholds ($b_{up}$ & $b_{lo}$), we can determine the current return of one stock is too high, given the second stock's current return. This is demonstrated as the following:
- **Opening** rules:  
-- IF $P(U_1\le u_1 | U_2 = u_2) \le b_{lo}$ AND $P(U_2\le u_2 | U_1 = u_1) \ge b_{up}$,  
 then stock 1 is undervalued, and stock 2 is overvalued. Hence we long the spread. (1 in position)  
-- IF $P(U_2\le u_2 | U_1 = u_1) \le b_{lo}$ AND $P(U_1\le u_1 | U_2 = u_2) \ge b_{up}$,    
 then stock 2 is undervalued, and stock 1 is overvalued. Hence we short the spread. (-1 in position)
- **Exit** rule:  
If BOTH/EITHER conditional probabilities cross the boundary of 0.5, then we exit the position, as we consider the position no longer valid. (0 in position). 

### Methodology: 
Select the 
Lower Bound: $b_{lo}$ = **0.05**  
Upper Bound: $b_{up}$ = **0.95**  
Training period length: **3 years**.  
Trading period length: **3 months**.  
Leverage ratio: **1:20**.  
Transaction & commission fees: **0.25% per position**. 

In [16]:
train_window = 252*3 # 3 years
trade_window = 63 # 3 months
algo_returns = {}
top_copulas_dfs = []
for i in range(train_window, len(prices)-trade_window, trade_window):
    print("\n##################################################################################\n")
    print("Train period:", prices.index[i-train_window], "to", prices.index[i])
    print("Test period:", prices.index[i], "to", prices.index[i+trade_window+1])
    for sector in sectors_list:
        ticker_cols = prices.columns.get_level_values(1).unique()
        # find the indexing position of tickers_by_sectors[sector] in ticker_cols
        indx = np.where(ticker_cols.isin(tickers_by_sectors[sector]))[0]
        train_closes = prices.Close.iloc[i-train_window:i, indx]
        train_opens = prices.Open.iloc[i-train_window:i, indx]
        train = pd.concat([train_closes, train_opens], axis=1, keys=['Close', 'Open'])
        returns_form = preprocess_data(train)
        test_closes = prices.Close.iloc[i:i+trade_window+1, indx]
        test_opens = prices.Open.iloc[i:i+trade_window+1, indx]
        test = pd.concat([test_closes, test_opens], axis=1, keys=['Close', 'Open'])
        returns_trade = preprocess_data(test)
        # Get the fitted copula for the sector
        results = formation(train, use_num=True, num_pairs=1)

        selected_pair = list(results.keys())
        cl = 0.99 # confidence level
        fee = 0 # trading fee

        for pair in selected_pairs:
            s1,s2 = parse_pair(pair)
                    
            dist_s1 = results[pair]['s1']
            dist_s2 = results[pair]['s2']
            best_copula = results[pair]['copula']
            print(f"Pair: {pair}, Fitted copula: {best_copula.name}, in {sector.upper()}.\n")   
            # calculate conditional probabilities
            prob_s1 = []
            prob_s2 = []

            for u,v in zip(dist_s1.cdf(returns_trade[s1]), dist_s2.cdf(returns_trade[s2])):
                prob_s1.append(best_copula.cdf_u_given_v(u,v))
                prob_s2.append(best_copula.cdf_v_given_u(u,v))
                
            probs_trade = pd.DataFrame(np.vstack([prob_s1, prob_s2]).T, index=returns_trade.index, columns=[s1, s2])

            # calculate positions
            positions = pd.DataFrame(index=probs_trade.index, columns=probs_trade.columns)
            long = False
            short = False

            for t in positions.index:    
                # if long position is open
                if long:
                    if (probs_trade.loc[t][s1] >= 0.3) or (probs_trade.loc[t][s2] <= 0.7):
                        positions.loc[t] = [0-fee, 0-fee] # Add fee to close positions
                        long = False
                    else:
                        positions.loc[t] = [1, -1]

                # if short position is open
                elif short:
                    if (probs_trade.loc[t][s1] <= 0.7) or (probs_trade.loc[t][s2] >= 0.3):
                        positions.loc[t] = [0-fee, 0-fee]
                        short = False
                    else:
                        positions.loc[t] = [-1, 1]

                # if no positions are open
                else:
                    if (probs_trade.loc[t][s1] <= (1-cl)) and (probs_trade.loc[t][s2] >= cl):
                        # open long position
                        positions.loc[t] = [1-fee, -1-fee] # Add fee to open positions
                        long = True
                    elif (probs_trade.loc[t][s1] >= cl) and (probs_trade.loc[t][s2] <= (1-cl)):
                        # open short positions
                        positions.loc[t] = [-1-fee, 1-fee]
                        short = True
                    else:
                        positions.loc[t] = [0, 0]
                
            # calculate returns
            algo_ret = (returns_trade * positions.shift()).sum(axis=1)
            # append algo_ret to algo_returns
            if pair not in algo_returns.keys():
                algo_returns[pair] = []
            algo_returns[pair].append(algo_ret)


##################################################################################

Train period: 2010-01-04 00:00:00 to 2013-01-04 00:00:00
Test period: 2013-01-04 00:00:00 to 2013-04-09 00:00:00

##################################################################################

Train period: 2010-04-06 00:00:00 to 2013-04-08 00:00:00
Test period: 2013-04-08 00:00:00 to 2013-07-09 00:00:00

##################################################################################

Train period: 2010-07-06 00:00:00 to 2013-07-08 00:00:00
Test period: 2013-07-08 00:00:00 to 2013-10-07 00:00:00

##################################################################################

Train period: 2010-10-04 00:00:00 to 2013-10-04 00:00:00
Test period: 2013-10-04 00:00:00 to 2014-01-07 00:00:00

##################################################################################

Train period: 2011-01-03 00:00:00 to 2014-01-06 00:00:00
Test period: 2014-01-06 00:00:00 to 2014-04-08 00:00:00

#########