In [1]:
# Import necessary libraries
import numpy as np
import pandas as pd
from tqdm import trange
from typing import List, Type, Dict, Any
import yfinance as yf
import copy

# Graphing libraries
import plotly.express as px
import plotly.graph_objs as go
from plotly.subplots import make_subplots

# Regression analysis
import statsmodels.api as sm

In [2]:
class Strategy:
    def __init__(self, name: str, data: pd.DataFrame):
        """
        Initialize a Strategy instance.

        Parameters:
        name (str): The name of the strategy.
        data (pd.DataFrame): The data to be used by the strategy.
        """
        self.strategy_name = name
        self.data = copy.deepcopy(data)

    def calculate_signal(self) -> None:
        """
        Calculate the trading signal.

        This method should be implemented by subclasses.
        """
        raise NotImplementedError("Subclasses should implement this method")

    def calculate_positions(self) -> pd.DataFrame:
        """
        Calculate the trading positions.

        This method should be implemented by subclasses.

        Returns:
        pd.DataFrame: A DataFrame containing the calculated positions.
        """
        raise NotImplementedError("Subclasses should implement this method")

In [3]:
class StrategyManager:
    def __init__(self, strategies: List[Type[Strategy]]):
        """
        Initialize a StrategyManager instance.

        Parameters:
        strategies (List[Type[Strategy]]): A list of Strategy instances.
        """
        self.strategies = strategies

    def calculate_all_signals(self) -> None:
        """
        Calculate signals for all strategies.
        """
        for strategy in self.strategies:
            strategy.calculate_signal()

    def calculate_all_positions(self) -> pd.DataFrame:
        """
        Calculate positions for all strategies and combine them into a single DataFrame.

        Returns:
        pd.DataFrame: A DataFrame containing the combined positions from all strategies.
        """
        all_positions = pd.DataFrame()
        for strategy in self.strategies:
            positions = strategy.calculate_positions()
            all_positions = pd.concat([all_positions, positions], axis=0)

        # Drop duplicate columns, keeping the first occurrence
        all_positions = all_positions.loc[:, ~all_positions.columns.duplicated()]

        return all_positions

In [4]:
class Backtest:
    def __init__(self, base_data: pd.DataFrame, strategies: List[Type[Strategy]], weights: Dict[str, float] = {}):
        """
        Initialize a Backtest instance.

        Parameters:
        base_data (pd.DataFrame): The base data to be used for the backtest.
        strategies (List[Type[Strategy]]): A list of Strategy instances.
        weights (Dict[str, float]): A dictionary of weights for each strategy. Default is equal weight.
        """
        self.base_data = copy.deepcopy(base_data)
        self.trades = pd.DataFrame(columns=['time', 'book', 'ticker', 'price', 'units'])
        self.positions = pd.DataFrame(columns=['time', 'book', 'ticker', 'units'])
        self._generator = self._dataframe_generator()
        self.strategy_manager = StrategyManager(strategies)
        self.next_positions = pd.DataFrame(columns=['time', 'ticker', 'units'])
        self.pnl = pd.DataFrame(columns=['time', 'book', 'ticker', 'pnl'])
        self.cumulative_pnl = pd.DataFrame(columns=['time', 'book', 'pnl'])
        self.weights = weights if weights else {strategy.strategy_name: 1 for strategy in strategies}
        self.run_backtest()

    def _dataframe_generator(self) -> pd.DataFrame:
        """
        Generator to yield the base data incrementally.

        Yields:
        pd.DataFrame: The incremental base data.
        """
        for i in range(1, len(self.base_data) + 1):
            yield self.base_data.iloc[:i]

    def next(self) -> bool:
        """
        Get the next incremental data.

        Returns:
        bool: True if there is more data, False otherwise.
        """
        try:
            self.data = next(self._generator)
            return True
        except StopIteration:
            return False

    def update_pnl(self) -> None:
        """
        Update the PnL based on the latest and penultimate positions.
        """
        if len(self.data) > 1:
            latest_date = self.data.index[-1]
            penultimate_date = self.data.index[-2]

            latest_positions = self.positions[self.positions['time'] == penultimate_date]

            for book in latest_positions['book'].unique():
                for ticker in latest_positions['ticker'].unique():
                    latest_prices = self.data.loc[latest_date, ticker]
                    penultimate_prices = self.data.loc[penultimate_date, ticker]
                    diff = latest_prices - penultimate_prices

                    new_pnl = pd.DataFrame([[latest_date, book, ticker, diff * latest_positions.loc[(latest_positions['ticker'] == ticker) & (latest_positions['book'] == book), 'units'].values[0]]], columns=self.pnl.columns)
                    self.pnl = pd.concat([self.pnl, new_pnl], ignore_index=True)

    def update_positions_and_trades(self) -> None:
        """
        Update positions and trades based on the latest and penultimate positions.
        """
        if len(self.data) > 2:
            latest_date = self.data.index[-1]
            penultimate_date = self.data.index[-2]

            latest_positions = self.next_positions[self.next_positions['time'] == latest_date]
            penultimate_positions = self.positions[self.positions['time'] == penultimate_date]

            for index, row in latest_positions.iterrows():
                book = row['book']
                ticker = row['ticker']
                units = row['units']
                if not penultimate_positions.empty:
                    penultimate_units = penultimate_positions.loc[(penultimate_positions['ticker'] == ticker) & (penultimate_positions['book'] == book), 'units'].values
                    if len(penultimate_units) > 0:
                        penultimate_units = penultimate_units[0]
                    else:
                        penultimate_units = 0
                else:
                    penultimate_units = 0

                trade_units = units - penultimate_units
                if trade_units != 0:
                    trade_price = self.data.loc[latest_date, ticker]
                    self.add_trade(latest_date, book, ticker, trade_price, trade_units)

                self.update_position(latest_date, book, ticker, units)

    def run_strategies(self) -> None:
        """
        Run all strategies to calculate signals and positions.
        """
        self.strategy_manager.calculate_all_signals()
        self.next_positions = self.strategy_manager.calculate_all_positions()

    def run_backtest(self) -> None:
        """
        Run the backtest.
        """
        while self.next():
            self.update_pnl()
            self.update_positions_and_trades()
            self.run_strategies()

        # Format PnL
        self.pnl.set_index('time', inplace=True)

    def add_trade(self, time: pd.Timestamp, book: str, ticker: str, buy_price: float, units: float) -> None:
        """
        Add a trade to the trades DataFrame.

        Parameters:
        time (pd.Timestamp): The time of the trade.
        book (str): The book of the trade.
        ticker (str): The ticker of the trade.
        buy_price (float): The price of the trade.
        units (float): The units of the trade.
        """
        new_trade = pd.DataFrame([[time, book, ticker, buy_price, units]], columns=self.trades.columns)
        self.trades = pd.concat([self.trades, new_trade], ignore_index=True)

    def update_position(self, time: pd.Timestamp, book: str, ticker: str, units: float) -> None:
        """
        Update the positions DataFrame.

        Parameters:
        time (pd.Timestamp): The time of the position.
        book (str): The book of the position.
        ticker (str): The ticker of the position.
        units (float): The units of the position.
        """
        if not ((self.positions['book'] == book) & (self.positions['ticker'] == ticker) & (self.positions['time'] == time)).any():
            new_position = pd.DataFrame([[time, book, ticker, units]], columns=self.positions.columns)
            self.positions = pd.concat([self.positions, new_position], ignore_index=True)
        else:
            self.positions.loc[(self.positions['book'] == book) & (self.positions['ticker'] == ticker) & (self.positions['time'] == time), 'units'] = units

    def compute_pnl_book(self) -> pd.DataFrame:
        """
        Compute the PnL for each book.

        Returns:
        pd.DataFrame: The PnL for each book.
        """
        return self.pnl.groupby(['time', 'book'])['pnl'].sum().unstack()

    def compute_cumulative_pnl_book(self) -> pd.DataFrame:
        """
        Compute the cumulative PnL for each book.

        Returns:
        pd.DataFrame: The cumulative PnL for each book.
        """
        return self.pnl.groupby(['time', 'book'])['pnl'].sum().groupby(level=1).cumsum().unstack()

    def compute_pnl(self) -> pd.DataFrame:
        """
        Compute the weighted PnL.

        Returns:
        pd.DataFrame: The weighted PnL.
        """
        pnl = self.pnl.copy()
        for book, weight in self.weights.items():
            pnl.loc[pnl['book'] == book, 'pnl'] *= weight
        return pnl.groupby('time')[['pnl']].sum()

    def compute_cumulative_pnl(self) -> pd.DataFrame:
        """
        Compute the cumulative weighted PnL.

        Returns:
        pd.DataFrame: The cumulative weighted PnL.
        """
        pnl = self.compute_pnl()
        return pnl[['pnl']].cumsum()

    def export_excel(self, filename: str = "backtest_results.xlsx") -> None:
        """
        Export the backtest results to an Excel file.

        Parameters:
        filename (str): The name of the Excel file.
        """
        with pd.ExcelWriter(filename) as writer:
            # Export prices
            self.base_data.to_excel(writer, sheet_name='Prices', index=True)

            # Export positions
            self.positions.to_excel(writer, sheet_name='Positions', index=False)

            # Export trades
            self.trades.to_excel(writer, sheet_name='Trades', index=False)

            # Prepare PnL data
            pnl_ticker = self.pnl.pivot_table(index='time', columns='ticker', values='pnl', aggfunc='sum')
            pnl_ticker.columns = [f'PnL_{col}' for col in pnl_ticker.columns]
            pnl_book = self.compute_pnl_book()
            pnl_book.columns = [f'PnL_{col}' for col in pnl_book.columns]
            cumulative_pnl_book = self.compute_cumulative_pnl_book()
            cumulative_pnl_book.columns = [f'Cumulative_PnL_{col}' for col in cumulative_pnl_book.columns]
            cumulative_pnl = self.compute_cumulative_pnl()
            cumulative_pnl.columns = ['Cumulative_PnL']

            # Write PnL
            pnl_df = pnl_ticker.join(pnl_book).join(cumulative_pnl_book).join(cumulative_pnl)
            pnl_df.to_excel(writer, sheet_name='PnL')

            # Export each strategy
            for strategy in self.strategy_manager.strategies:
                # Filter positions for the current strategy
                book_position = self.positions[self.positions['book'] == strategy.strategy_name].pivot_table(index='time', columns='ticker', values='units', aggfunc='sum')

                # Create a copy of the base data with the relevant columns
                strategy_sheet = self.base_data.loc[:, self.base_data.columns.isin(book_position.columns.to_list())]

                # Join book_position with a suffix
                strategy_sheet = strategy_sheet.join(book_position, rsuffix='_Position')

                # Join PnL and Cumulative PnL data
                strategy_sheet = strategy_sheet.join(pnl_book[[f'PnL_{strategy.strategy_name}']])
                strategy_sheet = strategy_sheet.join(cumulative_pnl_book[[f'Cumulative_PnL_{strategy.strategy_name}']])

                # Export to Excel
                strategy_sheet.to_excel(writer, sheet_name=strategy.strategy_name)

        print(f'Backtest results exported to {filename}')

In [5]:
class MovingAverageStrategy(Strategy):
    def __init__(self, name, data, short_window, long_window):
        super().__init__(name, data)
        self.short_window = short_window
        self.long_window = long_window
        self.tickers = data.columns

        # Compute moving averages for each ticker
        for ticker in self.tickers:
            self.data[f'{self.strategy_name}_{ticker}_short_ma'] = self.data[ticker].rolling(window=self.short_window).mean()
            self.data[f'{self.strategy_name}_{ticker}_long_ma'] = self.data[ticker].rolling(window=self.long_window).mean()

    def calculate_signal(self):
        for ticker in self.tickers:
            self.data[f'{self.strategy_name}_{ticker}_signal'] = 0.0
            self.data[f'{self.strategy_name}_{ticker}_signal'][self.short_window:] = np.where(
                self.data[f'{self.strategy_name}_{ticker}_short_ma'][self.short_window:] > self.data[f'{self.strategy_name}_{ticker}_long_ma'][self.short_window:], 1.0, 0.0
            )

    def calculate_positions(self):
        positions_list = []
        for ticker in self.tickers:
            positions = pd.DataFrame({
                'time': self.data.index,
                'book': self.strategy_name,
                'ticker': ticker,
                'units': np.where(self.data[f'{self.strategy_name}_{ticker}_signal'] == 1.0, 10, -10)
            })
            positions_list.append(positions)
        return pd.concat(positions_list)

In [6]:
# Example usage
if __name__ == "__main__":
    # Define tickers and date range
    tickers = ['AAPL', 'TSLA', 'OXY']
    start_date = '2023-01-01'
    end_date = '2024-01-01'

    # Fetch data for each ticker
    data_dict = {}
    for ticker in tickers:
        data = yf.download(ticker, start=start_date, end=end_date)
        data = data[['Close']].rename(columns={'Close': ticker})
        data_dict[ticker] = data

    # Combine data into a single DataFrame
    data = pd.concat(data_dict.values(), axis=1)

    # Example strategies
    strategies = [MovingAverageStrategy(name="ShortMovingAverageStrategy", data=data, short_window=7, long_window=21), MovingAverageStrategy(name="LongMovingAverageStrategy", data=data, short_window=20, long_window=60)]

    # Initialize backtest
    backtest = Backtest(base_data=data, strategies=strategies, weights={"ShortMovingAverageStrategy": 0.5, "LongMovingAverageStrategy": 0.5})

[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed


In [7]:
backtest.export_excel('test_file.xlsx')

Backtest results exported to test_file.xlsx


In [8]:
px.line(backtest.compute_cumulative_pnl_book())

Display

In [12]:
class DisplayBacktest:
    def __init__(self, backtest: 'Backtest'):
        """
        Initialize a DisplayBacktest instance.

        Parameters:
        backtest (Backtest): The backtest instance to be visualized.
        """
        self.backtest = backtest

    def plot_book(self, book: str, exclude_non_traded: bool = False) -> None:
        """
        Plot the prices, positions, and trades for a specific book.

        Parameters:
        book (str): The book to plot.
        exclude_non_traded (bool): Whether to exclude non-traded tickers.
        """
        # Create fig with secondary y-axis
        fig = make_subplots(specs=[[{"secondary_y": True}]])

        # Select tickers to plot
        tickers = self.backtest.base_data.columns if not exclude_non_traded else self.backtest.trades[self.backtest.trades['book'] == book]['ticker'].unique().tolist()

        # Plot prices & positions
        for ticker in tickers:
            # Prices
            fig.add_trace(go.Scatter(x=self.backtest.base_data[ticker].index,
                                     y=self.backtest.base_data[ticker],
                                     mode='lines',
                                     name=f'Price {ticker}',
                                     legendgroup=ticker,
                                     hoverinfo='name+y'),
                          secondary_y=False)

        # Add scatter plots for trades
        trades_long = self.backtest.trades[self.backtest.trades['units'] > 0]
        trades_short = self.backtest.trades[self.backtest.trades['units'] < 0]

        for ticker in tickers:
            ticker_trades_long = trades_long[(trades_long['ticker'] == ticker) & (trades_long['book'] == book)]
            ticker_trades_short = trades_short[(trades_short['ticker'] == ticker) & (trades_short['book'] == book)]

            fig.add_trace(go.Scatter(x=ticker_trades_long['time'],
                                     y=ticker_trades_long['price'],
                                     mode='markers',
                                     marker=dict(symbol='arrow-up', color='green', size=10),
                                     name=f'Long Trades {ticker}',
                                     legendgroup=ticker,
                                     hoverinfo='name+y'),
                          secondary_y=False)

            fig.add_trace(go.Scatter(x=ticker_trades_short['time'],
                                     y=ticker_trades_short['price'],
                                     mode='markers',
                                     marker=dict(symbol='arrow-down', color='red', size=10),
                                     name=f'Short Trades {ticker}',
                                     legendgroup=ticker,
                                     hoverinfo='name+y'),
                          secondary_y=False)

        fig.update_layout(title=f'{book}',
                          xaxis_title='Time',
                          yaxis_title='Price',
                          yaxis2_title='Position')

        fig.show()

    def plot_cumulative_pnl_per_book(self) -> None:
        """
        Plot the cumulative PnL for each book.
        """
        cumulative_pnl_book = self.backtest.compute_cumulative_pnl_book().reset_index()
        fig = go.Figure()

        for book in cumulative_pnl_book.columns[1:]:
            fig.add_trace(go.Scatter(x=cumulative_pnl_book['time'],
                                     y=cumulative_pnl_book[book],
                                     mode='lines',
                                     name=f'Book {book}'))

        fig.update_layout(title='Cumulative PnL per Book',
                          xaxis_title='Time',
                          yaxis_title='Cumulative PnL')
        fig.show()

    def plot_cumulative_pnl(self) -> None:
        """
        Plot the cumulative PnL.
        """
        cumulative_pnl = self.backtest.compute_cumulative_pnl().reset_index()
        fig = go.Figure()

        fig.add_trace(go.Scatter(x=cumulative_pnl['time'],
                                 y=cumulative_pnl['pnl'],
                                 mode='lines',
                                 name='Cumulative PnL'))

        fig.update_layout(title='Cumulative PnL',
                          xaxis_title='Time',
                          yaxis_title='Cumulative PnL')
        fig.show()

    def plot_individual_pnl(self) -> None:
        """
        Plot the individual PnL for each book.
        """
        pnl = self.backtest.pnl.reset_index()
        fig = go.Figure()

        for book in pnl['book'].unique():
            book_data = pnl[pnl['book'] == book]
            fig.add_trace(go.Scatter(x=book_data['time'],
                                     y=book_data['pnl'],
                                     mode='lines',
                                     name=f'Book {book}'))

        fig.update_layout(title='Individual PnL per Book',
                          xaxis_title='Time',
                          yaxis_title='PnL')
        fig.show()

# Example of usage:
# display = DisplayBacktest(backtest)
# display.plot_book('ShortMovingAverageStrategy')
# display.plot_cumulative_pnl_per_book()
# display.plot_cumulative_pnl()
# display.plot_individual_pnl()