# Quant DCA ETF Strategy 📈
Enhanced with equity curve and performance metrics visualizations.

# ETF Long-Term Buy & Hold Strategy Notebook
This notebook implements the SPY/QQQ DCA strategy with 200-day MA filtering.

In [374]:
import os,sys
from pathlib import Path

In [375]:
PROJECT_ROOT = Path.cwd().parents[0]
CWD = Path(os.getcwd())
# DUCKDB_PATH = PROJECT_ROOT / "data" / "processed" / "alpaca" / "minute.duckdb"
TBL_DAILY = PROJECT_ROOT / "data" / "processed" / "alpaca" / "price.duckdb"
TBL_MINUTE = PROJECT_ROOT / "data" / "processed" / "alpaca" / "minute.duckdb"

print(f"Project Root: {PROJECT_ROOT}")
print(f"Working directory: {CWD}")

Project Root: C:\Users\luyanda\workspace\QuantTrade
Working directory: C:\Users\luyanda\workspace\QuantTrade\notebooks


In [376]:
if PROJECT_ROOT not in sys.path:
    sys.path.append(PROJECT_ROOT)
from utils.data_fetch import get_price_data

## 1. Setup

In [377]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import duckdb

In [378]:
ETFS = ["SPY", "QQQ", "SPLV", "SHY"]
ALLOCATION_PER_MONTH = 500  # USD
CORE_ALLOCATION = {"SPY": 0.5, "QQQ": 0.5}
FALLBACK_ETFS = ["SPLV", "SHY"]
HOLD_PERIOD_MONTHS = 6
REBALANCE_FREQ = "Q"  # Quarterly
TREND_LOOKBACK = 200  # days for moving average filter
STARTING_CASH = 1
START_CASH = 0


## 2. Data Ingestion

In [379]:
# --- Ingest latest minute-level data ---
minute_data = {}

with duckdb.connect(str(TBL_MINUTE)) as con:
    for symbol in ETFS:
        table_name = f"minute_{symbol}"

        query = f"""
        SELECT timestamp,symbol,open,high,low,close,volume,trade_count,vwap
        FROM {table_name}
        """
        try:
            df = con.execute(query).fetchdf().sort_values("timestamp")
            df.set_index("timestamp", inplace=True)
            minute_data[symbol] = df
        except Exception as e:
            print(f"Failed to load {symbol}: {e}")

print(minute_data["SPY"].tail())

                          symbol    open     high      low   close   volume  \
timestamp                                                                     
2025-08-08 21:57:00+02:00    SPY  636.85  637.020  636.850  637.02  13125.0   
2025-08-08 21:58:00+02:00    SPY  637.00  637.085  636.965  637.08   9432.0   
2025-08-08 21:59:00+02:00    SPY  637.08  637.260  636.850  637.26  34887.0   
2025-08-08 22:00:00+02:00    SPY  637.21  637.210  637.210  637.21   1231.0   
2025-08-08 22:01:00+02:00    SPY  637.42  637.470  637.420  637.47    795.0   

                           trade_count        vwap  
timestamp                                           
2025-08-08 21:57:00+02:00        116.0  636.956506  
2025-08-08 21:58:00+02:00        110.0  637.033163  
2025-08-08 21:59:00+02:00        216.0  637.073073  
2025-08-08 22:00:00+02:00         11.0  637.210000  
2025-08-08 22:01:00+02:00          9.0  637.431612  


In [380]:
# --- Ingest latest daily data ---
# Load the last 200 days of data for each ETF from DuckDB
daily_data = {}

with duckdb.connect(TBL_DAILY) as con:
    for symbol in ETFS:
        table_name = f"daily_{symbol}"

        query = f"""
        SELECT timestamp,symbol,open,high,low,close,volume,trade_count,vwap
        FROM {table_name}
        ORDER BY timestamp DESC
        """
        try:
            df = con.execute(query).fetchdf().sort_values("timestamp")
            df.set_index("timestamp", inplace=True)
            daily_data[symbol] = df
        except Exception as e:
            print(f"Failed to load {symbol}: {e}")
print(daily_data["SPY"].tail())

                          symbol     open     high      low   close  \
timestamp                                                             
2025-08-06 06:00:00+02:00    SPY  629.060  633.410  628.165  632.70   
2025-08-06 06:00:00+02:00    SPY  629.060  633.410  628.165  632.70   
2025-08-07 06:00:00+02:00    SPY  636.255  636.955  629.195  632.36   
2025-08-07 06:00:00+02:00    SPY  636.255  636.955  629.195  632.36   
2025-08-08 06:00:00+02:00    SPY  634.110  637.635  633.780  637.26   

                              volume  trade_count        vwap  
timestamp                                                      
2025-08-06 06:00:00+02:00   858622.0      10661.0  631.772947  
2025-08-06 06:00:00+02:00   858622.0      10661.0  631.772947  
2025-08-07 06:00:00+02:00  1145664.0      12538.0  632.345962  
2025-08-07 06:00:00+02:00  1145664.0      12538.0  632.345962  
2025-08-08 06:00:00+02:00   839695.0      10065.0  636.469431  


In [381]:
# Combine all symbols into a wide DataFrame
daily_prices = pd.concat(
    {sym: df['close'] for sym, df in daily_data.items()}, axis=1
)
daily_prices.columns = ETFS  # Flatten MultiIndex


In [382]:
features_df = daily_prices.copy()

for symbol in ETFS:
    features_df[f"{symbol}_200ma"] = daily_prices[symbol].rolling(window=200).mean()
    features_df[f"{symbol}_signal"] = (daily_prices[symbol] > features_df[f"{symbol}_200ma"]).astype(int)


In [383]:
from datetime import datetime

# Combine all timestamps from both dictionaries
all_dates = []

for data_dict in [minute_data, daily_data]:
    for symbol, df in data_dict.items():
        if not df.empty:
            # Drop timezone if present
            index = df.index.tz_localize(None) if df.index.tz is not None else df.index
            all_dates.append(index.min())
            all_dates.append(index.max())

# Final range
START_DATE = min(all_dates)
END_DATE = max(all_dates)
total_days = (END_DATE - START_DATE).days

print(f"📅 Start Date: {START_DATE.date()}")
print(f"📅 End Date:   {END_DATE.date()}")
print(f"📆 Total days between start and end: {total_days}")


📅 Start Date: 2023-08-07
📅 End Date:   2025-08-08
📆 Total days between start and end: 732


## 3. Data Quality Checks

U.S. Market (SPY, QQQ, SPLV, SHY)
Assuming regular NYSE/Nasdaq trading hours:

| **Session**     | **Hours (ET)**   | **Duration** |
| --------------- | ---------------- | ------------ |
| Regular session | 09:30 – 16:00 ET | 6.5 hours    |
|                 |                  | 390 minutes  |

Expect around 390 rows per ETF

In [384]:
for symbol in ETFS:
    df = minute_data[symbol]
    print(f"\n🔍 {symbol}")
    print(f"  • Rows: {len(df)}")
    print(f"  • Date Range: {df.index.min().date()} → {df.index.max().date()}")
    print(f"  • Timezone-aware: {df.index.tz is not None}")
    # print(f"  • Missing 'close': {df['close'].isna().sum()}")

    # --- Drop timezone if needed ---
    df = df.copy()
    if df.index.tz is not None:
        df.index = df.index.tz_localize(None)

    # --- Identify all available intraday dates ---
    df["date"] = df.index.normalize()
    available_dates = df["date"].unique()

    # --- Construct full expected range (business days) ---
    expected_dates = pd.date_range(
        start=df.index.min().normalize(),
        end=df.index.max().normalize(),
        freq='B'
    )

    # --- Missing trading days entirely ---
    missing_dates = sorted(set(expected_dates) - set(available_dates))
    print(f"  • Missing Intraday Dates: {len(missing_dates)}")
    # if missing_dates:
    #     print("    Example:", missing_dates[:5])

    # --- Check for partial trading days (fewer than 390 rows) ---
    counts = df.groupby("date").size()
    partial_days = counts[counts < 390]
    print(f"  • Partial Intraday Days (<390 rows): {len(partial_days)}")
    # if not partial_days.empty:
    #     print("    Example:", partial_days.head())



🔍 SPY
  • Rows: 573345
  • Date Range: 2023-08-07 → 2025-08-08
  • Timezone-aware: True
  • Missing Intraday Dates: 22
  • Partial Intraday Days (<390 rows): 2

🔍 QQQ
  • Rows: 557705
  • Date Range: 2023-08-07 → 2025-08-08
  • Timezone-aware: True
  • Missing Intraday Dates: 22
  • Partial Intraday Days (<390 rows): 2

🔍 SPLV
  • Rows: 102582
  • Date Range: 2023-08-07 → 2025-08-08
  • Timezone-aware: True
  • Missing Intraday Dates: 22
  • Partial Intraday Days (<390 rows): 464

🔍 SHY
  • Rows: 76772
  • Date Range: 2023-08-07 → 2025-08-08
  • Timezone-aware: True
  • Missing Intraday Dates: 22
  • Partial Intraday Days (<390 rows): 492


In [385]:
for symbol in ETFS:
    df = daily_data[symbol]
    print(f"\n🔍 {symbol}")
    print(f"  • Rows: {len(df)}")
    print(f"  • Date Range: {df.index.min().date()} → {df.index.max().date()}")
    print(f"  • Timezone-aware: {df.index.tz is not None}")
    print(f"  • Missing 'close': {df['close'].isna().sum()}")



🔍 SPY
  • Rows: 1508
  • Date Range: 2023-08-07 → 2025-08-08
  • Timezone-aware: True
  • Missing 'close': 0

🔍 QQQ
  • Rows: 1508
  • Date Range: 2023-08-07 → 2025-08-08
  • Timezone-aware: True
  • Missing 'close': 0

🔍 SPLV
  • Rows: 1508
  • Date Range: 2023-08-07 → 2025-08-08
  • Timezone-aware: True
  • Missing 'close': 0

🔍 SHY
  • Rows: 1508
  • Date Range: 2023-08-07 → 2025-08-08
  • Timezone-aware: True
  • Missing 'close': 0


## 3. Feature Engineering

In [386]:
summary = []

for symbol in ETFS:
    minute_df = minute_data[symbol]
    daily_df = daily_data[symbol]

    # Calculate 200-day moving average from daily close
    ma_200 = daily_df['close'].rolling(window=200).mean().iloc[-1]

    # Get the latest 1-minute close price
    latest_price = minute_df['close'].iloc[-1]

    # Determine signal: Buy if price >= 200 MA
    signal = int(latest_price >= ma_200)

    summary.append({
        "Symbol": symbol,
        "Latest Price": latest_price,
        "200-day MA": ma_200,
        "Signal (1=Buy)": signal
    })

signal_df = pd.DataFrame(summary)
display(signal_df)


Unnamed: 0,Symbol,Latest Price,200-day MA,Signal (1=Buy)
0,SPY,637.47,605.8475,1
1,QQQ,575.66,536.4431,1
2,SPLV,73.555,72.7984,1
3,SHY,82.62,82.49825,1


## 5. Intraday (1-min) ETF Prices vs 200-day Moving Average

This chart visualizes the **intraday 1-minute price action** of each ETF (SPY, QQQ, SPLV, SHY) relative to its **long-term 200-day moving average (MA)**.

1. Load the most recent intraday prices (1-minute resolution)
2. Load the latest 200-day moving average from daily data
3. Plot both on the same chart for comparison
4. Indicate whether it's a good day to invest (i.e. price above 200-day MA)


The **200-day MA** serves as a trend-following filter:
- When **price is above** the 200-day MA, the asset is considered to be in a long-term **uptrend**.
- When **price is below** the 200-day MA, it signals a potential **downtrend or weakness**.


#### ✅ Allocation Rules:
- **If SPY is above its 200-day MA** → Allow contribution to SPY this month.
- **If SPY is below its 200-day MA** → Redirect SPY contribution to SPLV (or SHY).
- **If QQQ is above its 200-day MA** → Allow contribution to QQQ this month.
- **If QQQ is below its 200-day MA** → Redirect QQQ contribution to SPLV (or SHY).
- **If both SPY and QQQ are below their 200-day MAs** → Go fully defensive: allocate entire amount to SPLV or SHY, depending on which is above its MA.

#### 🕒 Why Intraday View?
Markets often gap or trend early in the session. Checking current price against the 200-day MA on a **1-minute basis** just before execution (e.g., around 10:00 AM ET) lets us make precise and **rule-based allocation decisions** on the day of purchase.


In [387]:
# --- Get today's intraday data ---
minute_df = minute_data[symbol].sort_index()
today = minute_df.index[-1].normalize()
minute_df = minute_df[minute_df.index.normalize() == today]


In [388]:
from plotly.subplots import make_subplots
import plotly.graph_objects as go

rows, cols = 2, 2

fig = make_subplots(
    rows=rows,
    cols=cols,
    subplot_titles=[f"{etf} Intraday vs 200MA" for etf in ETFS],
    shared_xaxes=False,
    shared_yaxes=False
)

for i, symbol in enumerate(ETFS):
    row = i // cols + 1
    col = i % cols + 1

    # --- Filter to today's intraday data ---
    minute_df = minute_data[symbol].sort_index()
    today = minute_df.index[-1].normalize()
    minute_df = minute_df[minute_df.index.normalize() == today]

    # --- Skip if no 'close' column ---
    if 'close' not in minute_df.columns:
        print(f"Skipping {symbol} – no 'close' column found.")
        continue

    # --- Get latest 200-day MA ---
    try:
        ma_value = features_df[f"{symbol}_200ma"].dropna().iloc[-1]
    except KeyError:
        print(f"Skipping {symbol} – 200-day MA not found in features.")
        continue
    
    # --- Plot intraday price ---
    fig.add_trace(
        go.Scatter(
            x=minute_df.index,
            y=minute_df['close'],
            mode='lines',
            name=f"{symbol} Intraday",
            line=dict(color='deepskyblue'),
            showlegend=False
        ),
        row=row,
        col=col
    )

    # --- Plot 200-day MA as flat line ---
    fig.add_trace(
        go.Scatter(
            x=minute_df.index,
            y=[ma_value] * len(minute_df),
            mode='lines',
            name='200-Day MA',
            line=dict(color='orange', dash='dash'),
            showlegend=False
        ),
        row=row,
        col=col
    )

# Layout tweaks
fig.update_layout(
    title_text="📈 Intraday (1-min) ETF Prices vs 200-Day Moving Average",
    height=800,
    template="plotly_dark"
)

fig.update_xaxes(title_text="Time")
fig.update_yaxes(title_text="Price (USD)")

# 🔥 REQUIRED: Display the chart
fig.show()


In [389]:
for symbol in ETFS:
    minute_df = minute_data[symbol].sort_index()
    latest_price = minute_df['close'].iloc[-1]
    ma_value = features_df[f"{symbol}_200ma"].dropna().iloc[-1]
    
    status = "✅ ABOVE" if latest_price > ma_value else "🚫 BELOW"
    print(f"{symbol}: {latest_price:.2f} is {status} the 200-day MA ({ma_value:.2f})")


SPY: 637.47 is ✅ ABOVE the 200-day MA (605.85)
QQQ: 575.66 is ✅ ABOVE the 200-day MA (536.44)
SPLV: 73.56 is ✅ ABOVE the 200-day MA (72.80)
SHY: 82.62 is ✅ ABOVE the 200-day MA (82.50)


## 5. Strategy Logic

Strategy Rules (Execution Logic)
- Invest $500/month:
  - Default split: 50% SPY, 50% QQQ

- 200-day MA Filter (per ETF):
  - If ETF ≥ 200 MA → ✅ buy normally
  - If ETF < 200 MA → 🔁 redirect that portion to SPLV or SHY, with preference to:
    - SPLV if above 200 MA
    - SHY if SPLV also < 200 MA
    - If both < 200 MA → Hold in cash until next month

- Dividends: Reinvest
- No selling of existing holdings for first 6 months

- Quarterly rebalancing of new contributions to maintain target weights

In [390]:
# --- Backtrader DCA Strategy (buys only) ---
import backtrader as bt
import pandas as pd
from typing import List, Dict

class DCABacktrader(bt.Strategy):
    params = dict(
        monthly_cash=500.0,         # contribution each month
        etfs=(),                    # tuple/list of symbols (must match data feed names)
        invest_day=1,               # day-of-month to invest
        log_trades=True,            # capture executions
    )

    def __init__(self):
        # Map datafeed name -> datafeed
        self.data_by_name = {d._name: d for d in self.datas}
        self.last_invest_month = None
        self.trade_log = []  # collected in notify_order
        self._first_bar_done = False

    def next(self):
        # invest once per calendar month on invest_day
        dt = self.datas[0].datetime.datetime(0)  # engine clock = first feed

        # inject first contribution on the first bar to avoid zero start
        if not self._first_bar_done:
            self.broker.add_cash(self.p.monthly_cash)
            self._first_bar_done = True

        if dt.day != self.p.invest_day:
            return
        if self.last_invest_month == (dt.year, dt.month):
            return

        self.last_invest_month = (dt.year, dt.month)

        # split the monthly cash equally across selected ETFs
        tickers = list(self.p.etfs)
        if not tickers:
            return
        cash_per_etf = float(self.p.monthly_cash) / len(tickers)

        for sym in tickers:
            data = self.data_by_name.get(sym)
            if data is None or len(data) == 0:
                continue
            price = float(data.close[0])
            if price <= 0:
                continue
            
            # add the monthly contribution to the broker cash on invest day
            self.broker.add_cash(self.p.monthly_cash)

            # size the buy 
            size = cash_per_etf / price

            # Market order; Backtrader fills next bar at open by default (or broker policy)
            self.buy(data=data, size=size)

    def notify_order(self, order):
        if order.status in [order.Completed, order.Partial]:
            d = order.data
            dt = bt.num2date(order.executed.dt).replace(tzinfo=None)
            side = 'BUY' if order.isbuy() else 'SELL'
            rec = dict(
                datetime=dt,
                symbol=d._name,
                side=side,
                size=order.executed.size,
                price=order.executed.price,
                value=order.executed.value,
                commission=order.executed.comm
            )
            if self.p.log_trades:
                self.trade_log.append(rec)


In [None]:
# --- Utilities to feed existing pandas OHLCV DataFrames into Backtrader ---
def _bt_feed_from_df(symbol: str, df: pd.DataFrame) -> bt.feeds.PandasData:
    """
    df must have datetime index and columns: ['open','high','low','close','volume'].
    Any extra columns are ignored.
    """
    use = df.copy()
    # normalize index
    use.index = pd.to_datetime(use.index)
    use.index.name = 'datetime'
    # keep required columns
    cols = [c for c in ['open','high','low','close','volume'] if c in use.columns]
    use = use[cols].dropna()
    return bt.feeds.PandasData(dataname=use, name=symbol)

def run_backtest_with_df(
    price_dfs: Dict[str, pd.DataFrame],
    etfs: List[str],
    start_cash: float = 10_000.0,
    monthly_cash: float = 500.0,
    commission_bps: float = 1.0  # 1bp = 0.01%
):
    """
    price_dfs: dict like {'SPY': df, 'QQQ': df, ...} (daily OHLCV recommended)
    etfs: list of tickers to DCA into (must be keys in price_dfs)
    """
    cerebro = bt.Cerebro()
    # add data feeds
    for sym in etfs:
        feed = _bt_feed_from_df(sym, price_dfs[sym])
        cerebro.adddata(feed)

    cerebro.broker.setcash(float(start_cash))
    # simple fixed commission in bps
    cerebro.broker.setcommission(commission=commission_bps/10000.0)

    cerebro.addstrategy(DCABacktrader, monthly_cash=monthly_cash, etfs=tuple(etfs))

    strat = cerebro.run(maxcpus=1)[0]  # single run, get strategy instance

    # Extract trade log as DataFrame
    trades = pd.DataFrame(strat.trade_log) if len(strat.trade_log) else pd.DataFrame(
        columns=['datetime','symbol','side','size','price','value','commission']
    )
    trades.sort_values('datetime', inplace=True)

    # also compute final portfolio value timeline (broker value per bar)
    # We’ll reconstruct from the first data feed's index
    # NOTE: Backtrader's broker value is accessible in next(), but we can rebuild from prices + recorded buys.
    # For simplicity here: just return trades; plotting uses original DataFrames.
    return dict(trades=trades, etfs=etfs)


In [412]:
def plot_dca_results(price_dfs: Dict[str, pd.DataFrame], results: dict, title="DCA Backtest"):
    etfs = results['etfs']
    trades = results['trades'].copy()

    rows = len(etfs)
    fig = make_subplots(
        rows=rows, cols=1,
        shared_xaxes=False,
        vertical_spacing=0.06,
        specs=[[{"secondary_y": True}] for _ in range(rows)],
        subplot_titles=[f"{sym}" for sym in etfs]
    )

    for i, sym in enumerate(etfs, start=1):
        df = price_dfs[sym].copy()
        df.index = pd.to_datetime(df.index)

        fig.add_trace(
            go.Candlestick(
                x=df.index, open=df['open'], high=df['high'],
                low=df['low'], close=df['close'],
                name=f"{sym} OHLC",
                showlegend=False
            ),
            row=i, col=1, secondary_y=False
        )

        if 'volume' in df.columns:
            fig.add_trace(
                go.Bar(
                    x=df.index, y=df['volume'],
                    name=f"{sym} Volume", opacity=0.3,
                    showlegend=False
                ),
                row=i, col=1, secondary_y=True
            )

        if not trades.empty:
            t_sym = trades[trades['symbol'] == sym]
            if not t_sym.empty:
                fig.add_trace(
                    go.Scatter(
                        x=t_sym['datetime'], y=t_sym['price'],
                        mode='markers',
                        marker=dict(symbol='triangle-up', size=10),
                        name=f"{sym} Buys",
                        marker_color='lime',
                        showlegend=False
                    ),
                    row=i, col=1, secondary_y=False
                )

        fig.update_yaxes(title_text="Price", row=i, col=1, secondary_y=False)
        fig.update_yaxes(title_text="Volume", row=i, col=1, secondary_y=True)

    # Disable rangeslider on ALL subplots (not just xaxis)
    fig.update_xaxes(rangeslider=dict(visible=False))

    fig.update_layout(
        height=rows * 650,
        title=title,
        template="plotly_dark",
        title_font=dict(size=20)
    )
    fig.show()



In [393]:
def to_bt_daily(df):
    """
    Make a clean daily OHLCV frame:
    - tz-naive datetime index
    - one row per calendar day
    - OHLC aggregated: O=first, H=max, L=min, C=last, V=sum
    """
    tmp = df.copy()

    # Ensure datetime index and drop tz
    idx = pd.to_datetime(tmp.index)
    try:
        idx = idx.tz_localize(None)
    except (TypeError, AttributeError):
        pass
    tmp.index = idx

    # Collapse duplicates to daily granularity
    tmp["__date__"] = tmp.index.normalize()
    out = (tmp
           .groupby("__date__")
           .agg(open=("open", "first"),
                high=("high", "max"),
                low=("low", "min"),
                close=("close", "last"),
                volume=("volume", "sum"))
           .sort_index())
    out.index.name = "datetime"
    return out.dropna()


In [394]:
# Convert daily_data to price_dfs for Backtrader
price_dfs = {}
for sym, df in daily_data.items():
    price_dfs[sym] = to_bt_daily(df)

# Quick audit
for sym, df in price_dfs.items():
    dups = pd.Index(df.index).duplicated(keep=False).sum()
    print(f"{sym}: {df.index.min().date()} → {df.index.max().date()}, rows={len(df)}, dup_idx={dups}")
    print(df.head(2))


SPY: 2023-08-07 → 2025-08-08, rows=504, dup_idx=0
              open     high     low   close     volume
datetime                                              
2023-08-07  448.69  450.860  448.01  450.68  2795358.0
2023-08-08  448.03  449.225  445.30  448.86  2931732.0
QQQ: 2023-08-07 → 2025-08-08, rows=504, dup_idx=0
              open    high     low   close     volume
datetime                                             
2023-08-07  373.99  375.29  371.56  375.24  1416480.0
2023-08-08  372.61  372.62  368.85  371.93  1499439.0
SPLV: 2023-08-07 → 2025-08-08, rows=504, dup_idx=0
             open    high     low   close    volume
datetime                                           
2023-08-07  62.41  62.635  62.280  62.550  190692.0
2023-08-08  62.45  62.450  61.985  62.245  116079.0
SHY: 2023-08-07 → 2025-08-08, rows=504, dup_idx=0
              open    high    low   close    volume
datetime                                           
2023-08-07  81.075  81.115  81.06  81.095  869619.0

In [413]:
# Example wiring:
# ETFS = ["SPY", "QQQ"]  # or ["STX40","STXIND"] if you feed JSE dataframes
# price_dfs should be a dict of daily OHLCV dfs for the ETFS above
# e.g., price_dfs = {sym: daily_data[sym][['open','high','low','close','volume']].dropna() for sym in ETFS}

results = run_backtest_with_df(
    price_dfs=price_dfs,
    etfs=ETFS,
    start_cash=1,
    monthly_cash=500,
    commission_bps=1.0
)

plot_dca_results(price_dfs, results, title="DCA Backtest – Candles, Buys & Volume")


## 6. Performance Evaluation

In [351]:
import numpy as np
import pandas as pd

def build_equity_curve(price_dfs: dict,
                       trades: pd.DataFrame,
                       etfs: list,
                       start_cash: float,
                       monthly_cash: float,
                       invest_day: int = 1):
    """
    Reconstruct daily equity curve from trades (BUY/SELL),
    adding monthly contributions and marking holdings to market.
    Robust to duplicate timestamps (collapses to one row per day).
    """
    if trades.empty:
        raise ValueError("No trades available to build equity curve.")

    # --- build per-ETF daily close (1 row per calendar day) ---
    closes_map = {}
    for sym in etfs:
        df = price_dfs[sym].copy()
        # ensure datetime index without tz
        idx = pd.to_datetime(df.index)
        try:
            idx = idx.tz_localize(None)
        except (TypeError, AttributeError):
            pass
        df.index = idx

        # normalize to date and collapse duplicates by last (you can use 'last'/'mean')
        df['__date__'] = df.index.normalize()
        s = df.groupby('__date__')['close'].last()  # pick the last close for the day
        s.index.name = 'date'
        closes_map[sym] = s.sort_index()

    # union of all dates across ETFs
    all_dates = sorted(set().union(*[s.index.tolist() for s in closes_map.values()]))
    dates = pd.DatetimeIndex(all_dates)

    # build closes matrix and forward-fill gaps
    closes = pd.DataFrame(index=dates)
    for sym, s in closes_map.items():
        closes[sym] = s.reindex(dates).ffill()

    # normalize trades to dates (BUY negative cash, SELL positive cash is handled below)
    t = trades.copy()
    t['datetime'] = pd.to_datetime(t['datetime']).dt.floor('D')
    t = t.sort_values('datetime')

    # state
    holdings = {sym: 0.0 for sym in etfs}
    cash = float(start_cash)

    equity = []
    contrib = []
    last_contrib_month = None

    for d in dates:
        # monthly contribution on invest_day
        if d.day == invest_day and (last_contrib_month != (d.year, d.month)):
            cash += monthly_cash
            contrib.append((d, monthly_cash))
            last_contrib_month = (d.year, d.month)

        # apply trades on this day
        todays = t[t['datetime'] == d]
        if not todays.empty:
            for _, row in todays.iterrows():
                side = row['side']
                sym  = row['symbol']
                qty  = float(row['size'])
                px   = float(row['price'])
                comm = float(row.get('commission', 0.0))

                if side == 'BUY':
                    holdings[sym] += qty
                    cash -= qty * px + comm
                elif side == 'SELL':
                    holdings[sym] -= qty
                    cash += qty * px - comm

        # mark to market
        mtm = 0.0
        for sym in etfs:
            px = closes.at[d, sym] if (sym in closes.columns and pd.notna(closes.at[d, sym])) else np.nan
            if np.isfinite(px):
                mtm += holdings[sym] * px

        equity.append((d, cash + mtm))

    eq = pd.DataFrame(equity, columns=['date', 'equity']).set_index('date')
    contrib_df = (pd.DataFrame(contrib, columns=['date','contribution'])
                    .set_index('date')) if contrib else pd.DataFrame(columns=['contribution'])
    return eq, contrib_df



In [352]:
# Metrics: CAGR, vol, Sharpe, max DD, TWR, IRR

import numpy as np

def perf_metrics(equity: pd.DataFrame, rf_annual: float = 0.0):
    """
    equity: DataFrame with column 'equity' daily (calendar-day) levels.
    rf_annual: annual risk-free (e.g., 0.045 for 4.5%)
    """
    eq = equity['equity'].astype(float).dropna()
    # use business days for return calc
    eq_b = eq.asfreq('B').ffill()
    rets = eq_b.pct_change().dropna()

    # annualization factors
    ann_factor = 252

    # CAGR
    n_years = (eq_b.index[-1] - eq_b.index[0]).days / 365.25
    cagr = (eq_b.iloc[-1] / eq_b.iloc[0])**(1/n_years) - 1 if n_years > 0 else np.nan

    # Vol & Sharpe
    vol = rets.std() * np.sqrt(ann_factor)
    rf_daily = (1 + rf_annual) ** (1/ann_factor) - 1
    excess = rets - rf_daily
    sharpe = (excess.mean() / rets.std()) * np.sqrt(ann_factor) if rets.std() > 0 else np.nan

    # Max Drawdown
    roll_max = eq_b.cummax()
    dd = eq_b / roll_max - 1
    max_dd = dd.min()

    # TWR (time-weighted) equals product of (1+returns)-1
    twr = (1 + rets).prod() - 1

    return dict(
        CAGR=cagr,
        AnnVol=vol,
        Sharpe=sharpe,
        MaxDrawdown=max_dd,
        TWR=twr,
        daily_returns=rets,
        drawdown=dd
    )


In [353]:
import numpy as np
import pandas as pd

def cashflow_series(trades: pd.DataFrame,
                    start_cash: float,
                    monthly_cash: float,
                    invest_day: int,
                    equity_last: float) -> pd.Series:
    """ Robust cashflow series: start cash (+), monthly contribs (+), buys (-), sells (+), terminal equity (+). """
    cf = {}

    if trades is None or trades.empty:
        today = pd.Timestamp.today().normalize()
        cf[today] = float(start_cash)
        cf[today + pd.Timedelta(days=365)] = float(equity_last)
        return pd.Series(cf).astype(float).sort_index()

    t = trades.copy()
    t['datetime'] = pd.to_datetime(t['datetime']).dt.floor('D')
    t = t.sort_values('datetime')

    start_date = t['datetime'].min().normalize()
    end_date   = t['datetime'].max().normalize()

    # starting cash as inflow
    cf[start_date] = cf.get(start_date, 0.0) + float(start_cash)

    # monthly contributions (month start)
    for d in pd.date_range(start=start_date, end=end_date, freq='MS'):
        cf[d] = cf.get(d, 0.0) + float(monthly_cash)

    # trades (BUY -, SELL +), include commissions
    for _, row in t.iterrows():
        amt = (-1 if row['side'] == 'BUY' else 1) * (float(row['price']) * float(row['size']))
        amt -= float(row.get('commission', 0.0) or 0.0)
        d = row['datetime']
        cf[d] = cf.get(d, 0.0) + amt

    # terminal inflow one day after last trade
    terminal_date = end_date + pd.Timedelta(days=1)
    cf[terminal_date] = cf.get(terminal_date, 0.0) + float(equity_last)

    s = pd.Series(cf).sort_index().astype(float)
    s = s.groupby(s.index).sum()  # collapse duplicate dates
    return s


def xirr_bisection(cashflows: pd.Series,
                   r_low: float = -0.999, r_high: float = 5.0,
                   tol: float = 1e-8, max_iter: int = 200) -> float:
    """
    Robust XIRR via bracketing + bisection. Returns annual rate (e.g. 0.12 for 12%).
    Domain restricted to [-0.999, 5.0] to avoid overflow.
    """
    if not isinstance(cashflows, pd.Series) or cashflows.empty:
        return np.nan

    cf = cashflows.groupby(cashflows.index).sum().astype(float).sort_index()
    if not ((cf > 0).any() and (cf < 0).any()):
        # no sign change possible
        return np.nan

    dates = pd.to_datetime(cf.index)
    t = (dates - dates.min()).days.values.astype(float) / 365.0
    a = cf.values.astype(float)

    # NPV(r) using log1p for stability
    def npv(r):
        # clamp r to valid domain of log1p
        if r <= -0.999:
            r = -0.999
        return np.sum(a * np.exp(-t * np.log1p(r)))

    f_low = npv(r_low)
    f_high = npv(r_high)

    # try to find a bracket with sign change; expand r_high if needed (up to a cap)
    expand = 0
    while f_low * f_high > 0 and r_high < 10.0 and expand < 10:
        r_high *= 1.5
        f_high = npv(r_high)
        expand += 1

    if f_low * f_high > 0:
        # still no sign change → no root in reasonable range
        return np.nan

    # Bisection
    for _ in range(max_iter):
        r_mid = 0.5 * (r_low + r_high)
        f_mid = npv(r_mid)
        if abs(f_mid) < tol or abs(r_high - r_low) < tol:
            return r_mid
        if f_low * f_mid <= 0:
            r_high, f_high = r_mid, f_mid
        else:
            r_low, f_low = r_mid, f_mid
    return r_mid  # best effort


In [366]:
# equity, drawdown, rolling returns
import plotly.graph_objects as go
from plotly.subplots import make_subplots

def plot_performance(equity: pd.DataFrame, dd: pd.Series, daily_rets: pd.Series, title="Portfolio Performance"):
    # 252d rolling return
    roll_ret = (1 + daily_rets).rolling(252).apply(lambda x: np.prod(1+x)-1, raw=False)

    fig = make_subplots(rows=2, cols=1, shared_xaxes=True,
                        vertical_spacing=0.06,
                        # subplot_titles=("Equity Curve", "Drawdown", "Rolling 1Y Return"))
                        subplot_titles=("Equity Curve", "Drawdown"))

    fig.add_trace(go.Scatter(x=equity.index, y=equity['equity'],
                             mode='lines', name='Equity'), row=1, col=1)

    fig.add_trace(go.Scatter(x=dd.index, y=dd.values,
                             mode='lines', name='Drawdown',
                             fill='tozeroy'), row=2, col=1)

    # fig.add_trace(go.Scatter(x=roll_ret.index, y=roll_ret.values,
    #                          mode='lines', name='Rolling 1Y'), row=3, col=1)

    fig.update_yaxes(title_text="Equity ($)", row=1, col=1)
    fig.update_yaxes(title_text="Drawdown", tickformat=".0%", row=2, col=1)
    # fig.update_yaxes(title_text="1Y Return", tickformat=".0%", row=3, col=1)
    fig.update_layout(height=900, template="plotly_dark", title=title)
    
    fig.show()


In [367]:
# print metrics + plot 

# Build price_dfs from your daily_data (if not already done)
price_dfs = {}
for sym, df in daily_data.items():
    tmp = df.copy()
    tmp.index = pd.to_datetime(tmp.index).tz_localize(None)
    price_dfs[sym] = tmp[['open','high','low','close','volume']].dropna()

# Run your backtest (you already have this)
results = run_backtest_with_df(
    price_dfs=price_dfs,
    etfs=ETFS,                 # e.g., ["SPY","QQQ"]
    start_cash=STARTING_CASH,
    monthly_cash=500,
    commission_bps=1.0
)

# Build equity curve from trades + prices
eq, contrib_df = build_equity_curve(
    price_dfs=price_dfs,
    trades=results['trades'],
    etfs=ETFS,
    start_cash=STARTING_CASH,
    monthly_cash=500,
    invest_day=1
)

# Metrics
m = perf_metrics(eq, rf_annual=0.0)
# print(f"CAGR:          {m['CAGR']:.2%}")
# print(f"Ann Vol:       {m['AnnVol']:.2%}")
print(f"Sharpe:        {m['Sharpe']:.2f}")
print(f"Max Drawdown:  {m['MaxDrawdown']:.2%}")
# print(f"TWR:           {m['TWR']:.2%}")

# # IRR / money-weighted
# cf = cashflow_series(results['trades'], start_cash=10_000, monthly_cash=500, invest_day=1, equity_last=eq['equity'].iloc[-1])
# irr = xirr(cf)
# print(f"IRR (XIRR):    {irr:.2%}")
cf = cashflow_series(
    results['trades'],
    start_cash=STARTING_CASH,
    monthly_cash=500,
    invest_day=1,
    equity_last=float(eq['equity'].iloc[-1])
)
# irr = xirr_bisection(cf)
# print(f"IRR (XIRR): {irr:.2%}")

# print(cf)


# Plots
plot_performance(eq, m['drawdown'], m['daily_returns'], title="DCA Portfolio – Equity, Drawdown, Rolling 1Y")


Sharpe:        0.70
Max Drawdown:  -8.48%
