In [13]:
# Step 1: Preprocess the SPY data to create a clean CSV file for Backtrader
import pandas as pd
import os

def preprocess_spy_data():
    """
    Read the original SPY data and create a clean CSV file for Backtrader
    """
    # Read the original CSV file
    # spy_file = r'../03_quant_fundamentals/spy_data.csv'
    spy_file = r'../04_S&P500_quant_analysis/01_data/S&P500_D_1789-05-01_2025-09-17.csv'
    
    # Read the file skipping the first 3 header rows
    df = pd.read_csv(spy_file, skiprows=3, header=None)
    
    # Set proper column names
    df.columns = ['Date', 'Open', 'High', 'Low', 'Close', 'Volume']
    
    # Convert Date column to datetime
    df['Date'] = pd.to_datetime(df['Date'])
    
    # Sort by date (should already be sorted)
    df = df.sort_values('Date')
    
    # Save the clean data
    clean_file = 'spy_data_clean.csv'
    df.to_csv(clean_file, index=False)
    
    print(f"Clean data saved to: {clean_file}")
    print(f"Data shape: {df.shape}")
    print(f"Date range: {df['Date'].min()} to {df['Date'].max()}")
    print("\nFirst few rows:")
    print(df.head())
    
    return clean_file

# Create the clean data file
print("=== STEP 1: DATA PREPROCESSING ===")
clean_data_file = preprocess_spy_data()
print("✓ Data preprocessing completed!")

=== STEP 1: DATA PREPROCESSING ===
Clean data saved to: spy_data_clean.csv
Data shape: (39530, 6)
Date range: 1789-07-01 00:00:00 to 2025-09-17 00:00:00

First few rows:
        Date  Open  High   Low  Close  Volume
0 1789-07-01  0.50  0.50  0.50   0.50     0.0
1 1789-08-01  0.50  0.51  0.50   0.51     0.0
2 1789-09-01  0.51  0.51  0.50   0.51     0.0
3 1789-10-01  0.51  0.51  0.51   0.51     0.0
4 1789-11-01  0.51  0.51  0.50   0.50     0.0
✓ Data preprocessing completed!


In [14]:
"""
Step 2: Simple Backtrader Strategy Backtest Example

Indicators and Conditions used only as a reference, this is not a tradable strategy.

STRATEGY TYPE: RSI_MA_Combined
DIRECTION: BUY
INDICATORS: [RSI(28), RSI(21), SMA(100), SMA(50), SMA(200)]

ENTRY CONDITIONS:
  close < SMA(100) and RSI(28) < 30

EXIT CONDITIONS:
  RSI(21) < 25 and SMA(50) < SMA(200)

DETAILED CONDITIONS:
  Entry Logic: close < SMA(100) and RSI(28) < 30
  Exit Logic: RSI(21) < 25 and SMA(50) < SMA(200)
"""

from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

import datetime
import backtrader as bt
import backtrader.feeds as btfeeds


class RSIMAStrategy(bt.Strategy):

    params = (
        ('rsi_entry_period', 28),  # Period RSI for entry
        ('rsi_exit_period', 21),   # Period RSI for exit
        ('sma_entry_period', 100), # Period SMA for entry
        ('sma_mid_period', 50),    # Period SMA medium for exit
        ('sma_long_period', 200),  # Period SMA long for exit
        ('rsi_entry_threshold', 30), # Soglia RSI for entry
        ('rsi_exit_threshold', 25),  # Soglia RSI for exit
        ('printlog', False),       # Disabled logging to speed up execution
    )

    def __init__(self):
        # Entry technical indicators
        self.rsi_entry = bt.talib.RSI(self.data.close, timeperiod=self.p.rsi_entry_period)
        self.sma_entry = bt.talib.SMA(self.data.close, timeperiod=self.p.sma_entry_period)

        # Exit technical indicators
        self.rsi_exit = bt.talib.RSI(self.data.close, timeperiod=self.p.rsi_exit_period)
        self.sma_mid = bt.talib.SMA(self.data.close, timeperiod=self.p.sma_mid_period)
        self.sma_long = bt.talib.SMA(self.data.close, timeperiod=self.p.sma_long_period)

        # Tracking variables
        self.order = None
        self.buyprice = None
        self.buycomm = None

    def log(self, txt, dt=None, doprint=False):
        """Logging function for this strategy"""
        if self.params.printlog or doprint:
            dt = dt or self.datas[0].datetime.date(0)
            print(f'{dt.isoformat()}, {txt}')

    def notify_order(self, order):
        if order.status in [order.Submitted, order.Accepted]:
            # Buy/Sell order submitted/accepted to/by broker - Nothing to do
            return

        # Check if an order has been completed
        if order.status in [order.Completed]:
            if order.isbuy():
                self.log(
                    f'BUY EXECUTED, Price: {order.executed.price:.2f}, '
                    f'Cost: {order.executed.value:.2f}, '
                    f'Comm: {order.executed.comm:.2f}')

                self.buyprice = order.executed.price
                self.buycomm = order.executed.comm
            else:  # Sell
                self.log(
                    f'SELL EXECUTED, Price: {order.executed.price:.2f}, '
                    f'Cost: {order.executed.value:.2f}, '
                    f'Comm: {order.executed.comm:.2f}')

            self.bar_executed = len(self)

        elif order.status in [order.Canceled, order.Margin, order.Rejected]:
            self.log('Order Canceled/Margin/Rejected')

        # Write down: no pending order
        self.order = None

    def notify_trade(self, trade):
        if not trade.isclosed:
            return

        self.log(f'OPERATION PROFIT, GROSS {trade.pnl:.2f}, NET {trade.pnlcomm:.2f}')

    def next(self):
        # # Log reduced for debug
        # if len(self) % 100 == 0:  # Log every 100 days
        #     self.log(f'Day {len(self)}: Close: {self.data.close[0]:.2f}, '
        #             f'RSI28: {self.rsi_entry[0]:.2f}, SMA100: {self.sma_entry[0]:.2f}')

        # Different Intervals
        # Every 50 days (more frequent updates)
        if len(self) % 50 == 0:
            self.log(f'Day {len(self)}: Progress update...')

        # # Every 250 days (quarterly updates)
        # if len(self) % 250 == 0:
        #     self.log(f'Day {len(self)}: Quarterly report...')

        # # Monthly updates (approx 21 trading days)
        # if len(self) % 21 == 0:
        #     self.log(f'Day {len(self)}: Monthly update...')

        # # Conditional Logging Based on Events
        # def next(self):
        # # Regular progress (every 100 days)
        # if len(self) % 100 == 0:
        #     self.log(f'Day {len(self)}: Close: {self.data.close[0]:.2f}')
        
        # # Always log important events
        # if not self.position and self.should_buy():
        #     self.log('🔥 BUY SIGNAL DETECTED!', doprint=True)
        
        # # Log extreme market moves
        # daily_change = (self.data.close[0] / self.data.close[-1] - 1) * 100
        # if abs(daily_change) > 5:  # >5% move
        #     self.log(f'⚡ Large move: {daily_change:.1f}%', doprint=True)


        # Check if an order is pending ... if yes, we cannot send a 2nd one
        if self.order:
            return

        # Check if we are in the market
        if not self.position:
            # ENTRY CONDITIONS: close < SMA(100) and RSI(28) < 30
            entry_sma_condition = self.data.close[0] < self.sma_entry[0]
            entry_rsi_condition = self.rsi_entry[0] < self.p.rsi_entry_threshold

            # Combined entry condition
            if entry_sma_condition and entry_rsi_condition:
                self.log(f'BUY CREATE - Entry conditions met: '
                        f'Close: {self.data.close[0]:.2f} < SMA100: {self.sma_entry[0]:.2f}, '
                        f'RSI28: {self.rsi_entry[0]:.2f} < {self.p.rsi_entry_threshold}')
                # Keep track of the created order to avoid a 2nd order
                self.order = self.buy()

        else:
            # EXIT CONDITIONS: RSI(21) < 25 and SMA(50) < SMA(200)
            exit_rsi_condition = self.rsi_exit[0] < self.p.rsi_exit_threshold
            exit_sma_condition = self.sma_mid[0] < self.sma_long[0]

            if exit_rsi_condition and exit_sma_condition:
                self.log(f'SELL CREATE - Exit conditions met: '
                        f'RSI21: {self.rsi_exit[0]:.2f} < {self.p.rsi_exit_threshold}, '
                        f'SMA50: {self.sma_mid[0]:.2f} < SMA200: {self.sma_long[0]:.2f}')
                # SELL, SELL, SELL!!! (with all possible default parameters)
                self.order = self.sell()

    def stop(self):
        self.log(f'Final Portfolio Value: {self.broker.getvalue():.2f}', doprint=True)


def run_backtest():
    """
    Main function to run the backtest
    """
    # Initialize Cerebro
    cerebro = bt.Cerebro()

    # Read the clean data to get date range
    df = pd.read_csv('spy_data_clean.csv')
    df['Date'] = pd.to_datetime(df['Date'])
    
    # Get min and max dates from the DataFrame
    fromdate = df['Date'].min()
    todate = df['Date'].max()
    
    print(f"Using data from {fromdate.strftime('%Y-%m-%d')} to {todate.strftime('%Y-%m-%d')}")

    # Load the data
    data = btfeeds.GenericCSVData(
        # Using the clean SPY data file
        dataname='spy_data_clean.csv',
        dtformat=('%Y-%m-%d'),  # Date format without time
        datetime=0,  # Date column is the first column (index 0)
        open=1,      # Open column is the second column (index 1)
        high=2,      # High column is the third column (index 2)
        low=3,       # Low column is the fourth column (index 3)
        close=4,     # Close column is the fifth column (index 4)
        volume=5,    # Volume column is the sixth column (index 5)
        openinterest=-1,  # No open interest data
        headers=True,  # Skip the header row
        # Date range - use actual min/max from the data
        fromdate=fromdate,
        todate=todate,
    )

    # Add the data to Cerebro
    cerebro.adddata(data)

    # Add the strategy
    cerebro.addstrategy(RSIMAStrategy)

    # Set the initial capital
    cerebro.broker.setcash(100000.0)

    # Set the commission - 0.1% per trade
    cerebro.broker.setcommission(commission=0.001)

    # Add analyzers for performance
    cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name="trades")
    cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name="sharpe")
    cerebro.addanalyzer(bt.analyzers.DrawDown, _name="drawdown")
    cerebro.addanalyzer(bt.analyzers.Returns, _name="returns")

    # Add additional analyzers as requested
    cerebro.addanalyzer(bt.analyzers.AnnualReturn, _name="annual_returns")
    cerebro.addanalyzer(bt.analyzers.TimeDrawDown, _name="time_drawdown")
    cerebro.addanalyzer(bt.analyzers.Returns, _name="detailed_returns")

    # Print the initial portfolio value
    print(f'Starting Portfolio Value: {cerebro.broker.getvalue():.2f}')

    # Run the backtest
    results = cerebro.run()

    # Print the final portfolio value
    print(f'Final Portfolio Value: {cerebro.broker.getvalue():.2f}')

    # Extract and print the statistics
    strat = results[0]

    print('\n=== TRADE ANALYSIS ===')
    trade_analysis = strat.analyzers.trades.get_analysis()
    if trade_analysis:
        # Safe handling of trade analysis data using try/except
        try:
            total_trades = trade_analysis.total.total
        except (KeyError, AttributeError):
            total_trades = 0
        print(f"Total Trades: {total_trades}")

        # Safe handling for winning trades
        won_trades = 0
        gross_profit = 0.0
        try:
            won_trades = trade_analysis.won.total
            try:
                gross_profit = trade_analysis.won.pnl.total
            except (KeyError, AttributeError):
                gross_profit = 0.0
        except (KeyError, AttributeError):
            pass
        print(f"Winning Trades: {won_trades}")

        # Safe handling for losing trades
        lost_trades = 0
        gross_loss = 0.0
        try:
            lost_trades = trade_analysis.lost.total
            try:
                gross_loss = trade_analysis.lost.pnl.total
            except (KeyError, AttributeError):
                gross_loss = 0.0
        except (KeyError, AttributeError):
            pass
        print(f"Losing Trades: {lost_trades}")

        print(f"Gross Profit: {gross_profit:.2f}")
        print(f"Gross Loss: {gross_loss:.2f}")

        # Calculate win rate if there are trades
        if total_trades > 0:
            win_rate = (won_trades / total_trades) * 100
            print(f"Win Rate: {win_rate:.2f}%")

        # Profit Factor
        if abs(gross_loss) > 0:
            profit_factor = abs(gross_profit / gross_loss)
            print(f"Profit Factor: {profit_factor:.2f}")

        # # Avg Profit for Trade
        # if total_trades > 0:
        #     avg_profit_per_trade = (gross_profit + gross_loss) / total_trades
        #     print(f"Avg Profit for Trade: {avg_profit_per_trade:.2f}")

        # Net Profit
        net_profit = gross_profit + gross_loss  # gross_loss è già negativo
        print(f"Net Profit: {net_profit:.2f}")
    else:
        print("No trade data available")

    print('\n=== PERFORMANCE METRICS ===')
    sharpe_ratio = strat.analyzers.sharpe.get_analysis()
    if sharpe_ratio and 'sharperatio' in sharpe_ratio:
        print(f"Sharpe Ratio: {sharpe_ratio['sharperatio']:.4f}")

    drawdown = strat.analyzers.drawdown.get_analysis()
    if drawdown:
        print(f"Max Drawdown: {drawdown['max']['drawdown']:.2f}%")
        print(f"Max Drawdown Money: ${drawdown['max']['moneydown']:.2f}")

    returns = strat.analyzers.returns.get_analysis()
    if returns:
        print(f"Total Return: {returns['rtot']*100:.2f}%")
        print(f"Average Return: {returns['ravg']*100:.4f}%")

    # Display results from additional analyzers
    print('\n=== ADDITIONAL ANALYTICS ===')

    # Annual Returns Analysis
    annual_returns = strat.analyzers.annual_returns.get_analysis()
    if annual_returns:
        print("Annual Returns:")
        for year, return_val in annual_returns.items():
            print(f"  {year}: {return_val*100:.2f}%")

    # Time DrawDown Analysis
    time_drawdown = strat.analyzers.time_drawdown.get_analysis()
    if time_drawdown:
        print(f"\nTime DrawDown Analysis:")
        if 'maxdrawdownperiod' in time_drawdown:
            print(f"  Max DrawDown Period: {time_drawdown['maxdrawdownperiod']} days")
        if 'maxdrawdown' in time_drawdown:
            print(f"  Max DrawDown Value: {time_drawdown['maxdrawdown']:.2f}%")

    # Detailed Returns Analysis
    detailed_returns = strat.analyzers.detailed_returns.get_analysis()
    if detailed_returns:
        print(f"\nDetailed Returns Analysis:")
        print(f"  Total Compound Return (rtot): {detailed_returns.get('rtot', 0)*100:.4f}%")
        print(f"  Average Return (ravg): {detailed_returns.get('ravg', 0)*100:.6f}%")
        if 'rnorm' in detailed_returns:
            print(f"  Annualized/Normalized Return (rnorm): {detailed_returns['rnorm']*100:.4f}%")
        if 'rnorm100' in detailed_returns:
            print(f"  Annualized Return in 100% (rnorm100): {detailed_returns['rnorm100']:.4f}%")

    return cerebro, results


print("=== STEP 2: STRATEGY AND BACKTEST FUNCTION DEFINED ===")
print("✓ RSI+MA Strategy class created")
print("✓ Backtest function ready")
print("✓ Ready to run backtest!")


=== STEP 2: STRATEGY AND BACKTEST FUNCTION DEFINED ===
✓ RSI+MA Strategy class created
✓ Backtest function ready
✓ Ready to run backtest!


In [15]:
# Step 3: Execute the backtest
print("=== STEP 3: RUNNING BACKTEST ===")
cerebro, results = run_backtest()

=== STEP 3: RUNNING BACKTEST ===
Using data from 1789-07-01 to 2025-09-17
Starting Portfolio Value: 100000.00
Using data from 1789-07-01 to 2025-09-17
Starting Portfolio Value: 100000.00
2025-09-16, Final Portfolio Value: 106649.54
Final Portfolio Value: 106649.54

=== TRADE ANALYSIS ===
Total Trades: 66
Winning Trades: 24
Losing Trades: 41
Gross Profit: 1059.87
Gross Loss: -218.58
Win Rate: 36.36%
Profit Factor: 4.85
Net Profit: 841.29

=== PERFORMANCE METRICS ===
Sharpe Ratio: -6.1130
Max Drawdown: 1.16%
Max Drawdown Money: $1219.53
Total Return: 6.44%
Average Return: 0.0002%

=== ADDITIONAL ANALYTICS ===
Annual Returns:
  1789: 0.00%
  1790: 0.00%
  1791: 0.00%
  1792: 0.00%
  1793: 0.00%
  1794: 0.00%
  1795: 0.00%
  1796: 0.00%
  1797: 0.00%
  1798: 0.00%
  1799: 0.00%
  1800: 0.00%
  1801: 0.00%
  1802: 0.00%
  1803: 0.00%
  1804: 0.00%
  1805: 0.00%
  1806: 0.00%
  1807: 0.00%
  1808: 0.00%
  1809: 0.00%
  1810: 0.00%
  1811: 0.00%
  1812: 0.00%
  1813: 0.00%
  1814: 0.00%
  181