In [2]:
from backtesting import Backtest, Strategy
import talib
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
import pandas as pd
import numpy as np
import yfinance as yf
from sklearn.naive_bayes import GaussianNB
from xgboost import XGBClassifier

np.random.seed(42)



## Load Data

In [3]:
df = pd.read_csv("../Data/^GSPC.csv")
df = df[df["Price"] != "Ticker"]
df = df[df["Price"] != "Date"]
df[['Close','High', 'Low', 'Open', 'Volume']]= df[['Close', 'High', 'Low', 'Open', 'Volume']].astype(float)
print(df.columns)
df['Price'] = pd.to_datetime(df['Price'])
df.set_index('Price', inplace=True)
df.index.name = None

Index(['Price', 'Close', 'High', 'Low', 'Open', 'Volume', 'Return', 'MA5',
       'MA10', 'MA20', 'Volatility', 'Momentum', 'Target'],
      dtype='object')


## Trading Function

In [4]:
def add_features(data):
    """Use existing features and add additional ones for ML model"""
    df = data.copy()
    
    # Use the existing indicators (MA5, MA10, MA20, Volatility, Momentum)
    # and create additional features from them
    
    # Normalized price distances from MAs
    df['X_MA5'] = (df.Close - df.MA5) / df.Close
    df['X_MA10'] = (df.Close - df.MA10) / df.Close
    df['X_MA20'] = (df.Close - df.MA20) / df.Close
    
    # MA crossovers
    df['X_MA5_10'] = (df.MA5 - df.MA10) / df.Close
    df['X_MA10_20'] = (df.MA10 - df.MA20) / df.Close
    
    # Use existing Volatility and Momentum
    df['X_Volatility'] = df.Volatility
    df['X_Momentum'] = df.Momentum
    
    # Relative return features
    df['X_Return'] = df.Return
    df['X_Return_5'] = df.Return.rolling(5).sum()
    
    # Volume features
    df['X_VOL_CHG'] = df.Volume.pct_change(5)
    
    # Use existing Target if not binary, otherwise keep as is
    if 'Target' in df.columns and set(df.Target.dropna().unique()) != {0, 1}:
        # Convert to our desired format if it's not already binary
        df['Target_orig'] = df.Target
        df['Target'] = np.where(df.Return.shift(-1) > 0.005, 1,    # Long if >0.5% up
                       np.where(df.Return.shift(-1) < -0.005, -1,  # Short if >0.5% down
                       0))                                         # No position if in between
    
    return df.dropna()

def get_X(data):
    """Return feature matrix X"""
    feature_columns = [col for col in data.columns if col.startswith('X_')]
    return data[feature_columns].values

def get_y(data):
    """Return target variable y"""
    return data.Target.values

def get_clean_Xy(data):
    """Return (X, y) cleaned of NaN values"""
    X = get_X(data)
    y = get_y(data).values
    isnan = np.isnan(y)
    X = X[~isnan]
    y = y[~isnan]
    return X, y


class XGBoostStrategy(Strategy):
    n_estimators = 100
    max_depth = 5
    learning_rate = 0.1
    price_delta = 0.01  # 1% for take-profit and stop-loss
    train_size = 500    # Number of bars to use for training
    
    def init(self):
        # Prepare the data with features
        self.last_train_bar = 0
        self.retrain_freq = 50
        self.all_data = add_features(self.data.df)
        
        # Initialize the model - XGBoost
        self.model = XGBClassifier(
            n_estimators=self.n_estimators,
            max_depth=self.max_depth,
            learning_rate=self.learning_rate,
            use_label_encoder=False,
            eval_metric='logloss',
            random_state=42
        )
        
        # Only train once we have enough data
        if len(self.all_data) >= self.train_size:
            train_data = self.all_data.iloc[:self.train_size]
            X_train = get_X(train_data)
            y_train = get_y(train_data)
            self.model.fit(X_train, y_train)
        
        # Track our own state
        self.current_position = 0  # 0: flat, 1: long, -1: short
        self.entry_price = None
        
        # To track our position in the dataframe
        self.bar_count = 0
    
    def next(self):
        # self.bar_count += 1
        
        # # Skip until we have enough data
        # if self.bar_count <= self.train_size:
        #     return
        
        # # Get current features
        # current_idx = self.bar_count - 1  # Adjust for 0-based indexing
        
        # # Make sure current_idx is within valid range
        # if current_idx >= len(self.all_data):
        #     return
            
        # current_features = get_X(self.all_data.iloc[current_idx:current_idx+1])
        
        # # Get model prediction
        # try:
        #     prediction = self.model.predict(current_features)[0]
        # except:
        #     # Handle potential errors in prediction
        #     return
        
        # # Current prices
        # close = self.data.Close[-1]
        
        # # Set take-profit and stop-loss levels
        # tp_long = close * (1 + self.price_delta)
        # sl_long = close * (1 - self.price_delta)
        # tp_short = close * (1 - self.price_delta)
        # sl_short = close * (1 + self.price_delta)
        
        # # Execute trades based on model prediction
        # if prediction == 1 and not self.position.is_long:
        #     # Close any existing short position first
        #     if self.position.is_short:
        #         self.position.close()
        #     # Enter long position
        #     self.buy(size=0.2, tp=tp_long, sl=sl_long)
            
        # elif prediction == -1 and not self.position.is_short:
        #     # Close any existing long position first
        #     if self.position.is_long:
        #         self.position.close()
        #     # Enter short position
        #     self.sell(size=0.2, tp=tp_short, sl=sl_short)
        
        # # Risk management for existing trades
        # for trade in self.trades:
        #     # If trade is open more than 5 days, tighten stop loss
        #     if self.data.index[-1] - trade.entry_time > pd.Timedelta(days=5):
        #         if trade.is_long:
        #             # Move stop loss up to entry price to avoid bigger losses
        #             trade.sl = max(trade.sl, trade.entry_price)
        #         else:
        #             # Move stop loss down to entry price to avoid bigger losses
        #             trade.sl = min(trade.sl, trade.entry_price)
        if len(self.data) < self.train_size:
            return
        
        # Initialize last_train_bar if it doesn't exist
        if not hasattr(self, 'last_train_bar'):
            self.last_train_bar = 0
        
        # Initialize retrain_freq if it doesn't exist
        if not hasattr(self, 'retrain_freq'):
            self.retrain_freq = 50  # Retrain every 50 bars by default
        
        # Implement _train_model if it doesn't exist
        if not hasattr(self, '_train_model'):
            def _train_model(self):
                """Train the XGBoost model on the latest data"""
                # Use a sliding window of training data
                train_end = len(self.all_data)
                train_start = max(0, train_end - self.train_size)
                
                if train_start >= train_end:
                    return  # Not enough data to train
                    
                train_data = self.all_data.iloc[train_start:train_end]
                
                # Get features and target
                X_train = get_X(train_data)
                y_train = get_y(train_data)
                
                # Make sure we have enough samples to train
                if len(X_train) < 10 or len(set(y_train)) < 2:
                    return  # Not enough data or not enough class diversity
                    
                # Train the model
                try:
                    self.model.fit(X_train, y_train)
                except Exception as e:
                    print(f"Error training model: {e}")
            
            # Add the method to the class instance
            import types
            self._train_model = types.MethodType(_train_model, self)
        
        # Retrain the model periodically
        if len(self.data) - self.last_train_bar >= self.retrain_freq:
            self._train_model()
            self.last_train_bar = len(self.data)
        
        # Update all data with the latest bar
        self.all_data = add_features(self.data.df)
        
        # Initialize required attributes if they don't exist
        if not hasattr(self, 'min_pred_confidence'):
            self.min_pred_confidence = 0.6  # Default threshold
            
        if not hasattr(self, 'tp_factor'):
            self.tp_factor = self.price_delta  # Default to existing price_delta
            
        if not hasattr(self, 'sl_factor'):
            self.sl_factor = self.price_delta  # Default to existing price_delta
        
        # Get prediction for current bar (ensure method exists or implement it)
        if not hasattr(self, '_get_prediction'):
            # Define a simple prediction method if it doesn't exist
            def _get_prediction(self):
                current_bar = self.all_data.iloc[-1:]
                if len(current_bar) == 0:
                    return 0, 0  # No data, no prediction
                    
                features = get_X(current_bar)
                
                # Check if we have a model trained
                if hasattr(self, 'model') and self.model is not None:
                    try:
                        prediction = self.model.predict(features)[0]
                        # Get prediction probabilities
                        probs = self.model.predict_proba(features)[0]
                        confidence = max(probs)
                        return prediction, confidence
                    except:
                        return 0, 0  # Error in prediction
                return 0, 0  # No model, no prediction
            
            # Add the method to the class instance
            import types
            self._get_prediction = types.MethodType(_get_prediction, self)
        
        # Same for position size calculation
        if not hasattr(self, '_calculate_position_size'):
            def _calculate_position_size(self, confidence):
                # Simple linear scaling based on confidence
                return min(0.5, confidence * 0.5)  # Max 50% of equity
            
            # Add the method to the class instance
            import types
            self._calculate_position_size = types.MethodType(_calculate_position_size, self)
        
        # Same for trade adjustment
        if not hasattr(self, '_adjust_existing_trades'):
            def _adjust_existing_trades(self):
                for trade in self.trades:
                    # If trade is open more than 5 days, tighten stop loss
                    if self.data.index[-1] - trade.entry_time > pd.Timedelta(days=5):
                        if trade.is_long:
                            # Move stop loss up to entry price to avoid bigger losses
                            trade.sl = max(trade.sl, trade.entry_price)
                        else:
                            # Move stop loss down to entry price to avoid bigger losses
                            trade.sl = min(trade.sl, trade.entry_price)
            
            # Add the method to the class instance
            import types
            self._adjust_existing_trades = types.MethodType(_adjust_existing_trades, self)
        
        # Get prediction and confidence
        prediction, confidence = self._get_prediction()
        
        # Only trade if confidence is above threshold
        if confidence >= self.min_pred_confidence:
            # Calculate position size based on confidence
            position_size = self._calculate_position_size(confidence)
            
            # Current price
            close = self.data.Close[-1]
            
            # Set take-profit and stop-loss levels
            tp_long = close * (1 + self.tp_factor)
            sl_long = close * (1 - self.sl_factor)
            tp_short = close * (1 - self.tp_factor)
            sl_short = close * (1 + self.sl_factor)
            
            # Execute trades based on model prediction
            if prediction == 1 and not self.position.is_long:
                # Close any existing short position first
                if self.position.is_short:
                    self.position.close()
                
                # Enter long position
                self.buy(size=position_size, tp=tp_long, sl=sl_long)
                self.current_position = 1
                self.entry_price = close
                
            elif prediction == -1 and not self.position.is_short:
                # Close any existing long position first
                if self.position.is_long:
                    self.position.close()
                
                # Enter short position
                self.sell(size=position_size, tp=tp_short, sl=sl_short)
                self.current_position = -1
                self.entry_price = close
            
            # For neutral prediction, consider closing positions
            elif prediction == 0 and (self.position.is_long or self.position.is_short):
                # Only close if confidence is high enough
                if confidence > 0.75:
                    self.position.close()
                    self.current_position = 0
                    self.entry_price = None
        
        # Adjust risk management for existing trades
        self._adjust_existing_trades()

In [5]:
## New XGBoostStrategy 
def init(self):
    # Prepare the data with features
    self.all_data = add_features(self.data.df)
    
    # Initialize the model - XGBoost
    self.model = XGBClassifier(
        n_estimators=self.n_estimators,
        max_depth=self.max_depth,
        learning_rate=self.learning_rate,
        use_label_encoder=False,
        eval_metric='logloss',
        random_state=42
    )
    
    # Initialize training parameters
    self.train_size = getattr(self, 'train_size', 500)  # Number of bars to use for training
    self.retrain_freq = getattr(self, 'retrain_freq', 50)  # Retrain every 50 bars
    self.last_train_bar = 0
    
    # Initialize trading parameters
    self.min_pred_confidence = getattr(self, 'min_pred_confidence', 0.6)
    self.tp_factor = getattr(self, 'tp_factor', self.price_delta)
    self.sl_factor = getattr(self, 'sl_factor', self.price_delta)
    
    # Track our own state
    self.current_position = 0  # 0: flat, 1: long, -1: short
    self.entry_price = None
    
    # Train the model if we have enough data
    if len(self.all_data) >= self.train_size:
        self._train_model()

def next(self):
    # Skip if we don't have enough data
    if len(self.data) < self.train_size:
        return
    
    # Retrain the model periodically
    if len(self.data) - self.last_train_bar >= self.retrain_freq:
        self._train_model()
        self.last_train_bar = len(self.data)
    
    # Update all data with the latest bar
    self.all_data = add_features(self.data.df)
    
    # Get prediction for current bar
    prediction, confidence = self._get_prediction()
    
    # Only trade if confidence is above threshold
    if confidence >= self.min_pred_confidence:
        # Calculate position size based on confidence
        position_size = self._calculate_position_size(confidence)
        
        # Current price
        close = self.data.Close[-1]
        
        # Set take-profit and stop-loss levels
        tp_long = close * (1 + self.tp_factor)
        sl_long = close * (1 - self.sl_factor)
        tp_short = close * (1 - self.tp_factor)
        sl_short = close * (1 + self.sl_factor)
        
        # Execute trades based on model prediction
        if prediction == 1 and not self.position.is_long:
            # Close any existing short position first
            if self.position.is_short:
                self.position.close()
            
            # Enter long position
            self.buy(size=position_size, tp=tp_long, sl=sl_long)
            self.current_position = 1
            self.entry_price = close
            
        elif prediction == -1 and not self.position.is_short:
            # Close any existing long position first
            if self.position.is_long:
                self.position.close()
            
            # Enter short position
            self.sell(size=position_size, tp=tp_short, sl=sl_short)
            self.current_position = -1
            self.entry_price = close
        
        # For neutral prediction, consider closing positions
        elif prediction == 0 and (self.position.is_long or self.position.is_short):
            # Only close if confidence is high enough
            if confidence > 0.75:
                self.position.close()
                self.current_position = 0
                self.entry_price = None
    
    # Adjust risk management for existing trades
    self._adjust_existing_trades()

def _train_model(self):
    """Train the XGBoost model on the latest data"""
    # Use a sliding window of training data
    train_end = len(self.all_data)
    train_start = max(0, train_end - self.train_size)
    
    if train_start >= train_end:
        return  # Not enough data to train
        
    train_data = self.all_data.iloc[train_start:train_end]
    
    # Get features and target
    X_train = get_X(train_data)
    y_train = get_y(train_data)
    
    # Make sure we have enough samples to train
    if len(X_train) < 10 or len(set(y_train)) < 2:
        return  # Not enough data or not enough class diversity
        
    # Train the model
    try:
        self.model.fit(X_train, y_train)
    except Exception as e:
        print(f"Error training model: {e}")

def _get_prediction(self):
    """Get prediction and confidence for current bar"""
    current_bar = self.all_data.iloc[-1:]
    if len(current_bar) == 0:
        return 0, 0  # No data, no prediction
        
    features = get_X(current_bar)
    
    # Check if we have a model trained
    if hasattr(self, 'model') and self.model is not None:
        try:
            prediction = self.model.predict(features)[0]
            # Get prediction probabilities
            probs = self.model.predict_proba(features)[0]
            confidence = max(probs)
            return prediction, confidence
        except:
            return 0, 0  # Error in prediction
    return 0, 0  # No model, no prediction

def _calculate_position_size(self, confidence):
    """Calculate position size based on prediction confidence"""
    # Simple linear scaling based on confidence
    return min(0.5, confidence * 0.5)  # Max 50% of equity

def _adjust_existing_trades(self):
    """Adjust risk management for existing trades"""
    for trade in self.trades:
        # If trade is open more than 5 days, tighten stop loss
        if self.data.index[-1] - trade.entry_time > pd.Timedelta(days=5):
            if trade.is_long:
                # Move stop loss up to entry price to avoid bigger losses
                trade.sl = max(trade.sl, trade.entry_price)
            else:
                # Move stop loss down to entry price to avoid bigger losses
                trade.sl = min(trade.sl, trade.entry_price)

In [6]:
class AllInStrategy(Strategy):
    def init(self):
        # Set the position size to 100% of equity
        self.position_size = 1.0
        
        # Keep track of whether we've already entered a position
        self.position_entered = False
        
    def next(self):
        # Only enter a position once, at the beginning
        if not self.position_entered:
            # Buy with all available capital
            self.buy(size=self.position_size)
            
            # Mark that we've entered our position
            self.position_entered = True
            

In [9]:
bt = Backtest(df, XGBoostStrategy, cash=10_000, commission=.0002, margin=0.05)
bt.run()

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.



Start                     2020-01-30 00:00:00
End                       2023-12-29 00:00:00
Duration                   1429 days 00:00:00
Exposure Time [%]                    26.13982
Equity Final [$]                  10934.46478
Equity Peak [$]                   27663.80806
Commissions [$]                    5944.14783
Return [%]                            9.34465
Buy & Hold Return [%]                45.25956
Return (Ann.) [%]                     2.30709
Volatility (Ann.) [%]                44.48774
CAGR [%]                              1.58786
Sharpe Ratio                          0.05186
Sortino Ratio                         0.07589
Calmar Ratio                          0.02748
Alpha [%]                           -12.43293
Beta                                  0.48117
Max. Drawdown [%]                   -83.95894
Avg. Drawdown [%]                   -22.16543
Max. Drawdown Duration      633 days 00:00:00
Avg. Drawdown Duration      164 days 00:00:00
# Trades                          

In [10]:
bt.plot

<bound method Backtest.plot of <backtesting.backtesting.Backtest object at 0x12c09d0a0>>