In [3]:
import backtrader as bt
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import yfinance as yf

In [4]:
def get_sp500_tickers():
    """Get S&P 500 tickers from Wikipedia"""
    url = 'https://en.wikipedia.org/wiki/List_of_S%26P_500_companies'
    tables = pd.read_html(url)
    sp500_table = tables[0]
    return sp500_table['Symbol'].tolist()
sp500_tickers = get_sp500_tickers()
print(f"S&P 500 tickers: {len(sp500_tickers)}")

S&P 500 tickers: 503


In [None]:
data = yf.download(sp500_tickers[:2], start='2020-01-01', end='2023-12-31')
data.xs('AOS', axis=1, level=1)

  data = yf.download(sp500_tickers[:2], start='2020-01-01', end='2023-12-31')
[*********************100%***********************]  2 of 2 completed


Price,Close,High,Low,Open,Volume
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2020-01-02,42.994770,43.183777,42.490748,43.120775,1093200
2020-01-03,42.616756,42.751763,42.076736,42.445751,883200
2020-01-06,42.886768,42.904766,42.085732,42.418749,1088400
2020-01-07,42.598763,42.967778,42.427753,42.661765,868200
2020-01-08,42.535755,43.030778,42.472753,42.679761,1119100
...,...,...,...,...,...
2023-12-22,79.236603,79.527446,78.761546,79.449885,449400
2023-12-26,79.352936,79.556528,78.936054,79.207510,420400
2023-12-27,79.653488,79.886164,79.197826,79.478979,441700
2023-12-28,79.731041,79.924944,79.333553,79.430501,452800


In [None]:
class TechnicalIndicators:
    """
    A class to calculate various technical indicators for stock price data
    """
    
    @staticmethod
    def moving_average(data: pd.Series, window: int) -> pd.Series:
        """Calculate Simple Moving Average"""
        return data.rolling(window=window).mean()
    
    @staticmethod
    def exponential_moving_average(data: pd.Series, window: int) -> pd.Series:
        """Calculate Exponential Moving Average"""
        return data.ewm(span=window).mean()
    
    @staticmethod
    def returns(data: pd.Series, periods: int = 1) -> pd.Series:
        """Calculate returns over specified periods"""
        return data.pct_change(periods=periods)
    
    @staticmethod
    def future_returns(data: pd.Series, periods: int = 1) -> pd.Series:
        """Calculate future returns over specified periods"""
        return (data.shift(periods=periods) - data) / data

    @staticmethod
    def volatility(data: pd.Series, window: int) -> pd.Series:
        """Calculate rolling volatility (standard deviation of returns)"""
        returns = data.pct_change()
        return returns.rolling(window=window).std() * np.sqrt(252)  # Annualized
    
    @staticmethod
    def rsi(data: pd.Series, window: int = 14) -> pd.Series:
        """Calculate Relative Strength Index"""
        delta = data.diff()
        gain = (delta.where(delta > 0, 0)).rolling(window=window).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window=window).mean()
        rs = gain / loss
        rsi = 100 - (100 / (1 + rs))
        return rsi
    
    @staticmethod
    def macd(data: pd.Series, fast: int = 12, slow: int = 26, signal: int = 9) -> Dict[str, pd.Series]:
        """Calculate MACD (Moving Average Convergence Divergence)"""
        ema_fast = data.ewm(span=fast).mean()
        ema_slow = data.ewm(span=slow).mean()
        macd_line = ema_fast - ema_slow
        signal_line = macd_line.ewm(span=signal).mean()
        histogram = macd_line - signal_line
        
        return {
            'MACD': macd_line,
            'MACD_Signal': signal_line,
            'MACD_Histogram': histogram
        }
    
    @staticmethod
    def bollinger_bands(data: pd.Series, window: int = 20, num_std: float = 2) -> Dict[str, pd.Series]:
        """Calculate Bollinger Bands"""
        sma = data.rolling(window=window).mean()
        std = data.rolling(window=window).std()
        
        return {
            'BB_Upper': sma + (std * num_std),
            'BB_Middle': sma,
            'BB_Lower': sma - (std * num_std),
            'BB_Width': (sma + (std * num_std)) - (sma - (std * num_std)),
            'BB_Position': (data - sma) / (std * num_std)
        }
    
    @staticmethod
    def average_true_range(high: pd.Series, low: pd.Series, close: pd.Series, window: int = 14) -> pd.Series:
        """Calculate Average True Range"""
        high_low = high - low
        high_close = np.abs(high - close.shift())
        low_close = np.abs(low - close.shift())
        
        true_range = np.maximum(high_low, np.maximum(high_close, low_close))
        atr = true_range.rolling(window=window).mean()
        return atr
    
    @staticmethod
    def stochastic_oscillator(high: pd.Series, low: pd.Series, close: pd.Series, 
                            k_window: int = 14, d_window: int = 3) -> Dict[str, pd.Series]:
        """Calculate Stochastic Oscillator"""
        lowest_low = low.rolling(window=k_window).min()
        highest_high = high.rolling(window=k_window).max()
        
        k_percent = 100 * ((close - lowest_low) / (highest_high - lowest_low))
        d_percent = k_percent.rolling(window=d_window).mean()
        
        return {
            'Stoch_K': k_percent,
            'Stoch_D': d_percent
        }




class TestStrategy(bt.Strategy):

    def log(self, txt, dt=None):
        ''' Logging function for this strategy'''
        dt = dt or self.datas[1].datetime.date(0)
        print('%s, %s' % (dt.isoformat(), txt))

    def __init__(self):
        # Keep a reference to the "close" line in the data[0] dataseries
        self.dataclose = self.datas[1].close

    def next(self):
        # Simply log the closing price of the series from the reference
        self.log('Close, %.2f' % self.dataclose[0])

def run_backtest(sp500_tickers, start_date='2020-01-01', end_date='2023-12-31'):
    """
    Run the backtest with your XGBoost model
    """
    cerebro = bt.Cerebro()

    # Add a strategy
    cerebro.addstrategy(TestStrategy)

    sample_tickers = sp500_tickers[:2]  # Use first 50 for demo

    for ticker in sample_tickers:
        try:
            data = yf.download(ticker, start=start_date, end=end_date)
            data = data.xs(ticker, axis=1, level=1)
            if len(data) > 100:  # Ensure sufficient data
                bt_data = bt.feeds.PandasData(
                    dataname=data,
                    name=ticker
                )
                cerebro.adddata(bt_data)
        except:
            continue

    # Set initial cash
    cerebro.broker.setcash(100000.0)
    
    # Set commission
    cerebro.broker.setcommission(commission=0.001)  # 0.1% commission

    print('Starting Portfolio Value: %.2f' % cerebro.broker.getvalue())

    cerebro.run()

    print('Final Portfolio Value: %.2f' % cerebro.broker.getvalue())
    
    return

In [None]:
run_backtest(sp500_tickers)

  data = yf.download(ticker, start=start_date, end=end_date)
[*********************100%***********************]  1 of 1 completed
  data = yf.download(ticker, start=start_date, end=end_date)
[*********************100%***********************]  1 of 1 completed


Starting Portfolio Value: 100000.00
2020-01-02, Close, 42.99
2020-01-03, Close, 42.62
2020-01-06, Close, 42.89
2020-01-07, Close, 42.60
2020-01-08, Close, 42.54
2020-01-09, Close, 42.35
2020-01-10, Close, 42.86
2020-01-13, Close, 43.69
2020-01-14, Close, 42.52
2020-01-15, Close, 41.95
2020-01-16, Close, 42.87
2020-01-17, Close, 42.24
2020-01-21, Close, 42.32
2020-01-22, Close, 41.85
2020-01-23, Close, 42.04
2020-01-24, Close, 40.89
2020-01-27, Close, 39.43
2020-01-28, Close, 40.94
2020-01-29, Close, 39.88
2020-01-30, Close, 39.32
2020-01-31, Close, 38.63
2020-02-03, Close, 38.26
2020-02-04, Close, 38.72
2020-02-05, Close, 39.64
2020-02-06, Close, 39.18
2020-02-07, Close, 38.00
2020-02-10, Close, 38.91
2020-02-11, Close, 39.83
2020-02-12, Close, 40.08
2020-02-13, Close, 39.82
2020-02-14, Close, 39.75
2020-02-18, Close, 39.80
2020-02-19, Close, 39.91
2020-02-20, Close, 40.29
2020-02-21, Close, 39.90
2020-02-24, Close, 38.77
2020-02-25, Close, 37.06
2020-02-26, Close, 35.84
2020-02-27, Cl

In [None]:



# Example usage function
def run_backtest(model, sp500_tickers, start_date='2020-01-01', end_date='2023-12-31'):
    """
    Run the backtest with your XGBoost model
    """
    cerebro = bt.Cerebro()
    
    # Add strategy
    strategy = cerebro.addstrategy(XGBoostStrategy, top_n=10, rebalance_freq=5)
    
    # Download and add data for S&P 500 stocks (sample)
    sample_tickers = sp500_tickers[:50]  # Use first 50 for demo
    
    for ticker in sample_tickers:
        try:
            data = yf.download(ticker, start=start_date, end=end_date)
            if len(data) > 100:  # Ensure sufficient data
                bt_data = bt.feeds.PandasData(
                    dataname=data,
                    name=ticker
                )
                cerebro.adddata(bt_data)
        except:
            continue
    
    # Set initial cash
    cerebro.broker.setcash(100000.0)
    
    # Set commission
    cerebro.broker.setcommission(commission=0.001)  # 0.1% commission
    
    # Add analyzers
    cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe')
    cerebro.addanalyzer(bt.analyzers.Returns, _name='returns')
    cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown')
    
    print(f'Starting Portfolio Value: {cerebro.broker.getvalue():.2f}')
    
    # Run backtest
    results = cerebro.run()
    
    print(f'Final Portfolio Value: {cerebro.broker.getvalue():.2f}')
    
    # Print results
    strat = results[0]
    print(f'Sharpe Ratio: {strat.analyzers.sharpe.get_analysis()["sharperatio"]:.2f}')
    print(f'Total Return: {strat.analyzers.returns.get_analysis()["rtot"]:.2%}')
    print(f'Max Drawdown: {strat.analyzers.drawdown.get_analysis()["max"]["drawdown"]:.2%}')
    
    # Plot results
    cerebro.plot(style='candlestick')
    
    return results

# Save the code to a file
with open('xgboost_backtrader_strategy.py', 'w') as f:
    f.write('''
# XGBoost + Backtrader Strategy Implementation
# This file contains the complete strategy code

import backtrader as bt
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import yfinance as yf

# [Include all the code above here]

# To use this strategy:
# 1. Train your XGBoost model
# 2. Define your feature calculation function
# 3. Get S&P 500 ticker list
# 4. Run: results = run_backtest(model, sp500_tickers)
''')

print("Strategy code saved to 'xgboost_backtrader_strategy.py'")
print("\nTo use this strategy:")
print("1. Replace calculate_features() with your actual feature engineering")
print("2. Load your trained XGBoost model")
print("3. Get S&P 500 ticker list")
print("4. Call run_backtest(model, sp500_tickers)")

In [None]:
class XGBoostStrategy(bt.Strategy):
    params = (
        ('top_n', 10),  # Number of top stocks to buy
        ('rebalance_freq', 5),  # Rebalance every 5 trading days (1 week)
    )
    
    def __init__(self):
        self.model = None  # Your trained XGBoost model
        self.feature_calculator = None  # Your feature calculation function
        self.rebalance_counter = 0
        self.current_positions = []
        self.sp500_tickers = []  # List of S&P 500 tickers
        
    def set_model(self, model, feature_calculator, sp500_tickers):
        """Set the XGBoost model and feature calculator"""
        self.model = model
        self.feature_calculator = feature_calculator
        self.sp500_tickers = sp500_tickers
    
    def next(self):
        # Rebalance every week (5 trading days)
        if self.rebalance_counter % self.params.rebalance_freq == 0:
            self.rebalance_portfolio()
        
        self.rebalance_counter += 1
    
    def rebalance_portfolio(self):
        """Rebalance portfolio based on XGBoost predictions"""
        if self.model is None:
            return
        
        # Close all current positions
        for data in self.datas:
            if self.getposition(data).size != 0:
                self.close(data=data)
        
        # Get predictions for all stocks
        predictions = self.get_predictions()
        
        if len(predictions) == 0:
            return
        
        # Sort by predicted returns and get top N
        top_stocks = predictions.nlargest(self.params.top_n)
        
        # Calculate position size (equal weight)
        available_cash = self.broker.getcash()
        position_size = available_cash / len(top_stocks)
        
        # Buy top predicted stocks
        for ticker, predicted_return in top_stocks.items():
            data = self.get_data_by_name(ticker)
            if data is not None:
                size = int(position_size / data.close[0])
                if size > 0:
                    self.buy(data=data, size=size)
                    print(f"Buying {size} shares of {ticker} at {data.close[0]:.2f} (predicted return: {predicted_return:.4f})")
    
    def get_predictions(self):
        """Get XGBoost predictions for all stocks"""
        predictions = {}
        
        for ticker in self.sp500_tickers:
            data = self.get_data_by_name(ticker)
            if data is not None and len(data) > 50:  # Ensure enough data for features
                try:
                    # Calculate features for current stock
                    features = self.calculate_features(data, ticker)
                    if features is not None:
                        # Get prediction
                        pred = self.model.predict([features])[0]
                        predictions[ticker] = pred
                except Exception as e:
                    print(f"Error predicting for {ticker}: {e}")
                    continue
        
        return pd.Series(predictions)
    

    #def calculate_all_indicators
    # (stock_data: Dict[str, pd.DataFrame], ticker) -> pd.DataFrame:
        """
        Calculate all technical indicators for all stocks
        """
        all_indicators = []
        
        print("Calculating technical indicators...")
        
        for ticker in stock_data.columns.get_level_values(1).unique().tolist():
            try:
                # Basic price data
                close = stock_data.loc[:, ('Close', ticker)]
                high = stock_data.loc[:,('High', ticker)]
                low = stock_data.loc[:, ('Low', ticker)]
                volume = stock_data.loc[:, ('Volume', ticker)]
                
                # Create a DataFrame for this stock's indicators
                indicators_df = pd.DataFrame(index=data.index)
                indicators_df['Ticker'] = ticker
                indicators_df['Close'] = close
                indicators_df['High'] = high
                indicators_df['Low'] = low
                indicators_df['Volume'] = volume
                

                # Future Returns
                for period in [-1, -5, -10, -21, -63]:
                    indicators_df[f'Future_Return_{abs(period)}d'] = TechnicalIndicators.future_returns(close, period)

                # Moving Averages
                for window in [5, 10, 20, 50, 100, 200]:
                    indicators_df[f'SMA_{window}'] = TechnicalIndicators.moving_average(close, window)
                    indicators_df[f'EMA_{window}'] = TechnicalIndicators.exponential_moving_average(close, window)
                
                # Returns
                for period in [1, 5, 10, 21, 63]:
                    indicators_df[f'Return_{period}d'] = TechnicalIndicators.returns(close, period)

                # Volatility
                for window in [10, 21, 63]:
                    indicators_df[f'Volatility_{window}d'] = TechnicalIndicators.volatility(close, window)
                
                # RSI
                indicators_df['RSI_14'] = TechnicalIndicators.rsi(close, 14)
                indicators_df['RSI_21'] = TechnicalIndicators.rsi(close, 21)
                
                # MACD
                macd_data = TechnicalIndicators.macd(close)
                for key, value in macd_data.items():
                    indicators_df[key] = value
                
                # Bollinger Bands
                bb_data = TechnicalIndicators.bollinger_bands(close)
                for key, value in bb_data.items():
                    indicators_df[key] = value
                
                # ATR
                indicators_df['ATR_14'] = TechnicalIndicators.average_true_range(high, low, close, 14)
                
                # Stochastic Oscillator
                stoch_data = TechnicalIndicators.stochastic_oscillator(high, low, close)
                for key, value in stoch_data.items():
                    indicators_df[key] = value
                
                # Volume indicators
                indicators_df['Volume_SMA_20'] = TechnicalIndicators.moving_average(volume, 20)
                indicators_df['Volume_Ratio'] = volume / indicators_df['Volume_SMA_20']
                
                # Price momentum
                indicators_df['Momentum_10'] = close / close.shift(10) - 1
                indicators_df['Momentum_21'] = close / close.shift(21) - 1
                
                # Price position relative to moving averages
                indicators_df['Price_vs_SMA20'] = close / indicators_df['SMA_20'] - 1
                indicators_df['Price_vs_SMA50'] = close / indicators_df['SMA_50'] - 1
                
                all_indicators.append(indicators_df)
                print(f"✓ Calculated indicators for {ticker}")
                
            except Exception as e:
                print(f"✗ Error calculating indicators for {ticker}: {str(e)}")
        
        # Combine all data
        if all_indicators:
            combined_df = pd.concat(all_indicators, ignore_index=False)
            combined_df.reset_index(inplace=True)
            combined_df.rename(columns={'index': 'Date'}, inplace=True)
            
            # Set MultiIndex
            combined_df.set_index(['Date', 'Ticker'], inplace=True)
            
            print(f"\nFinal dataset shape: {combined_df.shape}")
            print(f"Columns: {list(combined_df.columns)}")
            
            return combined_df
        else:
            print("No data to combine!")
            return pd.DataFrame()
    
    def calculate_features(self, data, ticker):
        """Calculate features for a given stock data"""
        try:
            # Convert backtrader data to pandas DataFrame
            df = pd.DataFrame({
                'Open': [data.open[i] for i in range(-50, 0)],
                'High': [data.high[i] for i in range(-50, 0)],
                'Low': [data.low[i] for i in range(-50, 0)],
                'Close': [data.close[i] for i in range(-50, 0)],
                'Volume': [data.volume[i] for i in range(-50, 0)]
            })
            
            # Use your feature calculator function
            features = self.feature_calculator(df)
            return features
        except:
            return None
    
    def get_data_by_name(self, name):
        """Get data feed by ticker name"""
        for data in self.datas:
            if data._name == name:
                return data
        return None

# Example feature calculation function (replace with your actual function)
def calculate_features(df):
    """
    Calculate your 40 features from yfinance data
    Replace this with your actual feature engineering
    """
    features = []
    
    # Example features - replace with your actual features
    # Price-based features
    features.append(df['Close'].pct_change(1).iloc[-1])  # 1-day return
    features.append(df['Close'].pct_change(5).iloc[-1])  # 5-day return
    features.append(df['Close'].pct_change(20).iloc[-1])  # 20-day return
    
    # Moving averages
    features.append(df['Close'].iloc[-1] / df['Close'].rolling(10).mean().iloc[-1] - 1)  # Price vs 10-day MA
    features.append(df['Close'].iloc[-1] / df['Close'].rolling(20).mean().iloc[-1] - 1)  # Price vs 20-day MA
    
    # Volatility
    features.append(df['Close'].pct_change().rolling(10).std().iloc[-1])  # 10-day volatility
    features.append(df['Close'].pct_change().rolling(20).std().iloc[-1])  # 20-day volatility
    
    # Volume features
    features.append(df['Volume'].iloc[-1] / df['Volume'].rolling(10).mean().iloc[-1])  # Volume vs average
    
    # Technical indicators (simplified)
    # RSI approximation
    delta = df['Close'].diff()
    gain = (delta.where(delta > 0, 0)).rolling(14).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(14).mean()
    rs = gain / loss
    rsi = 100 - (100 / (1 + rs))
    features.append(rsi.iloc[-1])
    
    # Add more features to reach 40...
    # For now, pad with dummy features
    while len(features) < 40:
        features.append(np.random.random())  # Replace with actual features
    
    return features[:40]

