In [1]:
import pandas as pd
import os
import matplotlib.pyplot as plt
import numpy as np
from riskfolio import Portfolio
from pandas.tseries.holiday import USFederalHolidayCalendar
import pandas_market_calendars as mcal
from datetime import datetime, timedelta
from pathlib import Path
import seaborn as sns
from scipy.stats import skew
import scipy.optimize as sco
from scipy.optimize import minimize
from scipy.stats import norm
import matplotlib.lines as mlinesv
import statsmodels.api as sm
from statsmodels.tsa.stattools import adfuller

LOADING DATA

In [2]:
def load_price_data(
    start_date='2020-01-01',
    end_date=datetime.today(),
    assets=["Stock", "Bond", "Commodity", "forex"],  # or use "All" for everything
    path_stock="data\\master_stock_data.csv",
    path_bond="data\\master_bond_etf_data.csv",
    path_commodity="data\\master_commodity_etf_data.csv",
    path_forex="data\\master_forex_data.csv"
):
    if end_date is None:
        end_date = datetime.now()

    # Convert to Timestamps
    start_date = pd.to_datetime(start_date)
    end_date = pd.to_datetime(end_date)

    def load_and_filter(filepath, asset_type):
        df = pd.read_csv(filepath, parse_dates=['Date'])
        df = df[(df['Date'] > start_date) & (df['Date'] <= end_date)]
        df['AssetType'] = asset_type
        return df

    # Normalize asset input
    if isinstance(assets, str):
        assets = [assets]
    if "All" in assets:
        assets = ["Stock", "Bond", "Commodity"]

    combined = []

    if "Stock" in assets:
        combined.append(load_and_filter(path_stock, "Stock"))

    if "Bond" in assets:
        try:
            combined.append(load_and_filter(path_bond, "Bond"))
        except FileNotFoundError:
            print(f"Warning: {path_bond} not found. Skipping Bond data.")

    if "Commodity" in assets:
        try:
            combined.append(load_and_filter(path_commodity, "Commodity"))
        except FileNotFoundError:
            print(f"Warning: {path_commodity} not found. Skipping Commodity data.")
            
    if "forex" in assets:
        try:
            combined.append(load_and_filter(path_forex, "Commodity"))
        except FileNotFoundError:
            print(f"Warning: {path_forex} not found. Skipping forex data.")

    if not combined:
        raise ValueError("No valid asset types selected or no files found.")

    final_df = pd.concat(combined, ignore_index=True)
    final_df.sort_values(by="Date", inplace=True)

    return final_df


ASSET SELECTION

In [26]:
def filter_by_correlation(price_df, corr_threshold=0.3, pairs=False):
    df = price_df.copy()
    if 'Date' not in df.columns or 'Symbol' not in df.columns or 'Close' not in df.columns:
        raise ValueError("Input DataFrame must contain 'Date', 'Symbol', and 'Close' columns.")
    
    df['Date'] = pd.to_datetime(df['Date'])
    df = df.sort_values('Date')

    returns = df.pivot(index='Date', columns='Symbol', values='Close').pct_change().dropna()
    corr_matrix = returns.corr()

    if pairs:
        selected_pairs = []
        for asset1 in corr_matrix.columns:
            for asset2 in corr_matrix.columns:
                if asset1 != asset2 and corr_matrix.loc[asset1, asset2] <= corr_threshold:
                # Avoid duplicates (pairs like (AAPL, MSFT) and (MSFT, AAPL))
                    if (asset2, asset1) not in selected_pairs:
                        selected_pairs.append((asset1, asset2))
                        
        selected = [asset for pair in selected_pairs for asset in pair]
        selected = list(set(selected))  # Remove duplicates
        selected_corr_matrix = corr_matrix.loc[selected, selected]
        
        return selected_pairs, selected_corr_matrix
                    
                    
    else:
        selected = []               
        for asset in corr_matrix.columns:
            
            if all(abs(corr_matrix.loc[asset, other]) < corr_threshold for other in selected):
                selected.append(asset)
                
        selected_corr_matrix = corr_matrix.loc[selected, selected]

        return selected, selected_corr_matrix
    
def cointegration_test(asset1, asset2):
    """
    Perform the Engle-Granger two-step cointegration test.
    
    1. Regress asset1 on asset2.
    2. Test the residuals for stationarity using the ADF test.
    
    Parameters:
    - asset1: Series representing the first asset (price or returns).
    - asset2: Series representing the second asset (price or returns).
    
    Returns:
    - p_value: p-value from the ADF test on the residuals (indicating cointegration).
    - is_cointegrated: Boolean indicating whether the pair is cointegrated.
    """
    
    # Step 1: Run a linear regression of asset1 on asset2
    asset1 = sm.add_constant(asset1)  # Add a constant for the intercept term
    model = sm.OLS(asset2, asset1)  # OLS regression model
    result = model.fit()  # Fit the model
    
    # Step 2: Get the residuals (errors from the regression)
    residuals = result.resid
    
    beta = result.params.iloc[1] # Get the slope coefficient (beta)
    
    # Step 3: Perform the ADF test on the residuals
    adf_stat, p_value, _, _, critical_values, _ = adfuller(residuals)
    
    # Step 4: Cointegration condition - if p-value < 0.05, the pair is cointegrated
    is_cointegrated = p_value < 0.05
    
    return p_value, is_cointegrated, beta

def test_cointegration_on_pairs(price_df, selected_pairs):
    """
    Test cointegration for a list of asset pairs and return only the cointegrated pairs.
    
    Parameters:
    - price_df: DataFrame with asset prices ('Date', 'Symbol', 'Close').
    - selected_pairs: List of tuples containing asset pairs.
    
    Returns:
    - cointegrated_pairs: List of cointegrated pairs.
    """
    cointegrated_pairs = []
    betas = {}

    for pair in selected_pairs:
        asset1_symbol, asset2_symbol = pair

        # Get the price data for the two assets
        asset1 = price_df[price_df['Symbol'] == asset1_symbol][['Date', 'Close']]
        asset2 = price_df[price_df['Symbol'] == asset2_symbol][['Date', 'Close']]

        # Align the assets on the same dates (drop missing values)
        combined_data = pd.merge(asset1, asset2, on='Date', suffixes=('_1', '_2')).dropna()
        
        # Get the aligned price data
        asset1_aligned = combined_data['Close_1']
        asset2_aligned = combined_data['Close_2']

        # Perform the cointegration test
        p_value, is_cointegrated, beta = cointegration_test(asset1_aligned, asset2_aligned)

        # Only store cointegrated pairs
        if is_cointegrated:
            cointegrated_pairs.append(pair)
            betas[pair] = beta

    return cointegrated_pairs, betas

In [27]:
df = load_price_data(end_date='2025-01-01')

selected, selected_corr_matrix = filter_by_correlation(df, corr_threshold=-0.3, pairs=True)
cointegrated_pairs, betas = test_cointegration_on_pairs(df, selected)

  returns = df.pivot(index='Date', columns='Symbol', values='Close').pct_change().dropna()


BASE STRATEGY 

In [None]:
def generate_signals_for_pairs(price_df, pairs, window=20, entry_threshold=2.0, exit_threshold=0.5):
    """
    Generate pair trading signals for multiple asset pairs.

    Args:
        price_df (pd.DataFrame): Long-format price data with columns ['Date', 'Symbol', 'Close'].
        pairs (list of tuples): List of (symbol1, symbol2) pairs.
        window (int): Rolling window for z-score.
        entry_threshold (float): Z-score entry threshold.
        exit_threshold (float): Z-score exit threshold.

    Returns:
        dict: keys = pair tuple, values = dict with:
              - 'signals': pd.Series of signals (1, 0, -1) indexed by Date
              - 'spread': pd.Series of spread values indexed by Date
              - 'z_score': pd.Series of z-score indexed by Date
              - 'beta': float hedge ratio from regression
              - 'alpha': float intercept from regression
    """
    results = {}

    for sym1, sym2 in pairs:
        # Extract prices for each symbol
        prices1 = price_df[price_df['Symbol'] == sym1][['Date', 'Close']].set_index('Date').sort_index()
        prices2 = price_df[price_df['Symbol'] == sym2][['Date', 'Close']].set_index('Date').sort_index()

        # Align on dates (inner join)
        combined = prices1.join(prices2, lsuffix='_1', rsuffix='_2').dropna()

        if len(combined) < window:
            # Not enough data to compute rolling stats, skip
            continue

        # Step 1: Compute spread = residuals of regressing sym1 on sym2
        X = sm.add_constant(combined['Close_2'])
        model = sm.OLS(combined['Close_1'], X).fit()
        alpha, beta = model.params

        spread = combined['Close_1'] - (alpha + beta * combined['Close_2'])

        # Step 2: Generate signals from spread using rolling z-score
        rolling_mean = spread.rolling(window=window).mean()
        rolling_std = spread.rolling(window=window).std()
        z_score = (spread - rolling_mean) / rolling_std

        # Initialize signals series
        signals = pd.Series(0, index=spread.index)

        # Shift z-score by 1 to avoid lookahead
        z_score_lagged = z_score.shift(1)

        # Entry signals
        signals[z_score_lagged > entry_threshold] = -1
        signals[z_score_lagged < -entry_threshold] = 1

        # Forward fill signals to hold position until exit
        signals_ffill = signals.copy()
        signals_ffill[signals_ffill == 0] = np.nan
        signals_ffill = signals_ffill.ffill().fillna(0)

        # Exit signals (also based on lagged z-score)
        signals_ffill[(signals_ffill == 1) & (z_score_lagged > -exit_threshold)] = 0
        signals_ffill[(signals_ffill == -1) & (z_score_lagged < exit_threshold)] = 0

        # Save results
        results[(sym1, sym2)] = {
            'signals': signals_ffill,
            'spread': spread,
            'z_score': z_score,
            'alpha': alpha,
            'beta': beta,
        }

    return results

In [30]:
pair_signals = generate_signals_for_pairs(df, cointegrated_pairs)

WEIGHTING PORTFOLIO

In [32]:
def allocate_pair_trade_capital(total_capital, price1, price2, beta):
    """
    Determine number of shares to long/short for pair trading.

    Args:
        total_capital (float): Total capital to allocate to the pair.
        price1 (float): Price of first asset.
        price2 (float): Price of second asset.
        beta (float): Hedge ratio (regression coefficient).

    Returns:
        dict: Number of shares to long/short for sym1 and sym2.
    """
    # Normalize beta in absolute terms
    abs_beta = abs(beta)

    # Dollar allocation per leg: proportionally divide based on |1| and |β|
    capital_sym1 = total_capital / (1 + abs_beta)
    capital_sym2 = total_capital - capital_sym1

    # Shares = capital / price
    shares_sym1 = capital_sym1 / price1
    shares_sym2 = capital_sym2 / price2

    return {
        'sym1_shares': shares_sym1,
        'sym2_shares': shares_sym2,
        'sym1_capital': capital_sym1,
        'sym2_capital': capital_sym2,
    }


RISK MANAGEMENT

METRICS LOGGING

BACKTESTING

In [None]:
def backtest_pair_trading_with_capital(price_df, pair_signals_dict, capital_allocations):
    price_df = price_df.sort_index()
    all_dates = sorted(price_df.index.unique())

    portfolio_returns = []
    portfolio_dates = []

    for i in range(len(all_dates) - 1):
        date_t = all_dates[i]
        date_t1 = all_dates[i + 1]

        daily_return = 0
        total_capital = sum(capital_allocations.values())
        weighted_return_sum = 0

        for (sym1, sym2), data in pair_signals_dict.items():
            signals = data['signals']
            beta = data['beta']

            if date_t not in signals.index:
                continue

            signal = signals.loc[date_t]
            if signal == 0:
                continue

            try:
                price_t_sym1 = price_df.loc[(date_t, sym1), 'Close']
                price_t1_sym1 = price_df.loc[(date_t1, sym1), 'Close']
                price_t_sym2 = price_df.loc[(date_t, sym2), 'Close']
                price_t1_sym2 = price_df.loc[(date_t1, sym2), 'Close']
            except KeyError:
                continue

            ret_sym1 = (price_t1_sym1 / price_t_sym1) - 1
            ret_sym2 = (price_t1_sym2 / price_t_sym2) - 1

            pair_ret = signal * (ret_sym1 - beta * ret_sym2)

            # Scale pair return by capital allocated
            cap = capital_allocations.get((sym1, sym2), 0)
            weighted_return_sum += pair_ret * cap

        # Normalize portfolio return by total capital invested
        if total_capital > 0:
            daily_return = weighted_return_sum / total_capital
        else:
            daily_return = 0

        portfolio_dates.append(date_t1)
        portfolio_returns.append(daily_return)

    return pd.Series(portfolio_returns, index=portfolio_dates).sort_index()


In [34]:
df = backtest_pair_trading(df, pair_signals)

In [35]:
df.head()

1    0
2    0
3    0
4    0
5    0
dtype: int64

OTHER FUNCTIONS

PORTFOLIO PERFORMANCE TRACKING

PLOTTING