# SPY Intraday Momentum Strategy Implementation

Based on the paper "Beat the Market: An Effective Intraday Momentum Strategy for S&P500 ETF (SPY)"

This notebook implements the curr.band opening + VWAP closing intraday algorithm.

In [48]:
import pandas as pd
import numpy as np
from math import floor
import matplotlib.pyplot as plt
from datetime import datetime, time, timedelta, date

## Helper Functions

In [49]:
def calculate_vwap_incrementally(prices, volumes):
    """
    Calculate VWAP incrementally for a series of prices and volumes
    
    Parameters:
        prices: List or array of prices
        volumes: List or array of volumes
        
    Returns:
        List of VWAP values calculated incrementally
    """
    cum_vol = 0
    cum_pv = 0
    vwaps = []
    
    for price, volume in zip(prices, volumes):
        cum_vol += volume
        cum_pv += price * volume
        
        if cum_vol > 0:
            vwap = cum_pv / cum_vol
        else:
            vwap = price  # If no volume yet, use current price
            
        vwaps.append(vwap)
        
    return vwaps

In [50]:
def simulate_day(day_df, prev_close, allowed_times, position_size, debug=False):
    """
    Simulate trading for a single day using curr.band + VWAP strategy
    
    Parameters:
        day_df: DataFrame with minute data for a single day (including UpperBound, LowerBound)
        prev_close: Previous day's closing price
        allowed_times: List of times when trading is allowed (e.g., ['10:00', '10:30', ...])
        position_size: Number of shares to trade
        debug: Whether to print debug information
        
    Returns:
        List of trade dictionaries with entry_time, exit_time, side, entry_price, exit_price, pnl
    """
    position = 0  # 0: no position, 1: long, -1: short
    entry_price = np.nan
    trailing_stop = np.nan
    trade_entry_time = None
    trades = []
    trade_completed = False  # Flag to track if a trade has been completed for the day
    
    # Calculate VWAP incrementally during simulation
    prices = []
    volumes = []
    vwaps = []
    
    if debug:
        print(f"\nSimulating day: {day_df['Date'].iloc[0]}")
        print(f"Open: {day_df['Open'].iloc[0]}, Prev Close: {prev_close}")
        print(f"Upper Ref: {day_df['upper_ref'].iloc[0]}, Lower Ref: {day_df['lower_ref'].iloc[0]}")
    
    for idx, row in day_df.iterrows():
        current_time = row['Time']
        price = row['Close']
        volume = row['Volume']
        upper = row['UpperBound']
        lower = row['LowerBound']
        
        # Update prices and volumes for VWAP calculation
        prices.append(price)
        volumes.append(volume)
        
        # Calculate VWAP up to this point
        vwap = calculate_vwap_incrementally(prices, volumes)[-1]
        vwaps.append(vwap)
        
        # Entry signals at allowed times only if no trade has been completed for the day
        if position == 0 and current_time in allowed_times and not trade_completed:
            if price > upper:
                # Long entry
                position = 1
                entry_price = price
                trade_entry_time = row['DateTime']
                # Trailing stop: max(UpperBound, VWAP)
                trailing_stop = max(upper, vwap)
                
                if debug:
                    print(f"LONG ENTRY at {current_time}: Price={price:.2f}, Upper={upper:.2f}, VWAP={vwap:.2f}, Stop={trailing_stop:.2f}")
                    
            elif price < lower:
                # Short entry
                position = -1
                entry_price = price
                trade_entry_time = row['DateTime']
                # Trailing stop: min(LowerBound, VWAP)
                trailing_stop = min(lower, vwap)
                
                if debug:
                    print(f"SHORT ENTRY at {current_time}: Price={price:.2f}, Lower={lower:.2f}, VWAP={vwap:.2f}, Stop={trailing_stop:.2f}")
        
        # Update trailing stop and check for exit signals
        if position != 0:
            if position == 1:  # Long position
                new_stop = max(upper, vwap)
                # Only update in favorable direction (raise the stop)
                trailing_stop = max(trailing_stop, new_stop)
                
                if debug and current_time in allowed_times:
                    print(f"LONG UPDATE at {current_time}: Price={price:.2f}, Upper={upper:.2f}, VWAP={vwap:.2f}, Stop={trailing_stop:.2f}")
                    
                if price < trailing_stop and current_time in allowed_times:
                    # Exit long position
                    exit_time = row['DateTime']
                    pnl = position_size * (price - entry_price)
                    trades.append({
                        'entry_time': trade_entry_time,
                        'exit_time': exit_time,
                        'side': 'Long',
                        'entry_price': entry_price,
                        'exit_price': price,
                        'pnl': pnl,
                        'exit_reason': 'Stop Loss'
                    })
                    
                    if debug:
                        print(f"LONG EXIT at {current_time}: Price={price:.2f}, Stop={trailing_stop:.2f}, P&L=${pnl:.2f}")
                        
                    position = 0
                    trailing_stop = np.nan
                    trade_completed = True  # Mark that a trade has been completed for the day
                    
            elif position == -1:  # Short position
                new_stop = min(lower, vwap)
                # Only update in favorable direction (lower the stop)
                trailing_stop = min(trailing_stop, new_stop)
                
                if debug and current_time in allowed_times:
                    print(f"SHORT UPDATE at {current_time}: Price={price:.2f}, Lower={lower:.2f}, VWAP={vwap:.2f}, Stop={trailing_stop:.2f}")
                    
                if price > trailing_stop and current_time in allowed_times:
                    # Exit short position
                    exit_time = row['DateTime']
                    pnl = position_size * (entry_price - price)
                    trades.append({
                        'entry_time': trade_entry_time,
                        'exit_time': exit_time,
                        'side': 'Short',
                        'entry_price': entry_price,
                        'exit_price': price,
                        'pnl': pnl,
                        'exit_reason': 'Stop Loss'
                    })
                    
                    if debug:
                        print(f"SHORT EXIT at {current_time}: Price={price:.2f}, Stop={trailing_stop:.2f}, P&L=${pnl:.2f}")
                        
                    position = 0
                    trailing_stop = np.nan
                    trade_completed = True  # Mark that a trade has been completed for the day
    
    # Close any open position at the end of the day
    if position != 0:
        exit_time = day_df.iloc[-1]['DateTime']
        last_price = day_df.iloc[-1]['Close']
        last_time = day_df.iloc[-1]['Time']
        
        if position == 1:  # Long position
            pnl = position_size * (last_price - entry_price)
            trades.append({
                'entry_time': trade_entry_time,
                'exit_time': exit_time,
                'side': 'Long',
                'entry_price': entry_price,
                'exit_price': last_price,
                'pnl': pnl,
                'exit_reason': 'Market Close'
            })
            
            if debug:
                print(f"LONG EXIT at {last_time} (CLOSE): Price={last_price:.2f}, P&L=${pnl:.2f}")
                
        else:  # Short position
            pnl = position_size * (entry_price - last_price)
            trades.append({
                'entry_time': trade_entry_time,
                'exit_time': exit_time,
                'side': 'Short',
                'entry_price': entry_price,
                'exit_price': last_price,
                'pnl': pnl,
                'exit_reason': 'Market Close'
            })
            
            if debug:
                print(f"SHORT EXIT at {last_time} (CLOSE): Price={last_price:.2f}, P&L=${pnl:.2f}")
    
    return trades

## Main Backtest Function

In [51]:
def run_backtest(data_path, initial_capital=100000, lookback_days=14, start_date=None, end_date=None, debug_days=None):
    """
    Run the backtest on SPY data
    
    Parameters:
        data_path: Path to the SPY minute data CSV file
        initial_capital: Initial capital for the backtest
        lookback_days: Number of days to use for calculating the Noise Area
        start_date: Start date for the backtest (datetime.date)
        end_date: End date for the backtest (datetime.date)
        debug_days: List of dates to print detailed debug information for
        
    Returns:
        DataFrame with daily results
        DataFrame with monthly results
    """
    # Load and process data
    print(f"Loading data from {data_path}...")
    spy_df = pd.read_csv(data_path, parse_dates=['DateTime'])
    spy_df.sort_values('DateTime', inplace=True)
    
    # Extract date and time components
    spy_df['Date'] = spy_df['DateTime'].dt.date
    spy_df['Time'] = spy_df['DateTime'].dt.strftime('%H:%M')
    
    # Filter data by date range if specified
    if start_date is not None:
        spy_df = spy_df[spy_df['Date'] >= start_date]
        print(f"Filtered data to start from {start_date}")
    
    if end_date is not None:
        spy_df = spy_df[spy_df['Date'] <= end_date]
        print(f"Filtered data to end at {end_date}")
    
    # Get previous day's close for each day
    spy_df['prev_close'] = spy_df.groupby('Date')['Close'].transform('last').shift(1)
    
    # Calculate daily open price (first price of the day)
    spy_df['day_open'] = spy_df.groupby('Date')['Open'].transform('first')
    
    # Calculate Open_day as per the paper: max/min of yesterday's close and today's open
    # For upper bound: max(Open, prev_close)
    # For lower bound: min(Open, prev_close)
    spy_df['upper_ref'] = spy_df.apply(lambda row: max(row['day_open'], row['prev_close']) 
                                      if not pd.isna(row['prev_close']) else row['day_open'], axis=1)
    spy_df['lower_ref'] = spy_df.apply(lambda row: min(row['day_open'], row['prev_close']) 
                                      if not pd.isna(row['prev_close']) else row['day_open'], axis=1)
    
    # Calculate return from open for each minute (using day_open for consistency)
    spy_df['ret'] = spy_df['Close'] / spy_df['day_open'] - 1
    
    # Calculate the Noise Area boundaries
    print("Calculating Noise Area boundaries...")
    # Pivot to get time-of-day in columns
    pivot = spy_df.pivot(index='Date', columns='Time', values='ret').abs()
    # Calculate rolling average of absolute returns for each time-of-day
    # This ensures we're using the previous 14 days for each time point
    sigma = pivot.rolling(window=lookback_days, min_periods=1).mean().shift(1)
    # Convert back to long format
    sigma = sigma.stack().reset_index(name='sigma')
    
    # Merge sigma back to the main dataframe
    spy_df = pd.merge(spy_df, sigma, on=['Date', 'Time'], how='left')
    
    # Fill NaN values in sigma with a reasonable default
    spy_df['sigma'] = spy_df['sigma'].fillna(0.001)  # 0.1% default
    
    # Calculate upper and lower boundaries of the Noise Area using the correct reference prices
    spy_df['UpperBound'] = spy_df['upper_ref'] * (1 + spy_df['sigma'])
    spy_df['LowerBound'] = spy_df['lower_ref'] * (1 - spy_df['sigma'])
    
    # Define allowed trading times (semi-hourly intervals)
    allowed_times = [
        '09:30', '10:00', '10:30', '11:00', '11:30', '12:00', '12:30',
        '13:00', '13:30', '14:00', '14:30', '15:00', '15:30', '16:00'
    ]
    
    # Initialize backtest variables
    capital = initial_capital
    daily_results = []
    all_trades = []
    
    # Run the backtest day by day
    print("Running backtest...")
    unique_dates = sorted(spy_df['Date'].unique())
    
    for i, trade_date in enumerate(unique_dates):
        # Get data for the current day
        day_spy = spy_df[spy_df['Date'] == trade_date].copy()
        day_spy = day_spy.sort_values('DateTime').reset_index(drop=True)
        
        # Skip days with insufficient data
        if len(day_spy) < 10:  # Arbitrary threshold
            print(f"{trade_date}: Insufficient data, skipping")
            daily_results.append({
                'Date': trade_date,
                'capital': capital,
                'daily_return': 0
            })
            continue
        
        # Get the previous day's close
        prev_close = day_spy['prev_close'].iloc[0] if not pd.isna(day_spy['prev_close'].iloc[0]) else None
        
        # Get the opening price for the day
        open_price = day_spy['day_open'].iloc[0]
        
        # Calculate position size (fixed, no VIX adjustment)
        position_size = floor(capital / open_price)
        
        # Skip days with insufficient capital
        if position_size <= 0:
            print(f"{trade_date}: Insufficient capital, skipping")
            daily_results.append({
                'Date': trade_date,
                'capital': capital,
                'daily_return': 0
            })
            continue
        
        # Check if this is a debug day
        debug = debug_days is not None and trade_date in debug_days
        
        # Simulate trading for the day
        trades = simulate_day(day_spy, prev_close, allowed_times, position_size, debug=debug)
        
        # Calculate daily P&L
        day_pnl = sum(trade['pnl'] for trade in trades)
        
        # Update capital and calculate daily return
        capital_start = capital
        capital += day_pnl
        daily_return = day_pnl / capital_start
        
        # Store daily results
        daily_results.append({
            'Date': trade_date,
            'capital': capital,
            'daily_return': daily_return
        })
        
        # Store trades
        for trade in trades:
            trade['Date'] = trade_date
            all_trades.append(trade)
        
        # Print detailed trade information
        trade_details = []
        for trade in trades:
            entry_time = trade['entry_time'].strftime('%H:%M')
            exit_time = trade['exit_time'].strftime('%H:%M')
            side = trade['side']
            entry_price = trade['entry_price']
            exit_price = trade['exit_price']
            pnl = trade['pnl']
            exit_reason = trade['exit_reason']
            
            trade_detail = f"{entry_time}-{exit_time} {side} {entry_price:.2f}->{exit_price:.2f} ${pnl:.2f} ({exit_reason})"
            trade_details.append(trade_detail)
        
        trade_details_str = ', '.join(trade_details) if trade_details else "No trades"
        print(f"{trade_date}: {trade_details_str}, P&L=${day_pnl:.2f}, Return={daily_return*100:.2f}%")
    
    # Create daily results DataFrame
    daily_df = pd.DataFrame(daily_results)
    daily_df['Date'] = pd.to_datetime(daily_df['Date'])
    daily_df.set_index('Date', inplace=True)
    
    # Calculate monthly returns
    monthly = daily_df.resample('ME').first()[['capital']].rename(columns={'capital': 'month_start'})
    monthly['month_end'] = daily_df.resample('ME').last()['capital']
    monthly['monthly_return'] = monthly['month_end'] / monthly['month_start'] - 1
    
    # Print monthly returns
    print("\nMonthly Returns:")
    print(monthly[['month_start', 'month_end', 'monthly_return']])
    
    # Calculate overall performance
    total_return = capital / initial_capital - 1
    print(f"\nTotal Return: {total_return*100:.2f}%")
    
    # Create trades DataFrame
    trades_df = pd.DataFrame(all_trades)
    
    return daily_df, monthly, trades_df

## Run the Backtest

In [52]:
# Run the backtest with spy_all.csv, focusing on 2022-2024 period
start_date = date(2022, 1, 1)
end_date = date(2024, 12, 31)
daily_results, monthly_results, trades = run_backtest('spy_all.csv', initial_capital=100000, 
                                                      lookback_days=14, start_date=start_date, end_date=end_date)

Loading data from spy_all.csv...
Filtered data to start from 2022-01-01
Filtered data to end at 2024-12-31
Calculating Noise Area boundaries...
Running backtest...
2022-01-03: 10:00-10:30 Short 475.62->476.23 $-127.49 (Stop Loss), P&L=$-127.49, Return=-0.13%
2022-01-04: 09:30-10:00 Long 479.27->479.61 $70.68 (Stop Loss), P&L=$70.68, Return=0.07%
2022-01-05: No trades, P&L=$0.00, Return=0.00%
2022-01-06: 09:30-10:30 Short 468.02->468.28 $-55.38 (Stop Loss), P&L=$-55.38, Return=-0.06%
2022-01-07: No trades, P&L=$0.00, Return=0.00%
2022-01-10: 09:30-11:30 Short 462.74->459.87 $614.27 (Stop Loss), P&L=$614.27, Return=0.61%
2022-01-11: 09:30-10:30 Short 464.90->464.71 $40.85 (Stop Loss), P&L=$40.85, Return=0.04%
2022-01-12: No trades, P&L=$0.00, Return=0.00%
2022-01-13: 09:30-10:00 Long 472.12->472.17 $11.14 (Stop Loss), P&L=$11.14, Return=0.01%
2022-01-14: 09:30-10:00 Short 461.42->463.83 $-520.56 (Stop Loss), P&L=$-520.56, Return=-0.52%
2022-01-18: No trades, P&L=$0.00, Return=0.00%
2022-

## Debug Specific Days

In [None]:
# Run the backtest with debug output for specific days
# Example: debug_days = [datetime.date(2022, 1, 20), datetime.date(2022, 1, 21)]
debug_days = [date(2022, 1, 20), date(2022, 1, 21)]
debug_results, debug_monthly, debug_trades = run_backtest('spy_all.csv', initial_capital=100000, 
                                                          lookback_days=14, start_date=start_date, end_date=end_date,
                                                          debug_days=debug_days)

## Performance Analysis

In [45]:
# Print monthly returns in a more readable format
monthly_formatted = monthly_results.copy()
monthly_formatted.index = monthly_formatted.index.strftime('%Y-%m')
monthly_formatted['monthly_return'] = monthly_formatted['monthly_return'] * 100  # Convert to percentage
print("Monthly Returns (%):\n")
print(monthly_formatted['monthly_return'])

Monthly Returns (%):

Date
2022-01   -2.223133
2022-02   -1.974951
2022-03   -1.756902
2022-04   -1.487445
2022-05   -1.662809
2022-06   -0.921494
2022-07   -0.263176
2022-08   -1.616580
2022-09   -2.239205
2022-10   -2.126119
2022-11   -0.163398
2022-12   -1.254027
2023-01    0.034320
2023-02   -2.193802
2023-03   -1.310435
2023-04   -0.861629
2023-05   -0.943770
2023-06   -1.328424
2023-07   -0.129328
2023-08   -1.841216
2023-09   -1.269363
2023-10   -0.966114
2023-11   -0.167482
2023-12   -0.706988
2024-01   -1.293613
2024-02   -0.728251
2024-03   -0.507279
2024-04   -0.507311
2024-05   -0.528117
2024-06   -0.596255
2024-07   -0.704898
2024-08   -2.330985
2024-09   -1.301438
2024-10   -0.939264
2024-11   -0.643695
2024-12   -1.074174
Name: monthly_return, dtype: float64


In [46]:
# Calculate annual returns
annual_returns = daily_results.resample('YE')['daily_return'].apply(lambda x: (1 + x).prod() - 1)
annual_returns = annual_returns * 100  # Convert to percentage
print("Annual Returns (%):\n")
print(annual_returns)

Annual Returns (%):

Date
2022-12-31   -17.656745
2023-12-31   -11.460245
2024-12-31   -11.230606
Freq: YE-DEC, Name: daily_return, dtype: float64


In [47]:
# Calculate trade statistics
if len(trades) > 0:
    trades['duration'] = trades['exit_time'] - trades['entry_time']
    
    print("Trade Statistics:\n")
    print(f"Total Trades: {len(trades)}")
    print(f"Winning Trades: {(trades['pnl'] > 0).sum()} ({(trades['pnl'] > 0).mean()*100:.1f}%)")
    print(f"Losing Trades: {(trades['pnl'] < 0).sum()} ({(trades['pnl'] < 0).mean()*100:.1f}%)")
    print(f"Average P&L per Trade: ${trades['pnl'].mean():.2f}")
    print(f"Average Duration: {trades['duration'].mean()}")

Trade Statistics:

Total Trades: 212
Winning Trades: 20 (9.4%)
Losing Trades: 173 (81.6%)
Average P&L per Trade: $-166.42
Average Duration: 0 days 00:53:29.150943396
