In [1]:
# ! pip install yfinance statsmodels plotly backtesting



In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import yfinance as yf
import plotly.graph_objects as go
import warnings
from statsmodels.tsa.stattools import adfuller, coint
from sklearn.linear_model import LinearRegression
from scipy.stats import norm
from backtesting import Backtest , Strategy
from plotly.subplots import make_subplots
%matplotlib inline
warnings.filterwarnings("ignore")



**A list of tickers representing futures commodities chosen for this project**

In [3]:
comodities_tickers = ['AAPL', 'MSFT', 'GOOGL', 'AMZN', 'NVDA', 'TSLA', 'META', 'BRK-A', 'JPM', 'V',
                      'PG', 'MA', 'HD', 'UNH', 'KO', 'PEP', 'DIS', 'NFLX', 'CMCSA', 'ADBE']

# **Data preprocessing and cleaning**

In [4]:
data = pd.DataFrame()
for i in comodities_tickers:
  data[i]=  yf.Ticker(i).history(start = '2015-01-01',
                           end = '2025-10-30',
                           interval = '1d',
                           actions = True,
                           auto_adjust = True).Close
data.head()

Unnamed: 0_level_0,AAPL,MSFT,GOOGL,AMZN,NVDA,TSLA,META,BRK-A,JPM,V,PG,MA,HD,UNH,KO,PEP,DIS,NFLX,CMCSA,ADBE
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1
2015-01-02 00:00:00-05:00,24.237555,39.858448,26.296135,15.426,0.483012,14.620667,77.969337,223600.0,46.720951,61.340973,66.518341,80.105682,79.901878,85.048485,29.783407,68.548538,85.943535,4.984857,22.256113,72.339996
2015-01-05 00:00:00-05:00,23.554747,39.491924,25.795088,15.1095,0.474853,14.006,76.717064,220980.0,45.270489,59.986973,66.202065,77.852455,78.22554,83.647621,29.783407,68.033203,84.687576,4.731143,21.802227,71.980003
2015-01-06 00:00:00-05:00,23.556959,38.912285,25.15848,14.7645,0.460456,14.085333,75.683434,220450.0,44.096657,59.600422,65.900528,77.684181,77.986046,83.478813,30.009571,67.51783,84.238403,4.650143,21.517813,70.529999
2015-01-07 00:00:00-05:00,23.887278,39.406689,25.08449,14.921,0.459257,14.063333,75.683434,223480.0,44.163944,60.398952,66.246216,78.89257,80.658981,84.331161,30.384169,69.492157,85.100136,4.674286,21.498339,71.110001
2015-01-08 00:00:00-05:00,24.805079,40.565948,25.171888,15.023,0.476533,14.041333,77.701004,226680.0,45.150864,61.209042,67.003761,80.11969,82.443474,88.356606,30.751675,70.755104,85.980202,4.778,21.965864,72.919998


In [5]:
data.isnull().sum().tolist()
# Output là danh sách tổng các giá trị còn thiếu cho mỗi cột trên

[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

In [6]:
for i in data.columns:
  if data[i].isnull().sum()/len(data[i])> 0.01:    # Xóa các cột có tỷ lệ phần trăm các giá trị bị thiếu vượt quá 1%
    del data[i]
data.interpolate(limit_direction="both",inplace=True)   # Fill các dữ liệu thiếu còn lại bằng cách sử dụng interploation (nội suy)
data.tail()

Unnamed: 0_level_0,AAPL,MSFT,GOOGL,AMZN,NVDA,TSLA,META,BRK-A,JPM,V,PG,MA,HD,UNH,KO,PEP,DIS,NFLX,CMCSA,ADBE
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1
2025-10-23 00:00:00-04:00,259.328583,519.587524,253.080002,221.089996,182.149857,448.980011,734.0,735600.0,294.540009,345.276093,151.153,573.77002,382.555725,360.450012,69.452187,151.509995,113.029999,111.359001,29.299999,354.119995
2025-10-24 00:00:00-04:00,262.565491,522.631836,259.920013,224.210007,186.249619,433.720001,738.359985,738500.0,300.440002,346.693298,152.490005,573.669983,384.195099,362.5,69.223785,151.550003,111.68,109.469002,29.280001,353.519989
2025-10-27 00:00:00-04:00,268.549652,530.5271,269.269989,226.970001,191.47934,452.420013,750.820007,732650.0,304.149994,347.132416,151.740005,572.359985,382.794159,365.980011,69.571342,152.630005,112.339996,109.456001,29.42,357.799988
2025-10-28 00:00:00-04:00,268.739471,541.057373,267.470001,229.25,201.018814,460.549988,751.440002,722495.0,305.359985,346.214233,151.369995,565.929993,383.271088,367.839996,69.670654,150.119995,111.650002,110.25,29.280001,359.910004
2025-10-29 00:00:00-04:00,269.438812,540.53833,274.570007,230.300003,207.028473,461.51001,751.669983,712900.0,305.51001,340.605347,148.770004,554.580017,375.610657,355.26001,67.873268,146.160004,110.239998,110.041,28.530001,337.859985


**Identification of non-stationary assets to fulfill the cointegration property requirement**

In [7]:
train_data , test_data = data[:round(len(data)*0.8)],data[round(len(data)*0.8):]
non_stationary = []
for i in train_data.columns:
  if adfuller(data[i])[1]> 0.01:
    non_stationary.append(i)
non_stationary

['AAPL',
 'MSFT',
 'GOOGL',
 'AMZN',
 'NVDA',
 'TSLA',
 'META',
 'BRK-A',
 'JPM',
 'V',
 'PG',
 'MA',
 'HD',
 'UNH',
 'KO',
 'PEP',
 'DIS',
 'NFLX',
 'CMCSA',
 'ADBE']

**Selection of pairs exhibiting cointegration**

In [8]:
pair_coint = pd.DataFrame({'asset1':[], 'asset2':[], 'score':[], 'pvalue':[]})
# creating dataframe to store pairs that are cointegrated
for y in non_stationary:
  for x in non_stationary:
    if y != x:
      score, pvalue, _ = coint(train_data[y],train_data[x])
      if pvalue <= 0.05:
        pair_coint.loc[len(pair_coint.index)] = [y , x, score, pvalue]

list1 = [sorted([i,y]) for i , y in zip(pair_coint.asset1, pair_coint.asset2)]
index = []
list2 = []
# the code below consist of removing duplicated pairs
for i in list1:
  if list1.count(i)>1:
    list1.remove(i)

for i,y in zip(np.array(list1)[:,0], np.array(list1)[:,1]):
  row = pair_coint[(pair_coint['asset1'] == i) & (pair_coint['asset2'] == y)].index.tolist()
  index.append(row)

for i in index:
  for y in i:
    list2.append(y)
new_pair = pair_coint.loc[list2]
new_pair

Unnamed: 0,asset1,asset2,score,pvalue
11,HD,MSFT,-3.962476,0.008117
2,GOOGL,TSLA,-3.746096,0.01597
0,MSFT,PG,-3.652555,0.021062
10,MA,V,-4.799205,0.000379
11,HD,MSFT,-3.962476,0.008117
3,GOOGL,HD,-3.674049,0.019781
13,HD,PG,-3.570694,0.026622
19,PEP,UNH,-3.370846,0.045693
17,KO,UNH,-3.920393,0.009295
19,PEP,UNH,-3.370846,0.045693


**We define a fucntion that calculate the spread between two cointegrated pairs using rolling**

In [9]:
from statsmodels.regression.rolling import RollingOLS
import statsmodels.api as sm

def spread(y, x):
    # Thêm hằng số Alpha vào biến X
    X_exog = sm.add_constant(train_data[x])

    # Chạy hồi quy trượt (Rolling OLS) với cửa sổ 60 ngày
    # Thay vì hồi quy trên toàn bộ dữ liệu một lúc
    model = RollingOLS(train_data[y], X_exog, window=60)
    rres = model.fit()
    params = rres.params

    # spread = y - (alpha + beta * x)
    # params['const'] là alpha, params[x] là beta
    spread_series = train_data[y] - (params['const'] + params[x] * train_data[x])

    return spread_series.dropna()

In [10]:
def eligible_pair(pair):
    drop_list = []

    for i, (x, y) in enumerate(zip(pair.asset1.tolist(), pair.asset2.tolist())):
        try:
            s = spread(x, y)

            # Chuyển về Series và loại bỏ NaN/Inf để tránh lỗi tính toán
            s = pd.Series(s).replace([np.inf, -np.inf], np.nan).dropna()

            if len(s) < 30:
                drop_list.append(i)
                continue

            adf_result = adfuller(s)
            p_value = adf_result[1]

            if p_value > 0.05:
                drop_list.append(i)

        except Exception as e:
            print(f"Error processing pair {x}-{y}: {e}")
            drop_list.append(i)

    return pair.drop(pair.index[drop_list])

eligible_pair = eligible_pair(new_pair)
eligible_pair

Unnamed: 0,asset1,asset2,score,pvalue
11,HD,MSFT,-3.962476,0.008117
2,GOOGL,TSLA,-3.746096,0.01597
0,MSFT,PG,-3.652555,0.021062
10,MA,V,-4.799205,0.000379
11,HD,MSFT,-3.962476,0.008117
3,GOOGL,HD,-3.674049,0.019781
13,HD,PG,-3.570694,0.026622
19,PEP,UNH,-3.370846,0.045693
17,KO,UNH,-3.920393,0.009295
19,PEP,UNH,-3.370846,0.045693


## Variance Ratio Test - Lo and MacKinlay:
The LO-Mackinlay test is used to evaluate the efficiency of a time series or determine if it follows a random walk process. we need to identify the minimum p-value and retrieves the corresponding maximum z-statistic, which represents the highest significance level for the LO-Mackinlay variance test ratio

In [11]:
def rolling_variance_ratio(df):
  spread = df.diff().dropna()
  variance = spread.var()
  max_t = 40
  variance_ratio_test = pd.DataFrame({'variance':[], 'variance_increments': [], 'variance_ratio': [], 'standard_eroor':[],'z_stat':[],'pvalue':[]})
  for t in range(2,max_t):
    v_t = spread.rolling(t).sum().var()
    variance_ratio = v_t/(t*variance) -1
    standard_error = np.sqrt(2*(2*t - 1)*(t-1)/(3*t*len(spread)))
    z_stat = variance_ratio/standard_error
    p_value = 2*(1- norm.cdf(abs(z_stat)))
    variance_ratio_test.loc[len(variance_ratio_test.index)] = [variance, v_t, variance_ratio, standard_error, z_stat, p_value]

  min_pvalue = min(variance_ratio_test['pvalue'])
  max_z_stat =  variance_ratio_test[variance_ratio_test['pvalue'] == min_pvalue]['z_stat'].values # retreving the minimum pvalue with the corresponding z_stat

  return max_z_stat

In [12]:
# If any of the maximum z-statistics are greater than or equal to 0 (indicating mean-reverting behavior)
def eligible(pair):
  list1 = []
  for i, (x,y) in enumerate(zip(pair.asset1.tolist(),pair.asset2.tolist())):
    max_z_stat = rolling_variance_ratio(spread(x,y))
    if  (max_z_stat >= 0).any():
      list1.append(i)
  return pair.drop(pair.index[list1])
eligible = eligible(eligible_pair)
eligible

Unnamed: 0,asset1,asset2,score,pvalue
11,HD,MSFT,-3.962476,0.008117
2,GOOGL,TSLA,-3.746096,0.01597
0,MSFT,PG,-3.652555,0.021062
10,MA,V,-4.799205,0.000379
11,HD,MSFT,-3.962476,0.008117
3,GOOGL,HD,-3.674049,0.019781
19,PEP,UNH,-3.370846,0.045693
17,KO,UNH,-3.920393,0.009295
19,PEP,UNH,-3.370846,0.045693
18,KO,PEP,-3.881252,0.010526


Matrix of eligible pairs of assets, exhibiting mean-reverting behavior

In [13]:
# Extract the relevant columns
matrix_data = eligible[['asset1', 'asset2', 'pvalue']]

# Create matrix plot
fig = go.Figure(data=go.Scatter(
    x=matrix_data['asset2'],
    y=matrix_data['asset1'],   #
    mode='markers',
    marker=dict(
        color=matrix_data['pvalue'],
        colorscale='Viridis',
        size=20,
        colorbar=dict(title='P-value')
    ),
    text=matrix_data['pvalue'],
    hovertemplate='<b>Asset 1</b>: %{y}<br><b>Asset 2</b>: %{x}<br><b>P-value</b>: %{text:.4f}',
))

# Set axis labels
fig.update_xaxes(title_text='Asset 2')
fig.update_yaxes(title_text='Asset 1')

# Show figure
fig.show()

We retreive pairs that demonstrate a fundamental relationship, which will enable us to conduct backtesting

In [14]:
list_pairs = [['MA','V'],['ADBE','AMZN'],['KO','PEP'],['GOOGL','TSLA']]

In [15]:
#  calculates the z-score of the spread between two pairs
def zscore_spread(y,x):
  spread_mavg = spread(y,x)
  spread_mavg60 = spread(y,x).rolling(window=60, center=False).mean()
  spread_std_60 = spread(y,x).rolling(window=60, center=False).std()
  zscore_60 = (spread_mavg - spread_mavg60)/spread_std_60
  return zscore_60

In [16]:
# Calculate the threshold values
def get_thresholds(data):
    std_dev = np.std(data)
    threshold_upper = np.mean(data) + 1.7*std_dev
    threshold_lower = np.mean(data) - 1.7*std_dev
    return threshold_upper, threshold_lower

We generate trading signals for two pairs of assets. In this strategy we need to make sure that two pairs move differently in the same time. so if the spread exceeds the upper threshold and the corresponding asset's price has increased over the past three periods, a short position (-1) is assigned to that asset, and vice versa.

In [17]:
def signal(new_data, y_ticker, x_ticker):
    """
    Hàm sinh tín hiệu nhận vào DataFrame và tên 2 mã chứng khoán.
    """
    df = new_data.copy()
    threshold = 1.7

    # Logic Momentum: Giá hiện tại so với 3 ngày trước
    # Dùng đúng tên ticker được truyền vào
    y_momentum_up = df[y_ticker] > df[y_ticker].shift(3)
    y_momentum_down = df[y_ticker] < df[y_ticker].shift(3)

    # --- Tín hiệu ---
    # Short Spread (Bán Y): Z > 1.7 và Y đang tăng
    cond_short_y = (df['zscore'] > threshold) & y_momentum_up

    # Long Spread (Mua Y): Z < -1.7 và Y đang giảm
    cond_long_y = (df['zscore'] < -threshold) & y_momentum_down

    conditions = [cond_long_y, cond_short_y]
    choices = [1, -1] # 1: Buy, -1: Sell

    df['position_y'] = np.select(conditions, choices, default=0)
    df['position_x'] = -df['position_y'] # Hedge ngược chiều

    return df

In [18]:
def visualize_pair(y, x):
    # --- PHẦN 1: CHUẨN BỊ DỮ LIỆU (Giữ logic đúng để không bị lỗi code) ---
    # 1. Tính toán Spread và Z-score
    spread_series = spread(y, x)
    window = 60
    zscore = (spread_series - spread_series.rolling(window).mean()) / spread_series.rolling(window).std()

    # 2. Tạo DataFrame tổng hợp
    df_input = pd.DataFrame(index=spread_series.index)
    df_input[y] = train_data[y]
    df_input[x] = train_data[x]
    df_input['zscore'] = zscore
    df_input = df_input.dropna()

    # 3. Gọi hàm signal (Truyền đủ 3 tham số)
    data = signal(df_input, y, x)

    # Thiết lập ngưỡng (như mẫu của bạn)
    threshold_upper = 1.7
    threshold_lower = -1.7

    # --- PHẦN 2: VẼ BIỂU ĐỒ (Giống hệt mẫu bạn yêu cầu) ---
    # Layout: 3 hàng (Row 1: Y, Row 2: Spread, Row 3: X)
    fig_pair = make_subplots(rows=3, cols=1, vertical_spacing=0.025, row_heights=[1, 1.4, 1],
                             subplot_titles=(f'Price: {y}', 'Spread Z-Score', f'Price: {x}'))

    # === ROW 1: Cổ phiếu Y ===
    fig_pair.add_trace(go.Scatter(x=data.index, y=data[y], name=y), row=1, col=1)

    # Điểm Mua Y
    fig_pair.add_trace(go.Scatter(mode='markers', name=f'Buy {y}',
                                  x=data[data['position_y'] == 1].index,
                                  y=data[data['position_y'] == 1][y],
                                  marker=dict(color='green', size=10, symbol='triangle-up')), row=1, col=1)
    # Điểm Bán Y
    fig_pair.add_trace(go.Scatter(mode='markers', name=f'Sell {y}',
                                  x=data[data['position_y'] == -1].index,
                                  y=data[data['position_y'] == -1][y],
                                  marker=dict(color='red', size=10, symbol='triangle-down')), row=1, col=1)

    # === ROW 3: Cổ phiếu X ===
    fig_pair.add_trace(go.Scatter(x=data.index, y=data[x], name=x), row=3, col=1)

    # Điểm Mua X
    fig_pair.add_trace(go.Scatter(mode='markers', name=f'Buy {x}',
                                  x=data[data['position_x'] == 1].index,
                                  y=data[data['position_x'] == 1][x],
                                  marker=dict(color='green', size=10, symbol='triangle-up')), row=3, col=1)
    # Điểm Bán X
    fig_pair.add_trace(go.Scatter(mode='markers', name=f'Sell {x}',
                                  x=data[data['position_x'] == -1].index,
                                  y=data[data['position_x'] == -1][x],
                                  marker=dict(color='red', size=10, symbol='triangle-down')), row=3, col=1)

    # === ROW 2: Spread (Z-Score) ===
    # Đường Z-Score
    fig_pair.add_trace(go.Scatter(x=data.index, y=data['zscore'], mode='lines', name='Spread (Z-score)',
                                  line=dict(color='skyblue', dash='dashdot')), row=2, col=1)

    # Đường trung bình (Mean = 0)
    fig_pair.add_trace(go.Scatter(x=data.index, y=[0] * len(data), mode='lines', name='Mean',
                                  line=dict(color='black', dash='dash')), row=2, col=1)

    # Ngưỡng trên (Upper Threshold)
    fig_pair.add_trace(go.Scatter(x=data.index, y=[threshold_upper] * len(data), mode='lines',
                                  name='Upper Threshold', line=dict(color='red', dash='dash')), row=2, col=1)

    # Ngưỡng dưới (Lower Threshold)
    fig_pair.add_trace(go.Scatter(x=data.index, y=[threshold_lower] * len(data), mode='lines',
                                  name='Lower Threshold', line=dict(color='green', dash='dash')), row=2, col=1)

    # Update Layout
    fig_pair.update_layout(title=f'Pair Trading Strategy: {y} and {x}',
                           hovermode='x unified', title_x=0.45, height=900, showlegend=True)

    fig_pair.show()

In [19]:
#  Present Graph of each pair
for i in list_pairs:
  visualize_pair(i[0], i[1])

# Backtesting Strategy

In [20]:
class generating_signal:
  def __init__(self,first_pair, second_pair,test_data):
    self.first_pair = first_pair
    self.second_pair = second_pair
    self.test_data = test_data
    self.test_data1 = yf.Ticker(self.first_pair).history(period = '2y',
                           interval = '1d',
                           actions = True,
                           auto_adjust = True)
    self.test_data2 = yf.Ticker(self.second_pair).history(period = '2y',
                           interval = '1d',
                           actions = True,
                           auto_adjust = True)

    self.test_data.index = pd.to_datetime(self.test_data.index,format="%Y-%m-%d",utc=True)
    self.test_data1.index = pd.to_datetime(self.test_data1.index,format="%Y-%m-%d",utc=True)
    self.test_data2.index = pd.to_datetime(self.test_data2.index,format="%Y-%m-%d",utc=True)
    self.test_data1 = self.test_data1.loc[self.test_data.index[0]:]
    self.test_data2 = self.test_data2.loc[self.test_data.index[0]:]
    self.common = self.test_data1.index.intersection(self.test_data2.index)
    self.test_data1 = self.test_data1.loc[self.common]
    self.test_data2 = self.test_data2.loc[self.common]
    if True:
      self.signal()

  def pair_spread(self):
    # Add constant to the second asset's close prices
    X_exog = sm.add_constant(self.test_data2.Close)

    # Perform Rolling OLS regression with a window of 60
    # y = self.test_data1.Close, X = X_exog (self.test_data2.Close with constant)
    model = RollingOLS(self.test_data1.Close, X_exog, window=60)
    rres = model.fit()
    params = rres.params

    # Calculate spread = Y - (alpha + beta * X)
    # params['const'] is alpha, params['Close'] is beta for self.test_data2.Close
    # Need to handle potential NaN values from rolling window
    self.spread = self.test_data1.Close - (params['const'] + params['Close'] * self.test_data2.Close)
    return self.spread

  def zscore_spread(self):
    self.spread_mavg = self.pair_spread()
    self.spread_mavg60 = self.spread_mavg.rolling(window=60, center=False).mean()
    self.spread_std_60 = self.spread_mavg.rolling(window=60, center=False).std()
    self.zscore_60 = (self.spread_mavg - self.spread_mavg60)/self.spread_std_60
    return self.zscore_60.dropna()

  def get_thresholds(self):
    zscore = self.zscore_spread()
    self.std_dev = np.std(zscore)
    self.threshold_upper = np.mean(zscore) + 1.7*self.std_dev
    self.threshold_lower = np.mean(zscore) - 1.7*self.std_dev
    return self.threshold_upper, self.threshold_lower

  def signal(self):

    self.spread_data = self.zscore_spread()
    self.threshold_upper, self.threshold_lower = self.get_thresholds()

    # Align test_data1 and test_data2 to the index of spread_data
    aligned_test_data1_close = self.test_data1.Close.loc[self.spread_data.index]
    aligned_test_data2_close = self.test_data2.Close.loc[self.spread_data.index]

    self.test_data1['position_y'] = 0
    self.test_data2['position_x'] = 0

    # Use iterrows or iterate over the aligned index to avoid direct integer indexing issues
    for i, date in enumerate(self.spread_data.index):
      if i == 0: # Skip the first element for comparison with i-1
          continue

      # Access data using the aligned Series
      current_spread_zscore = self.spread_data.iloc[i]
      prev_test_data1_close = aligned_test_data1_close.iloc[i-1]
      current_test_data1_close = aligned_test_data1_close.iloc[i]
      prev_test_data2_close = aligned_test_data2_close.iloc[i-1]
      current_test_data2_close = aligned_test_data2_close.iloc[i]

      if (current_spread_zscore >= self.threshold_upper) and (current_test_data1_close > prev_test_data1_close):
        self.test_data1.loc[date, 'position_y'] = -1
      if (current_spread_zscore <= self.threshold_lower) and (current_test_data1_close < prev_test_data1_close):
        self.test_data1.loc[date, 'position_y'] = 1

      if (current_spread_zscore <= self.threshold_lower) and (current_test_data2_close > prev_test_data2_close) :
        self.test_data2.loc[date, 'position_x'] = -1
      if (current_spread_zscore >= self.threshold_upper) and (current_test_data2_close < prev_test_data2_close):
        self.test_data2.loc[date, 'position_x'] = 1

## Backtest first asset



In [21]:
asset1 = generating_signal('MA','V',test_data).test_data1
def SIGNAL1():
  return asset1.position_y
def SINGAL2():
  return asset1.position_x

In [22]:
class  Pairstrading(Strategy):
  def init(self):
    super().init()
    self.signal = self.I(SIGNAL1)
    self.cooldown = 0

  def next(self):
    super().next()
    price = self.data.Close[-1]

    if self.signal == 1 and self.cooldown == 0:
      self.buy(tp = 1.2*price , sl = 0.95*price, size = 0.1)
      self.cooldown = 3   # new trades can be taken only after 3 bars

    elif self.signal== -1 and self.cooldown == 0:
      self.sell(tp = 0.8*price , sl = 1.05*price, size = 0.1)
      self.cooldown = 3  # new trades can be taken only after 3 bars

    if self.cooldown > 0 :
      self.cooldown -= 1

In [23]:
bt = Backtest(asset1,Pairstrading,cash=100000,commission=0.001, exclusive_orders=True)
stat = bt.run()
evaluation = pd.DataFrame(stat)
evaluation

Unnamed: 0,0
Start,2023-12-05 05:00:00+00:00
End,2025-12-05 05:00:00+00:00
Duration,731 days 00:00:00
Exposure Time [%],41.749503
Equity Final [$],102451.685528
Equity Peak [$],102892.865704
Commissions [$],257.160214
Return [%],2.451686
Buy & Hold Return [%],35.949307
Return (Ann.) [%],1.220857


In [24]:
bt.plot()

## Backtest Second asset

In [25]:
asset2 = generating_signal('MA','V',test_data).test_data2
def SIGNAL1():
  return asset2.position_y
def SIGNAL2():
  return asset2.position_x

In [26]:
class  Pairstrading(Strategy):
  def init(self):
    super().init()
    self.signal = self.I(SIGNAL2) # for second assets
    self.cooldown = 0

  def next(self):
    super().next()
    price = self.data.Close[-1]

    if self.signal == 1 and self.cooldown == 0:
      self.buy(tp = 1.2*price , sl = 0.95*price, size = 0.1)
      self.cooldown = 3   # new trades can be taken only after 3 bars

    elif self.signal== -1 and self.cooldown == 0:
      self.sell(tp = 0.8*price , sl = 1.05*price, size = 0.1)
      self.cooldown = 3  # new trades can be taken only after 3 bars

    if self.cooldown > 0 :
      self.cooldown -= 1

In [27]:
bt = Backtest(asset2,Pairstrading,cash=100000,commission=0.001, exclusive_orders=True)
stat = bt.run()
evaluation = pd.DataFrame(stat)
evaluation

Unnamed: 0,0
Start,2023-12-05 05:00:00+00:00
End,2025-12-05 05:00:00+00:00
Duration,731 days 00:00:00
Exposure Time [%],4.17495
Equity Final [$],98594.660017
Equity Peak [$],100000.0
Commissions [$],78.971358
Return [%],-1.40534
Buy & Hold Return [%],32.867483
Return (Ann.) [%],-0.706553


In [28]:
bt.plot()

In [29]:
! git add .
! git commit -m "Update pairs trading"
! git push

[main bcf8299] Update pairs trading
 1 file changed, 185 insertions(+), 662 deletions(-)


To https://github.com/tamnguyen-2905/CF_Trang22110234_Thanh22110203_Tam22110193.git
   3ad2eb5..bcf8299  main -> main
