# 02 — Backtest & Results (Vectorized)

This notebook:

1. Loads `data/processed/adj_close_*.parquet` + `pair_screening.parquet` from Notebook 01
2. Generates long/short signals from spread z-score
3. Runs a **vectorized** backtest (daily close-to-close)
4. Reports performance metrics:
   - Annualized return
   - Sharpe ratio
   - Max drawdown
   - Win rate
   - Equity curve

It also includes diagnostics that help explain failure modes (non-stationarity / regime shifts).

> **Important:** This is research code (educational). It ignores borrow costs, hard-to-borrow constraints, corporate actions edge cases, and realistic execution/slippage.

In [1]:
import warnings
from pathlib import Path

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

warnings.filterwarnings("ignore")
pd.set_option("display.max_columns", 200)
pd.set_option("display.width", 140)

In [None]:
# Locate repo root + processed data
def find_repo_root(start: Path | None = None) -> Path:
    start = (start or Path.cwd()).resolve()
    candidates = [start] + list(start.parents)
    for p in candidates:
        if (p / "pyproject.toml").exists() or (p / "README.md").exists():
            if (p / "src").exists():
                return p
            return p
    return start

PROJECT_ROOT = find_repo_root()
PROCESSED_DIR = PROJECT_ROOT / "data" / "processed"
PROCESSED_DIR.mkdir(parents=True, exist_ok=True)

# Find the most recent adj_close parquet
adj_files = sorted(PROCESSED_DIR.glob("adj_close_*.parquet"))
if not adj_files:
    raise FileNotFoundError("No adj_close_*.parquet found. Run notebook 01 first.")
PRICE_FILE = adj_files[-1]
prices = pd.read_parquet(PRICE_FILE).dropna(how="any")

screen_file = PROCESSED_DIR / "pair_screening.parquet"
pair_screen = pd.read_parquet(screen_file) if screen_file.exists() else None

PRICE_FILE, prices.shape

## 1) Strategy definitions

In [2]:
import statsmodels.api as sm
from statsmodels.tsa.stattools import coint, adfuller

def estimate_hedge_ratio(y: pd.Series, x: pd.Series) -> float:
    x_ = sm.add_constant(x.values)
    model = sm.OLS(y.values, x_).fit()
    return float(model.params[1])

def compute_spread(y: pd.Series, x: pd.Series, beta: float) -> pd.Series:
    return y - beta * x

def zscore(series: pd.Series, lookback: int) -> pd.Series:
    mu = series.rolling(lookback).mean()
    sd = series.rolling(lookback).std()
    return (series - mu) / sd

def max_drawdown(equity: pd.Series) -> float:
    peak = equity.cummax()
    dd = equity / peak - 1.0
    return float(dd.min())

def sharpe_ratio(returns: pd.Series, periods_per_year: int = 252) -> float:
    r = returns.dropna()
    if r.std() == 0:
        return np.nan
    return float(np.sqrt(periods_per_year) * r.mean() / r.std())

def annualized_return(equity: pd.Series, periods_per_year: int = 252) -> float:
    equity = equity.dropna()
    if len(equity) < 2:
        return np.nan
    total = equity.iloc[-1] / equity.iloc[0]
    years = len(equity) / periods_per_year
    return float(total ** (1/years) - 1)

def win_rate(trade_pnl: pd.Series) -> float:
    trade_pnl = trade_pnl.dropna()
    if len(trade_pnl) == 0:
        return np.nan
    return float((trade_pnl > 0).mean())

def adf_pvalue(series: pd.Series) -> float:
    return float(adfuller(series.dropna().values, autolag="AIC")[1])

## 2) Signal generation + vectorized backtest

In [3]:
def backtest_pair(
    prices: pd.DataFrame,
    pair: tuple[str, str],
    lookback: int = 60,
    entry_z: float = 2.0,
    exit_z: float = 0.5,
    tc_bps: float = 5.0,   # transaction cost per trade leg in bps (very rough)
    capital: float = 1.0,
) -> dict:
    a, b = pair
    px_a = prices[a]
    px_b = prices[b]

    y = np.log(px_a)
    x = np.log(px_b)

    # Static hedge ratio (beta)
    beta = estimate_hedge_ratio(y, x)
    spread = compute_spread(y, x, beta)
    z = zscore(spread, lookback)

    # Positions: +1 means long A / short B*beta, -1 means short A / long B*beta
    pos = pd.Series(index=prices.index, dtype=float)
    pos[:] = 0.0

    # Simple state machine vectorized-ish:
    long = False
    short = False
    for t in range(len(z)):
        zi = z.iat[t]
        if np.isnan(zi):
            pos.iat[t] = 0.0
            continue

        if (not long) and (not short):
            if zi <= -entry_z:
                long = True
            elif zi >= entry_z:
                short = True

        else:
            # exit rules
            if long and zi >= -exit_z:
                long = False
            if short and zi <= exit_z:
                short = False

        pos.iat[t] = (1.0 if long else (-1.0 if short else 0.0))

    # Daily returns of each leg
    ret_a = px_a.pct_change()
    ret_b = px_b.pct_change()

    # Pair return: position * (ret_a - beta*ret_b)
    strat_ret_gross = pos.shift(1).fillna(0.0) * (ret_a - beta * ret_b)

    # Transaction costs: charge when position changes (turnover)
    turnover = pos.diff().abs().fillna(0.0)  # 1 when enter/exit, 2 when flip
    tc = (tc_bps / 1e4) * turnover * 2.0  # 2 legs cost (A + B)
    strat_ret = strat_ret_gross - tc

    equity = (1.0 + strat_ret.fillna(0.0)).cumprod() * capital

    # Approx trade PnL: realized on exits (position goes to 0)
    exit_mask = (pos.shift(1).fillna(0.0) != 0) & (pos == 0)
    trade_pnl = equity.pct_change().where(exit_mask)

    out = {
        "pair": f"{a}-{b}",
        "asset_a": a,
        "asset_b": b,
        "beta": beta,
        "lookback": lookback,
        "entry_z": entry_z,
        "exit_z": exit_z,
        "tc_bps": tc_bps,
        "coint_p": float(coint(y, x)[1]),
        "adf_p_spread": adf_pvalue(spread),
        "equity": equity,
        "daily_returns": strat_ret,
        "daily_returns_gross": strat_ret_gross,
        "position": pos,
        "spread": spread,
        "z": z,
        "trade_pnl": trade_pnl,
    }
    return out

## 3) Run backtests

In [4]:
# Choose pairs to test:
DEFAULT_PAIRS = [("NVDA", "JPM"), ("AMZN", "META")]

if pair_screen is not None and len(pair_screen) > 0:
    # Add top candidates by cointegration p-value
    top = pair_screen.sort_values("coint_p").head(5)
    top_pairs = list(zip(top["asset_a"], top["asset_b"]))
    TEST_PAIRS = DEFAULT_PAIRS + [p for p in top_pairs if p not in DEFAULT_PAIRS]
else:
    TEST_PAIRS = DEFAULT_PAIRS

TEST_PAIRS

NameError: name 'pair_screen' is not defined

In [None]:
PARAMS = dict(lookback=60, entry_z=2.0, exit_z=0.5, tc_bps=5.0)

runs = [backtest_pair(prices, pair=p, **PARAMS) for p in TEST_PAIRS]

summary_rows = []
for r in runs:
    eq = r["equity"]
    rets = r["daily_returns"]
    summary_rows.append({
        "pair": r["pair"],
        "coint_p": r["coint_p"],
        "adf_p_spread": r["adf_p_spread"],
        "annualized_return": annualized_return(eq),
        "sharpe": sharpe_ratio(rets),
        "max_drawdown": max_drawdown(eq),
        "win_rate": win_rate(r["trade_pnl"]),
        "equity_start": float(eq.iloc[0]),
        "equity_end": float(eq.iloc[-1]),
        "n_days": int(eq.dropna().shape[0]),
    })

summary = pd.DataFrame(summary_rows).sort_values("coint_p")
summary

## 4) Visualize equity curves + diagnostics

In [None]:
for r in runs:
    eq = r["equity"]
    fig, ax = plt.subplots(figsize=(12, 3))
    eq.plot(ax=ax, title=f"Equity Curve — {r['pair']} (tc={r['tc_bps']} bps)")
    plt.show()

    fig, ax = plt.subplots(figsize=(12, 3))
    r["z"].plot(ax=ax, title=f"Spread z-score — {r['pair']} (lookback={r['lookback']})")
    ax.axhline(r["entry_z"], linestyle="--")
    ax.axhline(-r["entry_z"], linestyle="--")
    ax.axhline(0, linestyle="-")
    plt.show()

    fig, ax = plt.subplots(figsize=(12, 2))
    r["position"].plot(ax=ax, title=f"Position (1=long spread, -1=short spread) — {r['pair']}")
    plt.show()

## 5) Portfolio aggregation (equal-weight across pairs)

In [None]:
# Align returns and equal-weight across pairs each day
rets_mat = pd.concat([r["daily_returns"].rename(r["pair"]) for r in runs], axis=1).fillna(0.0)
port_ret = rets_mat.mean(axis=1)
port_equity = (1.0 + port_ret).cumprod()

portfolio_metrics = {
    "annualized_return": annualized_return(port_equity),
    "sharpe": sharpe_ratio(port_ret),
    "max_drawdown": max_drawdown(port_equity),
    "equity_start": float(port_equity.iloc[0]),
    "equity_end": float(port_equity.iloc[-1]),
}
portfolio_metrics

In [None]:
fig, ax = plt.subplots(figsize=(12, 3))
port_equity.plot(ax=ax, title="Equal-weight portfolio equity (across tested pairs)")
plt.show()

# Drawdown
peak = port_equity.cummax()
dd = port_equity / peak - 1.0
fig, ax = plt.subplots(figsize=(12, 3))
dd.plot(ax=ax, title="Portfolio drawdown")
plt.show()

## 6) Interpretation helper: why cointegration can still fail

Common failure modes you can quantify:

- **Weak / unstable stationarity:** ADF p-values > 0.05, or rolling ADF that drifts upward.
- **Regime shifts:** rolling correlation drops; spread variance changes; beta drifts.
- **Static hedge ratio:** a single beta estimated over years can be badly wrong in sub-periods.

Below: rolling beta + rolling spread volatility for one pair.

In [None]:
PAIR_TO_DIAGNOSE = runs[0]["pair"]
diag = next(r for r in runs if r["pair"] == PAIR_TO_DIAGNOSE)

a, b = diag["asset_a"], diag["asset_b"]
y = np.log(prices[a])
x = np.log(prices[b])

window = 252

# Rolling beta via OLS
betas = []
idx = y.index
for i in range(window, len(y)+1):
    y_w = y.iloc[i-window:i]
    x_w = x.iloc[i-window:i]
    beta_w = estimate_hedge_ratio(y_w, x_w)
    betas.append(beta_w)
rolling_beta = pd.Series(betas, index=idx[window-1:])

spread = diag["spread"]
spread_vol = spread.rolling(window).std()

fig, ax = plt.subplots(figsize=(12, 3))
rolling_beta.plot(ax=ax, title=f"Rolling beta (window={window}) — {a} ~ beta*{b}")
plt.show()

fig, ax = plt.subplots(figsize=(12, 3))
spread_vol.plot(ax=ax, title=f"Rolling spread volatility (window={window}) — {a}-{b}")
plt.show()

## 7) Save results to `reports/`

In [None]:
REPORTS_DIR = PROJECT_ROOT / "reports"
REPORTS_DIR.mkdir(parents=True, exist_ok=True)

summary_path = REPORTS_DIR / "backtest_summary.csv"
summary.to_csv(summary_path, index=False)

portfolio_path = REPORTS_DIR / "portfolio_equity.csv"
port_equity.rename("equity").to_csv(portfolio_path)

print("Saved:")
print("-", summary_path)
print("-", portfolio_path)