In [38]:
import os
import math
import re
import functools
import itertools
from dataclasses import dataclass, field
import pandas as pd
pd.set_option("display.max_columns", None)
import numpy as np
from scipy.stats import norm
from scipy.optimize import minimize
import matplotlib.pyplot as plt

Add any general utility functions as static member functions to this class. They can be called as `Util.fn`.

In [39]:
class Util:
    def __init__(self):
        raise TypeError("Non-instantiable class")
        
    def __new__(self, *args, **kwargs):
        raise TypeError("Non-instantiable class")
        
    @staticmethod
    def make_regex_group_disjunction(coll):
        return "|".join(map(lambda x: f"({str(x)})", coll))

In [56]:
class OptionsData:
    s_data_dir = "data"
    s_default_filename = "isx2010C.xls"
    
    def __init__(self, filename=s_default_filename, clean=True):
        filepath = os.path.join(self.s_data_dir, filename)
        if not os.path.isfile(filepath):
            faulty_filepath = filepath
            filepath = os.path.join(self.s_data_dir, self.s_default_filename)
            print(f"Warning: could not find {faulty_filepath!r}; proceeding with {filepath!r}")
        self.__m_sheet_df_dict = pd.read_excel(filepath, sheet_name=None)
        if clean:
            for key, val in self.__m_sheet_df_dict.items():
                self.__m_sheet_df_dict[key] = self.__clean_df(val)
                
    def __get_item__(self, key):
        return self.__m_sheet_df_dict[key]
    
    def get_sheet_df_dict(self):
        return self.__m_sheet_df_dict
    
    def get_df_with_strikes(self, E, sheet_name=""):
        if not sheet_name:
            sheet_name = list(self.__m_sheet_df_dict.keys())[0]
            print(f"Warning: sheet name not specified; proceeding with {sheet_name!r}")
        df = self.__m_sheet_df_dict[sheet_name]
        common = ["T", "T_norm", "S", "r"]
        strikes = E if type(E) is list or type(E) is tuple else [E]
        cols = [*common, *map(lambda x: str(int(x)), strikes)]
        return df[cols]
    
    def __clean_df(self, df):
        # Discard rows where no options data is available.
        df = df.dropna(how="all")
        # Rename the columns according to the following convention:
        #  T = Time to Maturity
        #  S = Price of the Underlying
        #  r = Risk-Free Interest Rate
        df = df.rename(lambda x: self.__rename_df_cols(str(x), df), axis="columns")
        # Adjust the interest rate properly.
        df["r"] = df["r"] / 100
        # Add new column with annual-normalized T (252 = no. trading days in a year).
        df["T_norm"] = df["T"] / 252
        # Re-arrange the columns.
        common = ["S", "r", "T", "T_norm"]
        cols = [*common, *filter(lambda x: re.search("[0-9]+", x), df.columns.astype(str))]
        return df[cols]
    
    def __rename_df_cols(self, col_name, df):
        ncol = len(df.columns)
        # Time to maturity | (price of the underlying | risk-free rate).
        regex = r"(?P<T>[0-9]+(-[0-9]{2}){2} ([0-9]{2}:){2}[0-9]{2})|(?P<Sr>Unnamed: (?P<idx>[0-9]+))"
        match = re.match(regex, col_name)
        if not match:
            return col_name
        if match["T"]:
            return "T"
        elif match["Sr"]:
            col_idx = int(match["idx"])
            # Third last depicts the price of the underlying...
            if col_idx == ncol - 3:
                return "S"
            # ...and the second last the risk free rate.
            elif col_idx == ncol - 2:
                return "r"


In [57]:
data = OptionsData()
data.get_df_with_strikes(340).tail()



Unnamed: 0,T,T_norm,S,r,340
81,5,0.019841,524.29,0.0006,185.5
82,4,0.015873,527.93,0.0006,188.55
83,3,0.011905,529.59,0.0006,190.1
84,2,0.007937,524.11,0.0005,184.1
85,1,0.003968,,,


A class encapsulating the Black-Scholes-Merton model and related computations, such as Greeks. Can create instances from `pd.Series` objects (as returned by pd.DataFrame.iterrows) via the `BSM.make_from_series` factory method.

In [96]:
@dataclass(frozen=True)
class BSM:
    S: float
    E: float
    r: float
    T: float
    C_obs: float
    sigma: float = 1.0
    d1: float = field(init=False)
    d2: float = field(init=False)
    
    def __post_init__(self):
        S, E, r, T, C_obs, sigma = self.S, self.E, self.r, self.T, self.C_obs, self.sigma
        sigma, *_ = minimize(self.__implied_vol_objective, sigma, args=(S, E, r, T, C_obs))['x']
        object.__setattr__(self, "sigma", sigma)
        eps = np.finfo(float).eps
        d1 = (math.log(S / E) + (r + 0.5 * self.sigma**2) * T) / (sigma * math.sqrt(T) + eps)
        object.__setattr__(self, "d1", d1)
        d2 = self.d1 - math.sqrt(self.sigma**2 * T)
        object.__setattr__(self, "d2", d2)
        
    @staticmethod
    def make_from_series(ser, E, sigma=1.0):
        ser = ser.filter(regex=Util.make_regex_group_disjunction(["S", int(E), "r", "T_norm"]), axis="index")
        assert ser.shape[0] == 4, f"Error: The series should have an index of form [S, E, r, T_norm], got {ser.index}."
        S, r, T, C_obs = ser.array
        return BSM(S, E, r, T, C_obs, sigma=sigma)
    
    @functools.cached_property
    def delta(self):
        return norm.cdf(self.d1)
    
    @functools.cached_property
    def gamma(self):
        return norm.pdf(self.d1) / (self.S * self.sigma * math.sqrt(self.T))
    
    @functools.cached_property
    def theta(self):
        S, E, r, T, sigma, d1, d2 = self.S, self.E, self.r, self.T, self.sigma, self.d1, self.d2
        return -0.5 * S * norm.pdf(d1) * sigma / math.sqrt(T) - r * E * math.exp(-r * T) * norm.cdf(d2)
    
    @functools.cached_property
    def vega(self):
        return self.S * math.sqrt(self.T) * norm.pdf(self.d1)
    
    def __implied_vol_objective(self, sigma0, S, E, r, T, C_obs):
        eps = np.finfo(float).eps
        d1 = (math.log(S / E) + (r + 0.5 * sigma0**2) * T) / (math.sqrt(sigma0**2 * T) + eps)
        d2 = d1 - math.sqrt(sigma0**2 * T)
        C = norm.cdf(d1) * S - E * math.exp(-r * T) * norm.cdf(d2)
        return 0.5 * (C - C_obs)**2


In [97]:
class Hedger:
    pass

In [122]:
E = 340
schedule = 2
df = data.get_df_with_strikes(E).dropna()

import time

t0 = time.time()
# t = 0:
rows_iterator = iter(df.to_dict("records"))
row = next(rows_iterator)
print(row)
bsm_prev = BSM.make_from_series(row, E)
long_prev = bsm_prev.C_obs
delta_factor = bsm_prev.delta
short_prev = delta_factor * bsm_prev.S

# 0 < t < T:
mse = 0.0
for t, row in rows_iterator:
    bsm = BSM.make_from_series(row, E)
    long = bsm.C_obs
    dlong = long - long_prev
    short = delta_factor * bsm.S
    dshort = short - short_prev
    mse += (dlong - dshort)**2
    long_prev = long
    bsm_prev = bsm
    # Rehedge?
    if t % schedule == 0:
        delta_factor = bsm.delta
        short_prev = delta_factor * bsm.S
    else:
        short_prev = short

mse /= df.shape[0]

t1 = time.time()
print(f"Took {(t1 - t0)*1000:.2f} ms")

{'T': 86, 'T_norm': 0.3412698412698413, 'S': 491.34, 'r': 0.0011, '340': 152.2}


AttributeError: 'dict' object has no attribute 'filter'

In [112]:
print(f"Single option delta hedging {mse=:.2f}")

Single option delta hedging mse=21.88
