## Backtesting for trading strategy 

### Fetching data and making a CSV file using Zerodha API

#### Getting logging credentials ready

In [140]:
#Generating URl for starting session
from kiteconnect import KiteConnect
api_key = "h7cjmsoda9maddt6"
api_secret = "0kz75ghjnzuja1d2jbs6se0j146k4os7"
kite = KiteConnect(api_key=api_key)
login_url = kite.login_url()
print(login_url)


https://kite.zerodha.com/connect/login?api_key=h7cjmsoda9maddt6&v=3


In [141]:
#Generating the Access Token
from kiteconnect import KiteConnect

api_key = 'h7cjmsoda9maddt6'
api_secret = '0kz75ghjnzuja1d2jbs6se0j146k4os7'
request_token = 'hLFpQYqxhs0oYp3Q8IPnTWMcbEX8TxD2'

kite = KiteConnect(api_key=api_key)
data = kite.generate_session(request_token, api_secret=api_secret)
access_token = data["access_token"]

print(f"Access token: {access_token}")


Access token: q8VKQp1ek4hoQcoQReXZxbXsEXfXxZ8g


#### Fetching data and making candle_data.csv

In [145]:
import csv
import datetime as dt
from kiteconnect import KiteConnect

#Zerodha API credentials
api_key = 'h7cjmsoda9maddt6'
api_secret = '0kz75ghjnzuja1d2jbs6se0j146k4os7'
access_token = 'q8VKQp1ek4hoQcoQReXZxbXsEXfXxZ8g'
kite = KiteConnect(api_key=api_key)
kite.set_access_token(access_token)

#Time frame for the data and instrment token
instrument_token = 'NSE:NIFTY 50'
start_date = dt.date(2015, 1, 1)
end_date = dt.date(2023, 6, 1)
num_intervals = int((end_date - start_date).days * 24 * 60 / 5)

#Since Zerodha API only defines 100 requests a time, we will make multiple requests
max_days_per_request = 100
num_requests = int(num_intervals / (max_days_per_request * 24 * 60 / 5)) + 1

#The list to store all the data
all_data = []

#Fetching historical data in small propoetions
for i in range(num_requests):
    #Calculating the start and end dates for each request
    request_start_date = start_date + dt.timedelta(days=i * max_days_per_request)
    request_end_date = request_start_date + dt.timedelta(days=max_days_per_request)

    #Requesting historical data from Zerodha API
    data = kite.historical_data(
        instrument_token=256265,
        from_date=request_start_date,
        to_date=request_end_date,
        interval='5minute'
    )
    all_data.extend(data)

#Creating the CSV file
filename = 'candle_data.csv'
with open(filename, 'w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(['Timestamp', 'Open', 'High', 'Low', 'Close', 'Volume'])
    for candle in all_data:
        timestamp = candle['date']
        open_price = candle['open']
        high_price = candle['high']
        low_price = candle['low']
        close_price = candle['close']
        volume = candle['volume']
        writer.writerow([timestamp, open_price, high_price, low_price, close_price, volume])

print(f"CSV file '{filename}' created successfully!")


CSV file 'candle_data.csv' created successfully!


### Using the CSV file for Backtesting 

#### Data Manipulation

In [146]:
import pandas as pd
df = pd.read_csv("candle_data.csv")
df.head()

Unnamed: 0,Timestamp,Open,High,Low,Close,Volume
0,2015-01-09 09:15:00+05:30,8285.45,8301.3,8285.45,8301.2,0
1,2015-01-09 09:20:00+05:30,8300.5,8303.0,8293.25,8301.0,0
2,2015-01-09 09:25:00+05:30,8301.65,8302.55,8286.8,8294.15,0
3,2015-01-09 09:30:00+05:30,8294.1,8295.75,8280.65,8288.5,0
4,2015-01-09 09:35:00+05:30,8289.1,8290.45,8278.0,8283.45,0


In [147]:
df = df.drop('Volume', axis=1)
df.rename(columns={'Timestamp': 'Date'}, inplace=True)
df['EMA'] = df['Close'].ewm(span=5).mean()
df.head()

Unnamed: 0,Date,Open,High,Low,Close,EMA
0,2015-01-09 09:15:00+05:30,8285.45,8301.3,8285.45,8301.2,8301.2
1,2015-01-09 09:20:00+05:30,8300.5,8303.0,8293.25,8301.0,8301.08
2,2015-01-09 09:25:00+05:30,8301.65,8302.55,8286.8,8294.15,8297.797368
3,2015-01-09 09:30:00+05:30,8294.1,8295.75,8280.65,8288.5,8293.935385
4,2015-01-09 09:35:00+05:30,8289.1,8290.45,8278.0,8283.45,8289.91019


#### Performing Backtesting

In [151]:
import pandas as pd
import datetime
class BacktestStrategy:
    #Default Constructor
    def __init__(self, df):
        self.df = df
        self.sell_positions = []
        self.buy_positions = []
        self.max_trades_per_day = 3
        self.nifty_points_limit = 50
        self.buy_trades_count = 0
        self.sell_trades_count = 0
        #To ensure no more buy or sell occurs before the Exit
        self.exit_flag_buy = True
        self.exit_flag_sell = True
        
    def backtest(self):
        prev_day = None
        for i in range(len(self.df)):
            row = self.df.iloc[i]
            curr_date_str = row['Date']
            curr_day = datetime.strptime(curr_date_str, '%Y-%m-%d %H:%M:%S%z').date()
            #Ensuring to only make 3 trades in a day
            if curr_day != prev_day:
                self.buy_trades_count = 0
                self.sell_trades_count = 0
                
            if self.buy_trades_count < self.max_trades_per_day and self.is_buy_candle(row) and self.exit_flag_buy:
                self.update_buy_positions(row)
                self.buy_trades_count += 1
            
            if self.sell_trades_count < self.max_trades_per_day and self.is_sell_candle(row) and self.exit_flag_sell:
                self.update_sell_positions(row)
                self.sell_trades_count += 1
            prev_day = curr_day

    #Buy alert candle condition
    def is_buy_candle(self, row):
        return row['Open'] < row['EMA'] and row['Low'] < row['EMA'] and row['High'] < row['EMA'] and row['Close'] < row['EMA']
    #Sell alert candle condition
    def is_sell_candle(self, row):
        return row['Open'] > row['EMA'] and row['Low'] > row['EMA'] and row['High'] > row['EMA'] and row['Close'] > row['EMA']
    
    #Defining the Buy Positions
    def update_buy_positions(self, row):
        if not self.is_buy_candle(row):
            return
        self.exit_flag_buy = False
        alert_candle = row['Low']
        buy_price = row['High']
        stop_loss = alert_candle
        target = buy_price + 3 * (buy_price - stop_loss)
        self.buy_positions.append({'date': row['Date'], 'entry': buy_price, 'stop_loss': stop_loss, 'target': target})
        #Exit for Buy setup
        exit_df = self.df[self.df['Date'] > row['Date']]
        for i in range(len(exit_df)):
            row = exit_df.iloc[i]
            high = row['High']
            low = row['Low']

            if low <= stop_loss:
                self.buy_positions.append({'exit_time': row['Date'], 'exit':stop_loss, 'condition': 'loss'})
                self.exit_flag_buy = True
                break
            elif high >= target:
                self.buy_positions.append({'exit_time': row['Date'], 'exit':target, 'condition': 'profit'})
                self.exit_flag_buy = True
                break

    #Defining the Sell Positions
    def update_sell_positions(self, row):
        self.exit_flag_sell = False
        alert_candle = row['High']
        sell_price = row['Low']
        stop_loss = alert_candle
        target = sell_price - 3 * (stop_loss - sell_price)
        self.sell_positions.append({'date': row['Date'], 'entry': sell_price, 'stop_loss': stop_loss, 'target': target})
        #Exit for Sell setup
        exit_df = self.df[self.df['Date'] > row['Date']]
        for i in range(len(exit_df)):
            row = exit_df.iloc[i]
            high = row['High']
            low = row['Low']
            
            if high >= stop_loss:
                self.sell_positions.append({'exit_time': row['Date'], 'exit':stop_loss, 'condition': 'loss'})
                self.exit_flag_sell = True
                break
            elif low <= target:
                self.sell_positions.append({'exit_time': row['Date'], 'exit':target, 'condition': 'profit'})
                self.exit_flag_sell = True
                break
 
    #Generating Dataframes
    def get_buy_positions_dataframe(self):
        return pd.DataFrame(self.buy_positions, columns=['date', 'entry', 'stop_loss', 'target', 'exit_time', 'exit', 'condition'])
    
    def get_sell_positions_dataframe(self):
        return pd.DataFrame(self.sell_positions, columns=['date', 'entry', 'stop_loss', 'target', 'exit_time', 'exit', 'condition'])

#Backtesting the data
strategy = BacktestStrategy(df)
strategy.backtest()

buy_positions_df = strategy.get_buy_positions_dataframe()
sell_positions_df = strategy.get_sell_positions_dataframe()

#Data Manipulation
columns_to_shift = ['exit_time', 'exit', 'condition']
buy_positions_df[columns_to_shift] = buy_positions_df[columns_to_shift].shift(-1)
sell_positions_df[columns_to_shift] = sell_positions_df[columns_to_shift].shift(-1)

#Droping the NaN rows
buy_positions_df = buy_positions_df.dropna()
sell_positions_df = sell_positions_df.dropna()

#Reseting indices
buy_positions_df = buy_positions_df.reset_index(drop=True)
sell_positions_df = sell_positions_df.reset_index(drop=True)

print("Sell positions:")
sell_positions_df
print("Buy positions:")
buy_positions_df

Sell positions:
Buy positions:


Unnamed: 0,date,entry,stop_loss,target,exit_time,exit,condition
0,2015-01-09 09:40:00+05:30,8288.30,8277.40,8321.00,2015-01-09 09:50:00+05:30,8277.40,loss
1,2015-01-09 10:10:00+05:30,8281.60,8274.95,8301.55,2015-01-09 11:20:00+05:30,8274.95,loss
2,2015-01-09 10:45:00+05:30,8283.85,8279.50,8296.90,2015-01-09 10:50:00+05:30,8279.50,loss
3,2015-01-12 09:25:00+05:30,8268.60,8254.45,8311.05,2015-01-12 12:20:00+05:30,8254.45,loss
4,2015-01-12 10:40:00+05:30,8285.80,8280.80,8300.80,2015-01-12 10:50:00+05:30,8280.80,loss
...,...,...,...,...,...,...,...
6037,2023-06-01 11:05:00+05:30,18564.95,18557.65,18586.85,2023-06-01 11:25:00+05:30,18557.65,loss
6038,2023-06-01 12:05:00+05:30,18549.10,18535.80,18589.00,2023-06-01 12:10:00+05:30,18535.80,loss
6039,2023-06-02 10:00:00+05:30,18505.05,18494.30,18537.30,2023-06-02 10:05:00+05:30,18494.30,loss
6040,2023-06-02 10:05:00+05:30,18502.90,18485.85,18554.05,2023-06-02 10:15:00+05:30,18485.85,loss


### Creating CSV files for Buy and Sell setups

In [152]:
filename = 'buy_data.csv'
buy_positions_df.to_csv(filename, index=False)

print(f"CSV file '{filename}' created successfully!")

CSV file 'buy_data.csv' created successfully!


In [153]:
filename = 'sell_data.csv'
sell_positions_df.to_csv(filename, index=False)

print(f"CSV file '{filename}' created successfully!")

CSV file 'sell_data.csv' created successfully!
