In [1]:
import asyncio
import datetime as dt
import math
from typing import Literal

import matplotlib.pyplot as plt
import mplfinance as mpf
import numpy as np
import pandas as pd
import pandas_market_calendars as mcal
import plotly.graph_objects as go
import polars as pl
from dash import Dash, dcc, html
from plotly.subplots import make_subplots

nse = mcal.get_calendar("NSE")

In [2]:
pd.set_option("display.max_rows", 25_000)
pd.set_option("display.max_columns", 500)
pl.Config.set_tbl_cols(500)
pl.Config.set_tbl_rows(10_000)

pd.options.display.float_format = "{:.4f}".format

In [3]:
import sys

sys.path.append("..")
from tooling.enums import AssetClass, Index, Spot, StrikeSpread
from tooling.fetch import fetch_option_data, fetch_spot_data
from tooling.filter import find_atm, option_tool

In [4]:
async def get_expiry(f_today, index):

    if index == 'bnf':    
        if (f_today <= dt.date(2024, 1, 25)) and (f_today >= dt.date(2024, 1, 18)):
            f_expiry = dt.date(2024, 1, 25)
        elif (f_today <= dt.date(2024, 1, 31)) and (f_today >= dt.date(2024, 1, 26)):
            f_expiry = dt.date(2024, 1, 31)
        elif (f_today <= dt.date(2024, 2, 22)) and (f_today >= dt.date(2024, 2, 29)):
            f_expiry = dt.date(2024, 2, 29)
        elif (f_today <= dt.date(2024, 3, 25)) and (f_today >= dt.date(2024, 3, 27)):
            f_expiry = dt.date(2024, 2, 27)
        elif f_today < dt.date(2023, 9, 1):
            days_to_thursday = (3 - f_today.weekday()) % 7
            nearest_thursday = f_today + dt.timedelta(days=days_to_thursday)
            f_expiry = nearest_thursday
            if nse.valid_days(start_date=nearest_thursday, end_date=nearest_thursday).empty:
                f_expiry = nearest_thursday - dt.timedelta(days=1)
        elif f_today >= dt.date(2023, 9, 1):
            if f_today.day < 24:
                days_to_wednesday = (2 - f_today.weekday()) % 7
                nearest_wednesday = f_today + dt.timedelta(days=days_to_wednesday)
                f_expiry = nearest_wednesday
                if nse.valid_days(
                    start_date=nearest_wednesday, end_date=nearest_wednesday
                ).empty:
                    f_expiry = nearest_wednesday - dt.timedelta(days=1)
            else:
                days_to_thursday = (3 - f_today.weekday()) % 7
                nearest_thursday = f_today + dt.timedelta(days=days_to_thursday)
                f_expiry = nearest_thursday
                if nse.valid_days(
                    start_date=nearest_thursday, end_date=nearest_thursday
                ).empty:
                    f_expiry = nearest_thursday - dt.timedelta(days=1)
        return f_expiry

    elif index == 'nifty':
        days_to_thursday = (3 - f_today.weekday()) % 7
        nearest_thursday = f_today + dt.timedelta(days=days_to_thursday)
        f_expiry = nearest_thursday
        if nse.valid_days(start_date=nearest_thursday, end_date=nearest_thursday).empty:
            f_expiry = nearest_thursday - dt.timedelta(days=1)
        return f_expiry

    elif index == 'finnifty' or index == 'fnf':
        days_to_thursday = (1 - f_today.weekday()) % 7
        nearest_thursday = f_today + dt.timedelta(days=days_to_thursday)
        f_expiry = nearest_thursday
        if nse.valid_days(start_date=nearest_thursday, end_date=nearest_thursday).empty:
            f_expiry = nearest_thursday - dt.timedelta(days=1)
        return f_expiry

    elif index == 'midcpnifty' or index == 'midcp':
        days_to_thursday = (0 - f_today.weekday()) % 7
        nearest_thursday = f_today + dt.timedelta(days=days_to_thursday)
        f_expiry = nearest_thursday
        if nse.valid_days(start_date=nearest_thursday, end_date=nearest_thursday).empty:
            f_expiry = nearest_thursday - dt.timedelta(days=1)
        return f_expiry

async def get_expiry_nifty(f_today):

    days_to_thursday = (3 - f_today.weekday()) % 7
    nearest_thursday = f_today + dt.timedelta(days=days_to_thursday)
    f_expiry = nearest_thursday
    if nse.valid_days(start_date=nearest_thursday, end_date=nearest_thursday).empty:
        f_expiry = nearest_thursday - dt.timedelta(days=1)
    return f_expiry


async def get_option_contract_name(symbol, strike, expiry, opt_type):
    temp = "0"
    mth = expiry.month

    if (expiry + dt.timedelta(days=7)).month != expiry.month:
        date_string = expiry.strftime("%y%b").upper()
        return f"{symbol}{date_string}{strike}{opt_type}"
    else:
        if expiry.day <= 9:
            date_string = f"{expiry.year - 2000}{mth}{temp}{expiry.day}"
        else:
            date_string = f"{expiry.year - 2000}{mth}{expiry.day}"
        return f"{symbol}{date_string}{strike}{opt_type}"


def get_option_contract_name2(symbol, strike, expiry, opt_type):
    temp = "0"
    mth = expiry.month

    if (expiry + dt.timedelta(days=7)).month != expiry.month:
        date_string = expiry.strftime("%y%b").upper()
        return f"{symbol}{date_string}{strike}{opt_type}"
    else:
        if expiry.day <= 9:
            date_string = f"{expiry.year - 2000}{mth}{temp}{expiry.day}"
        else:
            date_string = f"{expiry.year - 2000}{mth}{expiry.day}"
        return f"{symbol}{date_string}{strike}{opt_type}"

In [11]:
bnf_1min = pd.read_csv("../data/nifty_wave.csv")
bnf_1min["datetime"] = pd.to_datetime(bnf_1min["time"])
bnf_1min = bnf_1min[bnf_1min["datetime"].dt.year >= 2017]

In [12]:
bnf_1min.tail()

Unnamed: 0,time,open,high,low,close,MA,Plot,Zero line,MACD Signal,datetime
17188,2024-12-05T09:15:00+05:30,24512.2,24539.95,24396.2,24400.1,24258.1475,59.4254,0,59.4254,2024-12-05 09:15:00+05:30
17189,2024-12-05T10:15:00+05:30,24400.35,24441.75,24295.55,24368.05,24259.3125,33.7507,0,33.7507,2024-12-05 10:15:00+05:30
17190,2024-12-05T11:15:00+05:30,24367.6,24604.9,24330.1,24589.9,24266.8637,70.0144,0,70.0144,2024-12-05 11:15:00+05:30
17191,2024-12-05T12:15:00+05:30,24589.7,24648.6,24560.75,24631.1,24275.7887,99.4746,0,99.4746,2024-12-05 12:15:00+05:30
17192,2024-12-05T13:15:00+05:30,24633.85,24677.2,24570.75,24650.5,24285.1175,118.7053,0,118.7053,2024-12-05 13:15:00+05:30


In [13]:
def resample(
    data: pl.DataFrame, timeframe, offset: dt.timedelta | None = None
) -> pl.DataFrame:
    return (
        data.set_sorted("datetime")
        .group_by_dynamic(
            index_column="datetime",
            every=timeframe,
            period=timeframe,
            label="left",
            offset=offset,
        )
        .agg(
            [
                pl.col("open").first().alias("open"),
                pl.col("high").max().alias("high"),
                pl.col("low").min().alias("low"),
                pl.col("close").last().alias("close"),
                pl.col("volume").sum().alias("volume"),
            ]
        )
    )


# ohlc_resampled = resample(pl.DataFrame(bnf_1min), '7d', pd.Timedelta(days=4))
# ohlc_resampled

In [14]:
bnf_1min["datetime"] = pd.to_datetime(bnf_1min["datetime"])
list_of_traded_dates = set(bnf_1min["datetime"].dt.date)
list_of_traded_dates

{datetime.date(2020, 9, 28),
 datetime.date(2018, 6, 25),
 datetime.date(2023, 5, 12),
 datetime.date(2023, 8, 28),
 datetime.date(2023, 12, 13),
 datetime.date(2023, 7, 10),
 datetime.date(2022, 1, 5),
 datetime.date(2022, 2, 15),
 datetime.date(2019, 9, 26),
 datetime.date(2023, 4, 27),
 datetime.date(2023, 9, 11),
 datetime.date(2020, 5, 29),
 datetime.date(2019, 2, 6),
 datetime.date(2022, 2, 4),
 datetime.date(2017, 4, 19),
 datetime.date(2020, 2, 5),
 datetime.date(2024, 3, 1),
 datetime.date(2023, 11, 1),
 datetime.date(2018, 10, 10),
 datetime.date(2023, 11, 3),
 datetime.date(2020, 4, 1),
 datetime.date(2021, 1, 21),
 datetime.date(2024, 4, 8),
 datetime.date(2024, 3, 19),
 datetime.date(2019, 9, 17),
 datetime.date(2022, 9, 30),
 datetime.date(2021, 4, 7),
 datetime.date(2022, 8, 11),
 datetime.date(2024, 6, 20),
 datetime.date(2021, 8, 20),
 datetime.date(2023, 1, 23),
 datetime.date(2023, 5, 29),
 datetime.date(2022, 12, 16),
 datetime.date(2017, 6, 23),
 datetime.date(2017

In [16]:
import pandas as pd

def calculate_signals(df, n, rsi_period, rsi_overbought, ema_length):
    # Calculate RSI
    def calculate_rsi(series, period):
        delta = series.diff()
        gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
        rs = gain / loss
        rsi = 100 - (100 / (1 + rs))
        return rsi

    df['RSI'] = calculate_rsi(df['close'], rsi_period)

    # Calculate the previous n candles' low
    df['Prev_N_Low'] = df['low'].rolling(window=n).min().shift(1)
    df['ema'] = df['close'].ewm(span=ema_length, adjust=False).mean()

    # Signal generation logic
    df['Sell_Signal'] = (
        (df['close'] < df['Prev_N_Low']) &
        (df['RSI'] > rsi_overbought)
    )

    return df

In [17]:
bnf_1min.head()

Unnamed: 0,time,open,high,low,close,MA,Plot,Zero line,MACD Signal,datetime
3497,2017-01-02T09:15:00+05:30,8212.0,8212.0,8141.0,8154.8,8037.0038,51.3767,0,51.3767,2017-01-02 09:15:00+05:30
3498,2017-01-02T10:15:00+05:30,8155.15,8161.8,8133.8,8146.8,8040.765,41.7773,0,41.7773,2017-01-02 10:15:00+05:30
3499,2017-01-02T11:15:00+05:30,8146.95,8159.05,8137.5,8150.3,8044.7463,35.4942,0,35.4942,2017-01-02 11:15:00+05:30
3500,2017-01-02T12:15:00+05:30,8150.85,8166.55,8144.95,8165.35,8049.305,34.1609,0,34.1609,2017-01-02 12:15:00+05:30
3501,2017-01-02T13:15:00+05:30,8165.8,8195.6,8160.8,8192.45,8054.37,38.7247,0,38.7247,2017-01-02 13:15:00+05:30


In [18]:
lev_ = 8
portfolio_value = 10000000
lot_size_ = 75
slippage_ = 0.0002

In [19]:
def backtest_short(df, n, rsi_period, rsi_overbought, ema_length):
    df = df.reset_index(drop=True)
    trade_book = []
    in_trade = False
    points = 0
    is_trailing_active = False
    trailing_sl = 0
    df = calculate_signals(df, n, rsi_period, rsi_overbought, ema_length)
    # print(df[df['Sell_Signal']].to_string())
    for i in range(n, len(df)):
        # print(i, df.loc[i], df.loc[i, 'open'])
        # break
        if df.loc[i, 'Sell_Signal'] and not in_trade:
            # Entry Triggered
            entry_price = df.loc[i, 'close']
            entry_time = df.loc[i, 'datetime']
            initial_sl = df.loc[i-n:i, 'high'].max()
            sl_in_points = initial_sl - entry_price
            target = 5 * sl_in_points
            in_trade = True

        if in_trade:
            if df.loc[i, 'high'] < df.loc[i, 'ema']:
                is_trailing_active = True
            if is_trailing_active:
                trailing_sl = df.loc[i, 'ema']
            # print(df.loc[i, 'open'], initial_sl)
            # break
            if df.loc[i, 'open'] > initial_sl and not points:
                # Gap condition
                if df.loc[i, 'close'] < initial_sl:
                    initial_sl = df.loc[i, 'high']
                else:
                    exit_price = df.loc[i, 'close']
                    exit_time = df.loc[i, 'datetime']
                    in_trade = False
                    remark = 'Gap SL'
                    points = entry_price - exit_price
            if df.loc[i, 'high'] > initial_sl and not points:
                # Initial SL Hit
                exit_price = df.loc[i, 'close']
                exit_time = df.loc[i, 'datetime']
                in_trade = False
                remark = 'Initial SL'
                points = entry_price - exit_price
            if is_trailing_active:
                if df.loc[i, 'close'] > trailing_sl and not points:
                    # TSL Hit
                    exit_price = df.loc[i, 'close']
                    exit_time = df.loc[i, 'datetime']
                    in_trade = False
                    remark = 'Trailing SL'
                    points = entry_price - exit_price
            if df.loc[i, 'low'] < entry_price - target and not points:
                # Target Hit
                exit_price = entry_price - target
                exit_time = df.loc[i, 'datetime']
                in_trade = False
                remark = 'Target'
                points = entry_price - exit_price

        if points:
            index_lev = lev_
            qty = int(round(portfolio_value * index_lev / entry_price / lot_size_)) * lot_size_
            slippage = slippage_ * (entry_price + exit_price)
            final_points = points - slippage
            trade = {
                "Trade Type": "Short",
                "Entry Time": entry_time,
                "Entry Price": entry_price,
                "Initial SL": initial_sl,
                "Trailing SL": trailing_sl,
                "Exit Time": exit_time,
                "Exit Price": exit_price,
                "Points Captured": points,
                "Slippage in Points": slippage,
                "After Costs": final_points,
                "PnL": final_points * qty,
                "Remarks": remark,
                "Qty": qty,
                "Leverage": index_lev,
                "ROI%": (final_points * qty / portfolio_value) * 100,
                "Trade Year": entry_time.year,
                "Trade Month": entry_time.month,
            }
            # print(trade)
            trade_book.append(trade)
            points = 0
            in_trade = False
            is_trailing_active = False
            trailing_sl = 0

    return pd.DataFrame(trade_book)
            

In [22]:
tb = backtest_short(bnf_1min, n=3, rsi_period=5, rsi_overbought=50, ema_length=14)

KeyError: 3

In [None]:
def generate_stats(tb_expiry, variation):
    stats_df8 = pd.DataFrame(
        index=range(2018, 2025),
        columns=[
            "Total ROI",
            "Total Trades",
            "Win Rate",
            "Avg Profit% per Trade",
            "Avg Loss% per Trade",
            "Max Drawdown",
            "ROI/DD Ratio",
            "Variation",
        ],
    )
    combined_df_sorted = tb_expiry
    # combined_df_sorted = tb_expiry_ce
    # combined_df_sorted = tb_expiry_pe
    
    # Iterate over each year
    for year in range(2018, 2025):
        # Filter trades for the current year
        year_trades = combined_df_sorted[(combined_df_sorted["Trade Year"] == year)]
    
        # Calculate total ROI
        total_roi = year_trades["ROI%"].sum()
    
        # Calculate total number of trades
        total_trades = len(year_trades)
    
        # Calculate win rate
        win_rate = (year_trades["ROI%"] > 0).mean() * 100
    
        # Calculate average profit per trade
        avg_profit = year_trades[year_trades["ROI%"] > 0]["ROI%"].mean()
    
        # Calculate average loss per trade
        avg_loss = year_trades[year_trades["ROI%"] < 0]["ROI%"].mean()
    
        # Calculate maximum drawdown
        max_drawdown = (
            year_trades["ROI%"].cumsum() - year_trades["ROI%"].cumsum().cummax()
        ).min()
    
        # Calculate ROI/DD ratio
        roi_dd_ratio = total_roi / abs(max_drawdown)

        variation = variation
    
        # Store the statistics in the DataFrame
        stats_df8.loc[year] = [
            total_roi,
            total_trades,
            win_rate,
            avg_profit,
            avg_loss,
            max_drawdown,
            roi_dd_ratio,
            variation,
        ]
    
    # Calculate overall statistics
    overall_total_roi = stats_df8["Total ROI"].sum()
    overall_total_trades = stats_df8["Total Trades"].sum()
    overall_win_rate = (combined_df_sorted["ROI%"] > 0).mean() * 100
    overall_avg_profit = combined_df_sorted[combined_df_sorted["ROI%"] > 0]["ROI%"].mean()
    overall_avg_loss = combined_df_sorted[combined_df_sorted["ROI%"] < 0]["ROI%"].mean()
    overall_max_drawdown = (
        combined_df_sorted["ROI%"].cumsum() - combined_df_sorted["ROI%"].cumsum().cummax()
    ).min()
    overall_roi_dd_ratio = overall_total_roi / abs(overall_max_drawdown)
    overall_variation = variation
    
    # Store the overall statistics in the DataFrame
    stats_df8.loc["Overall"] = [
        overall_total_roi,
        overall_total_trades,
        overall_win_rate,
        overall_avg_profit,
        overall_avg_loss,
        overall_max_drawdown,
        overall_roi_dd_ratio,
        overall_variation,
    ]
    return {overall_roi_dd_ratio : stats_df8}

In [21]:
variation = 1
stats = generate_stats(tb, variation)
for x, y in stats.items():
    final_stats = y

final_stats

NameError: name 'generate_stats' is not defined

In [431]:
tb[tb['Trade Year'] > 2023]

Unnamed: 0,Trade Type,Entry Time,Entry Price,Initial SL,Trailing SL,Exit Time,Exit Price,Points Captured,Slippage in Points,After Costs,PnL,Remarks,Qty,Leverage,ROI%,Trade Year,Trade Month
186,Short,2024-01-05 10:15:00+05:30,48166.65,48381.95,48076.049,2024-01-05 14:45:00+05:30,48167.6,-0.95,9.6334,-10.5834,-10953.8449,Trailing SL,1035,5,-0.1095,2024,1
187,Short,2024-01-12 14:45:00+05:30,47659.95,47873.7,0.0,2024-01-15 09:15:00+05:30,48073.55,-413.6,9.5733,-423.1734,-444332.0175,Initial SL,1050,5,-4.4433,2024,1
188,Short,2024-01-18 12:15:00+05:30,45862.65,46184.05,45926.3041,2024-01-19 09:15:00+05:30,46084.8,-222.15,9.1947,-231.3447,-253322.4958,Trailing SL,1095,5,-2.5332,2024,1
189,Short,2024-01-19 10:45:00+05:30,45930.15,46249.85,45831.7843,2024-01-20 09:15:00+05:30,45873.5,56.65,9.1804,47.4696,51979.2503,Trailing SL,1095,5,0.5198,2024,1
190,Short,2024-01-24 11:15:00+05:30,45057.2,45485.5,45006.9758,2024-01-24 14:15:00+05:30,45098.75,-41.55,9.0156,-50.5656,-56127.8105,Trailing SL,1110,5,-0.5613,2024,1
191,Short,2024-01-25 09:45:00+05:30,44824.35,45148.2,44702.1564,2024-01-25 14:45:00+05:30,44872.45,-48.1,8.9697,-57.0697,-63347.3448,Trailing SL,1110,5,-0.6335,2024,1
192,Short,2024-01-31 12:15:00+05:30,45915.65,46179.75,0.0,2024-02-01 11:45:00+05:30,46098.95,-183.3,9.2015,-192.5015,-210789.0987,Initial SL,1095,5,-2.1079,2024,1
193,Short,2024-02-12 09:45:00+05:30,45376.35,45748.5,45055.1683,2024-02-13 09:15:00+05:30,45139.25,237.1,9.0516,228.0484,249713.0418,Trailing SL,1095,5,2.4971,2024,2
194,Short,2024-02-13 15:15:00+05:30,45419.85,45750.4,45304.8074,2024-02-14 10:45:00+05:30,45361.35,58.5,9.0781,49.4219,54116.9586,Trailing SL,1095,5,0.5412,2024,2
195,Short,2024-03-20 14:15:00+05:30,46283.95,46655.55,0.0,2024-03-21 09:15:00+05:30,46809.25,-525.3,9.3093,-534.6093,-577378.0656,Gap SL,1080,5,-5.7738,2024,3


In [None]:
stats_dictionary = {}
rsi_lengths = [4, 5, 6, 7, 8, 9, 12, 15]
for i in range(2, 5):
    for j in rsi_lengths:
        for k in range(30, 66, 5):
            for l in range(5, 36, 5):
                variation = f'n {i}, rsi {j}, threshold {k}, ema {l}'
                print(variation)
                tb = backtest_short(bnf_1min, i, j, k, l)
                if len(tb) > 0:
                    stats = generate_stats(tb, variation)
                    for x, y in stats.items():
                        if x > 8:
                            final_stats = y
                            print(final_stats)
                            stats_dictionary[x] = y


n 2, rsi 4, threshold 30, ema 5
n 2, rsi 4, threshold 30, ema 10
n 2, rsi 4, threshold 30, ema 15
n 2, rsi 4, threshold 30, ema 20
n 2, rsi 4, threshold 30, ema 25
n 2, rsi 4, threshold 30, ema 30
n 2, rsi 4, threshold 30, ema 35
n 2, rsi 4, threshold 35, ema 5
n 2, rsi 4, threshold 35, ema 10
n 2, rsi 4, threshold 35, ema 15
n 2, rsi 4, threshold 35, ema 20
n 2, rsi 4, threshold 35, ema 25
n 2, rsi 4, threshold 35, ema 30
n 2, rsi 4, threshold 35, ema 35
n 2, rsi 4, threshold 40, ema 5
n 2, rsi 4, threshold 40, ema 10
n 2, rsi 4, threshold 40, ema 15
n 2, rsi 4, threshold 40, ema 20
n 2, rsi 4, threshold 40, ema 25
n 2, rsi 4, threshold 40, ema 30
n 2, rsi 4, threshold 40, ema 35
n 2, rsi 4, threshold 45, ema 5
n 2, rsi 4, threshold 45, ema 10
n 2, rsi 4, threshold 45, ema 15
n 2, rsi 4, threshold 45, ema 20
n 2, rsi 4, threshold 45, ema 25
n 2, rsi 4, threshold 45, ema 30
n 2, rsi 4, threshold 45, ema 35
n 2, rsi 4, threshold 50, ema 5
n 2, rsi 4, threshold 50, ema 10
n 2, rsi 4, thr