# Backtest of the Intraday TQQQ Strategy
The Strategy works as follows:
1. Before market open, calculate the Average True Range (ATR) of TQQQ over the last 14 days. 
Use 7.5% of this number as your stop width.
2. Wait for the first 5 minutes of price action. (09:30-09:34 EST). If the move from market open 
is positive, go long. If it's negative, go short.
3. Place your stop at 7.5% ATR away from the 5-minute close.
4. If your stop isn't hit, close the position at the end of the trading session (16:00 EST).

## Imports, Setup, and and Data Fetching

In [29]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import pytz
from typing import Tuple, List
import matplotlib.pyplot as plt
import seaborn as sns
from polygon import RESTClient
from config import settings
from tqdm.notebook import tqdm
from pathlib import Path
import os

import warnings
warnings.filterwarnings('ignore')

# Set some pandas display options
pd.set_option('display.max_rows', 100)
pd.set_option('display.max_columns', 100)

# Initialize Polygon client
client = RESTClient(api_key=settings.POLYGON.API_KEY)

In [30]:
def get_cache_dir() -> Path:
    """Create and return the cache directory path"""
    cache_dir = Path("data_cache")
    cache_dir.mkdir(exist_ok=True)
    return cache_dir

def get_cache_filename(symbol: str, timeframe: str, start_date: str, end_date: str) -> str:
    """Generate a consistent cache filename based on parameters"""
    return f"{symbol}_{timeframe}_{start_date}_{end_date}.csv"

def get_historical_data(symbol: str, start_date: str, end_date: str) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """Get both 1-minute and 5-minute historical data from Polygon with caching"""
    cache_dir = get_cache_dir()
    
    # Cache filenames for both timeframes
    cache_1min = cache_dir / get_cache_filename(symbol, "1min", start_date, end_date)
    cache_5min = cache_dir / get_cache_filename(symbol, "5min", start_date, end_date)
    
    if cache_1min.exists() and cache_5min.exists():
        print("Loading cached data...")
        # Load with proper parsing of timezone-aware timestamps
        df_1min = pd.read_csv(cache_1min, parse_dates=['timestamp'])
        df_5min = pd.read_csv(cache_5min, parse_dates=['timestamp'])
        
        # Set index without timezone conversion
        df_1min.set_index('timestamp', inplace=True)
        df_5min.set_index('timestamp', inplace=True)
        
        return df_1min, df_5min
    
    print("Fetching historical data from Polygon...")
    
    # Get 1-minute data for opening 5 minutes
    aggs_1min = []
    for a in tqdm(client.list_aggs(
        ticker=symbol,
        multiplier=1,
        timespan="minute",
        from_=start_date,
        to=end_date,
        limit=50000
    ), desc="1-min data"):
        dt = pd.Timestamp(a.timestamp, unit='ms', tz='UTC')
        if dt.hour == 9 and 30 <= dt.minute <= 34:
            aggs_1min.append({
                'timestamp': a.timestamp,
                'Open': a.open,
                'High': a.high,
                'Low': a.low,
                'Close': a.close,
                'Volume': a.volume
            })
    
    # Get 5-minute data for the rest of the day
    aggs_5min = []
    for a in tqdm(client.list_aggs(
        ticker=symbol,
        multiplier=5,
        timespan="minute",
        from_=start_date,
        to=end_date,
        limit=50000
    ), desc="5-min data"):
        dt = pd.Timestamp(a.timestamp, unit='ms', tz='UTC')
        if dt.hour > 9 or (dt.hour == 9 and dt.minute >= 35):
            aggs_5min.append({
                'timestamp': a.timestamp,
                'Open': a.open,
                'High': a.high,
                'Low': a.low,
                'Close': a.close,
                'Volume': a.volume
            })
    
    # Convert to DataFrames
    df_1min = pd.DataFrame(aggs_1min)
    df_5min = pd.DataFrame(aggs_5min)
    
    # Cache the data
    df_1min.to_csv(cache_1min)
    df_5min.to_csv(cache_5min)
    
    return df_1min, df_5min

In [31]:
def get_daily_data(symbol: str, start_date: str, end_date: str) -> pd.DataFrame:
    """Get daily data from Polygon with caching"""
    cache_dir = get_cache_dir()
    cache_file = cache_dir / get_cache_filename(symbol, "daily", start_date, end_date)
    
    # Check if cached data exists and is fresh (less than 1 day old)
    if cache_file.exists():
        cache_age = (pd.Timestamp.now() - pd.Timestamp.fromtimestamp(cache_file.stat().st_mtime)).days
        if cache_age < 1:
            print("Loading cached daily data...")
            df = pd.read_csv(cache_file, index_col='timestamp', parse_dates=True)
            return df
    
    print("Fetching daily data from Polygon...")
    aggs = []
    for a in tqdm(client.list_aggs(
        ticker=symbol,
        multiplier=1,
        timespan="day",
        from_=start_date,
        to=end_date,
        limit=50000
    ), desc="Daily data"):
        aggs.append({
            'timestamp': a.timestamp,
            'Open': a.open,
            'High': a.high,
            'Low': a.low,
            'Close': a.close,
            'Volume': a.volume
        })
    
    df = pd.DataFrame(aggs)
    df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
    df.set_index('timestamp', inplace=True)
    
    # Cache the data
    df.to_csv(cache_file)
    
    return df

def calculate_atr(daily_data: pd.DataFrame, lookback: int = 14) -> float:
    """Calculate ATR based on daily data"""
    # Calculate True Range
    daily_data = daily_data.copy()
    daily_data['High-Low'] = daily_data['High'] - daily_data['Low']
    daily_data['High-PrevClose'] = abs(daily_data['High'] - daily_data['Close'].shift(1))
    daily_data['Low-PrevClose'] = abs(daily_data['Low'] - daily_data['Close'].shift(1))
    daily_data['TR'] = daily_data[['High-Low', 'High-PrevClose', 'Low-PrevClose']].max(axis=1)
    
    # Calculate ATR
    daily_data['ATR'] = daily_data['TR'].rolling(window=lookback).mean()
    
    return daily_data['ATR'].iloc[-1]

In [32]:
def generate_signals(df_1min: pd.DataFrame, min_move_atr_multiple: float = 0.1, atr: float = None) -> pd.DataFrame:
    """Generate trading signals based on first 5 minutes of market data"""
    df = df_1min.copy()
    df['Signal'] = 0
    df['Entry_Price'] = np.nan
    
    # Market open is 9:30 AM ET
    market_open_mask = (df.index.hour == 9) & (df.index.minute == 30)
    if not any(market_open_mask):
        return df
        
    # Get the market open price
    market_open_price = df[market_open_mask]['Open'].iloc[0]
    
    # Get the 9:34 price
    five_min_mask = (df.index.hour == 9) & (df.index.minute == 34)
    if not any(five_min_mask):
        return df
        
    five_min_close = df[five_min_mask]['Close'].iloc[0]
    
    # Calculate the move
    price_move = five_min_close - market_open_price
    
    # Generate signal based on the move
    if price_move > 0:  # Long signal
        df.loc[df[five_min_mask].index[0], 'Signal'] = 1
        df.loc[df[five_min_mask].index[0], 'Entry_Price'] = five_min_close
    elif price_move < 0:  # Short signal
        df.loc[df[five_min_mask].index[0], 'Signal'] = -1
        df.loc[df[five_min_mask].index[0], 'Entry_Price'] = five_min_close
    
    return df

In [33]:
def backtest_strategy(symbol: str, start_date: str, end_date: str,
                     initial_capital: float,
                     stop_atr_multiple: float,
                     min_move_atr_multiple: float,
                     position_size_risk_pct: float,
                     transaction_cost_pct: float,
                     max_position_size_pct: float,
                     max_leverage: float,
                     min_trade_size: float,
                     slippage_bps: float) -> pd.DataFrame:
    """
    Run backtest of the strategy with improved position sizing and risk management

    Parameters:
    -----------
    symbol: str
        Trading instrument symbol
    start_date: str
        Backtest start date
    end_date: str
        Backtest end date
    initial_capital: float
        Starting capital
    stop_atr_multiple: float
        Multiple of ATR for stop loss
    min_move_atr_multiple: float
        Minimum price move as multiple of ATR
    position_size_risk_pct: float
        Position size as percentage of capital at risk
    transaction_cost_pct: float
        Transaction costs as percentage
    max_position_size_pct: float
        Maximum position size as percentage of capital
    max_leverage: float
        Maximum allowed leverage
    min_trade_size: float
        Minimum trade size in dollars
    slippage_bps: float
        Slippage in basis points
    """

    df_1min, df_5min = get_historical_data(symbol, start_date, end_date)

    print("Calculating ATR...")
    extended_start = pd.Timestamp(start_date) - pd.Timedelta(days=30)
    df_daily = get_daily_data(symbol, extended_start.strftime('%Y-%m-%d'), end_date)

    results = []
    current_capital = initial_capital

    # Make indices timezone-aware
    df_1min.index = pd.to_datetime(df_1min.index, utc=True)
    df_5min.index = pd.to_datetime(df_5min.index, utc=True)

    unique_dates = sorted(set(idx.date() for idx in df_1min.index))
    print(f"Found {len(unique_dates)} trading days to process")

    for day in tqdm(unique_dates, desc="Backtest progress"):
        try:
            # Calculate ATR
            current_date = pd.Timestamp(day)
            prev_days = df_daily[df_daily.index < current_date].tail(14)
            if len(prev_days) < 14:
                continue

            atr = calculate_atr(prev_days)
            stop_width = atr * stop_atr_multiple

            # Get day's data
            day_start = pd.Timestamp(day).replace(hour=0, minute=0)
            day_end = (day_start + pd.Timedelta(days=1))

            day_1min = df_1min[(df_1min.index >= pd.Timestamp(day_start, tz=df_1min.index[0].tz)) &
                              (df_1min.index < pd.Timestamp(day_end, tz=df_1min.index[0].tz))]
            day_5min = df_5min[(df_5min.index >= pd.Timestamp(day_start, tz=df_5min.index[0].tz)) &
                              (df_5min.index < pd.Timestamp(day_end, tz=df_5min.index[0].tz))]

            if len(day_1min) == 0 or len(day_5min) == 0:
                continue

            # Generate signals
            signals = generate_signals(day_1min, min_move_atr_multiple, atr)
            entry_signals = signals[signals['Signal'] != 0]

            if len(entry_signals) == 0:
                continue

            # Get trade details
            entry_time = entry_signals.index[0]
            entry_price = entry_signals['Entry_Price'].iloc[0]
            trade_direction = entry_signals['Signal'].iloc[0]

            # Apply slippage to entry price
            entry_price = entry_price * (1 + slippage_bps * np.sign(trade_direction))

            # Calculate stop price
            stop_price = entry_price - (stop_width * trade_direction)

            # Calculate position size based on risk with improved logic
            risk_amount = current_capital * position_size_risk_pct
            risk_per_share = abs(entry_price - stop_price)
            position_size = risk_amount / risk_per_share

            # Apply maximum position size constraint
            max_position = (current_capital * max_position_size_pct) / entry_price
            position_size = min(position_size, max_position)

            # Calculate initial trade value
            trade_value = position_size * entry_price

            # Apply leverage limit
            if trade_value > current_capital * max_leverage:
                trade_value = current_capital * max_leverage
                position_size = trade_value / entry_price

            # Check minimum trade size
            if trade_value < min_trade_size:
                continue

            # Apply entry transaction cost
            entry_cost = trade_value * transaction_cost_pct
            current_capital -= entry_cost

            # Track trade through the day using 5-min data
            day_data_after_entry = day_5min[day_5min.index > entry_time]

            if len(day_data_after_entry) == 0:
                continue

            # Check for stop hit
            stop_hit = False
            exit_price = day_data_after_entry['Close'].iloc[-1]  # Default to close at 4 PM

            for idx, row in day_data_after_entry.iterrows():
                # Check if it's 4 PM
                if idx.hour == 16 and idx.minute == 0:
                    break

                if trade_direction == 1 and row['Low'] <= stop_price:
                    stop_hit = True
                    exit_price = max(row['Open'], stop_price)  # Realistic fill
                    break
                elif trade_direction == -1 and row['High'] >= stop_price:
                    stop_hit = True
                    exit_price = min(row['Open'], stop_price)  # Realistic fill
                    break

            # Apply slippage to exit price
            exit_price = exit_price * (1 - slippage_bps * np.sign(trade_direction))

            # Calculate P&L
            exit_value = position_size * exit_price
            exit_cost = exit_value * transaction_cost_pct
            total_transaction_costs = entry_cost + exit_cost

            if trade_direction == 1:
                pnl = (exit_price - entry_price) * position_size - total_transaction_costs
            else:
                pnl = (entry_price - exit_price) * position_size - total_transaction_costs

            ret = pnl / trade_value
            current_capital += pnl

            # Ensure capital doesn't go negative
            current_capital = max(0, current_capital)

            results.append({
                'Date': day,
                'Direction': trade_direction,
                'Entry_Time': entry_time,
                'Entry_Price': entry_price,
                'Stop_Price': stop_price,
                'Exit_Price': exit_price,
                'Position_Size': position_size,
                'Trade_Value': trade_value,
                'Stop_Hit': stop_hit,
                'ATR': atr,
                'Stop_Width': stop_width,
                'PnL': pnl,
                'Return': ret,
                'Capital': current_capital,
                'Transaction_Costs': total_transaction_costs
            })

            # Break if capital is depleted
            if current_capital <= min_trade_size:
                print(f"Trading stopped due to insufficient capital on {day}")
                break

        except Exception as e:
            print(f"Error on {day}: {str(e)}")
            continue

    results_df = pd.DataFrame(results)
    if len(results_df) > 0:
        results_df.set_index('Date', inplace=True)
        print(f"Successfully processed {len(results_df)} trades")
    else:
        print("No trades were generated during the backtest period")

    return results_df

In [34]:
def analyze_results(results: pd.DataFrame, initial_capital: float = 100000.0) -> None:
    """
    Analyze and display backtest results with comprehensive performance metrics

    Parameters:
    -----------
    results: pd.DataFrame
        DataFrame containing backtest results
    initial_capital: float
        Initial capital used in backtest
    """
    if len(results) == 0:
        print("No trades were executed in the backtest period")
        return

    # Ensure index is datetime
    if not isinstance(results.index, pd.DatetimeIndex):
        results.index = pd.to_datetime(results.index)

    # Calculate daily returns and equity curve
    results['Cumulative_Return'] = (1 + results['Return']).cumprod()
    results['Equity_Curve'] = initial_capital * results['Cumulative_Return']

    # Calculate drawdown series
    results['Peak'] = results['Equity_Curve'].expanding().max()
    results['Drawdown'] = (results['Equity_Curve'] - results['Peak']) / results['Peak']

    # Trade Statistics
    total_trades = len(results)
    winning_trades = len(results[results['PnL'] > 0])
    losing_trades = len(results[results['PnL'] <= 0])
    win_rate = winning_trades / total_trades if total_trades > 0 else 0

    # Position and Capital Statistics
    avg_position_size = results['Position_Size'].mean()
    avg_trade_value = results['Trade_Value'].mean()
    avg_capital_usage = (results['Trade_Value'] / results['Capital'].shift(1)).mean()

    # P&L Statistics
    total_pnl = results['PnL'].sum()
    total_costs = results['Transaction_Costs'].sum()
    net_pnl = total_pnl - total_costs

    # Average win/loss
    avg_win = results[results['PnL'] > 0]['PnL'].mean()
    avg_loss = results[results['PnL'] <= 0]['PnL'].mean()
    max_win = results['PnL'].max()
    max_loss = results['PnL'].min()

    # Risk/Reward Metrics
    profit_factor = abs(results[results['PnL'] > 0]['PnL'].sum() /
                       results[results['PnL'] < 0]['PnL'].sum()) if losing_trades > 0 else float('inf')

    # Stop Analysis
    stops_hit = len(results[results['Stop_Hit'] == True])
    stop_rate = stops_hit / total_trades if total_trades > 0 else 0

    # Use the Capital column directly instead of calculating it
    results['Return'] = results['Capital'].pct_change()  # Calculate returns from actual capital changes
    results['Cumulative_Return'] = results['Capital'] / initial_capital - 1  # Calculate cumulative return directly
    
    # Portfolio Performance calculations
    final_capital = results['Capital'].iloc[-1]  # Use actual final capital
    total_return = (final_capital - initial_capital) / initial_capital

    # Risk Metrics
    max_drawdown = results['Drawdown'].min() * 100
    avg_drawdown = results['Drawdown'].mean() * 100

    # Calculate daily returns for ratios
    daily_returns = results['Return'].resample('D').sum().fillna(0)
    risk_free_rate = 0.02  # Assuming 2% risk-free rate
    excess_returns = daily_returns - risk_free_rate/252

    # Risk Ratios
    # Calculate annualized return using actual trading days
    years = (results.index[-1] - results.index[0]).days / 365.25
    annualized_return = ((1 + total_return) ** (1/years) - 1) * 100
    annualized_volatility = daily_returns.std() * np.sqrt(252) * 100
    sharpe_ratio = np.sqrt(252) * excess_returns.mean() / daily_returns.std() if daily_returns.std() != 0 else 0
    sortino_ratio = np.sqrt(252) * excess_returns.mean() / daily_returns[daily_returns < 0].std() if len(daily_returns[daily_returns < 0]) > 0 else 0
    calmar_ratio = -annualized_return / max_drawdown if max_drawdown != 0 else 0

    # Maximum Consecutive Wins/Losses
    consecutive_wins = consecutive_losses = 0
    max_consecutive_wins = max_consecutive_losses = 0

    for pnl in results['PnL']:
        if pnl > 0:
            consecutive_wins += 1
            consecutive_losses = 0
            max_consecutive_wins = max(max_consecutive_wins, consecutive_wins)
        else:
            consecutive_losses += 1
            consecutive_wins = 0
            max_consecutive_losses = max(max_consecutive_losses, consecutive_losses)

    # Print Results
    print(f"\n{'='*50}")
    print(f"Strategy Performance Analysis")
    print(f"{'='*50}")
    print(f"\nBacktest Period: {results.index[0].strftime('%Y-%m-%d')} to {results.index[-1].strftime('%Y-%m-%d')}")

    print(f"\n{'Trade Statistics':=^40}")
    print(f"Total Trades: {total_trades:,}")
    print(f"Win Rate: {win_rate:.2%}")
    print(f"Profit Factor: {profit_factor:.2f}")
    print(f"Average Position Size: ${avg_position_size:,.2f}")
    print(f"Average Trade Value: ${avg_trade_value:,.2f}")
    print(f"Average Capital Usage: {avg_capital_usage:.2%}")
    print(f"Total Transaction Costs: ${total_costs:,.2f}")
    print(f"Stop-Loss Hit Rate: {stop_rate:.2%}")

    print(f"\n{'P&L Statistics':=^40}")
    print(f"Net P&L: ${net_pnl:,.2f}")
    print(f"Average Win: ${avg_win:,.2f}")
    print(f"Average Loss: ${avg_loss:,.2f}")
    print(f"Largest Win: ${max_win:,.2f}")
    print(f"Largest Loss: ${max_loss:,.2f}")
    print(f"Max Consecutive Wins: {max_consecutive_wins}")
    print(f"Max Consecutive Losses: {max_consecutive_losses}")

    print(f"\n{'Portfolio Performance':=^40}")
    print(f"Initial Capital: ${initial_capital:,.2f}")
    print(f"Final Capital: ${final_capital:,.2f}")
    print(f"Total Return: {total_return:.2%}")
    print(f"Annualized Return: {annualized_return:.2f}%")
    print(f"Annualized Volatility: {annualized_volatility:.2f}%")

    print(f"\n{'Risk Metrics':=^40}")
    print(f"Maximum Drawdown: {max_drawdown:.2f}%")
    print(f"Average Drawdown: {avg_drawdown:.2f}%")
    print(f"Sharpe Ratio: {sharpe_ratio:.2f}")
    print(f"Sortino Ratio: {sortino_ratio:.2f}")
    print(f"Calmar Ratio: {calmar_ratio:.2f}")

    # Plotting
    # Use a basic matplotlib style instead of seaborn
    plt.style.use('default')

    # Create figure with subplots
    fig, axes = plt.subplots(4, 1, figsize=(15, 20))

    # Equity Curve with Drawdown
    ax1 = axes[0]
    ax1.plot(results.index, results['Equity_Curve'], label='Equity Curve', color='blue')
    ax1.set_title('Equity Curve')
    ax1.set_xlabel('Date')
    ax1.set_ylabel('Portfolio Value ($)')
    ax1.grid(True)
    ax1.legend(loc='upper left')

    # Drawdown plot on twin axis
    ax1_dd = ax1.twinx()
    ax1_dd.fill_between(results.index, 0, results['Drawdown'] * 100,
                       color='red', alpha=0.3, label='Drawdown %')
    ax1_dd.set_ylabel('Drawdown %')
    ax1_dd.legend(loc='upper right')

    # Trade Values Over Time
    axes[1].plot(results.index, results['Trade_Value'], color='green')
    axes[1].set_title('Trade Values Over Time')
    axes[1].set_xlabel('Date')
    axes[1].set_ylabel('Trade Value ($)')
    axes[1].grid(True)

    # Monthly returns
    monthly_returns = results['Return'].resample('M').sum()
    axes[2].bar(monthly_returns.index, monthly_returns,
                color=['green' if x >= 0 else 'red' for x in monthly_returns])
    axes[2].set_title('Monthly Returns')
    axes[2].set_xlabel('Month')
    axes[2].set_ylabel('Return')
    axes[2].grid(True)
    plt.setp(axes[2].xaxis.get_majorticklabels(), rotation=45)

    # Distribution of returns
    axes[3].hist(results['Return'], bins=50, density=True, alpha=0.5, color='blue')
    axes[3].set_title('Distribution of Returns')
    axes[3].set_xlabel('Return')
    axes[3].set_ylabel('Frequency')
    axes[3].grid(True)

    plt.tight_layout()
    plt.show()

    # Additional rolling metrics plot
    fig, axes = plt.subplots(3, 1, figsize=(15, 15))

    # Rolling Sharpe Ratio (252-day)
    rolling_sharpe = np.sqrt(252) * results['Return'].rolling(252).mean() / results['Return'].rolling(252).std()
    axes[0].plot(rolling_sharpe.index, rolling_sharpe)
    axes[0].set_title('Rolling 252-day Sharpe Ratio')
    axes[0].grid(True)

    # Rolling 63-day (quarter) Win Rate
    rolling_winrate = results['PnL'].rolling(63).apply(lambda x: np.sum(x > 0) / len(x))
    axes[1].plot(rolling_winrate.index, rolling_winrate)
    axes[1].set_title('Rolling 63-day Win Rate')
    axes[1].grid(True)

    # Rolling 21-day Average Trade Value
    rolling_trade_value = results['Trade_Value'].rolling(21).mean()
    axes[2].plot(rolling_trade_value.index, rolling_trade_value)
    axes[2].set_title('Rolling 21-day Average Trade Value')
    axes[2].grid(True)

    plt.tight_layout()
    plt.show()

    return {
        'total_trades': total_trades,
        'win_rate': win_rate,
        'profit_factor': profit_factor,
        'net_pnl': net_pnl,
        'total_return': total_return,
        'annualized_return': annualized_return,
        'max_drawdown': max_drawdown,
        'sharpe_ratio': sharpe_ratio,
        'sortino_ratio': sortino_ratio,
        'calmar_ratio': calmar_ratio
    }

In [None]:
results = backtest_strategy(
    symbol='TQQQ',
    start_date='2020-01-01',
    end_date='2025-02-26',
    initial_capital=100000,
    stop_atr_multiple=0.075,
    min_move_atr_multiple=0.1,
    position_size_risk_pct=0.02,  # 2% risk per trade
    transaction_cost_pct=0,   # commission free trading of TQQQ with tastytrade
    max_position_size_pct=0.25,   # max 25% of capital per trade
    max_leverage=1.0,            # no leverage
    min_trade_size=1000,         # minimum $1000 per trade
    slippage_bps=0        # 0 bps slippage
)

analyze_results(results, initial_capital=100000)