In [1]:
import yfinance as yf
import pandas as pd
import pandas_ta as ta


In [3]:

class StockBacktester:
    def __init__(self, stock_symbol, interval='1h', period='2y', position_size=100000):
        self.stock_symbol = stock_symbol
        self.interval = interval
        self.period = period
        self.position_size = position_size
        self.stock_data = None
        self.df_daily = None
        self.results = None
        self.trades = []

    def download_stock_data(self):
        """
        Download historical stock data from Yahoo Finance.
        """
        self.stock_data = pd.read_csv(f'Trading_codes/nifty_200/data/{self.stock_symbol}')

    def resample_to_daily(self):
        """
        Resample the hourly stock data to daily data for pivot calculations.
        """
        self.df_daily = self.stock_data.resample('D').agg({
            'Open': 'first',
            'High': 'max',
            'Low': 'min',
            'Close': 'last'
        }).dropna()

    def calculate_pivots(self):
        """
        Calculate daily Pivot Points, TC (Top Central Pivot), BC (Bottom Central Pivot),
        R1, R2, R3 (Resistance levels), and S1, S2, S3 (Support levels).
        """
        high = self.df_daily['High']
        low = self.df_daily['Low']
        close = self.df_daily['Close']

        # Pivot Point (PP)
        self.df_daily['PP'] = (high + low + close) / 3

        # Resistance levels
        self.df_daily['R1'] = 2 * self.df_daily['PP'] - low
        self.df_daily['R2'] = self.df_daily['PP'] + (high - low)
        self.df_daily['R3'] = self.df_daily['R1'] + (high - low)

        # Support levels
        self.df_daily['S1'] = 2 * self.df_daily['PP'] - high
        self.df_daily['S2'] = self.df_daily['PP'] - (high - low)
        self.df_daily['S3'] = self.df_daily['S1'] - (high - low)

        # Top Central Pivot (TC) and Bottom Central Pivot (BC)
        self.df_daily['TC'] = (high + low) / 2
        self.df_daily['BC'] = (self.df_daily['PP'] + self.df_daily['TC']) / 2

        # Drop unnecessary columns if they exist
        self.df_daily = self.df_daily.drop(columns=['Open', 'High', 'Low', 'Close'], errors='ignore')


    def add_technical_indicators(self):
        """
        Calculate VWMA and RSI indicators and add them to the stock data.
        """
        self.stock_data['VWMA'] = ta.vwma(self.stock_data['Close'], self.stock_data['Volume'], length=14)
        self.stock_data['RSI'] = ta.rsi(self.stock_data['Close'], length=14)

    def merge_pivots_with_stock_data(self):
        """
        Merge the daily pivot points with the hourly stock data.
        """
        self.stock_data = self.stock_data.merge(self.df_daily[['PP', 'R1']], left_index=True, right_index=True, how='left')
        self.stock_data['%CPR'] = (self.df_daily['R1'] - self.df_daily['PP']) / self.df_daily['PP'] * 100

    def generate_signals(self):
        """
        Generate buy and sell signals based on the trading strategy.
        """
        # Calculate Buy Entry Condition
        self.stock_data['Buy_Entry'] = (
            (self.stock_data['RSI'] > 60) &
            (self.stock_data['%CPR'] < 0.5) &
            (self.stock_data['PP'] > self.stock_data['PP'].shift(1)) &
            (self.stock_data['VWMA'] > self.stock_data['R1']) &
            (self.stock_data['VWMA'].shift(1) <= self.stock_data['R1'].shift(1))  # VWMA crosses above R1
        )

        # Initialize Entry_Price column
        self.stock_data['Entry_Price'] = None

        # Assign Entry_Price on Buy_Entry
        self.stock_data.loc[self.stock_data['Buy_Entry'], 'Entry_Price'] = self.stock_data['Close']

        # Forward fill the Entry_Price to keep it until the exit
        self.stock_data['Entry_Price'] = self.stock_data['Entry_Price'].ffill()

        # Calculate Buy Exit Condition
        self.stock_data['Buy_Exit'] = (
            (self.stock_data['Close'] < self.stock_data['R1']) & 
            (self.stock_data['Close'] > self.stock_data['Entry_Price'] * 0.985) &  # SL at 0.5% below entry
            (self.stock_data['Close'] < self.stock_data['Entry_Price'] * 1.015)    # Target at 1.5% above entry
        )

    def execute_trades(self):
        """
        Execute trades based on the generated buy and sell signals.
        """
        position = 0
        for i in range(1, len(self.stock_data)):
            if self.stock_data['Buy_Entry'].iloc[i] and position == 0:
                # Buy the stock
                entry_price = self.stock_data['Close'].iloc[i]
                shares = self.position_size // entry_price
                position = shares * entry_price
                self.trades.append({
                    'Type': 'Buy',
                    'Shares': shares,
                    'Price': entry_price,
                    'Value': position,
                    'Date': self.stock_data.index[i]
                })
            elif self.stock_data['Buy_Exit'].iloc[i] and position > 0:
                # Sell the stock
                exit_price = self.stock_data['Close'].iloc[i]
                value = shares * exit_price
                profit = value - position
                self.trades.append({
                    'Type': 'Sell',
                    'Shares': shares,
                    'Price': exit_price,
                    'Value': value,
                    'Profit': profit,
                    'Date': self.stock_data.index[i]
                })
                position = 0

    def run_backtest(self):
        """
        Execute the backtesting process.
        """
        self.download_stock_data()
        self.resample_to_daily()
        self.calculate_pivots()
        self.add_technical_indicators()
        self.merge_pivots_with_stock_data()
        self.generate_signals()
        self.execute_trades()
        self.results = pd.DataFrame(self.trades)
        return self.results


class MultipleStockBacktester:
    def __init__(self, stock_list, position_size=100000):
        self.stock_list = stock_list
        self.position_size = position_size
        self.backtest_results = {}

    def run(self):
        """
        Run the backtest for each stock in the list.
        """
        for stock in self.stock_list:
            tester = StockBacktester(stock, position_size=self.position_size)
            self.backtest_results[stock] = tester.run_backtest()

    def get_results(self):
        """
        Return the results of the backtesting process.
        """
        return self.backtest_results


In [4]:
class MultipleStockBacktester:
    def __init__(self, stock_list, position_size=100000):
        self.stock_list = stock_list
        self.position_size = position_size
        self.backtest_results = {}

    def run(self):
        """
        Run the backtest for each stock in the list.
        """
        for stock in self.stock_list:
            tester = StockBacktester(stock, position_size=self.position_size)
            self.backtest_results[stock] = tester.run_backtest()

    def get_results(self):
        """
        Return the results of the backtesting process.
        """
        return self.backtest_results

