In [None]:
import os
from dotenv import load_dotenv
load_dotenv()

import pickle
if not os.path.exists("tmp"):
    os.mkdir("tmp")
if not os.path.exists("fig"):
	os.mkdir("fig")

from typing import cast
from datetime import datetime
from tqdm.auto import tqdm

import polars as pl
import pandas as pd
import numpy as np
import yfinance as yf
from sqlalchemy import create_engine, text, TextClause

import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
pd.options.plotting.backend = "plotly"

In [None]:
# SQL CONNECTION

CONNECTION_URI = "mysql+mysqlconnector://root@localhost:3306/options"
engine = create_engine(CONNECTION_URI)
with engine.connect() as conn:
	print(conn.execute(text("SELECT VERSION();")).scalar_one())

from copy import deepcopy
def explain_plan(query: TextClause|str, engine=engine):
	if isinstance(query, TextClause):
		query = deepcopy(query)
		query.text = "EXPLAIN PLAN " + query.text
	else:
		query = text(f"EXPLAIN PLAN {query}")
	with engine.connect() as conn:
		result = conn.execute(query).scalars().all()
	print("\n".join(result))
 
with engine.connect() as conn:
	conn.execute(text("CALL DOLT_PULL();"))

In [None]:
# FETCH DATA

from alpaca.data.historical import StockHistoricalDataClient
from alpaca.data.requests import StockBarsRequest
from alpaca.data.timeframe import TimeFrame, TimeFrameUnit
from alpaca.data.enums import DataFeed, Adjustment

def within_threshold(param: str, val: float, threshold = 0.1):    
	return f"{param} BETWEEN {val - threshold} AND {val + threshold}"

def fetch_data(symbol: str, start_date: datetime, end_date: datetime, engine=engine):
	query = text(f"""
	SELECT
		oc.date,
		DATEDIFF(oc.expiration, oc.date) as ttm,
		oc.vol
	FROM option_chain as oc
	WHERE
		oc.date BETWEEN :start_date AND :end_date
		AND oc.act_symbol = :symbol
		AND (
			{within_threshold("oc.delta", 0.50)}
			OR {within_threshold("oc.delta", -0.50)}
		)
		AND oc.bid > 0
		AND oc.ask > oc.bid
		AND ( (oc.bid + oc.ask) / 2 ) >= 0.10
	ORDER BY oc.date ASC
	""").bindparams(symbol=symbol, start_date=start_date, end_date=end_date)

	# daily resolution option chain data
	df_option_chain = pl.read_database(query, engine).set_sorted("date")
	if df_option_chain.is_empty():
		raise ValueError("No option chain data found for the given symbol and date range.")

	# 1h resolution OHLCV data
	client = StockHistoricalDataClient(api_key=os.environ.get("APCA_API_KEY_ID"), secret_key=os.environ.get("APCA_API_SECRET_KEY"))
	request_params = StockBarsRequest(
		symbol_or_symbols=symbol,
		timeframe=TimeFrame(1, TimeFrameUnit.Hour), # type: ignore
		start=start_date,
		end=end_date,
		feed=DataFeed.IEX,
		adjustment=Adjustment.SPLIT,
	)
	bars = client.get_stock_bars(request_params)
	df_ohlcv = pl.from_pandas(bars.df.loc[symbol].reset_index()) # type: ignore
	return df_ohlcv, df_option_chain

In [None]:
# MAKE STRATEGY

YEAR = 252
MONTH = 21
WEEK = 5
BARS_PER_DAY = 7 # 1h bars

def make_strategy(
    df_ohlcv: pl.DataFrame,
    df_option_chain: pl.DataFrame,
    band_std_dev: float,
    vol_z_hl: int,
    vol_z_entry_threshold: float,
    atr_period: int,
    atr_multiplier: float,
    vol_z_eps=0.01,
    commission=0.0001,
    slippage=0.0002,
    lag_period=1,
    long_only=False,
) -> pl.DataFrame:
    
	vol_ma = pl.col("vol").ewm_mean(half_life=vol_z_hl)
 
	vol_sd = pl.col("vol").ewm_std(half_life=vol_z_hl)
 
	df_vol = (
		df_option_chain
		.group_by("date")
		.agg(pl.col("vol").filter(pl.col("ttm") <= 30).mean())
		.sort("date")
		.with_columns(pl.col("vol").shift(1)) # avoid lookahead bias
		.with_columns(
			vol_d = pl.col("vol") * np.sqrt(1 / YEAR),
			vol_ma = vol_ma * np.sqrt(1 / YEAR),
			vol_z = (pl.col("vol") - vol_ma) / (vol_sd + vol_z_eps)
		)
		.with_columns(is_high_vol = (pl.col("vol_z") >= vol_z_entry_threshold))
		.drop_nulls()
	)
 
	day_open = pl.col("open").first().over("date")
	vol_move_dist = day_open * pl.col("vol_d") * band_std_dev
	df = (
		df_ohlcv
		.with_columns(date = pl.col("timestamp").dt.date())
		.join(df_vol, on="date", how="left")
		.sort("timestamp")
		.fill_null(strategy="forward")
		.with_columns(
			# --- Indicators ---
			lower_band = day_open - vol_move_dist,
			upper_band = day_open + vol_move_dist,
			atr = (pl.max_horizontal([
				pl.col("high") - pl.col("low"),
				(pl.col("high") - pl.col("close").shift(1)).abs(),
				(pl.col("low") - pl.col("close").shift(1)).abs()
			])).rolling_mean(window_size=atr_period)
		)
		.with_columns(
			# ensure day_open is only used AFTER the first bar of the day
			cross_upper = (pl.col("high") > pl.col("upper_band")).shift(1) & (pl.col("high") < pl.col("upper_band")),
			cross_lower = (pl.col("low") < pl.col("lower_band")).shift(1) & (pl.col("low") > pl.col("lower_band"))
		)
		.with_columns(
			# --- Signals ---
			raw_long_entry = (pl.col("is_high_vol") & pl.col("cross_lower")).shift(lag_period),
			raw_short_entry = (pl.col("is_high_vol") & pl.col("cross_upper")).shift(lag_period)
		)
	).drop_nulls()

	# Initialize columns
	df = df.with_columns(
		final_position = pl.lit(0).cast(pl.Int8),
		active_stop = pl.lit(np.nan).cast(pl.Float64),
		exit_marker = pl.lit(False).cast(pl.Boolean)
	)

	# State variables
	current_pos = 0
	highest_price = 0.0  # For long trailing stop
	lowest_price = 0.0   # For short trailing stop

	for i in range(1, len(df)):
		curr_open = cast(float, df[i, "open"])
		prev_atr = cast(float, df[i-1, "atr"])
		
		# --- EXIT LOGIC ---
		if current_pos == 1:
			stop_level = highest_price - (prev_atr * atr_multiplier)
			df[i, "active_stop"] = stop_level
			
			if curr_open < stop_level:
				current_pos = 0
				df[i, "exit_marker"] = True
			else:
				highest_price = max(highest_price, curr_open)
				
		elif current_pos == -1:
			stop_level = lowest_price + (prev_atr * atr_multiplier)
			df[i, "active_stop"] = stop_level
			
			if curr_open > stop_level:
				current_pos = 0
				df[i, "exit_marker"] = True
			else:
				lowest_price = min(lowest_price, curr_open)
		# --- ENTRY LOGIC (Only if flat) ---
		if current_pos == 0:
			if df[i, "raw_long_entry"]:
				current_pos = 1
				highest_price = curr_open
			elif not long_only and df[i, "raw_short_entry"]:
				current_pos = -1
				lowest_price = curr_open

		df[i, "final_position"] = current_pos
  
	position_change = (pl.col("final_position") != pl.col("final_position").shift(1)).fill_null(False).alias("position_change")
	df = df.with_columns(
		position_change,
		long_entry = position_change & (pl.col("final_position") == 1),
		short_entry = position_change & (pl.col("final_position") == -1)
	)
 
	log_ret = pl.col("close").log().diff().fill_null(0.0).alias("log_ret")
	cum_ret = log_ret.cum_sum().alias("cum_ret")
 
	cost = commission + slippage
	strat_ret_gross = (log_ret * pl.col("final_position")).alias("strat_ret_gross")
	strat_ret_net = (strat_ret_gross - (pl.col("position_change") * cost)).alias("strat_ret_net")
	strat_cum_ret = strat_ret_net.cum_sum().alias("strat_cum_ret")
 
	df = df.with_columns(
		log_ret,
		cum_ret,
		strat_ret_net,
		strat_cum_ret
	)
	return df

In [None]:
# STRATEGY METRICS

def compute_drawdown(cum_log_ret: np.typing.ArrayLike):
	peak = np.maximum.accumulate(cum_log_ret)
	return np.exp(cum_log_ret - peak) - 1

def compute_strategy_stats(log_ret: np.ndarray, bench_log_ret: np.ndarray|None = None, risk_free_rate=0.0) -> dict:
	total_ret = np.exp(np.sum(log_ret)) - 1

	annual_factor = BARS_PER_DAY * YEAR
	annual_ret = np.exp(np.mean(log_ret) * annual_factor) - 1
	annual_vol = np.std(log_ret) * np.sqrt(annual_factor)
 
	downside_rets = log_ret[log_ret < 0]
	downside_vol = np.std(downside_rets) * np.sqrt(annual_factor) if len(downside_rets) > 0 else 0
 
	sharpe = (annual_ret - risk_free_rate) / annual_vol if annual_vol != 0 else 0
	sortino = (annual_ret - risk_free_rate) / downside_vol if downside_vol != 0 else 0

	dd = compute_drawdown(np.cumsum(log_ret))
	max_dd = np.min(dd)
	avg_dd = np.mean(dd)

	if bench_log_ret is not None:
		cov_matrix = np.cov(log_ret, bench_log_ret)
		beta = cov_matrix[0, 1] / cov_matrix[1, 1]
		alpha_daily = np.mean(log_ret) - (beta * np.mean(bench_log_ret))
		ann_alpha = (1 + alpha_daily) ** annual_factor - 1
	else:
		beta = np.nan
		ann_alpha = np.nan

	return {
		"Total Return (%)": total_ret * 100,
		"Annualized Return (%)": annual_ret * 100,
		"Annualized Volatility (%)": annual_vol * 100,
		"Sharpe Ratio": sharpe,
		"Sortino Ratio": sortino,
		"Max Drawdown (%)": max_dd * 100,
		"Avg Drawdown (%)": avg_dd * 100,
		"Annualized Alpha (%)": ann_alpha * 100,
		"Beta (vs Asset)": beta,
	}

In [None]:
# STRATEGY RESULTS

def plot_strategy(df: pl.DataFrame, strategy_params: dict):
	width=0.75

	x_axis = np.arange(n := len(df))
	dd_bench = compute_drawdown(df["cum_ret"].to_numpy())
	dd_strat = compute_drawdown(df["strat_cum_ret"].to_numpy())


	bench_stats = compute_strategy_stats(df["log_ret"].to_numpy())
	strat_stats = compute_strategy_stats(df["strat_ret_net"].to_numpy(), df["log_ret"].to_numpy())
	data = [
		go.Table(
			header=dict(values=["Metric", "Buy & Hold", "Strategy"], align="left", font=dict(size=11),),
			cells=dict(
				values=[
					list(bench_stats.keys()),
					[ f"{v:.2f}" if not np.isnan(v) else "" for v in bench_stats.values() ],
					[ f"{v:.2f}" if not np.isnan(v) else "" for v in strat_stats.values() ],
				],
				align="left",
				font=dict(size=11),
			),
			domain=dict(x=[width+0.01, 1], y=[0, 1])
		),
		# Price and Indicators
		go.Candlestick(
			x=x_axis, open=df["open"], high=df["high"], low=df["low"], close=df["close"],
			name="Price", xaxis="x", yaxis="y"
		),
		go.Scatter(
			x=x_axis, y=df["upper_band"], line_shape="hv",
			line_color="rgba(173, 216, 230, 0.8)", 
			name=f"+{strategy_params['band_std_dev']}\u03C3 Vol Band", legendgroup="Vol Bands",
			xaxis="x", yaxis="y"
		),
		go.Scatter(
			x=x_axis, y=df["lower_band"], line_shape="hv",
			line_color="rgba(173, 216, 230, 0.8)", fill="tonexty", fillcolor="rgba(173, 216, 230, 0.3)",
			name=f"-{strategy_params['band_std_dev']}\u03C3 Vol Band", legendgroup="Vol Bands",
			xaxis="x", yaxis="y"
		),
		go.Scatter(
			x=x_axis, y=df["active_stop"],
			line_color="firebrick", line_shape="hv", opacity=0.8,
			name=f"{strategy_params['atr_multiplier']}x ATR({strategy_params['atr_period']}) Trailing Stop",
			connectgaps=False , hoverinfo="skip", legendgroup="Trades",
			xaxis="x", yaxis="y"
		),
		go.Scatter(
			x=x_axis[df["long_entry"]], y=df.filter(pl.col("long_entry"))["low"] * 0.995,
			mode="markers", marker=dict(symbol="triangle-up", size=10, color="green"),
			name="Long Entry", legendgroup="Trades", hoverinfo="skip",
			xaxis="x", yaxis="y"
		),
		go.Scatter(
			x=x_axis[df["short_entry"]], y=df.filter(pl.col("short_entry"))["high"] * 1.005,
			mode="markers", marker=dict(symbol="triangle-down", size=10, color="red"),
			name="Short Entry", legendgroup="Trades", hoverinfo="skip",
			xaxis="x", yaxis="y"
		),
        go.Scatter(
			x=x_axis, y=df["vol_d"],
			line_color=px.colors.qualitative.Plotly[0], name="Vol", showlegend=False,
			xaxis="x", yaxis="y2"
		),
        go.Scatter(
			x=x_axis, y=df["vol_ma"],
			line_color=px.colors.qualitative.Plotly[1], name=f"Vol EMA({strategy_params['vol_z_hl']})", showlegend=False,
			xaxis="x", yaxis="y2"
		),
		go.Scatter(
			x=x_axis, y=df["vol_z"],
			line_color=px.colors.qualitative.Plotly[0], name="Vol Z-Score", showlegend=False,
			xaxis="x", yaxis="y3"
		),
		go.Scatter(
			x=x_axis, y=np.repeat(strategy_params["vol_z_entry_threshold"], len(df)),
			line_color="gray", showlegend=False, hoverinfo="skip",
			xaxis="x", yaxis="y3"
		),
		go.Scatter(
			x=x_axis, y=df["final_position"],
			line_color=px.colors.qualitative.Plotly[0], fill="tozeroy", name="Position", showlegend=False,
			xaxis="x", yaxis="y4"
		),
		# Cumulative Returns
		go.Scatter(
			x=x_axis, y=df["cum_ret"],
			line_color="purple",
			name="B&H Returns", legendgroup="B&H",
			xaxis="x", yaxis="y5"
		),
		go.Scatter(
			x=x_axis, y=df["strat_cum_ret"],
			line_color="red",
			name="Strategy Returns", legendgroup="Strategy",
			xaxis="x", yaxis="y5"
		),
		# Drawdown
		go.Scatter(
			x=x_axis, y=dd_bench,
			line_color="purple", fill="tozeroy",
			name="B&H Drawdown", legendgroup="B&H",
			xaxis="x", yaxis="y6"
		),
		go.Scatter(
			x=x_axis, y=dd_strat,
			line_color="red", fill="tozeroy",
			name="Strategy Drawdown", legendgroup="Strategy",
			xaxis="x", yaxis="y6"
		),
	]

	def pad(min_val, max_val, pad: float):
		r = max_val - min_val
		return min_val - r * pad, max_val + r * pad

	def jump_to(a: int, b: int, padx=0.05, pady=0.1):
		s_start = max(a, 0)
		s_end = min(b, n)
		s = slice(s_start, s_end)
		
		bench_w = df["cum_ret"][s].to_numpy() - cast(float, df[s_start, "cum_ret"])
		strat_w = df["strat_cum_ret"][s].to_numpy() - cast(float, df[s_start, "strat_cum_ret"])
		
		dd_bench_w = compute_drawdown(bench_w)
		dd_strat_w = compute_drawdown(strat_w)
		
		bench_stats_w = compute_strategy_stats(df["log_ret"][s].to_numpy())
		strat_stats_w = compute_strategy_stats(df["strat_ret_net"][s].to_numpy(), df["log_ret"][s].to_numpy())

		# Format values for the table cells
		table_values = [
			list(bench_stats_w.keys()),
			[f"{v:.2f}" if not np.isnan(v) else "" for v in bench_stats_w.values()],
			[f"{v:.2f}" if not np.isnan(v) else "" for v in strat_stats_w.values()],
		]
    
		plot_indices = list(range(len(data)-4, len(data)))
		indices = [0] + plot_indices
		restyle = {
			"cells.values": [table_values] + [None] * len(plot_indices),
			"y": [None] + [bench_w, strat_w, dd_bench_w, dd_strat_w],
			"x": [None] + [x_axis[s]] * len(plot_indices),
		}

		# Layout updates
		relayout = {
			"xaxis.range": pad(a, b, padx),
			"yaxis.range": pad(df[s, ["low","lower_band"]].min_horizontal().min(), df[s, ["high","upper_band"]].max_horizontal().max(), pady),
			"yaxis2.range": pad(df[s, "vol_d"].min(), df[s, "vol_d"].max(), pady),
			"yaxis5.range": pad(min(bench_w.min(), strat_w.min()), max(bench_w.max(), strat_w.max()), pady),
			"yaxis6.range": [min(dd_bench_w.min(), dd_strat_w.min()) * (1 + pady), 0.005],
		}
		return [restyle, relayout, indices]

	spacing=0.01
	rh = (rh := np.array([3, 1.5, 1, 1, 2, 1])[::-1]) / np.sum(rh)
	ydomains = [(np.sum(rh[:i])+(spacing * (i != 0)), np.sum(rh[:i+1])-(spacing * (i != len(rh)-1))) for i in range(len(rh))][::-1]

	layout = dict(
		hoversubplots="axis",
		hovermode="x",
		grid=dict(rows=len(rh), columns=1),
		legend=dict(
			orientation="h",
			yanchor="top", y=-0.01,
			xanchor="left", x=0,
		),
		xaxis=dict(
			matches="x", domain=[0, width],
			showticklabels=False, showgrid=False,
			showspikes=True, spikemode="across", spikethickness=1,
			tickvals=x_axis, ticktext=df["timestamp"].dt.strftime("%a %d %b %Y %H:%M %Z").to_list(),
			rangeslider=dict(visible=False)
		),
		yaxis =dict(domain=ydomains[0], title="Price", automargin=True),
		yaxis2=dict(domain=ydomains[1], title=f"Vol", automargin=True),
		yaxis3=dict(domain=ydomains[2], title="Vol Z-Score", automargin=True),
		yaxis4=dict(domain=ydomains[3], title="Position", automargin=True),
		yaxis5=dict(domain=ydomains[4], title="Returns", automargin=True),
		yaxis6=dict(domain=ydomains[5], title="Drawdown", automargin=True),
		updatemenus=[dict(
			type="buttons", direction="right", x=width, y=1.06,
			buttons=[
				dict(label="All", method="update", args=jump_to(0, n)),
				dict(label="252D", method="update", args=jump_to(n - BARS_PER_DAY * YEAR, n)),
				dict(label="63D", method="update", args=jump_to(n - BARS_PER_DAY * MONTH * 3, n)),
				dict(label="21D", method="update", args=jump_to(n - BARS_PER_DAY * MONTH, n)),
				dict(label="10D", method="update", args=jump_to(n - BARS_PER_DAY * WEEK * 2, n)),
				dict(label="5D", method="update", args=jump_to(n - BARS_PER_DAY * WEEK, n)),
			]
		)]
	)

	fig = go.Figure(data=data, layout=layout)
	return fig


In [None]:
# MONTE CARLO SIMULATION

def stationary_bootstrap_monte_carlo(log_ret: np.ndarray, avg_block_size=BARS_PER_DAY * 10, n_simulations=1000, random_state=np.random.RandomState(42)):
    n_steps = len(log_ret)
    simulated_paths: list[np.ndarray] = [None] * n_simulations # type: ignore
    simulated_dd: list[np.ndarray] = [None] * n_simulations # type: ignore
    
    # p is the probability of starting a new block
    p = 1.0 / avg_block_size
    
    for i in range(n_simulations):
        indices = np.zeros(n_steps, dtype=int)
        # Pick the very first day randomly
        current_idx = random_state.randint(0, n_steps)
        indices[0] = current_idx
        
        for t in range(1, n_steps):
            if random_state.rand() < p:
                # 'Jump' to a new random starting point (New Block)
                current_idx = random_state.randint(0, n_steps)
            else:
                # Stay in the current block (Next consecutive day)
                current_idx = (current_idx + 1) % n_steps
            
            indices[t] = current_idx
            
        simulated_returns = log_ret[indices]
        log_ret_path = np.cumsum(simulated_returns)
        simulated_paths[i] = log_ret_path
        simulated_dd[i] = compute_drawdown(log_ret_path)
        
    return simulated_paths, simulated_dd

def plot_monte_carlo(mc_paths: list[np.ndarray], mc_dd: list[np.ndarray], cum_log_ret: np.ndarray, show_paths=100):
	x_axis = np.arange(len(cum_log_ret))
 
	final_equity = [path[-1] for path in mc_paths]
	max_dd = [np.min(dd) for dd in mc_dd]
 
	path_p5 = np.percentile(mc_paths, 5, axis=0)
	path_p50 = np.percentile(mc_paths, 50, axis=0)
	path_p95 = np.percentile(mc_paths, 95, axis=0)
 
	dd_p5 = np.percentile(mc_dd, 5, axis=0)
	dd_p50 = np.percentile(mc_dd, 50, axis=0)
	dd_p95 = np.percentile(mc_dd, 95, axis=0)
	
	max_dd_p5 = np.percentile(max_dd, 5)
	max_dd_p50 = np.percentile(max_dd, 50)
	max_dd_p95 = np.percentile(max_dd, 95)
 
	dd_r = compute_drawdown(cum_log_ret)
 
	fig = make_subplots(rows=2, cols=2, shared_xaxes="columns", shared_yaxes="rows", column_widths=[4, 1], subplot_titles=["Simulation Cum. Log Returns", "Final Return Distribution", "Simulation Drawdowns", "Max DD Distribution"], horizontal_spacing=0.02, vertical_spacing=0.05)
	for i in range(show_paths):
		c = px.colors.qualitative.Plotly[i % len(px.colors.qualitative.Plotly)]
		fig.add_trace(
			go.Scatter(
				x=x_axis,
				y=mc_paths[i],
				line_color=c,
				opacity=0.2,
				hoverinfo="skip",
				showlegend=False
			), row=1, col=1)
		fig.add_trace(
			go.Scatter(
				x=x_axis,
				y=mc_dd[i],
				line_color=c,
				opacity=0.2,
				hoverinfo="skip",
				showlegend=False
			), row=2, col=1)
  
		
	fig.add_traces([
		go.Scatter(x=x_axis, y=path_p95, line_color="gray", name="95th Percentile", legendgroup="95"),
		go.Scatter(x=x_axis, y=path_p50, line_color="blue", name="50th Percentile", legendgroup="50"),
		go.Scatter(x=x_axis, y=path_p5, line_color="gray", name="5th Percentile", legendgroup="5"),
		go.Scatter(x=x_axis, y=cum_log_ret, line_color="red", name="Realized Strategy Path", legendgroup="R"),
	], rows=1, cols=1)
 
	fig.add_traces([
		go.Scatter(x=x_axis, y=dd_p95, line_color="gray", name="Drawdown 95th Percentile", legendgroup="95"),
		go.Scatter(x=x_axis, y=dd_p50, line_color="blue", name="Drawdown 50th Percentile", legendgroup="50"),
		go.Scatter(x=x_axis, y=dd_p5, line_color="gray", name="Drawdown 5th Percentile", legendgroup="5"),
		go.Scatter(x=x_axis, y=dd_r, line_color="red", fill="tozeroy", name="Realized Drawdown", legendgroup="R"),
	], rows=2, cols=1)

	fig.add_trace(go.Histogram(y=final_equity, nbinsx=50, histnorm="probability", marker_color="purple", showlegend=False), row=1, col=2)
	fig.add_hline(y=path_p95[-1], line_color="gray", row=1, col=2)
	fig.add_hline(y=path_p50[-1], line_color="blue", line_dash="dot", row=1, col=2)
	fig.add_hline(y=path_p5[-1], line_color="gray", row=1, col=2)
	fig.add_hline(y=cum_log_ret[-1], line_color="red", row=1, col=2)

	fig.add_trace(go.Histogram(y=max_dd, nbinsx=50, histnorm="probability", marker_color="purple", showlegend=False), row=2, col=2)
	fig.add_hline(y=max_dd_p95, line_color="gray", row=2, col=2)
	fig.add_hline(y=max_dd_p50, line_color="blue", line_dash="dot", row=2, col=2)
	fig.add_hline(y=max_dd_p5, line_color="gray", row=2, col=2)
	fig.add_hline(y=np.min(dd_r), line_color="red", row=2, col=2)
 
	fig.update_xaxes(showgrid=False, showticklabels=False)

	return fig

In [None]:
# LAG SENSITIVITY

from statsmodels.regression.linear_model import OLS

def run_lag_sensitivity(min_lag: int, max_lag: int, df_ohlcv: pl.DataFrame, df_option_chain: pl.DataFrame, strategy_params: dict):
	results = []
	paths = []
 
	for lag_period in (pbar := tqdm(range(min_lag, max_lag + 1))):
		try:
			df = make_strategy(**strategy_params, lag_period=lag_period, df_ohlcv=df_ohlcv, df_option_chain=df_option_chain)
			stats = compute_strategy_stats(df["strat_ret_net"].to_numpy(), df["cum_ret"].to_numpy())
			paths.append((lag_period, df["strat_cum_ret"]))
			results.append({"lag_period": lag_period, **stats})
			pbar.set_postfix_str(f"sharpe={stats['Sharpe Ratio']:.2f}, annual_ret={stats['Annualized Return (%)']:.2f}%")
			
		except Exception as e:
			print(f"Error in iteration {lag_period}: {e}")

	return pd.DataFrame(results), paths


def plot_lag_sensitivity(lag_results: pd.DataFrame, lag_paths: list[tuple[int, pl.Series]]):
	fig = make_subplots(rows=4, cols=1, subplot_titles=["Paths", "Sharpe Ratio", "Annualized Alpha (%)", "Beta (vs Asset)"], row_heights=[1.5, 1, 1, 1], vertical_spacing=0.075)
	fig.add_traces([
		go.Scatter(
			x=np.arange(len(path)),
			y=path,
			line_color=px.colors.sample_colorscale("Blues_r", [lag / lag_results["lag_period"].max()])[0],
			opacity=0.7,
			mode="lines",
			name=f"Lag {lag}",
			hovertemplate=f"Lag Period: {lag}<br>Total Return: {path[-1]:.2%}",
			showlegend=False
		) for lag, path in lag_paths
	])
 
	fig.add_trace(
        go.Scatter(
            x=[None], y=[None],
            mode='markers',
            marker=dict(
                colorscale="Blues_r",
                showscale=True,
                cmin=lag_results["lag_period"].min(),
                cmax=lag_results["lag_period"].max(),
                colorbar=dict(title="Lag Period", y=0.5, len=0.8),
            ),
            hoverinfo='none',
            showlegend=False
        )
    )

	for row, metric in enumerate(["Sharpe Ratio", "Annualized Alpha (%)", "Beta (vs Asset)"], start=2):
		
		X = lag_results["lag_period"].to_numpy().reshape(-1, 1)
		y = lag_results[metric].to_numpy()
		model = OLS(y, np.hstack([np.ones_like(X), X])).fit()
		y_pred = model.predict(np.hstack([np.ones_like(X), X]))
	
		fig.add_traces([
			go.Scatter(
				x=lag_results["lag_period"],
				y=lag_results[metric],
				name=metric,
				marker_color=px.colors.qualitative.Plotly[row],
				mode="markers",
				showlegend=False,
			),
			go.Scatter(
				x=lag_results["lag_period"],
				y=y_pred,
				name="OLS Fit",
				hovertemplate=f"{model.params[0]:.2f} {'+' if model.params[1] >= 0 else '-'} {abs(model.params[1]):.4f} * Lag Period",
				line_color=px.colors.qualitative.Plotly[row],
				mode="lines",
				showlegend=False,
			),
	], rows=row, cols=1)

	return fig

In [None]:
# PARAMETER SENSITIVITY ANALYSIS

from itertools import combinations

def run_parameter_sensitivity(df_ohlcv: pl.DataFrame, df_option_chain: pl.DataFrame, iterations=500, random_state=np.random.RandomState(42)):
	results = []
	paths = []
 
	for i in (pbar := tqdm(range(iterations))):
		params={
			"band_std_dev": random_state.uniform(0.5, 2.0),
			"vol_z_hl": random_state.randint(3, MONTH),
			"vol_z_entry_threshold": random_state.uniform(-1.0, 1.0),
			"atr_period": BARS_PER_DAY * random_state.randint(3, MONTH),
			"atr_multiplier": round(random_state.uniform(1.5, 4.0), 1)
		}

		try:
			df = make_strategy(**params, df_ohlcv=df_ohlcv, df_option_chain=df_option_chain)
			stats = compute_strategy_stats(df["strat_ret_net"].to_numpy())
			paths.append((tuple(params.values()), df["strat_cum_ret"]))
			results.append({**params, **stats})
			pbar.set_postfix_str(f"sharpe={stats['Sharpe Ratio']:.2f}, annual_ret={stats['Annualized Return (%)']:.2f}%")
			
		except Exception as e:
			print(f"Error in iteration {i}: {e}")

	return pd.DataFrame(results), paths


def plot_parameter_sensitivity(sensitivity_results: pd.DataFrame, metric="Sharpe Ratio"):
	params = ["band_std_dev", "vol_z_hl", "vol_z_entry_threshold", "atr_period", "atr_multiplier"]
	param_pairs = list(combinations(params, 2))
	for param in params:
		sensitivity_results[f"{param}_bin"] = pd.cut(sensitivity_results[param], bins=6)

	fig = make_subplots(rows=5, cols=2, subplot_titles=[f"x={x} vs. y={y}" for x,y in param_pairs], horizontal_spacing=0.1, vertical_spacing=0.06)

	for i, (param_x, param_y) in enumerate(param_pairs):
		row = (i // 2) + 1
		col = (i % 2) + 1
		
		hm = (
			sensitivity_results
			.pivot_table(index=f"{param_y}_bin", columns=f"{param_x}_bin", values=metric, aggfunc="mean", observed=True)
			.fillna(0)
		)
	
		
		fig.add_trace(
			go.Heatmap(
				z=hm.values,
				x=[f"({i.left:.2f}, {i.right:.2f}]" for i in hm.columns],
				y=[f"({i.left:.2f}, {i.right:.2f}]" for i in hm.index],
				texttemplate="%{z:.2f}",
				textfont_size=10,
				hovertemplate=
					f"{param_x}: %{{x}}<br>"
					f"{param_y}: %{{y}}<br>"
					f"{metric}: %{{z:.2f}}<extra></extra>",
				coloraxis="coloraxis"
			),
			row=row, col=col
		)
		
	fig.update_xaxes(tickfont_size=9)
	fig.update_yaxes(tickfont_size=9)
	fig.update_annotations(font_size=12)
	fig.update_layout(coloraxis=dict(colorbar=dict(title=metric, y=0.5, len=0.8), colorscale="RdBu", cmid=0))
 
	return fig

In [None]:
symbol = "SPY"
start_date = datetime(year=2020, month=1, day=1)
end_date = datetime.today()

BAND_STD_DEV = 0.8
VOL_Z_HL = WEEK
VOL_Z_ENTRY_THRESHOLD = 0
ATR_PERIOD = WEEK * BARS_PER_DAY
ATR_MULTIPLIER = 3.0

strategy_params = {
	"band_std_dev": BAND_STD_DEV,
	"vol_z_hl": VOL_Z_HL,
	"vol_z_entry_threshold": VOL_Z_ENTRY_THRESHOLD,
	"atr_period": ATR_PERIOD,
	"atr_multiplier": ATR_MULTIPLIER
}

if "loaded" not in globals() or loaded != symbol: # type: ignore
	print (f"Fetching {symbol} data")
	df_ohlcv, df_option_chain = fetch_data(symbol, start_date, end_date)
	loaded = symbol
 
df = make_strategy(df_ohlcv, df_option_chain, **strategy_params) # type: ignore

In [None]:
# PLOT STRATEGY

fig = plot_strategy(df, strategy_params) \
    .update_layout(title=f"{symbol}: Implied Vol. Band Strategy<br>{start_date.strftime("%b %Y")} to {end_date.strftime("%b %Y")}", width=1200, height=900)
fig.write_image(f"fig/{symbol}_strategy.jpg")
fig.show()

In [None]:
# PLOT MONTE CARLO

AVG_BLOCK_SIZE_D = 3 * WEEK
mc_paths, mc_dd = stationary_bootstrap_monte_carlo(df["strat_ret_net"].to_numpy(), AVG_BLOCK_SIZE_D * BARS_PER_DAY)

fig = plot_monte_carlo(mc_paths, mc_dd, df["strat_cum_ret"].to_numpy(), show_paths=100) \
    .update_layout(title=f"{symbol}: Monte Carlo Simulation of Strategy Returns", width=1200, height=600)
fig.write_image(f"fig/{symbol}_monte_carlo.jpg")
fig.show()

In [None]:
# PLOT LAG SENSITIVITY

lag_results, lag_paths = run_lag_sensitivity(1, WEEK * BARS_PER_DAY, df_ohlcv, df_option_chain, strategy_params)

fig = plot_lag_sensitivity(lag_results, lag_paths) \
    .update_layout(title=f"{symbol}: Lag Sensitivity Analysis", width=1000, height=700)
fig.write_image(f"fig/{symbol}_lag_sensitivity.jpg")
fig.show()

In [None]:
# PLOT PARAMETER SENSITIVITY 

OVERWRITE = False
if not os.path.exists(path := f"tmp/{symbol}_sensitivity_results.pkl") or OVERWRITE:
	sensitivity_results, paths = run_parameter_sensitivity(df_ohlcv, df_option_chain)
	with open(path, "wb") as f:
		pickle.dump((sensitivity_results, paths), f)
else:
	with open(path, "rb") as f:
		sensitivity_results, paths = pickle.load(f)
  
fig = plot_parameter_sensitivity(sensitivity_results, metric="Sharpe Ratio") \
	.update_layout(title=f"{symbol}: Parameter Sensitivity Analysis", width=1000, height=1200)
fig.write_image(f"fig/{symbol}_parameter_sensitivity.jpg")
fig.show()