In [3]:
pip install "pandas<2.2"

Collecting pandas<2.2
  Downloading pandas-2.1.4-cp312-cp312-macosx_11_0_arm64.whl.metadata (18 kB)
Downloading pandas-2.1.4-cp312-cp312-macosx_11_0_arm64.whl (10.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.6/10.6 MB[0m [31m53.5 MB/s[0m eta [36m0:00:00[0m [36m0:00:01[0m
[?25hInstalling collected packages: pandas
  Attempting uninstall: pandas
    Found existing installation: pandas 2.2.2
    Uninstalling pandas-2.2.2:
      Successfully uninstalled pandas-2.2.2
Successfully installed pandas-2.1.4
Note: you may need to restart the kernel to use updated packages.


In [None]:
import pandas as pd
import numpy as np
from scipy.interpolate import interp1d
import yfinance as yf
import quantstats as qs
from datetime import datetime

In [None]:
# --- CONFIGURATION ---
HISTORICAL_DATA_FILE = 'spy_options_historical_2023-01-01_to_2024-12-31.csv'
RISK_FREE_RATE = 0.05
BASE_NOTIONAL = 5_000_000
INITIAL_CAPITAL = 1_000_000
START_DATE = '2023-01-03'
END_DATE = '2023-12-29'

In [None]:
def calculate_variance_strike(chain_for_one_expiry):
    if chain_for_one_expiry.empty: return 0
    T = chain_for_one_expiry['time_to_expiry'].iloc[0]
    S = chain_for_one_expiry['underlying_price'].iloc[0]
    if T <= 0 or S <= 0: return 0
    F = S * np.exp(RISK_FREE_RATE * T)
    below_F = chain_for_one_expiry[chain_for_one_expiry['strike'] <= F]
    if below_F.empty: return 0
    K0 = below_F['strike'].max()
    otm_puts = chain_for_one_expiry[(chain_for_one_expiry['option_type'] == 'P') & (chain_for_one_expiry['strike'] < K0)].sort_values('strike')
    otm_calls = chain_for_one_expiry[(chain_for_one_expiry['option_type'] == 'C') & (chain_for_one_expiry['strike'] > K0)].sort_values('strike')
    atm_options = chain_for_one_expiry[chain_for_one_expiry['strike'] == K0]
    sum_puts, sum_calls = 0, 0
    if not otm_puts.empty:
      otm_puts = otm_puts.copy()
      otm_puts['delta_K'] = otm_puts['strike'].diff().fillna(otm_puts['strike'].iloc[0])
      sum_puts = ((otm_puts['delta_K'] / otm_puts['strike']**2) * otm_puts['mid_price']).sum()
    if not otm_calls.empty:
      otm_calls = otm_calls.copy()
      otm_calls['delta_K'] = otm_calls['strike'].diff().fillna(0)
      sum_calls = ((otm_calls['delta_K'] / otm_calls['strike']**2) * otm_calls['mid_price']).sum()
    atm_price = atm_options['mid_price'].mean() if not atm_options.empty else 0
    put_max_strike = otm_puts['strike'].max() if not otm_puts.empty else 0
    call_min_strike = otm_calls['strike'].min() if not otm_calls.empty else K0
    atm_delta_K = (call_min_strike - put_max_strike) / 2 if put_max_strike > 0 else call_min_strike - K0
    sum_atm = (atm_delta_K / K0**2) * atm_price if K0 > 0 else 0
    variance = (2 / T) * np.exp(RISK_FREE_RATE * T) * (sum_puts + sum_calls + sum_atm) - (1 / T) * (F / K0 - 1)**2
    return np.sqrt(variance) * 100 if variance > 0 else 0

def get_variance_curve_for_date(target_date, all_options_data):
    daily_data = all_options_data[all_options_data['quote_date'] == target_date]
    if daily_data.empty: return None
    daily_data = daily_data.copy()
    daily_data['time_to_expiry'] = (daily_data['expiration'] - daily_data['quote_date']).dt.days / 365.25
    daily_data['mid_price'] = (daily_data['bid'] + daily_data['ask']) / 2.0
    daily_data = daily_data[(daily_data['time_to_expiry'] > 0.001) & (daily_data['mid_price'] > 0)]
    if daily_data.empty: return None
    results = []
    for _, chain in daily_data.groupby('expiration'):
        var_strike = calculate_variance_strike(chain)
        if var_strike > 0:
            results.append({'Days': int(chain['time_to_expiry'].iloc[0] * 365.25), 'Variance_Strike_Vol': var_strike})
    if len(results) < 2: return None
    results_df = pd.DataFrame(results).drop_duplicates(subset='Days').sort_values('Days')
    if len(results_df) < 2: return None
    return interp1d(results_df['Days'], results_df['Variance_Strike_Vol'], kind='linear', fill_value="extrapolate")

In [None]:
class VarSwapBacktester:
    def __init__(self, start_date, end_date, all_options_data, initial_capital, base_notional):
        self.start_date = pd.to_datetime(start_date)
        self.end_date = pd.to_datetime(end_date)
        self.all_options_data = all_options_data
        self.capital = initial_capital
        self.base_notional = base_notional
        self.portfolio = []
        self.equity_curve = []
        self.dates = pd.bdate_range(self.start_date, self.end_date)
        self.latest_var_curve = None
        self.latest_curve_date = None
        spy_df = yf.download('SPY', start=self.start_date - pd.Timedelta(days=60), end=self.end_date)
        self.spy_hist = spy_df['Close']

    def get_variance_curve(self, date):
        if date == self.latest_curve_date: return self.latest_var_curve
        curve = get_variance_curve_for_date(date, self.all_options_data)
        if curve:
            self.latest_var_curve = curve
            self.latest_curve_date = date
        return self.latest_var_curve

    def mark_to_market(self, today):
        if not self.portfolio: return
        total_pnl = 0
        var_curve_today = self.get_variance_curve(today)
        if var_curve_today is None: return
        for swap in self.portfolio:
            prev_mtm = swap.get('current_mtm', 0)
            hist_subset = self.spy_hist.loc[swap['inception_date']:today]
            if len(hist_subset) < 2:
                realized_var_total = 0
            else:
                log_returns = np.log(hist_subset / hist_subset.shift(1)).dropna()
                sum_of_sq_returns = (log_returns**2).sum()
                if isinstance(sum_of_sq_returns, pd.Series):
                    realized_var_total = float(sum_of_sq_returns.iloc[0])
                else:
                    realized_var_total = float(sum_of_sq_returns)
            t_elapsed_days = (today - swap['inception_date']).days
            T_total_days = swap['tenor_T']
            annualized_realized_var = (realized_var_total / t_elapsed_days) * 252 if t_elapsed_days > 0 else 0
            t_remaining_days = T_total_days - t_elapsed_days
            if t_remaining_days <= 0:
                current_var = annualized_realized_var
            else:
                implied_var_remaining = (var_curve_today(t_remaining_days) / 100)**2
                current_var = (t_elapsed_days / T_total_days) * annualized_realized_var + \
                              (t_remaining_days / T_total_days) * implied_var_remaining
            strike_var = (swap['strike_vol'] / 100)**2
            mtm_value = swap['notional'] * (strike_var - current_var)
            swap['current_mtm'] = mtm_value
            daily_pnl = mtm_value - prev_mtm
            total_pnl += daily_pnl
        self.capital += total_pnl

    def execute_rolls(self, today):
        self.portfolio = [s for s in self.portfolio if s['expiry_date'] > today]
        if today.weekday() != 4: return
        if len(self.portfolio) >= 4:
            self.portfolio.sort(key=lambda x: x['expiry_date'])
            self.portfolio.pop(0)
        var_curve_today = self.get_variance_curve(today)
        if var_curve_today is None: return
        new_strike_vol = var_curve_today(30)
        if new_strike_vol <= 0: return
        notional_weight = 1 / new_strike_vol if new_strike_vol > 0 else 1.0
        new_notional = self.base_notional * notional_weight
        new_swap = {'inception_date': today, 'expiry_date': today + pd.Timedelta(days=30), 'tenor_T': 30, 'strike_vol': new_strike_vol, 'notional': new_notional, 'current_mtm': 0}
        self.portfolio.append(new_swap)
        print(f"  {today.date()}: Rolled. New 30D swap @ {new_strike_vol:.2f}% Notional ${new_notional:,.0f}")

    def run_backtest(self):
        for today in self.dates:
            curve = self.get_variance_curve(today)
            if curve is None:
                if self.equity_curve:
                    self.equity_curve.append({'date': today, 'capital': self.equity_curve[-1]['capital']})
                continue
            self.mark_to_market(today)
            self.execute_rolls(today)
            self.equity_curve.append({'date': today, 'capital': self.capital})
        if not self.equity_curve:
            return pd.DataFrame()
        return pd.DataFrame(self.equity_curve).set_index('date')

In [None]:
def generate_performance_report(equity_df):
    print("\n--- PERFORMANCE ANALYSIS ---")
    if equity_df.empty or len(equity_df) < 2:
        print("Backtest complete, but not enough data to generate a performance report.")
        if not equity_df.empty: print(f"Final Portfolio Value: ${equity_df['capital'].iloc[-1]:,.2f}")
        return

    returns = equity_df['capital'].pct_change()
    returns.replace([np.inf, -np.inf], np.nan, inplace=True)
    returns.dropna(inplace=True)

    if returns.empty:
         print("No valid returns were generated after cleaning. Cannot create performance report.")
         print(f"Final Portfolio Value: ${equity_df['capital'].iloc[-1]:,.2f}")
         return


    returns = returns.to_period('D')
    # --- END FIX ---

    print("Generating QuantStats report...")
    qs.reports.html(returns, benchmark='SPY', output='strategy_report.html', title='Variance Swap Strategy Performance')
    print("\nFull performance report saved to 'strategy_report.html'")
    print(f"\nFinal Portfolio Value: ${equity_df['capital'].iloc[-1]:,.2f}")
    qs.plots.equity(returns, benchmark='SPY', title='Strategy Equity Curve vs. SPY')


if __name__ == "__main__":
    print("Loading historical options data...")
    all_data = pd.read_csv(HISTORICAL_DATA_FILE)
    print("Normalizing date columns...")
    for col in ['quote_date', 'expiration']:
        all_data[col] = pd.to_datetime(all_data[col], utc=True).dt.tz_convert(None).dt.normalize()
    print("Data loaded and normalized.")
    print("\nInstantiating backtester...")
    backtester = VarSwapBacktester(
        start_date=START_DATE,
        end_date=END_DATE,
        all_options_data=all_data,
        initial_capital=INITIAL_CAPITAL,
        base_notional=BASE_NOTIONAL
    )
    print("Running backtest...")
    equity_df = backtester.run_backtest()
    print("Backtest complete.")
    generate_performance_report(equity_df)