In [None]:
%pip install --upgrade fyers_helper TA-Lib

In [2]:
import pandas as pd

url = 'https://en.wikipedia.org/wiki/NIFTY_50'
tables = pd.read_html(url)
nifty50_df = tables[1]

tickers = [f"NSE:{t}-EQ" for t in nifty50_df['Symbol'].to_list()]

In [3]:
from fyers_helper import prepare_data, load_stock_data
import datetime as dt
import os
from google.colab import userdata

end_date = dt.datetime.now()
start_date = dt.datetime(2015, 1, 1)

os.environ['FYERS_CONFIG'] = userdata.get('FYERS_CONFIG')

file_paths = prepare_data(tickers, "1D", start_date=start_date, end_date=end_date, path=data_path, overwrite=False)
loaded_data = load_stock_data(file_paths, data_path, "1D")

Downloading data: 100%|██████████| 50/50 [00:01<00:00, 29.81ticker/s]
Loading 1Dmin data: 100%|██████████| 50/50 [00:18<00:00,  2.65it/s]


In [99]:
df = pd.concat(loaded_data, names=["Stock", "Date"]).reset_index()

In [100]:
import talib as ta

# Calculate RSI per stock
df['SMA20'] = df.groupby('Stock')['Close'].transform(lambda x: ta.SMA(x, timeperiod=20))
df['SMA20_Perc_Distance'] = (df['Close'] - df['SMA20']) / df['SMA20'] * 100

df['Entry_Price'] = df.groupby('Stock')['Close'].shift(-1)
df['Entry_Time'] = df.groupby('Stock')['Date'].shift(-1)

# df = df[df.Date >= '2024-01-01 05:30:00']

In [101]:
signals = (
    df[df.Close < df.SMA20].groupby('Date', group_keys=False).apply(lambda x: x.nsmallest(5, "SMA20_Perc_Distance"))
      .sort_values(['Date', 'SMA20_Perc_Distance']).reset_index(drop=True).dropna(subset=['Entry_Price'])
)

  df[df.Close < df.SMA20].groupby('Date', group_keys=False).apply(lambda x: x.nsmallest(5, "SMA20_Perc_Distance"))


In [136]:
from dataclasses import dataclass, field, asdict
from typing import List, Optional

leverage = 1
initial_capital = 200000
max_positions_num = 40
capital_buckets = [initial_capital/max_positions_num] * max_positions_num
tp_perc = 5
max_holding_days = 10
rebalance_perc = 3
active_positions = {}
closed_positions = []


@dataclass
class Trade:
  entry_time: str
  entry_price: float
  quantity: int

@dataclass
class Position:
  stock: str
  entry_time: str
  avg_entry_price: float = 0.0
  quantity: int = 0
  exit_time: str = None
  exit_price: float = 0.0
  sl: float = 0.0
  tp: float = 0.0
  trades: List[Trade] = field(default_factory=list)
  pnl: float = 0.0
  tax: float = 0.0

  def exit_margin(self):
    return (self.avg_entry_price * self.quantity)/leverage - self.tax + self.pnl

  def close(self, exit_time, exit_price):
    self.exit_time = exit_time
    self.exit_price = exit_price
    if self.quantity < 0:
      print(self)
      raise Exception("Quantity can't be negative")
    self.pnl = (exit_price - self.avg_entry_price) * self.quantity
    self.calculate_taxes()

  def calculate_taxes(self):
    extry_taxes = 0
    for trade in self.trades:
      stt = abs(trade.quantity) * trade.entry_price * 0.025/100
      transaction_charges = (abs(trade.quantity) * trade.entry_price * 0.00322/100)
      gst = (stt + transaction_charges) * 18/100
      extry_taxes += stt + transaction_charges + gst

    transaction_charges = abs(self.quantity) * self.exit_price * 0.00322/100
    gst = transaction_charges * 18/100
    stamp_duty = abs(self.quantity) * self.exit_price * 0.003/100
    exit_taxes = transaction_charges + gst + stamp_duty

    self.tax = extry_taxes + exit_taxes

  def rebalance_position(self):
    total_cost = 0
    total_qty = 0
    for t in self.trades:
      total_cost += t.entry_price * t.quantity
      total_qty += t.quantity
    if total_qty == 0:
      print(self)
    self.avg_entry_price = total_cost / total_qty
    self.quantity = total_qty
    self.tp = self.avg_entry_price * (1 + tp_perc / 100)

  def add_trade(self, trade: Trade):
    self.trades.append(trade)
    self.rebalance_position()

def select_stocks_for_entry(_df: pd.DataFrame):
  selected_stocks = []
  for _, row in _df.iterrows():
    if active_positions.get(row.Stock, None) is None:
      selected_stocks.append(row)
  return selected_stocks[:2]

def get_capital():
  if len(capital_buckets) == 0:
    return None
  return capital_buckets.pop()

def has_capital():
  return len(capital_buckets) > 0

def update_capital_buckets(c = 0, _date = None):
  global capital_buckets

  if len(capital_buckets) > 0:
    capital_buckets = [(sum(capital_buckets) + c)/len(capital_buckets)] * len(capital_buckets)

def init_new_position(selected_stock, capital: float, _date):
  if selected_stock is not None:
    qty = int(capital * leverage / selected_stock.Entry_Price)
    if qty < 0:
      print(selected_stock.Entry_Price, capital, leverage)
      raise Exception("Quantity can't be negative")

    if qty == 0:
      return None
    position = Position(selected_stock.Stock, selected_stock.Entry_Time)
    trade = Trade(selected_stock.Entry_Time, selected_stock.Entry_Price, qty)
    position.add_trade(trade)

    remaining_capital = capital - (qty * trade.entry_price) / leverage
    update_capital_buckets(remaining_capital, _date)
    return position

def avg_out_active_positions(k, v):
  global active_positions, capital_buckets
  if len(active_positions) == 0:
    return

  apdf = (
      pd.DataFrame([asdict(p) for p in active_positions.values() if p is not None]).set_index('stock')
      .join(v.rename(columns={'Stock': 'stock'}).set_index('stock')[['Close', 'Entry_Time', 'Entry_Price']])
  )
  apdf['perc_change'] = (apdf.Close - apdf.avg_entry_price)/apdf.avg_entry_price * 100
  rebalance_stock = apdf[apdf.perc_change <= -rebalance_perc].sort_values('perc_change')
  if len(rebalance_stock) == 0:
    return

  rebalance_stock = rebalance_stock.iloc[0]

  capital = get_capital()
  if capital is not None:
    position = active_positions[rebalance_stock.name]
    qty = position.quantity / len(position.trades)

    trade = Trade(rebalance_stock.Entry_Time, rebalance_stock.Entry_Price, qty)
    position.add_trade(trade)
    remaining_capital = capital - (qty * trade.entry_price)/ leverage
    update_capital_buckets(remaining_capital, k)

def book_profit(k, v):
  global capital_buckets
  if len(active_positions) == 0:
    return

  apdf = (
      pd.DataFrame([asdict(p) for p in active_positions.values() if p is not None]).set_index('stock')
        .join(v.rename(columns={'Stock': 'stock'}).set_index('stock')[['High', 'Close', 'Date']])
  )
  apdf = apdf[apdf.High >= apdf.tp]
  apdf['perc_change'] = (apdf.Close - apdf.avg_entry_price)/apdf.avg_entry_price * 100

  selected_stocks = apdf.sort_values('perc_change', ascending=False)
  if len(selected_stocks) > 0:
    selected_stock = selected_stocks.iloc[0]
    position = active_positions[selected_stock.name]
    position.close(selected_stock.Date, selected_stock.tp)
    closed_positions.append(position)
    active_positions[position.stock] = None

    capital_buckets += [position.exit_margin() / len(position.trades)] * len(position.trades)
    update_capital_buckets(_date=k)

def backtest():
  for k, v in df.groupby('Date'):
    global active_positions, capital_buckets

    old_len = len(capital_buckets)
    post_new_entry_len = 0
    post_avg_len = 0
    if has_capital():
      selected_stocks = select_stocks_for_entry(signals[signals.Date == k])
      if len(selected_stocks) > 0:
        # for s in selected_stocks:
        capital = get_capital()
        if capital is None:
          break
        position = init_new_position(selected_stocks[0], capital, k)
        if position is not None:
          active_positions[position.stock] = position
          post_new_entry_len = len(capital_buckets)
        else:
          capital_buckets.append(capital)
      else:
        avg_out_active_positions(k, v)
        post_avg_len = len(capital_buckets)

    # else:
    #   print(k, sum([len(p.trades) for p in active_positions.values() if p is not None]), len(capital_buckets))

    book_profit(k, v)
    post_book_profit_len = len(capital_buckets)

    capital_in_use = sum([len(p.trades) for p in active_positions.values() if p is not None])
    if str(k.date()) >= '2015-07-13':
      print(k.date(), old_len, post_new_entry_len, post_avg_len, post_book_profit_len, capital_in_use, capital_buckets)
      print()
    # if str(k.date()) == '2024-01-03':
    #   break

backtest()


2015-07-13 6 5 0 5 35 [np.float64(5795.769493506415), np.float64(5795.769493506415), np.float64(5795.769493506415), np.float64(5795.769493506415), np.float64(5795.769493506415)]

2015-07-14 5 0 4 4 36 [np.float64(7066.511866883018), np.float64(7066.511866883018), np.float64(7066.511866883018), np.float64(7066.511866883018)]

2015-07-15 4 0 3 4 36 [np.float64(8215.358154989048), np.float64(8215.358154989048), np.float64(8215.358154989048), np.float64(8215.358154989048)]

2015-07-16 4 0 3 4 36 [np.float64(9389.341941260216), np.float64(9389.341941260216), np.float64(9389.341941260216), np.float64(9389.341941260216)]

2015-07-17 4 0 3 5 35 [np.float64(9650.727116964541), np.float64(9650.727116964541), np.float64(9650.727116964541), np.float64(9650.727116964541), np.float64(9650.727116964541)]

2015-07-20 5 0 4 5 35 [np.float64(11558.862130543463), np.float64(11558.862130543463), np.float64(11558.862130543463), np.float64(11558.862130543463), np.float64(11558.862130543463)]

2015-07-21 5 4

In [141]:
from dataclasses import  asdict
pd.DataFrame([asdict(p) for p in closed_positions]).sort_values(['entry_time']).to_csv(f'{root_path}/closed_positions.csv', index=False)


In [94]:
[len(p.trades) for p in active_positions.values() if p is not None], len(capital_buckets)

([1, 4, 3, 1, 1, 1, 3, 1, 1, 1, 3, 1, 1, 1, 1, 1, 2, 1, 6, 1, 4, 1], 0)

In [None]:
signals[signals.Date == '2025-07-02 05:30:00']

Unnamed: 0,Stock,Date,Open,High,Low,Close,Volume,RSI,Entry_Price,Entry_Time
7913,NSE:UNITDSPR-EQ,2025-07-02 05:30:00,1403.9,1406.6,1380.4,1383.4,1464003,30.145129,1381.7,2025-07-03 05:30:00


In [None]:
no_stocks_found_days

281

In [None]:
signals.to_csv(f'{root_path}/signals.csv', index=False)

In [None]:
closed_positions

NameError: name 'closed_positions' is not defined