# Model for Pairs Mean Reversion Trading
This model uses the Backtrader framework to for backtesting our strategy. In this file we also run our linear regression analysis and do hyper parameter testing.

**Run this file to see how our model performs.**

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, r2_score
import backtrader as bt
from datetime import timedelta


Here we read the data into a DataModel. Our repository includes only the stock data that we use in our portfolio. We also split the data into testing and training data. We use 2022 and 2023 for training and 2024 for testing. The training data was used for parameter testing and learning the regression models, while training data is used for making trades on our final testing run.

In [2]:
class DataModel:
    def __init__(self, pairs: list):
        self.pairs = pairs
        self.data_dict = self.read_data()
        self.data_dict_train = None
        self.data_dict_test = None
        

    def read_data(self):
        data_dict = {}
        for pair in self.pairs:
            for ticker in pair:
                folder_name = "2022-2024 1min data"
                # read in data from csv
                df = pd.read_csv(f"{folder_name}\{ticker}_2022-2024_1min.csv", index_col=0, parse_dates=True)
                df.dropna(inplace=True)
                data_dict[ticker] = df

        dfs = [data_dict[ticker] for pair in self.pairs for ticker in pair]

        # Find common timestamps across all tickers
        common_timestamps = dfs[0].index
        for df in dfs[1:]:
            common_timestamps = common_timestamps.intersection(df.index)
        

        # Filter all DataFrames to only include common timestamps
        for pair in self.pairs:
            for ticker in pair:
                data_dict[ticker] = data_dict[ticker].loc[common_timestamps]
        
        return data_dict
    
    def split_train_test(self, train_year):
        data_dict_train = {}
        data_dict_test = {}
        for pair in self.pairs:
            for ticker in pair:
                df = self.data_dict[ticker]
                df_train = df[df.index.year < int(train_year)]
                data_dict_train[ticker] = df_train
                df_test = df[df.index.year > int(train_year) - 1]
                data_dict_test[ticker] = df_test
        self.data_dict_train = data_dict_train
        self.data_dict_test = data_dict_test


### Pair Trading Strategy
Here we define our strategy in the next() method. The backtester iterates through each timestep, then each pair, and checks the conditions and acts as follows:

spread is calculated as the difference between the actual value of Stock B and the predicted value of Stock B from the regression model. 

```plaintext
if in_position and spread switches signs:
    close positions
if not in_position:
    if spread > open_threshold:
        long Stock A
        short Stock B
    if spread < -open_threshold:
        short Stock A
        long Stock B
```

In [3]:

# Define the strategy
class PairTradingStrategy(bt.Strategy):
    params = (
        ('real_cash', None), ('reg_models', None), ('open_threshold', 0.1), ('size', 10), ('spread_data', None), ('interest_rate', None), ('test', None), ('leverage_size', None), ('commission_rate', None))  # 0.4 threshold, 10 size

    def __init__(self):
        self.spread_histories = []
        self.pairs = []
        self.previous_spreads = []
        self.datetimes = []
        self.reg_models = self.params.reg_models
        self.real_cash = self.params.real_cash
        self.initial_difference = self.broker.getvalue() - self.params.real_cash
        self.real_cash_history = []
        self.total_trade_value = 0
        
        last_dt = self.datas[0].datetime.datetime(-1).date()  # this gives a date
        self.cutoff_date = last_dt - timedelta(days=60)

        self.log_file = open("output_log.txt", "w")
        # self.min_real_cash = np.inf 
        
        for i in range(0, len(self.datas), 2):
            d0 = self.datas[i]
            d1 = self.datas[i+1]
            self.pairs.append((d0, d1))
            self.previous_spreads.append(0)
            self.spread_histories.append([])
            self.datetimes.append([])

        self.in_position = {}
        for i, (d0, d1) in enumerate(self.pairs):
            self.in_position[i] = False
            
            
    def log(self, txt):
        dt = self.datas[0].datetime.datetime(0)
        # print(f'{dt}: {txt}')
        self.log_file.write(f"{dt}: {txt}\n")

    def plot_spread(self):
        plt.figure(figsize=(4, 6))
        plt.plot(self.spread_histories)

    def next(self):
        """ Run this function at each time step """
        # stop if account value < 0
        cash = self.broker.getcash()
        value = self.broker.getvalue()

        self.real_cash = value - self.initial_difference
        self.real_cash_history.append(self.real_cash)

        current_date = self.datas[0].datetime.datetime(0).date()    
        
        for i, (d0, d1) in enumerate(self.pairs):

            predicted_B = self.reg_models[i].predict([[d0.close[0]]])[0]
            actual_B = d1.close[0]
            spread = actual_B - predicted_B
            previous_spread = self.previous_spreads[i]

            previous_spread = self.previous_spreads[i]
            self.spread_histories[i].append(spread)
            self.datetimes[i].append(self.datas[0].datetime.datetime(0))

            # Trading Logic
            # if not self.in_position[i] and current_date < self.cutoff_date:
            if self.in_position[i]:  # Exit trade when spread normalizes
                if previous_spread * spread < 0:
                    self.log(f"Closing positions for {d1._name} and {d0._name}, Previous spread: {previous_spread:.4f}, Spread: {spread:.4f}")
                    self.close(d1, price=d1.close[0])
                    self.close(d0, price=d0.close[0])
                    self.in_position[i] = False

                    # print(f'{self.datas[0].datetime.datetime(0)}, Real value: {self.real_cash:.2f}')
                    self.log_file.write(f'{self.datas[0].datetime.datetime(0)}, Real value: {self.real_cash:.2f}\n')
            if not self.in_position[i]:

                normalized_spread = spread / d1.close[0]

                if normalized_spread > self.params.open_threshold * self.params.spread_data[(d0._name, d1._name)]["std_dev"] and self.real_cash > 0:

                    beta = self.reg_models[i].coef_[0]
                    trading_size_B = int(self.params.size * self.params.spread_data[(d0._name, d1._name)]["penalty"] * (1/self.params.spread_data[(d0._name, d1._name)]["std_dev"]) * (1/d1.close[0]) * self.real_cash)
                    trading_size_A = int(trading_size_B * beta)

                    # print(f'{self.datas[0].datetime.datetime(0)}, Real value: {self.real_cash:.2f}')
                    self.log_file.write(f'{self.datas[0].datetime.datetime(0)}, Real value: {self.real_cash:.2f}\n')
                    
                    self.total_trade_value += int(trading_size_A * d0.close[0])
                    self.total_trade_value += int(trading_size_B * d1.close[0])
                    
                    self.log(f"Short {d1._name} @ {actual_B:.4f} for size {trading_size_B} and cash {int(trading_size_B * actual_B)}")
                    self.log(f"Long {d0._name} @ {d0.close[0]:.4f} for size {trading_size_A} and cash {int(trading_size_A * d0.close[0])}")
                    self.sell(data=d1, size=trading_size_B, price=d1.close[0])  # Short stock B
                    self.buy(data=d0, size=trading_size_A, price=d0.close[0])   # Long stock A
                    self.in_position[i] = True
                elif normalized_spread < -self.params.open_threshold * self.params.spread_data[(d0._name, d1._name)]["std_dev"] and self.real_cash > 0:

                    beta = self.reg_models[i].coef_[0]                    
                    trading_size_B = int(self.params.size * self.params.spread_data[(d0._name, d1._name)]["penalty"] * (1/self.params.spread_data[(d0._name, d1._name)]["std_dev"]) * (1/d1.close[0]) * self.real_cash)
                    trading_size_A = int(trading_size_B * beta)

                    # print(f'{self.datas[0].datetime.datetime(0)}, Real value: {self.real_cash:.2f}')
                    self.log_file.write(f'{self.datas[0].datetime.datetime(0)}, Real value: {self.real_cash:.2f}\n')

                    self.total_trade_value += int(trading_size_A * d0.close[0])
                    self.total_trade_value += int(trading_size_B * d1.close[0])

                    self.log(f"Long {d1._name} @ {actual_B:.4f} for size {trading_size_B} and cash {int(trading_size_B * actual_B)}")
                    self.log(f"Short {d0._name} @ {d0.close[0]:.4f} for size {trading_size_A} and cash {int(trading_size_A * d0.close[0])}")
                    self.buy(data=d1, size=trading_size_B, price=d1.close[0])   # Long stock B
                    self.sell(data=d0, size=trading_size_A, price=d0.close[0])  # Short stock A
                    self.in_position[i] = True
            

            self.previous_spreads[i] = (spread)
    
    def stop(self):
        for i, (d0, d1) in enumerate(self.pairs):
            if self.in_position[i]:
                value = self.broker.getvalue()

                predicted_B = self.reg_models[i].predict([[d0.close[0]]])[0]
                actual_B = d1.close[0]
                spread = actual_B - predicted_B
                self.log(f"Closing positions, Spread: {spread:.4f}, {d1._name} price: {d1.close[0]}, {d0._name} price: {d0.close[0]}")
                self.close(d1, price=d1.close[0])
                self.close(d0, price=d0.close[0])
                self.in_position[i] = False

                # print(f'Date: {self.datas[0].datetime.datetime(0)}, Cash: {self.real_cash:.2f}, Value: {value:.2f}')
                self.log_file.write(f'Date: {self.datas[0].datetime.datetime(0)}, Cash: {self.real_cash:.2f}, Value: {value:.2f}\n')
                # print("Total trade value: ", self.total_trade_value)
                self.log_file.write(f"Total trade value: {self.total_trade_value}\n")
        t = 1 if self.params.test else 2
        total_interest_payable = (1/6) * self.params.leverage_size * self.params.interest_rate * t * (6 * self.params.real_cash + t * self.real_cash - t * self.params.real_cash)
        commission_fee_payable = self.total_trade_value * self.params.commission_rate
        final_return = self.real_cash - total_interest_payable - commission_fee_payable
        return_rate = (final_return / self.params.real_cash) * (100) - 100
        print(f"Total interest fees: {total_interest_payable:.2f}")
        print(f"Commission fees: {commission_fee_payable:.2f}")
        print(f"Final return: {final_return:.2f}")
        print(f"Return rate: {return_rate:.2f}%")

        self.log_file.write(f"Total interest fees: {total_interest_payable:.2f}\n")
        self.log_file.write(f"Commission fee fees: {commission_fee_payable:.2f}\n")
        self.log_file.write(f"Final return: {final_return:.2f}\n")
        self.log_file.write(f"Return rate: {return_rate:.2f}%\n")
        self.log_file.close()
        
    def notify_trade(self, trade):
        """ Called when a trade is completed (i.e., when positions are closed) """
        if trade.isclosed:
            # Calculate the profit or loss of the trade
            pnl = round(trade.pnl, 2)  # PnL is returned by trade object
            self.log(f"Trade Closed | PnL: {pnl}")


Here is where we do the regression analysis, load the data into the backtester cerebro, initialize and run backtester model and track important data.

In [4]:
class BacktestingModel:
    
    def __init__(self, data_model, test=False):
        self.cerebro = bt.Cerebro()
        self.pairs = []
        self.reg_models = []
        self.open_threshold = None
        self.size = None
        self.data_model = data_model
        self.spread_data = None
        self.test = test
        self.real_cash = None
        self.interest_rate = None
        
        
    def add_pairs(self, pairs):
        self.pairs = pairs

    def set_parameters(self, cash=1000000000, real_cash=1000000, open_threshold=.2, size=50, commission=0.0, interest_rate=0.05, commission_rate=0.0001):
        self.cerebro.broker.set_cash(cash)
        self.cerebro.broker.setcommission(commission)
        self.open_threshold = open_threshold
        self.size = size
        self.real_cash = real_cash
        self.interest_rate = interest_rate
        self.commission_rate = commission_rate

    def add_strategy(self, strategy):
        self.cerebro.addstrategy(strategy, real_cash=self.real_cash, reg_models=self.reg_models, open_threshold=self.open_threshold, 
                                size=self.size, spread_data=self.spread_data, interest_rate=self.interest_rate, 
                                 test=self.test, leverage_size=self.get_leverage_size(self.size), commission_rate=self.commission_rate)

    def fit_regression_models(self):
        reg_models = []
        for i, (d0, d1) in enumerate(self.pairs):
            
            df_A = self.data_model.data_dict_train[self.pairs[i][0]]["close"]
            df_B = self.data_model.data_dict_train[self.pairs[i][1]]["close"]
            X = df_A.values.reshape(-1, 1)
            y = df_B.values.ravel()
            
            reg_model = LinearRegression()
            reg_model.fit(X, y)
            reg_models.append(reg_model)
        self.reg_models = reg_models

    def evaluate_regression_models(self):
        log_file = open("regression_evaluation_output.txt", "w")
        for i, (d0, d1) in enumerate(self.pairs):
            
            model = self.reg_models[i]
            X_test = df_A = self.data_model.data_dict_test[self.pairs[i][0]]["close"]
            X_test = df_A.values.reshape(-1, 1)
            y_pred = model.predict(X_test)
            y_test = self.data_model.data_dict_test[self.pairs[i][1]]["close"].values.ravel()

            mse = mean_squared_error(y_test, y_pred)  
            r2 = r2_score(y_test, y_pred)

            log_file.write(f"({d0}, {d1}) Regression Evaluation\n")
            log_file.write(f"Model Coefficient (Slope): {model.coef_[0]:.4f}\n")
            log_file.write(f"Model Intercept (Bias): {model.intercept_:.4f}\n")
            log_file.write(f"Mean Squared Error: {mse:.4f}\n")
            log_file.write(f"R² Score: {r2:.4f}\n\n")
            
            self.print_regression_line_graph(X_test, y_test, y_pred)

    def print_regression_line_graph(self, X_test, y_test, y_pred):
        plt.scatter(X_test, y_test, color='blue', label="Actual data", s=0.005)  # Scatter plot of actual data
        plt.plot(X_test, y_pred, color='red', linewidth=2, label="Regression line")  # Line for predictions
        plt.xlabel("Feature PAA Price ($)")
        plt.ylabel("Target PAGP Price ($)")
        plt.title("PAGP, PAA Linear Regression Model")
        plt.legend()
        plt.savefig(f"report figures/PAGP and PAA Regression Model graph.png")
        plt.close()
    
    def get_spread_data(self):
        spread_data = {}
        for i, (d0, d1) in enumerate(self.pairs):
            if self.test:
                y_actual = self.data_model.data_dict_test[self.pairs[i][1]]["close"]
                y_pred = self.reg_models[i].predict(self.data_model.data_dict_test[self.pairs[i][0]]["close"].values.reshape(-1, 1))
            else:
                y_actual = self.data_model.data_dict_train[self.pairs[i][1]]["close"]
                y_pred = self.reg_models[i].predict(self.data_model.data_dict_train[self.pairs[i][0]]["close"].values.reshape(-1, 1))

            df = pd.DataFrame(y_actual)
            df["predicted close"] = y_pred
            df["spread"] = df["close"] - df["predicted close"]
            df["normalized spread"] = df["spread"] / df["close"]
            std_dev = np.std(df["normalized spread"])

            spread_data[(d0, d1)] = {'spread_data': df, 'std_dev': std_dev}
        self.spread_data = spread_data

    def get_leverage_size(self, base_trading_size):
        sum = 0
        for i, (d0, d1) in enumerate(self.pairs):
            s0 = base_trading_size
            penalty = self.spread_data[(d0, d1)]["penalty"]
            std_dev = self.spread_data[(d0, d1)]["std_dev"]
            sum += 2 * s0 * penalty * (1/std_dev)
        print(f"Approximate leverage size: {sum:.2f}")
        return sum
        
            
    def calculate_penalty(self, penalty_threshold):
        correlations = pd.read_csv("correlations/correlation_data.csv")

        correlations.set_index(["Stock A", "Stock B"], inplace=True)
        for i, (d0, d1) in enumerate(self.pairs):
            corr_list = []
            curr_spread = self.spread_data[(d0, d1)]["spread_data"]["normalized spread"]
            
            for j, (s0, s1) in enumerate(self.pairs):
                
                curr_spread_2 = self.spread_data[(s0, s1)]["spread_data"]["normalized spread"]
                corr = np.corrcoef(curr_spread, curr_spread_2)[0, 1]
                corr_list.append(corr)
            corr_list = np.array(corr_list)
            corr_list = np.abs(corr_list)
            corr_list = corr_list - penalty_threshold
            corr_list = np.where(corr_list > 0, corr_list, 0)
            penalty = (1 - penalty_threshold) * (1 / np.sum(corr_list))
            self.spread_data[(d0, d1)]["penalty"] = penalty

    def load_data(self):
        for pair in self.pairs:
            for ticker in pair:
                if self.test:
                    df = self.data_model.data_dict_test[ticker]
                else:
                    df = self.data_model.data_dict_train[ticker]
                data_feed = bt.feeds.PandasData(dataname=df)
                self.cerebro.adddata(data_feed, name=ticker)
            
    def run(self):

        print(f'Starting Portfolio Value: {self.cerebro.broker.getvalue():.2f}')
        self.cerebro.run()
        print(f'Final Portfolio Value: {self.cerebro.broker.getvalue():.2f}')

    def print_save_results(self):
        strategy = self.cerebro.runstrats[0][0]
        real_cash_history = strategy.real_cash_history
        minimum_real_cash = min(real_cash_history)
        print(f"Minimum real cash: {minimum_real_cash:.2f}")
        self.plot_cash_history(real_cash_history)
        self.plot_normalized_spreads()
        self.plot_spreads()

    def plot_cash_history(self, real_cash_history):
        test_train = "Testing" if self.test else "Training"
        folder_name = "real cash graphs"
        dates = self.spread_data[self.pairs[0]]["spread_data"].index
        plt.figure(figsize=(15,5))
        plt.plot(dates, real_cash_history, label=f"Real Cash Growth During {test_train}", color='green')
        plt.xlabel("Date")
        plt.ylabel("Real Cash")
        plt.title(f"{test_train} Real Cash Growth")
        plt.legend()
        plt.savefig(f"{folder_name}/{test_train} real cash growth graph.png")
        plt.close()
    

    def plot_normalized_spreads(self):
        test_train = "Testing" if self.test else "Training"
        for i, (d0, d1) in enumerate(self.pairs):
        
            spread_history = self.spread_data[(d0, d1)]["spread_data"]["normalized spread"]
            dates = self.spread_data[(d0, d1)]["spread_data"].index
            threshold = self.open_threshold * self.spread_data[(d0, d1)]["std_dev"]
            folder_name = "normalized spread graphs"

            plt.figure(figsize=(20, 5))
            plt.plot(dates, spread_history, label="Normalized Spread", color='blue')
            plt.axhline(y=0, color='gray', linestyle='--', linewidth=1)  # Baseline
            plt.axhline(y=threshold, color='red', linestyle='dotted', label="Upper Open Threshold")
            plt.axhline(y=-threshold, color='red', linestyle='dotted', label="Lower Open Threshold")
            plt.xlabel("Date")
            plt.ylabel("Normalized Spread")
            plt.title(f"{self.pairs[i][0]} and {self.pairs[i][1]} Spread Over Time")
            plt.legend()
            plt.savefig(f"{folder_name}/{test_train} {self.pairs[i][0]} and {self.pairs[i][1]} spread.png")
            plt.close()

    def plot_spreads(self):
        strategy = self.cerebro.runstrats[0][0]  # Access the strategy instance
        spread_histories = strategy.spread_histories
        
        folder_name = "spread graphs"
        test_train = "Testing" if self.test else "Training"
        for i, spread_history in enumerate(spread_histories):
            dates = strategy.datetimes[i]
            plt.figure(figsize=(20, 5))
            plt.plot(dates, spread_history, label="Spread", color='blue')
            plt.axhline(y=0, color='gray', linestyle='--', linewidth=1)  # Baseline
            plt.axhline(y=strategy.params.open_threshold, color='red', linestyle='dotted', label="Upper Open Threshold")
            plt.axhline(y=-strategy.params.open_threshold, color='red', linestyle='dotted', label="Lower Open Threshold")
            plt.xlabel("Date")
            plt.ylabel("Spread")
            plt.title(f"{test_train} {self.pairs[i][0]} and {self.pairs[i][1]} Spread Over Time")
            plt.legend()
            plt.savefig(f"{folder_name}/{test_train} {self.pairs[i][0]} and {self.pairs[i][1]} spread.png")
            plt.close()

Here we set the pairs based on the highest correlation.

In [5]:
def set_pairs(num_pairs=None, threshold=None):
    pairs_df = pd.read_csv("correlations/correlation_data.csv", )
    pairs_df.columns= ['Stock A', 'Stock B', 'Correlation']
    if num_pairs:
        

        pairs_df = pairs_df[:num_pairs]
        stockA = pairs_df["Stock A"]
        stockB = pairs_df["Stock B"]
        pairs = [(stockA[i], stockB[i]) for i in range(num_pairs)]
    elif threshold:
        pairs_df = pairs_df[pairs_df["Correlation"] > threshold]
        stockA = pairs_df["Stock A"]
        stockB = pairs_df["Stock B"]
        pairs = [(stockA[i], stockB[i]) for i in range(len(stockA))]
    else:
        print("Define num_pairs OR threshold")
    return pairs


In [6]:
#### Setting pairs and parameters

### set parameters here
cash = 10000000
real_cash = 100000
open_threshold = 0.4
base_size = 0.012
interest_rate = 0.05
commission_rate = 0.0001

### setting pairs - gets the top num_pairs by correlation
num_pairs = 10
pair_corr_threshold = None

### set test = True for testing
test = True


In [7]:
##### DO NOT CHANGE BELOW
pairs = set_pairs(num_pairs, pair_corr_threshold)
print(pairs)
data_model = DataModel(pairs)
data_model.split_train_test(train_year="2024")
model = BacktestingModel(data_model,test=test)
model.add_pairs(pairs)
model.set_parameters(cash=cash, real_cash=real_cash, open_threshold=open_threshold, size=base_size)
model.fit_regression_models()
##### DO NOT CHANGE ABOVE


### uncomment to print regression evaluation results
model.evaluate_regression_models()

model.get_spread_data()
model.calculate_penalty(.75)


### uncomment to print leverage size
# model.get_leverage_size(base_size)

model.plot_normalized_spreads()

### only run this for backtesting
model.load_data()
model.add_strategy(PairTradingStrategy)
model.run()
model.print_save_results()
model.plot_spreads()


[('GOOGL', 'GOOG'), ('FOXA', 'FOX'), ('PAGP', 'PAA'), ('CHTR', 'LBRDK'), ('AVB', 'EQR'), ('LEN', 'DHI'), ('AMAT', 'LRCX'), ('SLG', 'VNO'), ('BRX', 'KIM'), ('VMC', 'MLM')]
Approximate leverage size: 24.16
Starting Portfolio Value: 10000000.00
Total interest fees: 124952.26
Commission fees: 2764.61
Final return: -7194.70
Return rate: -107.19%
Final Portfolio Value: 10020522.17
Minimum real cash: 96749.04
