### INPUTS & IMPORTS

In [1]:
import sqlite3
import pandas as pd
import logging
from datetime import datetime
import backtrader as bt
import pyfolio as pf
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error

# Configure logging
log_file = 'backtest_logs.log'
logger = logging.getLogger()
logger.setLevel(logging.INFO)

console_handler = logging.StreamHandler()
file_handler = logging.FileHandler(log_file)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
logger.addHandler(console_handler)
logger.addHandler(file_handler)

# #INPUT DATA FROM 1_clean_download_data.ipynb

# # Define the dates
# start_year = 2020
# start_month = 1
# start_day = 1

# end_year = 2023
# end_month = 12
# end_day = 30

# Constants
DB_PATH = '1_financial_data.db'
EXCEL_PATH = "Complete-List-of-Biotech-Stocks-Listed-on-NASDAQ-Jan-1-24.xlsx"
START_DATE = datetime(2020, 1, 1)
END_DATE = datetime(2022, 12, 30)
INITIAL_CASH = 1000000
STAKE_SIZE = 100
LIVE_START_DATE = datetime(2023, 1, 1)



### FUNCTIONS NEEDED

In [2]:
# **** Signal Library ****
class SignalLibrary:
    """Encapsulates all signal generation logic."""
    
    @staticmethod
    def rsi(data, period):
        return bt.indicators.RelativeStrengthIndex(data.close, period=period)
    
    @staticmethod
    def ema(data, period):
        return bt.indicators.ExponentialMovingAverage(data.close, period=period)
    
    @staticmethod
    def crossover(short_ema, long_ema):
        return bt.indicators.CrossOver(short_ema, long_ema)

# **** Custom Data Feed ****
class CustomDataFeed(bt.feeds.PandasData):
    params = (
        ('datetime', None),
        ('open', 'bid'),
        ('high', 'ask'),
        ('low', 'bid'),
        ('close', 'prc'),
        ('volume', 'vol'),
        ('openinterest', None),
    )

# **** Strategy Class ****
class SignalBasedStrategy(bt.Strategy):
    params = (
        ('rsi_period', 14),
        ('ema_short_period', 12),
        ('ema_long_period', 26),
    )
    
    def __init__(self):
        # Initialize indicators using the SignalLibrary
        self.rsi = SignalLibrary.rsi(self.data, self.params.rsi_period)
        self.ema_short = SignalLibrary.ema(self.data, self.params.ema_short_period)
        self.ema_long = SignalLibrary.ema(self.data, self.params.ema_long_period)
        self.crossover = SignalLibrary.crossover(self.ema_short, self.ema_long)

    def next(self):
        ticker = self.data._name
        if self.crossover > 0 and not self.position:
            self.buy()
            logger.info(f"{ticker} - BUY: {self.data.datetime.date(0)} | Close: {self.data.close[0]} | RSI: {self.rsi[0]}")
        elif self.crossover < 0 and self.position:
            self.sell()
            logger.info(f"{ticker} - SELL: {self.data.datetime.date(0)} | Close: {self.data.close[0]} | RSI: {self.rsi[0]}")
        elif self.rsi > 70 and self.position:
            self.sell()
            logger.info(f"{ticker} - RSI Overbought SELL: {self.data.datetime.date(0)} | Close: {self.data.close[0]} | RSI: {self.rsi[0]}")
        elif self.rsi < 30 and not self.position:
            self.buy()
            logger.info(f"{ticker} - RSI Oversold BUY: {self.data.datetime.date(0)} | Close: {self.data.close[0]} | RSI: {self.rsi[0]}")

# **** Linear Regression Model ****
def train_linear_regression(data):
    """
    Train a Linear Regression model using historical data.
    """
    logger.info("Training Linear Regression model...")
    
    # Handle missing values by dropping rows with NaN
    data = data.dropna(subset=['bid', 'ask', 'vol', 'prc'])
    
    # Define features and target variable
    X = data[['bid', 'ask', 'vol']]  # Example features; adjust based on your dataset
    y = data['prc']  # Target variable
    
    # Split into training and testing sets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, shuffle=False)
    
    # Train the model
    model = LinearRegression()
    model.fit(X_train, y_train)
    
    # Evaluate the model on the test set
    y_pred = model.predict(X_test)
    mse = mean_squared_error(y_test, y_pred)
    
    logger.info(f"Linear Regression Model Trained: MSE={mse:.4f}")
    
    return model

# **** Utility Functions ****
def load_tickers(file_path):
    tickers_df = pd.read_excel(file_path)
    return tickers_df['Ticker'].dropna().unique().tolist()

def load_data_from_db(db_path):
    conn = sqlite3.connect(db_path)
    equities_data = pd.read_sql('SELECT * FROM equities_data', conn)
    conn.close()
    return equities_data

def filter_data(data, tickers, start_date, end_date):
    data['date'] = pd.to_datetime(data['date'])
    filtered_data = data[data['ticker'].isin(tickers)]
    return filtered_data[(filtered_data['date'] >= start_date) & (filtered_data['date'] <= end_date)]

def prepare_data_for_backtest(df):
    df = df[['date', 'bid', 'ask', 'vol', 'prc']].sort_values(by='date')
    df.ffill(inplace=True)
    df.set_index('date', inplace=True)
    return df

# **** Backtest Runner ****
def run_backtest(start_date, end_date, tickers, strategy_class, **strategy_params):
    cerebro = bt.Cerebro()
    cerebro.addstrategy(strategy_class, **strategy_params)

    # Load and train linear regression model (if applicable)
    equities_data = load_data_from_db(DB_PATH)
    model = train_linear_regression(equities_data)

    # Filter data for the selected tickers and date range
    filtered_data = filter_data(equities_data, tickers, start_date, end_date)
    prepared_data = prepare_data_for_backtest(filtered_data)

    # Add data to Backtrader
    data_feed = CustomDataFeed(dataname=prepared_data)
    cerebro.adddata(data_feed)
    
    # Set initial conditions
    cerebro.broker.set_cash(INITIAL_CASH)
    cerebro.addsizer(bt.sizers.FixedSize, stake=STAKE_SIZE)
    cerebro.addanalyzer(bt.analyzers.PyFolio, _name='pyfolio')

    # Run the backtest
    logger.info('Running backtest...')
    results = cerebro.run()

    # Extract PyFolio results for performance analysis
    pyfolio_analyzer = results[0].analyzers.pyfolio
    returns, positions, transactions = pyfolio_analyzer.get_pf_items()[:3]

    # Generate performance tear sheet
    pf.create_full_tear_sheet(returns, positions=positions, transactions=transactions, live_start_date=LIVE_START_DATE)

    logger.info(f"Final Portfolio Value: ${cerebro.broker.getvalue():.2f}")

### MAIN EXECUTION

In [3]:
# **** Main Execution ****
def main():
    # Load tickers from Excel file
    tickers = load_tickers(EXCEL_PATH)
    logger.info(f"Loaded {len(tickers)} tickers.")

    # Run backtest with given parameters
    run_backtest(
        START_DATE,
        END_DATE,
        tickers,
        SignalBasedStrategy,
        rsi_period=10,
        ema_short_period=10,
        ema_long_period=30,
    )

if __name__ == "__main__":
    main()


2024-12-28 19:35:16,765 - INFO - Loaded 760 tickers.
2024-12-28 19:35:17,611 - INFO - Training Linear Regression model...
2024-12-28 19:35:17,699 - INFO - Linear Regression Model Trained: MSE=0.0050


KeyError: 'ticker'

In [None]:
# %% [markdown]
### Imports and Setup

# %%
import sqlite3
import pandas as pd
import numpy as np
import logging
from datetime import datetime
import backtrader as bt
import pyfolio as pf
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error

# Configure logging
log_file = 'backtest_logs.log'
logger = logging.getLogger()
logger.setLevel(logging.INFO)

console_handler = logging.StreamHandler()
file_handler = logging.FileHandler(log_file)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
logger.addHandler(console_handler)
logger.addHandler(file_handler)

# Constants
DB_PATH = '1_financial_data.db'
EXCEL_PATH = "Complete-List-of-Biotech-Stocks-Listed-on-NASDAQ-Jan-1-24.xlsx"
START_DATE = datetime(2020, 1, 1)
END_DATE = datetime(2022, 12, 30)
INITIAL_CASH = 1000000
STAKE_SIZE = 100
LIVE_START_DATE = datetime(2023, 1, 1)

# %% [markdown]
### Data Loading and Preparation

# %%
def load_data_from_db():
    """Load all datasets from the database"""
    conn = sqlite3.connect(DB_PATH)
    equities_data = pd.read_sql('SELECT * FROM equities_data', conn)
    volume_threshold_options_data = pd.read_sql('SELECT * FROM volume_threshold_options_data', conn)
    merged_data = pd.read_sql('SELECT * FROM merged_data', conn)
    conn.close()
    return equities_data, volume_threshold_options_data, merged_data

# %%
# Load the data
equities_data, volume_threshold_options_data, merged_data = load_data_from_db()

# Convert date columns to datetime
for df in [equities_data, volume_threshold_options_data, merged_data]:
    df['date'] = pd.to_datetime(df['date'])

# %% [markdown]
### Custom Data Feed Implementation

# %%

class EnhancedCustomDataFeed(bt.feeds.PandasData):
    """Enhanced data feed that handles both equity and options data"""
    
    lines = ('impl_volatility', 'delta', 'gamma', 'vega', 'theta',)  # Define new lines for options data
    
    params = (
        ('datetime', 'date'),    # Map to 'date' column
        ('open', 'bid'),         # Using bid as open
        ('high', 'ask'),         # Using ask as high
        ('low', 'bid'),          # Using bid as low
        ('close', 'prc'),        # Using prc as close
        ('volume', 'vol'),       # Volume from 'vol' column
        ('openinterest', None),  # Not present in data
        # Additional fields mapping
        ('impl_volatility', 'impl_volatility'),
        ('delta', 'delta'),
        ('gamma', 'gamma'),
        ('vega', 'vega'),
        ('theta', 'theta'),
    )

# %% [markdown]
### Strategy Implementation

# %%
class EnhancedSignalBasedStrategy(bt.Strategy):
    params = (
        ('rsi_period', 14),
        ('ema_short_period', 12),
        ('ema_long_period', 26),
    )
    
    def __init__(self):
        # Initialize indicators
        self.rsi = bt.indicators.RSI(self.data.close, period=self.params.rsi_period)
        self.ema_short = bt.indicators.EMA(self.data.close, period=self.params.ema_short_period)
        self.ema_long = bt.indicators.EMA(self.data.close, period=self.params.ema_long_period)
        self.crossover = bt.indicators.CrossOver(self.ema_short, self.ema_long)
        
        # Track metrics
        self.impl_volatility = self.data.impl_volatility if hasattr(self.data, 'impl_volatility') else None
        self.delta = self.data.delta if hasattr(self.data, 'delta') else None
        
    def next(self):
        if not self.position:  # If we're not in the market
            if self.crossover > 0 and self.rsi < 70:  # Golden cross and not overbought
                self.buy()
                self.log(f"BUY EXECUTED {self.data.close[0]:.2f}")
        
        else:  # If we're in the market
            if self.crossover < 0 or self.rsi > 70:  # Death cross or overbought
                self.sell()
                self.log(f"SELL EXECUTED {self.data.close[0]:.2f}")
    
    def log(self, txt, dt=None):
        dt = dt or self.datas[0].datetime.date(0)
        logger.info(f'{dt.isoformat()} {txt}')

# %% [markdown]
### Backtest Runner

# %%
def prepare_data_for_backtest(data_df, ticker=None):
    """Prepare data for backtesting"""
    if ticker:
        data_df = data_df[data_df['ticker'] == ticker]
    
    # Ensure all required columns exist
    required_columns = ['date', 'bid', 'ask', 'vol', 'prc']
    for col in required_columns:
        if col not in data_df.columns:
            raise ValueError(f"Missing required column: {col}")
    
    # Sort and set index
    data_df = data_df.sort_values('date')
    data_df.set_index('date', inplace=True)
    
    # Forward fill missing values
    data_df.ffill(inplace=True)
    
    return data_df

# %%
def run_backtest(data_df, strategy_class, ticker=None, **strategy_params):
    """Run backtest with the specified data and strategy"""
    cerebro = bt.Cerebro()
    
    # Prepare data
    prepared_data = prepare_data_for_backtest(data_df, ticker)
    
    # Add data feed
    data_feed = EnhancedCustomDataFeed(
        dataname=prepared_data,
        fromdate=START_DATE,
        todate=END_DATE
    )
    cerebro.adddata(data_feed)
    
    # Add strategy
    cerebro.addstrategy(strategy_class, **strategy_params)
    
    # Set broker parameters
    cerebro.broker.set_cash(INITIAL_CASH)
    cerebro.addsizer(bt.sizers.FixedSize, stake=STAKE_SIZE)
    
    # Add analyzers
    cerebro.addanalyzer(bt.analyzers.PyFolio, _name='pyfolio')
    cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe')
    cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown')
    
    # Run backtest
    logger.info(f'Starting Portfolio Value: ${cerebro.broker.getvalue():.2f}')
    results = cerebro.run()
    logger.info(f'Final Portfolio Value: ${cerebro.broker.getvalue():.2f}')
    
    # Get analysis
    strat = results[0]
    pyfolio_analyzer = strat.analyzers.pyfolio.get_analysis()
    sharpe_ratio = strat.analyzers.sharpe.get_analysis()
    drawdown = strat.analyzers.drawdown.get_analysis()
    
    return results, pyfolio_analyzer, sharpe_ratio, drawdown

# %% [markdown]
### Main Execution

# %%
def main():
    # # Run backtest on equities data
    # logger.info("Running backtest on equities data...")
    # results_equity, pyfolio_equity, sharpe_equity, drawdown_equity = run_backtest(
    #     equities_data,
    #     EnhancedSignalBasedStrategy,
    #     rsi_period=10,
    #     ema_short_period=10,
    #     ema_long_period=30
    # )
    
    # Run backtest on merged data (includes options metrics)
    logger.info("Running backtest on merged data...")
    results_merged, pyfolio_merged, sharpe_merged, drawdown_merged = run_backtest(
        merged_data,
        EnhancedSignalBasedStrategy,
        rsi_period=10,
        ema_short_period=10,
        ema_long_period=30
    )
    
    # Print results
    # print("\nEquities Backtest Results:")
    # print(f"Sharpe Ratio: {sharpe_equity['sharperatio']:.2f}")
    # print(f"Max Drawdown: {drawdown_equity['max']['drawdown']:.2%}")
    
    print("\nMerged Data Backtest Results:")
    print(f"Sharpe Ratio: {sharpe_merged['sharperatio']:.2f}")
    print(f"Max Drawdown: {drawdown_merged['max']['drawdown']:.2%}")

if __name__ == "__main__":
    main()



In [None]:
# # %% [markdown]
# # ### INPUTS & IMPORTS

# # %%
# import sqlite3
# import pandas as pd
# import logging
# from datetime import datetime
# import backtrader as bt
# import pyfolio as pf
# from sklearn.linear_model import LinearRegression
# from sklearn.model_selection import train_test_split
# from sklearn.metrics import mean_squared_error

# # Configure logging
# log_file = 'backtest_logs.log'
# logger = logging.getLogger()
# logger.setLevel(logging.INFO)

# console_handler = logging.StreamHandler()
# file_handler = logging.FileHandler(log_file)
# formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
# console_handler.setFormatter(formatter)
# file_handler.setFormatter(formatter)
# logger.addHandler(console_handler)
# logger.addHandler(file_handler)

# # Constants
# DB_PATH = '1_financial_data.db'
# EXCEL_PATH = "Complete-List-of-Biotech-Stocks-Listed-on-NASDAQ-Jan-1-24.xlsx"
# START_DATE = datetime(2020, 1, 1)
# END_DATE = datetime(2022, 12, 30)
# INITIAL_CASH = 1000000
# STAKE_SIZE = 100
# LIVE_START_DATE = datetime(2023, 1, 1)

# # %% [markdown]
# # ### FUNCTIONS NEEDED

# # %%
# # **** Signal Library ****
# class SignalLibrary:
#     """Encapsulates all signal generation logic."""
    
#     @staticmethod
#     def rsi(data, period):
#         return bt.indicators.RelativeStrengthIndex(data.close, period=period)
    
#     @staticmethod
#     def ema(data, period):
#         return bt.indicators.ExponentialMovingAverage(data.close, period=period)
    
#     @staticmethod
#     def crossover(short_ema, long_ema):
#         return bt.indicators.CrossOver(short_ema, long_ema)

# # **** Custom Data Feed ****
# class CustomDataFeed(bt.feeds.PandasData):
#     params = (
#         ('datetime', None),
#         ('open', 'bid'),
#         ('high', 'ask'),
#         ('low', 'bid'),
#         ('close', 'prc'),
#         ('volume', 'vol'),
#         ('openinterest', None),
#     )

# # **** Strategy Class ****
# class SignalBasedStrategy(bt.Strategy):
#     params = (
#         ('rsi_period', 14),
#         ('ema_short_period', 12),
#         ('ema_long_period', 26),
#     )
    
#     def __init__(self):
#         # Initialize indicators using the SignalLibrary
#         self.rsi = SignalLibrary.rsi(self.data, self.params.rsi_period)
#         self.ema_short = SignalLibrary.ema(self.data, self.params.ema_short_period)
#         self.ema_long = SignalLibrary.ema(self.data, self.params.ema_long_period)
#         self.crossover = SignalLibrary.crossover(self.ema_short, self.ema_long)

#     def next(self):
#         ticker = self.data._name
#         if self.crossover > 0 and not self.position:
#             self.buy()
#             logger.info(f"{ticker} - BUY: {self.data.datetime.date(0)} | Close: {self.data.close[0]} | RSI: {self.rsi[0]}")
#         elif self.crossover < 0 and self.position:
#             self.sell()
#             logger.info(f"{ticker} - SELL: {self.data.datetime.date(0)} | Close: {self.data.close[0]} | RSI: {self.rsi[0]}")
#         elif self.rsi > 70 and self.position:
#             self.sell()
#             logger.info(f"{ticker} - RSI Overbought SELL: {self.data.datetime.date(0)} | Close: {self.data.close[0]} | RSI: {self.rsi[0]}")
#         elif self.rsi < 30 and not self.position:
#             self.buy()
#             logger.info(f"{ticker} - RSI Oversold BUY: {self.data.datetime.date(0)} | Close: {self.data.close[0]} | RSI: {self.rsi[0]}")

# # **** Utility Functions ****
# def load_tickers(file_path):
#     tickers_df = pd.read_excel(file_path)
#     return tickers_df['Ticker'].dropna().unique().tolist()

# def load_data_from_db(db_path):
#     conn = sqlite3.connect(db_path)
#     equities_data = pd.read_sql('SELECT * FROM equities_data', conn)
#     conn.close()
#     return equities_data

# def filter_data(data, tickers, start_date, end_date):
#     data['date'] = pd.to_datetime(data['date'])
#     filtered_data = data[data['ticker'].isin(tickers)]
#     return filtered_data[(filtered_data['date'] >= start_date) & (filtered_data['date'] <= end_date)]

# def prepare_data_for_backtest(df):
#     df = df[['date', 'bid', 'ask', 'vol', 'prc']].sort_values(by='date')
#     df.ffill(inplace=True)
#     df.set_index('date', inplace=True)
#     return df

# # %% [markdown]
# # ### BACKTEST RUNNER

# # %%
# def run_backtest(start_date, end_date, tickers, strategy_class, **strategy_params):
#     cerebro = bt.Cerebro()
#     cerebro.addstrategy(strategy_class, **strategy_params)

#     # Load data from database and filter it for backtesting
#     equities_data = load_data_from_db(DB_PATH)
#     filtered_data = filter_data(equities_data, tickers, start_date, end_date)
    
#     # Prepare data for Backtrader and add to Cerebro
#     prepared_data = prepare_data_for_backtest(filtered_data)
#     data_feed = CustomDataFeed(dataname=prepared_data)
#     cerebro.adddata(data_feed)

#     # Set initial portfolio conditions
#     cerebro.broker.set_cash(INITIAL_CASH)
#     cerebro.addsizer(bt.sizers.FixedSize, stake=STAKE_SIZE)
    
#     # Add PyFolio analyzer for performance evaluation
#     cerebro.addanalyzer(bt.analyzers.PyFolio, _name='pyfolio')

#     # Run the backtest and analyze results
#     logger.info('Running backtest...')
#     results = cerebro.run()

#     # Extract PyFolio results for performance analysis
#     pyfolio_analyzer = results[0].analyzers.pyfolio
#     returns, positions, transactions = pyfolio_analyzer.get_pf_items()[:3]

#     # Generate performance tear sheet using PyFolio
#     pf.create_full_tear_sheet(returns, positions=positions, transactions=transactions,
#                               live_start_date=LIVE_START_DATE)

# # %% [markdown]
# # ### MAIN EXECUTION

# # %%
# def main():
#     # Load tickers from Excel file and run the backtest with specified strategy parameters.
#     tickers = load_tickers(EXCEL_PATH)
    
#     run_backtest(
#         START_DATE,
#         END_DATE,
#         tickers,
#         SignalBasedStrategy,
#         rsi_period=10,
#         ema_short_period=10,
#         ema_long_period=30,
#     )

# if __name__ == "__main__":
#     main()
