In [2]:
import pandas as pd
import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
from statsmodels.tsa.stattools import coint
import statsmodels.api as sm
import yfinance as yf
from datetime import date, datetime, timedelta
import ast

In [5]:

def get_adj_close_history_df(tickers: list[str], start_date: str, end_date: str) -> pd.DataFrame:
    price_df = yf.download(
        tickers,
        start=start_date,
        end=end_date,
        progress=False,
        auto_adjust=True
    )['Close']
    return price_df.dropna()

def calculate_historical_gamma(
        pair_price_history_df: pd.DataFrame, 
        in_sample_start_date: datetime, 
        in_sample_end_date: datetime
) -> float:
    """
    Estimate gamma via OLS and return the spread = P1 - gamma * P2
    """
    date_mask = (pair_price_history_df.index >= in_sample_start_date) & (pair_price_history_df.index <= in_sample_end_date)
    pair_price_history_in_sample_df = pair_price_history_df[date_mask] 
    ticker1_price_series = pair_price_history_in_sample_df.iloc[:,0]
    ticker2_price_series = pair_price_history_in_sample_df.iloc[:,1]



    # Add constant to allow intercept in regression
    X = sm.add_constant(ticker2_price_series)
    model = sm.OLS(ticker1_price_series, X).fit()

    return float(model.params[1])

def calculate_spread(
        pair_price_history_df: pd.DataFrame,
        gamma: float,
        start_date: datetime, 
        end_date: datetime
) -> pd.Series:
    date_mask = (pair_price_history_df.index >= start_date) & (pair_price_history_df.index <= end_date)
    pair_price_history_out_of_sample_df = pair_price_history_df[date_mask] 
    ticker1_price_series = pair_price_history_out_of_sample_df.iloc[:,0]
    ticker2_price_series = pair_price_history_out_of_sample_df.iloc[:,1]

    return ticker1_price_series - gamma * ticker2_price_series

def calculate_trailing_zscore(spread: pd.Series, z_score_window_in_days: int, start_date: datetime, end_date: datetime) -> pd.Series:
    rolling_mean = spread.rolling(window=z_score_window_in_days).mean()
    rolling_std = spread.rolling(window=z_score_window_in_days).std()
    zscore_series = (spread - rolling_mean) / rolling_std
    zscore_filtered_series = zscore_series.loc[start_date:end_date]
    return zscore_filtered_series

def plot_zscore_zeries(zscore_series: pd.Series, ticker_pair: tuple[str,str]):
    plt.figure(figsize=(14,6))
    plt.plot(zscore_series.index, zscore_series.values, label='Z-Score', color='blue')

    # Horizontal lines for 1 and 2 standard deviations
    for level in [1, 2]:
        plt.axhline(level, color='gray', linestyle='--', linewidth=1, label=f'+{level}σ' if level==1 else None)
        plt.axhline(-level, color='gray', linestyle='--', linewidth=1, label=f'-{level}σ' if level==1 else None)

    # Only show one label per line for legend
    handles, labels = plt.gca().get_legend_handles_labels()
    by_label = dict(zip(labels, handles))
    plt.legend(by_label.values(), by_label.keys())

    plt.title(f'Trailing Z-Score of Spread (Out-of-Sample) {ticker_pair[0]}/{ticker_pair[1]}')
    plt.xlabel('Date')
    plt.ylabel('Z-Score')
    plt.grid(True)
    plt.show()

In [53]:
in_sample_start_date = datetime(2022,1,1)
in_sample_end_date = datetime(2023,12,31)

z_score_window_in_days = 25

out_of_sample_start_date = in_sample_end_date + timedelta(days=1)
out_of_sample_end_date = datetime(2025,1,1)

df = pd.read_csv('../data/valid_sp500_pairs.csv', header=None, names=['Pairs'])
df['Pairs'] = df['Pairs'].apply(ast.literal_eval)


results = []
for ticker_pair in df['Pairs'].to_list():
    # Get prices once for entire evaluation period
    pair_price_history_df = get_adj_close_history_df(ticker_pair, in_sample_start_date, out_of_sample_end_date)
    # calculate gamma over period where cointegration has been validated
    gamma = calculate_historical_gamma(pair_price_history_df, in_sample_start_date, in_sample_end_date)

    # Lagging the out of sample start date is to ensure that there are z scores for the first day of the out of sample period
    #TODO there is a bug here bc pair price history is in trading days but the timedelta calculates in calendar days.
    # doubling the window fixed it for now but it is not precise. 
    spread_series = calculate_spread(
        pair_price_history_df,
        gamma,
        out_of_sample_start_date - timedelta(days=z_score_window_in_days*2),
        out_of_sample_end_date
    )
    out_of_sample_z_score_series = calculate_trailing_zscore(
        spread_series,
        z_score_window_in_days,
        out_of_sample_start_date,
        out_of_sample_end_date
    )
    # plot_zscore_zeries(out_of_sample_z_score_series, ticker_pair)
    results.append([out_of_sample_z_score_series, ticker_pair])

  return float(model.params[1])
  return float(model.params[1])
  return float(model.params[1])
  return float(model.params[1])
  return float(model.params[1])
  return float(model.params[1])
  return float(model.params[1])
  return float(model.params[1])
  return float(model.params[1])
  return float(model.params[1])
  return float(model.params[1])
  return float(model.params[1])
  return float(model.params[1])
  return float(model.params[1])
  return float(model.params[1])
  return float(model.params[1])
  return float(model.params[1])
  return float(model.params[1])
  return float(model.params[1])
  return float(model.params[1])
  return float(model.params[1])
  return float(model.params[1])
  return float(model.params[1])
  return float(model.params[1])
  return float(model.params[1])
  return float(model.params[1])
  return float(model.params[1])
  return float(model.params[1])
  return float(model.params[1])
  return float(model.params[1])
  return float(model.params[1])
  return

One initial observation here is that applying the cointegration test over 5 years results in a very low frequency spread meaning there are generally fewer trading opportunities per year. This is potentially a good thing in the sense that it can be exploited to navigate the 10 day calendar hold restriction. Another observation is that the number of relationships to pair trade is combinatorally massive. The number of possible pairs is given by n(n-1)/2 and each of those pairs can be tested for cointegration at different time intervals. Obviously most won't be cointegrated but it provides optimism that perhaps there are pockets of the market where this strategy is not completely arb'd away.

In [14]:
def is_exit_condition_met(
        spread_z_score_series: pd.Series,
        t_minus_one_date: datetime,
        t_minus_two_date: datetime,
        exit_threshold: float
) -> bool:
    z_score_at_t_minus_one_date = spread_z_score_series.loc[t_minus_one_date]
    z_score_at_t_minus_two_date = spread_z_score_series.loc[t_minus_two_date]
    # upwards mean reversion across exit threshold
    if z_score_at_t_minus_one_date > exit_threshold and z_score_at_t_minus_two_date < exit_threshold:
        return True
    # downwards mean reversion across exit threshold
    if z_score_at_t_minus_one_date < exit_threshold and z_score_at_t_minus_two_date > exit_threshold:
        return True
    if np.isclose(z_score_at_t_minus_one_date, exit_threshold, atol=0.1):
        return True
    return False


def get_date_of_previous_day(
        spread_z_score_series: pd.Series,
        index: int,
) -> datetime:
    return spread_z_score_series.index[index - 1]


def set_weights_to_zero_at_current_date(
        weight_at_date_df: pd.DataFrame,
        date: datetime,
        ticker_one_weight_column_name: str,
        ticker_two_weight_column_name: str,
) -> pd.DataFrame:
    weight_at_date_df.loc[date, ticker_one_weight_column_name] = 0
    weight_at_date_df.loc[date, ticker_two_weight_column_name] = 0
    return weight_at_date_df


def enter_position_at_current_date(
        weight_at_date_df: pd.DataFrame,
        date: datetime,
        ticker_one_weight_column_name: str,
        ticker_two_weight_column_name: str,
        *,
        is_long_ticker_one: bool
)  -> pd.DataFrame:
    long_position_flag = 1
    short_position_flag = -1
    if is_long_ticker_one:
        weight_at_date_df.loc[date, ticker_one_weight_column_name] = long_position_flag
        weight_at_date_df.loc[date, ticker_two_weight_column_name] = short_position_flag
    else:
        weight_at_date_df.loc[date, ticker_one_weight_column_name] = short_position_flag
        weight_at_date_df.loc[date, ticker_two_weight_column_name] = long_position_flag
    return weight_at_date_df


def assign_weights_from_prev_date_to_cur_date(
        weight_at_date_df: pd.DataFrame,
        cur_date: datetime,
        t_minus_one_date: datetime,
) -> pd.DataFrame:
    weight_at_date_df.loc[cur_date] = weight_at_date_df.loc[t_minus_one_date]
    return weight_at_date_df


def create_trade_signals_for_spread_z_score_series(
        ticker_pair: tuple[str,str],
        spread_z_score_series: pd.Series,
) -> pd.DataFrame:
    ticker_one = ticker_pair[0]
    ticker_two = ticker_pair[1]
    weight_column_name_suffix = '_trade_side'
    ticker_one_weight_column_name = ticker_one + weight_column_name_suffix
    ticker_two_weight_column_name = ticker_two + weight_column_name_suffix

    weight_at_date_df = pd.DataFrame(
        index=spread_z_score_series.index,
        columns=[ticker_one_weight_column_name, ticker_two_weight_column_name]
    )
    entrance_threshold = 2
    exit_threshold = 0

    is_invested = False
    for i, (cur_date, z_score) in enumerate(spread_z_score_series.items()):
        # TODO there is a better way than to ignore the first two days. 
        # probably just extend the spread_z_score_series
        if i < 2:
            continue
        t_minus_one_date = spread_z_score_series.index[i-1]
        t_minus_two_date = spread_z_score_series.index[i-2]
        z_score_at_t_minus_one_date = spread_z_score_series.loc[t_minus_one_date]
    
        if z_score_at_t_minus_one_date >= entrance_threshold and not is_invested:
            weight_at_date_df = enter_position_at_current_date(
                weight_at_date_df,
                cur_date,
                ticker_one_weight_column_name,
                ticker_two_weight_column_name,
                is_long_ticker_one=True
            )
            is_invested = True

        elif is_invested and is_exit_condition_met(spread_z_score_series, t_minus_one_date, t_minus_two_date, exit_threshold):
            weight_at_date_df = set_weights_to_zero_at_current_date(
                                    weight_at_date_df,
                                    cur_date,
                                    ticker_one_weight_column_name,
                                    ticker_two_weight_column_name
                                )
            is_invested = False
        
        elif z_score_at_t_minus_one_date <= -entrance_threshold and not is_invested:
            weight_at_date_df = enter_position_at_current_date(
                weight_at_date_df,
                cur_date,
                ticker_one_weight_column_name,
                ticker_two_weight_column_name,
                is_long_ticker_one=False
            )
            is_invested = True
        else:
            weight_at_date_df = assign_weights_from_prev_date_to_cur_date(weight_at_date_df, cur_date, t_minus_one_date)
    
    return weight_at_date_df.fillna(0)

In [49]:
def calculate_hurst(series: pd.Series) -> float:
    """
    Calculates the Hurst exponent of a time series using the Rescaled Range (R/S) method.

    The Hurst exponent is a measure of the long-term memory of a time series.
    - H ≈ 0.5: Indicates a random walk (e.g., a stock price).
    - H > 0.5: Indicates a trending series (positive autocorrelation).
    - H < 0.5: Indicates a mean-reverting series (negative autocorrelation).

    This implementation divides the series into multiple non-overlapping sub-periods to
    produce a more robust estimate of the Hurst exponent.
    """

    # The minimum number of data points for a meaningful Hurst calculation is often
    # recommended to be at least 100 or higher.
    min_length = 100
    if len(series) < min_length:
        print(f"Input series length ({len(series)}) is too short. At least {min_length} data points are recommended.")
        return np.nan

    data = np.array(series)
    
    # Create logarithmically spaced window sizes (lags) for analysis.
    # This provides a better representation across different time scales
    max_lag = len(data) // 4
    if max_lag < 10:
        print("Series is too short for meaningful Hurst calculation with logarithmic lags.")
        return np.nan
        
    lags = np.unique(np.logspace(np.log10(10), np.log10(max_lag), num=15).astype(int))
    
    rescaled_range_list = []
    log_lags = []

    for lag in lags:
        # Divide the series into non-overlapping sub-periods of a given lag
        # The last segment is included if it is at least 80% of the target lag size
        segments = []
        for i in range(0, len(data), lag):
            segment = data[i:i + lag]
            if len(segment) >= lag * 0.8:
                segments.append(segment)
            
        if not segments:
            continue
        
        # Calculate R/S for each segment and average the results
        rs_per_segment = []
        for segment in segments:
            # Step 1: Calculate the mean for the segment
            mean = np.mean(segment)
            
            # Step 2: Calculate the cumulative deviation from the mean
            deviations = segment - mean
            cumulative_deviation = np.cumsum(deviations)
            
            # Step 3: Calculate the range (max - min of cumulative deviation)
            rng = np.max(cumulative_deviation) - np.min(cumulative_deviation)
            
            # Step 4: Calculate the standard deviation (using sample std, ddof=1)
            std_dev = np.std(segment, ddof=1)
            
            # Avoid division by zero
            if std_dev == 0:
                continue
            
            # Step 5: Calculate the rescaled range (R/S)
            rs_per_segment.append(rng / std_dev)
        
        if rs_per_segment:
            rescaled_range_list.append(np.mean(rs_per_segment))
            log_lags.append(np.log(lag))

    # Perform a linear regression on log(R/S) vs log(lag)
    if not log_lags or not rescaled_range_list or len(log_lags) < 2:
        print("Could not calculate Hurst exponent due to insufficient data for log-log plot.")
        return np.nan
    
    log_rs = np.log(rescaled_range_list)
    
    # Use numpy's polyfit to get the slope of the regression line
    # The slope is the Hurst exponent. Note: np.RankWarning is a warning, not an exception.
    hurst, _ = np.polyfit(log_lags, log_rs, 1)
    
    return hurst

In [None]:
hurst_scores = []
for result in results:
    hurst = calculate_hurst(result[0])
    break

Date
2024-01-02   -1.571638
2024-01-03   -1.915560
2024-01-04   -1.540165
2024-01-05   -1.555693
2024-01-08   -2.024071
                ...   
2024-12-24   -2.028349
2024-12-26   -1.939386
2024-12-27   -1.406473
2024-12-30   -1.792231
2024-12-31   -0.952278
Length: 252, dtype: float64


In [38]:
results[0][0]

Date
2024-01-02   -1.571638
2024-01-03   -1.915560
2024-01-04   -1.540165
2024-01-05   -1.555693
2024-01-08   -2.024071
                ...   
2024-12-24   -2.028349
2024-12-26   -1.939386
2024-12-27   -1.406473
2024-12-30   -1.792231
2024-12-31   -0.952278
Length: 252, dtype: float64

In [40]:
nvda_dkng_signals = create_trade_signals_for_spread_z_score_series(results[0][1], results[0][0])
nvda_dkng_signals

  return weight_at_date_df.fillna(0)


Unnamed: 0_level_0,NVDA_trade_side,DKNG_trade_side
Date,Unnamed: 1_level_1,Unnamed: 2_level_1
2024-01-02,0,0
2024-01-03,0,0
2024-01-04,0,0
2024-01-05,0,0
2024-01-08,0,0
...,...,...
2024-12-24,0,0
2024-12-26,-1,1
2024-12-27,-1,1
2024-12-30,-1,1


In [44]:
nvda_iot_signals = create_trade_signals_for_spread_z_score_series(results[1][1], results[1][0])
nvda_iot_signals

  return weight_at_date_df.fillna(0)


Unnamed: 0_level_0,NVDA_trade_side,IOT_trade_side
Date,Unnamed: 1_level_1,Unnamed: 2_level_1
2024-01-02,0,0
2024-01-03,0,0
2024-01-04,0,0
2024-01-05,0,0
2024-01-08,0,0
...,...,...
2024-12-24,0,0
2024-12-26,0,0
2024-12-27,0,0
2024-12-30,0,0


In [48]:
nvda_iot_signals.merge(nvda_dkng_signals,left_index=True,right_index=True)

Unnamed: 0_level_0,NVDA_trade_side_x,IOT_trade_side_x,NVDA_trade_side_y,IOT_trade_side_y
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2024-01-02,0,0,0,0
2024-01-03,0,0,0,0
2024-01-04,0,0,0,0
2024-01-05,0,0,0,0
2024-01-08,0,0,0,0
...,...,...,...,...
2024-12-24,0,0,0,0
2024-12-26,0,0,0,0
2024-12-27,0,0,0,0
2024-12-30,0,0,0,0
