In [1]:
import numpy as np
import pandas as pd
import pyexcel
import statsmodels
import quandl

In [2]:
data = [] # used to store the result of pair trading strategy
headers = ['Date','commodity1','commodity2','zscore','signal','status','buy price','sell signal','mtm','pnl','Cointegration test']

data.append(headers)

threshold = 1.75 # limit for z-score

# used in calc of rolling avg and std
start = 80
end = 90

# order status of the preceding data point
prev_status = ""

# mark-to-market a measure of the fair value of accounts
mtm = ""

# buy/sell price

prev_sell_price = ""
prev_buy_price = ""

sell_price = ""
buy_price = ""

sl = -10000 # stop loss
tp = 20000 # take profit

# lot size of data1
n = 5000
# lot size of data2
m = 5000


In [3]:
# this is the code in blueshift for pairs trading
"""
    Title: Pair Trading Strategy Code Template
    Description: The pair trading strategy is implemented after using a
    cointegration test and z-score.
    Dataset: US Equities

    ############################# DISCLAIMER #############################
    This is a strategy template only and should not be
    used for live trading without appropriate backtesting and tweaking of
    the strategy parameters.
    ######################################################################
"""

# Import numpy and pandas
import numpy as np
import pandas as pd

# Import statsmodel
import statsmodels.api as stat
import statsmodels.tsa.stattools as ts


# Import blueshift libraries
from blueshift.api import (
                            symbol,
                            order_target_percent,
                            schedule_function,
                            date_rules,
                            time_rules,
                            get_datetime
                        )


def initialize(context):
    # Define Symbols
    context.security_1 = symbol('AAPL')
    context.security_2 = symbol('AMZN')

    # The lookback for calculating the hedge ratio
    context.lookback = 100

    # Percentage wealth to use
    context.pf_fraction = 1.0

    # The strategy parameters
    context.end = context.lookback-1
    context.start = context.end - 10
    context.status = ""
    context.prev_status = ""
    context.mtm = ""
    context.prev_sell_price = ""
    context.sell_price = ""
    context.prev_buy_price = ""
    context.buy_price = ""

    # The take profit and stop loss criteria
    context.SL = -0.03
    context.TP = 0.075

    # The standard deviation multiplier
    context.threshold = 1

    # Lot/Quantity size for data1
    context.N = 0.5
    context.M = 0.3

    # Schedule the rebalance function
    schedule_function(
                        rebalance,
                        date_rule=date_rules.every_day(),
                        time_rule=time_rules.market_close(minutes=5)
                     )


def cointegration_test(x, y):
    """
        A function to find the cointegration.
    """

    # Use OLS method to find the spread of the two series
    result = stat.OLS(x['close'], y['close']).fit()
    # Check for stationarity of the spread using adfuller test
    return ts.adfuller(result.resid)


def zscore_cal(data1, data2, start, end):
    """
        A function to find the z-score.
    """

    s1 = pd.Series(data1['close'][start:end])
    s2 = pd.Series(data2['close'][start:end])

    # Compute mean of the spread till now
    mvavg_old = np.mean(np.log(s1/s2))

    # Compute stdev of the spread till now
    std_old = np.std(np.log(s1/s2))

    # Compute spread
    current_spread = np.log(
        data1['close'][end]/data2['close'][end])

    # Compute z-score
    zscore = (current_spread - mvavg_old) / std_old if std_old > 0 else 0

    return zscore


def signal_cal(zscore, threshold, adftest):
    """
        A function to find the trading signal.
    """

    if zscore > threshold and adftest == 'Yes':
        # Z-score is greater than threshold, the spread shall fall towards mean
        signal = 'SELL'

    elif zscore < -threshold and adftest == 'Yes':
        # Z-score is smaller than threshold, the spread shall rise towards mean
        signal = 'BUY'

    else:
        signal = 'No position'

    return signal


def status_cal(prev_status, mtm, SL, TP, signal, adftest):
    """
        A function to find the trade status.
    """

    if prev_status in ["", "SL", "TP", "CB"]:
        status = signal
    else:
        if adftest == "No":
            status = "CB"   # Break in the cointegration status of the pair
        else:
            if mtm == "":
                status = ""
            else:
                if mtm < SL:
                    status = "SL"   # Stop loss status
                else:
                    if mtm > TP:
                        status = "TP"  # Take profit status
                    else:
                        status = prev_status

    return status


# Calculating buy price
def buy_price_cal(
                    prev_status,
                    prev_buy_price,
                    buy_price,
                    signal,
                    status,
                    data1,
                    data2,
                    end
                 ):

    if status == prev_status:
        buy_price = prev_buy_price

    else:
        if status in ["SL", "TP", "CB", ""]:
            buy_price = ""
        else:
            if signal == "BUY":    # Signal is to buy the spread
                # Hence, buy price = close of first security
                buy_price = data1['close'][end]
            else:
                if signal == "SELL":  # Signal is to sell the spread
                    # Hence, buy price = close of second security
                    buy_price = data2['close'][end]
                else:
                    buy_price = ""   # no signal hence no buy price

    return buy_price


# Calculating sell price
def sell_price_cal(
                    prev_status,
                    prev_sell_price,
                    sell_price,
                    signal,
                    status,
                    data1,
                    data2,
                    end
                  ):
    if status == prev_status:
        sell_price = prev_sell_price
    else:
        if status in ["SL", "TP", "CB", ""]:
            sell_price = ""
        else:
            if signal == "BUY":  # Signal is to buy the spread
                # Hence sell price = close of second security
                sell_price = data2['close'][end]
            else:
                if signal == "SELL":  # signal is to sell the spread
                    # Hence sell price = close of first security
                    sell_price = data1['close'][end]
                else:
                    sell_price = ""  # No signal hence no sell price either

    return sell_price


# Calculating mtm
def mtm_cal(
            data1,
            data2,
            prev_status,
            prev_sell_price,
            prev_buy_price,
            M,
            N,
            end
           ):

    if prev_status == "BUY":
        # Calculate mtm of the trades using their lot sizes
        mtm = (prev_sell_price-data2['close'][end])*M + \
                        (data1['close'][end] - prev_buy_price)*N
        mtm_percentage = mtm/(prev_sell_price*M+prev_buy_price*N)
        return mtm_percentage

    elif prev_status == "SELL":
        mtm = (prev_sell_price-data2['close'][end])*M + \
                        (data1['close'][end] - prev_buy_price)*N
        mtm_percentage = mtm/(prev_sell_price*M+prev_buy_price*N)
        return -mtm_percentage

    else:
        return ""


def rebalance(context, data):
    """
        A function to rebalance the portfolio. This function is called by the
        schedule_function in the initialize function.
    """

    try:
        # Fetch lookback no. days data for the first security
        data1 = data.history(
            context.security_1,
            ['close'],
            context.lookback,
            '1m')
    except IndexError:
        return

    try:
        # Fetch lookback no. days data for the second security
        data2 = data.history(
            context.security_2,
            ['close'],
            context.lookback,
            '1m')
    except IndexError:
        return
    
    # Check for cointegration
    c_t = cointegration_test(data1, data2)
    if c_t[1] <= 0.05:
        adftest = "Yes"
    else:
        adftest = "No"

    # Calculate z-score for the spread
    zscore = zscore_cal(data1, data2, context.start, context.end)

    # Generating trading signals
    signal = signal_cal(zscore, context.threshold, adftest)

    # Calculating mtm
    context.mtm = mtm_cal(
                            data1,
                            data2,
                            context.prev_status,
                            context.prev_sell_price,
                            context.prev_buy_price,
                            context.M,
                            context.N,
                            context.end
                         )

    # Assigning status
    context.status = status_cal(
                                context.prev_status,
                                context.mtm,
                                context.SL,
                                context.TP,
                                signal,
                                adftest
                               )

    # Assigning buy_price
    context.buy_price = buy_price_cal(
                                        context.prev_status,
                                        context.prev_buy_price,
                                        context.buy_price,
                                        signal,
                                        context.status,
                                        data1,
                                        data2,
                                        context.end
                                     )

    # Assigning sell_price
    context.sell_price = sell_price_cal(
                                        context.prev_status,
                                        context.prev_sell_price,
                                        context.sell_price,
                                        signal,
                                        context.status,
                                        data1,
                                        data2,
                                        context.end
                                       )

    print("{} Status: {}".format(get_datetime(), context.status))

    # Place the order
    if context.status == "BUY":
        print("{} Going long on {}".format(get_datetime(), context.security_1))
        order_target_percent(
                                context.security_1,
                                context.pf_fraction*context.N
                            )
        print("{} Going long on {}".format(get_datetime(), context.security_2))
        order_target_percent(
                                context.security_2,
                                -context.pf_fraction*context.M
                            )

    elif context.status == "SELL":
        print("{} Going short on {}".format(get_datetime(), context.security_1))
        order_target_percent(
                                context.security_1,
                                -context.pf_fraction*context.N
                            )
        print("{} Going short on {}".format(get_datetime(), context.security_2))
        order_target_percent(
                                context.security_2,
                                context.pf_fraction*context.M
                            )

    elif context.status in ["SL", "TP", "CB"]:
        print("{} Exiting position in {}".format(get_datetime(), context.security_1))
        order_target_percent(context.security_1, 0)
        print("{} Exiting position in {}".format(get_datetime(), context.security_2))
        order_target_percent(context.security_2, 0)

    # Assigning the previous values
    context.prev_sell_price = context.sell_price
    context.prev_status = context.status
    context.prev_buy_price = context.buy_price


ModuleNotFoundError: No module named 'blueshift'

In [24]:
import yfinance as yf
import numpy as np
import pandas as pd
import datetime 
import math

data1 = yf.download("TCS.NS",start=startdate,end=enddate,interval='1d')
data2 = yf.download("INFY.NS",start=startdate,end=enddate,interval='1d')
def df_merge(data1,data2):
    # here first change the name of the columns in both the dataframes then merge 
    # then create a column to calc the spread the std deviation and z-score finally
    data1 = data1.rename(columns={'Open':'open1','High':'high1','Low':'low1','Close':'close1','Volume':'volume1'})
    data2 = data2.rename(columns={'Open':'open2','High':'high2','Low':'low2','Close':'close2','Volume':'volume2'})
    data3 = pd.merge(data1,data2,left_index=True,right_index=True)
    return data3

df = df_merge(data1,data2)

def calc_zscore(df):
    df['spread'] = df['close1'] - df['close2']
    df['spread_mean'] = df['spread'].rolling(window=10).mean()
    df['spread_std'] = df['spread'].rolling(window=10).std()
    df['z_score'] = (df['spread'] - df['spread_mean'])/df['spread_std']
    df.drop(columns=['open1','open2','high1','high2','low1','low2','Adj Close_x','Adj Close_y'],inplace = True)
    df.dropna(axis=0,inplace = True)
    return df

df = calc_zscore(df)
print(df)
df.to_csv('file1.csv')


[*********************100%%**********************]  1 of 1 completed


[*********************100%%**********************]  1 of 1 completed

                 close1  volume1       close2   volume2       spread  \
Date                                                                   
2021-01-14  3250.699951  6931542  1370.500000  27521697  1880.199951   
2021-01-15  3233.350098  4131692  1344.949951  15018441  1888.400146   
2021-01-18  3221.750000  4160906  1312.050049   8861765  1909.699951   
2021-01-19  3260.699951  2975735  1316.650024   5585744  1944.049927   
2021-01-20  3308.800049  3453446  1339.449951   8225838  1969.350098   
...                 ...      ...          ...       ...          ...   
2022-12-26  3252.899902   870157  1502.400024   4115459  1750.499878   
2022-12-27  3259.500000   835883  1514.849976   4860076  1744.650024   
2022-12-28  3257.100098   910795  1510.150024   5029860  1746.950073   
2022-12-29  3268.750000  1037927  1517.550049   4624745  1751.199951   
2022-12-30  3256.699951  1163131  1508.199951   5060544  1748.500000   

            spread_mean  spread_std   z_score  
Date           




In [4]:
# here i will write code for backtesting
from backtesting import Backtest,Strategy
import talib
import yfinance as yf
import datetime
import numpy as np
import math
import pandas as pd

startdate = datetime.datetime(2024,7,28)
enddate = datetime.datetime(2024,9,24)

data1 = yf.download('TCS.NS',start=startdate,end=enddate,interval='15m')
data2 = yf.download('INFY.NS',start=startdate,end=enddate,interval='15m')

# data1 = yf.download('ADANIPOWER.NS',start=startdate,end=enddate,interval='15m')
# data2 = yf.download('TATAPOWER.NS',start=startdate,end=enddate,interval='15m')

def load_data(data1,data2):
    data1 = data1.rename(columns={'Open':'Open','High':'High','Low':'Low','Close':'Close','Volume':'Volume'})
    data2 = data2.rename(columns={'Open':'Open2','High':'High2','Low':'Low2','Close':'Close2','Volume':'Volume2'})
    data3 = pd.merge(data1,data2,left_index=True,right_index=True)
    return data3


def optim_func(series):
    if series['# Trades'] < 20:
        return -1
    else:
        return series['Equity Final [$]']
        # return series['Sharpe Ratio']
    
class Pairs(Strategy):
    stlo = 99
    tkpr = 101
    mom_period = 10

    def init(self):
        self.mom1 = self.I(talib.MOM,self.data.Close,timeperiod=self.mom_period)
        self.mom2 = self.I(talib.MOM,self.data.Close2,timeperiod=self.mom_period)
        self.vwap1 = self.data.Close*self.data.Volume
        self.vwap2 = self.data.Close2*self.data.Volume2
        self.spread = self.data.Close-self.data.Close2
        self.spread_mean = self.I(lambda x: pd.Series(x).rolling(window=10).mean(), self.spread)
        self.spread_std = self.I(lambda x: pd.Series(x).rolling(window=10).std(), self.spread)
        self.z_score = self.I(lambda x, y, z: (x - y) / z, self.spread, self.spread_mean, self.spread_std)

    def next(self):
        if (self.mom1<0 and self.z_score>0 and self.mom2>0):
            self.position.close()
            self.buy(sl=((self.stlo*self.data.Close)/100) , tp=((self.tkpr*self.data.Close)/100))

        elif (self.mom1>0 and self.z_score<0 and self.mom2<0):
            self.position.close()
            self.sell(tp=((self.stlo*self.data.Close)/100),sl=((self.tkpr*self.data.Close)/100))

def main():
    data = load_data(data1,data2)
    bt = Backtest(data, Pairs, cash=100000)
    bt.run()
    stats=bt.optimize(
        stlo=range(98,99,1),
        tkpr=range(101,102,1),
        maximize=optim_func
    )
    print(stats)
    bt.plot()

main()

[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed


  0%|          | 0/1 [00:00<?, ?it/s]

Start                     2024-07-29 09:15:00
End                       2024-09-23 15:15:00
Duration                     56 days 06:00:00
Exposure Time [%]                        58.0
Equity Final [$]                105112.713301
Equity Peak [$]                 105740.960996
Return [%]                           5.112713
Buy & Hold Return [%]               -3.120744
Return (Ann.) [%]                   36.907757
Volatility (Ann.) [%]               16.016859
Sharpe Ratio                         2.304307
Sortino Ratio                        5.770513
Calmar Ratio                        15.688379
Max. Drawdown [%]                   -2.352554
Avg. Drawdown [%]                   -0.539763
Max. Drawdown Duration       17 days 04:15:00
Avg. Drawdown Duration        1 days 11:51:00
# Trades                                   25
Win Rate [%]                             48.0
Best Trade [%]                       2.007574
Worst Trade [%]                     -1.010837
Avg. Trade [%]                    

  formatter=DatetimeTickFormatter(days=['%d %b', '%a %d'],
  formatter=DatetimeTickFormatter(days=['%d %b', '%a %d'],
  .resample(resample_rule, label='left')
  fig = gridplot(
  fig = gridplot(
