In [24]:
### Library Imports
import yfinance as yf
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import statsmodels.api as sm
import numpy as np
import itertools
import warnings
warnings.filterwarnings("ignore")
from statsmodels.tsa.vector_ar.vecm import coint_johansen
from statsmodels.tsa.stattools import adfuller


In [25]:
### Function to Import Stock Data
def import_stock_data(tickers, start_date, end_date):
    data = pd.DataFrame()
    if len([tickers]) == 1:
        data[tickers] = yf.download(tickers, start_date, end_date)['Adj Close']
        data = pd.DataFrame(data)
    else:
        for t in tickers:
            data[t] = yf.download(tickers, start_date, end_date)['Adj Close']
    
    # Reset index to include the Date as a column
    data = data.reset_index()

    return data

# Import Stock Data
tickers = ['XOM', 'CVX']
start_date = '2020-01-01'
end_date = '2025-01-01'
stock_data = import_stock_data(tickers, start_date, end_date)
print(stock_data.tail())

[*********************100%%**********************]  2 of 2 completed

           Date         XOM         CVX
1253 2024-12-24  143.839996  106.400002
1254 2024-12-26  143.979996  106.489998
1255 2024-12-27  144.000000  106.480003
1256 2024-12-30  143.070007  105.760002
1257 2024-12-31  144.839996  107.570000





In [26]:
### Function to Compute Log Prices
def compute_log_prices(data):
    # Ensure 'Date' is present now
    if 'Date' not in data.columns:
        raise KeyError("The 'Date' column is missing from the DataFrame!")

    # Set Date as index and apply log transformation
    data.set_index("Date", inplace = True)
    data = np.log(data)  # Apply log transformation

    # Reset index to restore 'Date' as a column
    data.reset_index(inplace=True)

    return data

log_prices = compute_log_prices(stock_data)
print(log_prices.head())

# Check for null vals
print(log_prices.isna().sum())  # Should return 0 for all columns
print(np.isinf(log_prices).sum())  # Should return 0 for all columns
print(log_prices.dtypes)  # Ensure all columns are float64


        Date       XOM       CVX
0 2020-01-02  4.577030  4.018744
1 2020-01-03  4.573566  4.010672
2 2020-01-06  4.570172  4.018320
3 2020-01-07  4.557320  4.010103
4 2020-01-08  4.545832  3.994908
Date    0
XOM     0
CVX     0
dtype: int64
Date    0
XOM     0
CVX     0
dtype: int64
Date    datetime64[ns]
XOM            float64
CVX            float64
dtype: object


In [28]:
### Check Cointegration
''' 
The matrix Π determines whether the time series are cointegrated:
    1. If Π has full rank (r = n), all series are stationary, no need for cointegration testing.
    2. If Π has rank 0, no cointegration exists, meaning the series move independently.
    3. If Π has reduced rank (0 < r < n), then there are r cointegrating relationships.
'''
def check_coint(data, r):
    # n = number of columns (number of time series)
    n = len(data.columns)
    
    # Case 1: No Cointegration
    if r == 0:
        print(f"Since the matrix Π has rank 0 (r = {r}), the time series are likely non-stationary and no cointegration exists.")
    # Case 2: Some Cointegration Exists
    elif 0 < r < n:
        print(f"Cointegration exists with {r} cointegrating relationships, meaning some assets share a long-term equilibrium.")
    # Case 3: Full Rank - All Series are Stationary
    elif r == n:
        print(f"Since the matrix Π has full rank (r = {n}), all time series are stationary, so cointegration testing is unnecessary.")
    # Error Handling
    else:
        print("Test did not run successfully. Please check your input values.")

# Example Usage
#check_coint(stock_data, 2)


In [33]:
### Test Cointegration - Johansen Test
''' 
The Johansen test is a statistical test used to determine the number of cointegrating relationships among multiple time series. 
The test uses Trace and Max Eigenvalue tests to check if a group of non-stationary time series share a stable, long-term equilibrium. 
If cointegration exists, the assets move together over time, making them suitable for pairs trading or statistical arbitrage.
'''
# https://medium.com/@cemalozturk/unveiling-cointegration-johansen-test-explained-with-python-examples-db8385219f1f
def johansen_test(data, det_order = 0, k_ar_diff = 1):
    """
    det_order (int): The order of deterministic terms.
                     -1: No constant or trend.
                      0: Constant term only.
                      1: Constant and trend terms.
    k_ar_diff (int): The number of lags to include in the VAR model.
    """
    try:
        # Ensure Date column is removed if it exists
        if 'Date' in data.columns:
            data = data.drop(columns = ['Date'])

        # Convert to NumPy array
        data_np = data.values

        # Run Johansen cointegration test
        result = coint_johansen(data_np, det_order, k_ar_diff)
        print(f'Johansen Test Results (det_order = {det_order})\n')

        # Print test statistics
        trace_stats = result.lr1
        max_eigenval_stats = result.lr2
        print('Trace Statistics:', trace_stats)
        print('Max Eigenvalue Statistics:', max_eigenval_stats)
        
        # Perform Trace Test and Max Eigenvalue Test at 1%, 5%, and 10% intervals
        print('\nCritical Values (Trace Test):')
        print(f"1%: {result.cvt[:, 0]}, 5%: {result.cvt[:, 1]}, 10%: {result.cvt[:, 2]}")

        print('\nCritical Values (Max Eigenvalue Test):')
        print(f"1%: {result.cvm[:, 0]}, 5%: {result.cvm[:, 1]}, 10%: {result.cvm[:, 2]}\n")

        # Determine the number of cointegrating relationships
        rank_est = sum(result.lr1 > result.cvt[:, 1])  # Compare trace test stats to 5% critical values
        print(f'Estimated number of cointegrating relationships: {rank_est}')

        # Return Cointegration
        check_coint(data, rank_est)

        return trace_stats, max_eigenval_stats, rank_est, result.cvt[:, 1], result.cvm[:, 1]
    
    except Exception as e:
        print(f'An error occurred during the Johansen test: {e}')
        return None

# Johansen Test Function Return 
log_prices_for_test = log_prices.drop(columns=['Date'], errors='ignore')
trace_stats, max_eigenval_stats, rank_est, crit_vals_trace, crit_vals_max_ev = johansen_test(log_prices_for_test, det_order = 0, k_ar_diff = 1)


Johansen Test Results (det_order = 0)

Trace Statistics: [10.22038781  0.53858908]
Max Eigenvalue Statistics: [9.68179873 0.53858908]

Critical Values (Trace Test):
1%: [13.4294  2.7055], 5%: [15.4943  3.8415], 10%: [19.9349  6.6349]

Critical Values (Max Eigenvalue Test):
1%: [12.2971  2.7055], 5%: [14.2639  3.8415], 10%: [18.52    6.6349]

Estimated number of cointegrating relationships: 0
Since the matrix Π has rank 0 (r = 0), the time series are likely non-stationary and no cointegration exists.


In [30]:
### Function to Compute P-Values and the Cointegration Score
def pvalues_coint_scores(trace_stats, crit_vals_trace):
    # Create empty array to store p-values and cointegration scores
    p_values = []
    coint_scores = []

    # Iterate over trace stats by 