- [Johansen Cointegration Test: Learn How to Implement it in Python](https://blog.quantinsti.com/johansen-test-cointegration-building-stationary-portfolio/)
- [Unveiling Cointegration: Johansen Test Explained with Python Examples](https://medium.com/@cemalozturk/unveiling-cointegration-johansen-test-explained-with-python-examples-db8385219f1f)
- [Cointegration and Pairs Trading](https://letianzj.github.io/cointegration-pairs-trading.html)

In [1]:
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import statsmodels.api as sm

from itertools import combinations, permutations
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.vector_ar.vecm import coint_johansen
from typing import Literal

In [2]:
# Import price data
df = pd.read_csv('prices.txt', engine='python', sep='   ', header=None, names=[f"stock{i}" for i in range(50)])
stocks = df.columns

df_label = 'Price'
if df_label == 'Log Returns':
    # convert data to log returns
    df = np.log1p(df.pct_change())
    df.dropna(axis=0, inplace=True)

df

Unnamed: 0,stock0,stock1,stock2,stock3,stock4,stock5,stock6,stock7,stock8,stock9,...,stock40,stock41,stock42,stock43,stock44,stock45,stock46,stock47,stock48,stock49
0,13.46,71.65,48.46,50.52,52.10,13.00,18.98,47.71,69.49,49.96,...,32.64,55.76,14.46,58.94,36.71,52.62,49.33,36.22,49.00,56.09
1,13.48,72.10,48.52,50.50,52.06,12.95,18.95,47.84,69.73,49.93,...,32.52,55.97,14.44,59.81,36.64,52.58,49.20,36.27,48.84,56.08
2,13.47,72.35,48.48,50.62,51.80,12.79,18.98,47.98,69.60,49.33,...,32.48,56.34,14.50,59.04,36.89,52.49,49.48,36.39,48.56,55.90
3,13.53,72.51,48.42,50.75,51.66,12.66,18.96,48.74,69.54,49.67,...,32.59,56.32,14.40,58.73,36.94,52.40,49.42,36.41,49.00,56.14
4,13.64,71.99,48.40,50.65,51.97,12.62,18.89,48.88,69.68,49.46,...,32.64,56.32,14.36,59.01,37.03,52.44,49.79,36.42,48.14,55.90
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
745,10.32,63.28,45.54,43.35,52.64,7.91,17.30,55.30,67.83,58.73,...,28.81,79.78,10.85,61.76,30.12,48.51,82.09,34.95,35.80,52.57
746,10.32,63.34,45.56,43.28,52.73,7.87,17.26,54.92,67.95,58.62,...,28.78,80.46,10.77,61.49,29.85,48.40,81.43,34.99,35.58,53.10
747,10.32,63.23,45.55,43.25,52.66,7.83,17.32,54.67,67.94,59.15,...,28.73,81.15,10.75,60.36,29.77,48.41,81.90,35.00,35.45,53.21
748,10.20,63.12,45.56,43.19,52.51,7.71,17.33,55.18,67.99,59.64,...,28.73,82.22,10.81,59.24,29.61,48.42,81.33,35.19,35.57,53.01


In [3]:
# Make plot directories
for dirname in ['acf-pacf/', 'cadf/', 'johansen/']:
    if not os.path.isdir(dirname):
        os.makedirs(dirname)

#### Autocorrelation + Partial Autocorrelation

In [7]:
def acf_pacf_plot(data, lags=70):
    fig, ax = plt.subplots(nrows=1, ncols=2, figsize=(20,5))
    sm.graphics.tsa.plot_acf(data.values.squeeze(), ax=ax[0], lags=100)
    sm.graphics.tsa.plot_pacf(data.values.squeeze(), ax=ax[1], lags=5)
    plt.suptitle(data.name)
    fig.savefig(f"acf-pacf/{data.name}.png")
    plt.close(fig)

# Plot ACF/PACF of all stocks
for stock in stocks:
    acf_pacf_plot(df[stock])

#### Cointegration Augmented Dicky-Fuller Test

In [43]:
def cadf_test(Y, X, maxlag=1, significance=0.1):
    # Hedge ratio
    model = sm.OLS(Y, X)
    model = model.fit()

    hedge = model.resid
    beta = model.params[X.name]

    # Compute ADF test statistics
    adf = adfuller(hedge, maxlag)
    t_stat = adf[0]
    p_val = adf[1]
    crit_90 = adf[4]['10%']
    crit_95 = adf[4]['5%']
    crit_99 = adf[4]['1%']

    # Check co-integration
    if t_stat <= crit_90 and p_val <= significance:
        confidence = 90
    elif t_stat <= crit_95 and p_val <= significance:
        confidence = 95
    elif t_stat <= crit_99 and p_val <= significance:
        confidence = 99
    else:
        confidence = None

    return hedge, confidence, t_stat, beta

def plot_cadf_test(hedge, Y, X, confidence, beta):
    y_str, x_str = Y.name, X.name

    # Save spread plot for reference
    title = f"Cointegration between y={y_str} x={x_str}:\n{confidence}% confidence"
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(20,5))

    # plot spread
    ax1.plot(hedge)
    ax1.set_ylabel('Hedge Ratio')
    ax1.axhline(hedge.mean(), color='black')

    # plot historical prices
    ax2.plot(Y, label=f"y={y_str}", color='blue')
    ax2.plot(X, label=f"x={x_str}", color='red', linestyle='--')
    ax2.plot(X * beta, label=f"Scaled {x_str}", color='pink')

    ax2.set_ylabel(df_label)
    ax2.yaxis.set_label_position('right')
    ax2.yaxis.tick_right()

    ax2.legend(loc='lower left')

    plt.suptitle(title)
    plt.savefig(f"cadf/{y_str}-{x_str}")
    plt.close(fig)

In [44]:
# Conduct ADF test for all stocks
confidence_threshold = 90
sig_threshold = 0.01

# for s1, s2 in [('stock0', 'stock30')]:        # single pair
for s1, s2 in combinations(stocks, 2):
    hedge1, confidence1, t_stat1, beta1 = cadf_test(df[s1], df[s2], significance=sig_threshold)
    hedge2, confidence2, t_stat2, beta2 = cadf_test(df[s2], df[s1], significance=sig_threshold)

    # Choose the t-statistic with most negative / statistically significant test-statistic
    if not confidence1 or not confidence2:
        continue
    elif t_stat1 < t_stat2 and confidence1 >= confidence_threshold:
        plot_cadf_test(hedge1, df[s1], df[s2], confidence1, beta1)
    elif t_stat2 < t_stat1 and confidence2 >= confidence_threshold:
        plot_cadf_test(hedge2, df[s2], df[s1], confidence2, beta2)

#### Johansen Cointegration Test

In [None]:
def johansen_test(data, det_order=0, k_ar_diff=1, debug=False):
    """Performs the Johansen cointegration test and prints the results.

    Args:
        data (np.ndarray): Time series data for cointegration test
        det_order (int): The order of the deterministic terms. Defaults to 0.
                        -1: (most restrictive) No constant or trend
                        0: Constant term only
                        -1: (least restrictive) Constant and trend terms
        k_ar_diff (int): The number of lags to include in the VAR model. Defaults to 1.
    """

    # Compute Johansen test statistics
    jh_results = coint_johansen(data, det_order, k_ar_diff)

    trace_stat = jh_results.lr1
    crit_val_table = jh_results.cvt
    eigenvectors = jh_results.evec

    if debug:
        print(trace_stat)   # dim=(n,) | Trace statistic
        print(crit_val_table)   # dim=(n,3) | Critical value table (90%, 95%, 99%)
        print(eigenvectors)  # dim=(n,n) | column-wise eigenvectors

    # Check co-integration
    confidence, rank = None, None
    confidence_levels = [90, 95, 99]

    for i, trace in enumerate(trace_stat):
        for j, conf_level in enumerate(crit_val_table[i]):
            if trace > conf_level:
                confidence = confidence_levels[j]
                rank = i

    return confidence, rank

def plot_johansen_test(data, confidence, rank):
    # Save spread plot for reference
    title = f"Cointegration between {data.columns}:\n r={rank} up to {confidence}% confidence"

    fig = plt.figure()

    # plot historical prices
    plt.plot(data, label=data.columns)
    plt.ylabel(df_label)
    plt.legend()
    plt.suptitle(title)

    plt.savefig(f"johansen/{data.columns[0]}-{data.columns[1]}")
    plt.close(fig)

In [None]:
# Conduct Johansen test for all stocks
confidence_threshold = 99
rank_threshold = 0

# for s1, s2 in [('stock18', 'stock43')]:        # single pair
for s1, s2 in combinations(stocks, 2):
    confidence, rank = johansen_test(df[[s1, s2]], debug=True)

    if confidence and rank and confidence >= confidence_threshold and rank >= rank_threshold:
        plot_johansen_test(df[[s1, s2]], confidence, rank)