# IV Surface

Content:

- [Load Data](#load-data)
- [Model](#Model) 
- [IV Code](#IV-code)
- [Calls IV](#Call-IV)
- [Puts IV](#Puts-IV)

In [1]:
import warnings
import time

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from sklearn.preprocessing import StandardScaler
from sklearn.utils import shuffle

from scipy.optimize import fsolve
import scipy as sq
from scipy import stats

from typing import Tuple
from itertools import product

import torch
import torch.nn as nn
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader

In [2]:
# Set seeds
torch.manual_seed(0)
np.random.seed(0)

In [3]:
synthetic_calls_path = '../data/heston_mc_synthetic_calls.csv'
synthetic_puts_path = '../data/heston_mc_synthetic_puts.csv'
call_sample_path = '../data/heston_calls_iv.csv'
put_sample_path = '../data/heston_puts_iv.csv'

In [4]:
def reduce_mem_usage(df):
    """ iterate through all the columns of a dataframe and modify the data type
        to reduce memory usage.        
    """    
    for col in df.columns:
        col_type = df[col].dtype
        
        if col_type != object:
            c_min = df[col].min()
            c_max = df[col].max()
            if str(col_type)[:3] == 'int':
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    df[col] = df[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    df[col] = df[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    df[col] = df[col].astype(np.int32)
                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                    df[col] = df[col].astype(np.int64)  
            else:
                if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
                    df[col] = df[col].astype(np.float16)
                elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df[col] = df[col].astype(np.float32)
                else:
                    df[col] = df[col].astype(np.float64)
        else:
            df[col] = df[col].astype('category')
    
    return df

In [5]:
synthetic_calls = pd.read_csv(synthetic_calls_path, index_col=0)
synthetic_puts = pd.read_csv(synthetic_puts_path, index_col=0)

synthetic_calls = reduce_mem_usage(synthetic_calls)
synthetic_puts = reduce_mem_usage(synthetic_puts)

In [6]:
synthetic_options = pd.concat([synthetic_calls, synthetic_puts], axis=0)
synthetic_options = shuffle(synthetic_options, random_state=0)
synthetic_options = synthetic_options.reset_index()
synthetic_options = synthetic_options.drop('index', axis=1)

In [7]:
synthetic_options

Unnamed: 0,Price,Strike,Type,Kappa,Rho,Theta,Xi,V_0,Interest Rate,Time to Expiration,Option Price
0,100,63.0,P,1.877930,-0.603516,0.252441,0.050323,0.364258,0.065308,1.000977,0.002708
1,100,131.0,P,1.459961,-0.262451,0.310059,0.153198,0.338867,0.069153,0.363525,28.703125
2,100,105.0,C,0.247192,-0.742676,0.471924,0.463135,0.268311,0.040802,1.099609,2.324219
3,100,68.0,P,1.594727,-0.576172,0.099670,0.465088,0.366455,0.062683,1.072266,0.060577
4,100,77.0,C,1.563477,-0.328125,0.308838,0.409912,0.241455,0.011162,0.940430,24.359375
...,...,...,...,...,...,...,...,...,...,...,...
605995,100,71.0,P,1.039062,-0.757324,0.227661,0.312500,0.445068,0.016006,0.487061,1.505859
605996,100,57.0,C,1.905273,-0.276855,0.322266,0.061432,0.120117,0.025070,0.947754,41.687500
605997,100,135.0,C,0.971680,-0.711426,0.270508,0.438477,0.483887,0.097107,0.289307,0.180542
605998,100,64.0,P,1.767578,-0.417480,0.054260,0.137573,0.226440,0.070374,0.365479,0.001135


In [8]:
call_sample = pd.read_csv(call_sample_path, index_col=0)
put_sample = pd.read_csv(put_sample_path, index_col=0)

In [20]:
put_sample

Unnamed: 0,Price,Strike,Kappa,Rho,Theta,Xi,V_0,Interest Rate,Time to Expiration,C,P,Option Price
0,100,66,1.5,-0.05,0.45,0.3,0.25,0.02,0.443166,0,1,0.185020
1,100,71,1.5,-0.05,0.45,0.3,0.25,0.02,0.443166,0,1,0.321809
2,100,76,1.5,-0.05,0.45,0.3,0.25,0.02,0.443166,0,1,0.573322
3,100,81,1.5,-0.05,0.45,0.3,0.25,0.02,0.443166,0,1,0.828163
4,100,86,1.5,-0.05,0.45,0.3,0.25,0.02,0.443166,0,1,1.336206
...,...,...,...,...,...,...,...,...,...,...,...,...
845,100,126,1.5,-0.05,0.45,0.3,0.25,0.02,0.940808,0,1,24.916916
846,100,131,1.5,-0.05,0.45,0.3,0.25,0.02,0.940808,0,1,29.799266
847,100,136,1.5,-0.05,0.45,0.3,0.25,0.02,0.940808,0,1,34.710163
848,100,141,1.5,-0.05,0.45,0.3,0.25,0.02,0.940808,0,1,39.561434


## Preprocessing

In [9]:
synthetic_options = pd.get_dummies(synthetic_options, prefix='', prefix_sep='')

In [10]:
synthetic_options.drop('Option Price', axis=1).columns

Index(['Price', 'Strike', 'Kappa', 'Rho', 'Theta', 'Xi', 'V_0',
       'Interest Rate', 'Time to Expiration', 'C', 'P'],
      dtype='object')

In [11]:
input_sc = StandardScaler()
output_sc = StandardScaler()
input_data = input_sc.fit_transform(synthetic_options.drop('Option Price', axis=1))
output_data = output_sc.fit_transform(synthetic_options['Option Price'].values.reshape(-1, 1))

## Model

In [12]:
CUDA = torch.cuda.is_available()
device = 'cuda:0' if CUDA else 'cpu'

In [13]:
class ResBlock(nn.Module):

  def __init__(self, module):
    super(ResBlock, self).__init__()
    self.module = module

  def forward(self, x):
    return self.module(x) + x

In [14]:
class HiddenLayer(nn.Module):

  def __init__(self, layer_size, act_fn):
      super(HiddenLayer, self).__init__()
      
      if act_fn == 'ReLU':
        self.layer = nn.Sequential(
          nn.Linear(layer_size, layer_size),
          nn.ReLU())
      elif act_fn == 'LeakyReLU':
        self.layer = nn.Sequential(
          nn.Linear(layer_size, layer_size),
          nn.LeakyReLU())
      elif act_fn == 'ELU':
        self.layer = nn.Sequential(
          nn.Linear(layer_size, layer_size),
          nn.ELU())
    
  def forward(self, x):
    return self.layer(x)

In [15]:
class Net(nn.Module):

  def __init__(self, input_size, output_size, hidden_size, num_layers, act_fn):
    super(Net, self).__init__()
    self.input_size = input_size
    self.output_size = output_size
    self.hidden_size = hidden_size

    if act_fn == 'ReLU':
      self.initial_layer = nn.Sequential(
          nn.Linear(self.input_size, self.hidden_size),
          nn.ReLU())
    elif act_fn == 'LeakyReLU':
      self.initial_layer = nn.Sequential(
          nn.Linear(self.input_size, self.hidden_size),
          nn.LeakyReLU())
    elif act_fn == 'ELU':
      self.initial_layer = nn.Sequential(
          nn.Linear(self.input_size, self.hidden_size),
          nn.ELU())

    self.hidden_layers_list = []

    for i in range(num_layers // 2):
      self.hidden_layers_list.append(
          ResBlock(
            nn.Sequential(
                HiddenLayer(self.hidden_size, act_fn),
                HiddenLayer(self.hidden_size, act_fn)
            )
        )
      )

    self.hidden_layers = nn.Sequential(*self.hidden_layers_list)

    self.net = nn.Sequential(
        self.initial_layer,
        self.hidden_layers,
        nn.Linear(self.hidden_size, self.output_size)
    )
  
  def forward(self, x):
    return self.net(x)

In [16]:
def init_weights(m, init_m: str):

  @torch.no_grad()
  def init_uniform(m):
    if isinstance(m, nn.Linear):
      torch.nn.init.uniform_(m.weight)
      m.bias.data.fill_(0.01)

  @torch.no_grad()
  def init_normal(m):
    if isinstance(m, nn.Linear):
      torch.nn.init.normal_(m.weight)
      m.bias.data.fill_(0.01)

  @torch.no_grad()
  def init_xuniform(m):
    if isinstance(m, nn.Linear):
      torch.nn.init.xavier_uniform_(m.weight)
      m.bias.data.fill_(0.01)

  @torch.no_grad()
  def init_xnormal(m):
    if isinstance(m, nn.Linear):
      torch.nn.init.xavier_normal_(m.weight)
      m.bias.data.fill_(0.01)

  if init_m == 'uniform':
    m.apply(init_uniform)
  elif init_m == 'normal':
    m.apply(init_normal)
  elif init_m == 'xaiver uniform':
    m.apply(init_xuniform)
  elif init_m == 'xavier normal':
    m.apply(init_xnormal)

In [21]:
input_size = 11
output_size = 1
num_layers = 4
hidden_size = 600
batch_size = 585
epochs = 2000
lr = 0.000656
init_method = 'xaiver uniform'
act_fn = 'LeakyReLU'

model = Net(input_size, output_size, hidden_size, num_layers, act_fn)
init_weights(model, init_method)

loss_fn = nn.MSELoss()

In [18]:
class OptDataset(Dataset):

  def __init__(self, X, y):
    self.X = X
    self.y = y

  def __getitem__(self, idx):
    return self.X[idx], self.y[idx]

  def __len__(self):
    return len(self.X)

In [22]:
save_model_path = '../models/final_heston_model.chkpt'

model = Net(input_size, output_size, hidden_size, num_layers, act_fn)
model.load_state_dict(torch.load(save_model_path, map_location=device))
model = model.to(device)

RuntimeError: Error(s) in loading state_dict for Net:
	size mismatch for initial_layer.0.weight: copying a param with shape torch.Size([600, 10]) from checkpoint, the shape in current model is torch.Size([600, 11]).
	size mismatch for net.0.0.weight: copying a param with shape torch.Size([600, 10]) from checkpoint, the shape in current model is torch.Size([600, 11]).

## IV code

There two different approaches for derive IV:

**Closed form approximations**: usually are closed form solution derived directly from the Black Scholes model, they tend to work fine when finding IV of ATM options. As soon the underlying price departs from the strike price it very quickly loses accuracy. Close form solutions are: Brenner & Subrahmanyam formula, Bharadia, Christofides and Salkin formula, Corrado & Miller and others

**Numerical methods**: they involves the use of root finding algorithm for obtaining the volatility value $\sigma$ such that the function:

$$
f(\sigma) = C - g(\sigma)
$$

vanishes i.e. that the price of the call $C$ (or the put $P$) observed on the market is the same as the expressed by the value $g(\sigma)$ proposed by the algorithm.

In [None]:
def get_d1_d2(S, X, T, t, r, sigma):
    """
    Compute d1 and d2 values for the black-scholes pricing model


    :param S: underlying price
    :param X: option's strike price
    :param T: option's time to maturity (in years)
    :param t: current time (in years)
    :param r: interest rate
    :param sigma: underlying volatility
    :return: (d1, d2)
    """
    d1 = (np.log(S / X) + (r + sigma * sigma / 2.) * (T - t)) / (sigma * np.sqrt(T - t))
    d2 = d1 - sigma * np.sqrt(T - t)
    return d1, d2


def black_scholes(S, X, T, t, r, sigma, o_type: str = "C") -> np.single:
    """
    Compute option price using the black-scholes model

    :param S: underlying price
    :param X: option's strike price
    :param T: option's time to maturity (in years)
    :param t: current time (in years)
    :param r: interest rate (in percentual)
    :param sigma: underlying volatility
    :param o_type: option type, "C" for a call option and "P" for a put option
    :return: the black-scholes option price
    """
    d1, d2 = get_d1_d2(S, X, T, t, r, sigma)
    if o_type == "C":
        return S * stats.norm.cdf(d1, 0, 1) - X * np.exp(-r * (T - t)) * stats.norm.cdf(d2, 0, 1)
    else:
        return X * np.exp(-r * (T - t)) * stats.norm.cdf(-d2, 0, 1) - S * stats.norm.cdf(-d1, 0, 1)

### Newton method

In the newton method the starting point used is the inflecion point of $g(\sigma)$ which is $\sigma_F = \sqrt{\frac{2 |\log(S/K)|}{T}}$.

As shown in [Giuseppe Orlando et al](https://www.sciencedirect.com/science/article/pii/S0377042717300602) $\sigma_F$ gives faster performance comparing to others starting points.

In [None]:
def IV_newton(CallPutFlag, S, X, T, r, Option_Value):

    def obj_function(IV):
        result = Option_Value - black_scholes(S, X, T, 0, r, IV, CallPutFlag)
        return result

    x0 = np.sqrt((2 * np.abs(np.log(S/X))) / T)
    
    try:
      IV_Result = sq.optimize.newton(obj_function, x0=x0)
    except RuntimeError:
      IV_Result = np.nan
    except ValueError:
      IV_Result = np.nan

    return IV_Result

### Brent method

In [None]:
def IV_brent(CallPutFlag, S, X, T, r, Option_Value):

    def obj_function(IV):
        result = Option_Value - black_scholes(S, X, T, 0, r, IV, CallPutFlag)
        return result

    x0 = np.sqrt((2 * np.abs(np.log(S/X))) / T)
    
    try:
      IV_Result = sq.optimize.brenth(obj_function, a=0.01, b=2.5, xtol=0.000001)
    except RuntimeError:
      IV_Result = np.nan
    except ValueError:
      IV_Result = np.nan

    return IV_Result

### Brenner & Subrahmanyam formula

In [None]:
def iv_bs(CallPutFlag, S, X, T, r, Option_Value):
    if CallPutFlag == 'C':
      return (np.sqrt(2 * np.pi / T) * S / X) / 10
    else:
      return (np.sqrt(2 * np.pi / T) * X / S) / 10

## Calls IV

In [None]:
call_sample

In [None]:
call_sample_t = input_sc.transform(call_sample.drop('Option Price', axis=1))
call_sample_t = Variable(torch.Tensor(call_sample_t))

In [None]:
with torch.no_grad():
    pred = model(call_sample_t)

pred = output_sc.inverse_transform(pred.cpu().detach().numpy())
call_sample['Prediction'] = pred
call_sample['Prediction'] = np.abs(call_sample['Prediction'].values)
call_sample['Moneyness'] = call_sample.Price / call_sample.Strike

In [None]:
call_sample.sort_values(['Moneyness', 'Time to Expiration'], ascending = [True, True])

In [None]:
call_sample_iv = call_sample.copy()
call_sample_iv_ = call_sample.copy()
call_sample_iv['IV'] = np.nan
call_sample_iv['Model IV'] = np.nan
call_sample_iv_['IV'] = np.nan
call_sample_iv_['Model IV'] = np.nan

In [None]:
call_sample_iv['Model IV'] = call_sample_iv.apply(lambda x: IV_newton('C', x[0], x[1], x[8], x[7], x[12]), axis=1)
call_sample_iv['IV'] = call_sample_iv.apply(lambda x: IV_newton('C', x[0], x[1], x[8], x[7], x[11]), axis=1)
call_sample_iv_['Model IV'] = call_sample_iv_.apply(lambda x: IV_brent('C', x[0], x[1], x[8], x[7], x[12]), axis=1)
call_sample_iv_['IV'] = call_sample_iv_.apply(lambda x: IV_brent('C', x[0], x[1], x[8], x[7], x[11]), axis=1)

In [None]:
call_sample_iv = call_sample_iv.dropna()
call_sample_iv = call_sample_iv[call_sample_iv.IV != 0]
call_sample_iv = call_sample_iv[call_sample_iv['Model IV'] != 0]
call_sample_iv = call_sample_iv.sort_values(['Moneyness', 'Time to Expiration'], ascending = [True, True])
call_sample_iv

In [None]:
call_sample_iv_ = call_sample_iv_.dropna()
call_sample_iv_ = call_sample_iv_[call_sample_iv_.IV != 0]
call_sample_iv_ = call_sample_iv_[call_sample_iv_['Model IV'] != 0]
call_sample_iv_ = call_sample_iv_.sort_values(['Moneyness', 'Time to Expiration'], ascending = [True, True])
call_sample_iv_

In [None]:
X_axis = call_sample_iv.Moneyness
Y_axis = call_sample_iv['Time to Expiration']
Z_axis = call_sample_iv.IV
X_axis_ = call_sample_iv_.Moneyness
Y_axis_ = call_sample_iv_['Time to Expiration']
Z_axis_ = call_sample_iv_.IV

In [None]:
fig = plt.figure(figsize=(20, 10))
ax = fig.add_subplot(1, 2, 1, projection='3d')

ax.plot_trisurf(X_axis, Y_axis, Z_axis, 
                linewidth=0.2, antialiased=True, cmap='viridis')
ax.set_xlabel('Moneyness')
ax.set_ylabel('Time to expiration')
ax.set_zlabel('IV')
ax.set_title('IV Surface')
ax.view_init(30, 60)

ax = fig.add_subplot(1, 2, 2, projection='3d')

ax.plot_trisurf(X_axis, Y_axis, call_sample_iv['Model IV'], 
                linewidth=0.2, antialiased=True, cmap='viridis')
ax.set_xlabel('Moneyness')
ax.set_ylabel('Time to expiration')
ax.set_zlabel('IV')
ax.set_title('ANN\'s IV Surface')
ax.view_init(30, 60)

In [None]:
fig = plt.figure(figsize=(20, 10))
ax = fig.add_subplot(1, 2, 1, projection='3d')

ax.plot_trisurf(X_axis_, Y_axis_, call_sample_iv_['Model IV'], 
                linewidth=0.2, antialiased=True, cmap='binary')
ax.set_xlabel('Moneyness')
ax.set_ylabel('Time to expiration')
ax.set_zlabel('IV')
ax.set_title('Brent IV Surface')
ax.view_init(30, 60)

ax = fig.add_subplot(1, 2, 2, projection='3d')

ax.plot_trisurf(X_axis, Y_axis, call_sample_iv['Model IV'], 
                linewidth=0.2, antialiased=True, cmap='plasma')
ax.set_xlabel('Moneyness')
ax.set_ylabel('Time to expiration')
ax.set_zlabel('IV')
ax.set_title('Newton IV Surface')
ax.view_init(30, 60)

In [None]:
fig = plt.figure(figsize=(20, 10))
ax = fig.add_subplot(1, 2, 1, projection='3d')

ax.scatter(X_axis, Y_axis, call_sample_iv.IV)
ax.set_xlabel('Moneyness')
ax.set_ylabel('Time to expiration')
ax.set_zlabel('IV')
ax.set_title('IV')

ax = fig.add_subplot(1, 2, 2, projection='3d')

ax.scatter(X_axis, Y_axis, call_sample_iv['Model IV'])
ax.set_xlabel('Moneyness')
ax.set_ylabel('Time to expiration')
ax.set_zlabel('IV')
ax.set_title('ANN\'s IV')

plt.show()

## Puts IV

In [None]:
put_sample_t = input_sc.transform(put_sample.drop('Option Price', axis=1))
put_sample_t = Variable(torch.Tensor(put_sample_t))

In [None]:
with torch.no_grad():
    pred = model(put_sample_t)

pred = output_sc.inverse_transform(pred.cpu().detach().numpy())
put_sample['Prediction'] = pred
put_sample['Prediction'] = np.abs(put_sample['Prediction'].values)
put_sample['Moneyness'] = put_sample.Strike / put_sample.Price

In [None]:
put_sample_iv = put_sample.copy()
put_sample_iv_ = put_sample.copy()
put_sample_iv['IV'] = np.nan
put_sample_iv['Model IV'] = np.nan
put_sample_iv_['IV'] = np.nan
put_sample_iv_['Model IV'] = np.nan

In [None]:
put_sample_iv['Model IV'] = put_sample_iv.apply(lambda x: IV_newton('P', x[0], x[1], x[8], x[7], x[12]), axis=1)
put_sample_iv['IV'] = put_sample_iv.apply(lambda x: IV_newton('P', x[0], x[1], x[8], x[7], x[11]), axis=1)
put_sample_iv_['Model IV'] = put_sample_iv_.apply(lambda x: IV_brent('P', x[0], x[1], x[8], x[7], x[12]), axis=1)
put_sample_iv_['IV'] = put_sample_iv_.apply(lambda x: IV_brent('P', x[0], x[1], x[8], x[7], x[11]), axis=1)

In [None]:
put_sample_iv = put_sample_iv.dropna()
put_sample_iv = put_sample_iv[put_sample_iv.IV != 0]
put_sample_iv = put_sample_iv[put_sample_iv['Model IV'] != 0]
put_sample_iv = put_sample_iv.sort_values(['Moneyness', 'Time to Expiration'], ascending = [True, True])
put_sample_iv

In [None]:
put_sample_iv_ = put_sample_iv_.dropna()
put_sample_iv_ = put_sample_iv_[put_sample_iv_.IV != 0]
put_sample_iv_ = put_sample_iv_[put_sample_iv_['Model IV'] != 0]
put_sample_iv_ = put_sample_iv_.sort_values(['Moneyness', 'Time to Expiration'], ascending = [True, True])
put_sample_iv_

In [None]:
X_axis = put_sample_iv.Moneyness
Y_axis = put_sample_iv['Time to Expiration']
Z_axis = put_sample_iv.IV
X_axis_ = put_sample_iv_.Moneyness
Y_axis_ = put_sample_iv_['Time to Expiration']
Z_axis_ = put_sample_iv_.IV

In [None]:
fig = plt.figure(figsize=(20, 10))
ax = fig.add_subplot(1, 2, 1, projection='3d')

ax.plot_trisurf(X_axis, Y_axis, Z_axis, 
                linewidth=0.2, antialiased=True, cmap='viridis')
ax.set_xlabel('Moneyness')
ax.set_ylabel('Time to expiration')
ax.set_zlabel('IV')
ax.set_title('IV Surface')
ax.view_init(30, 60)

ax = fig.add_subplot(1, 2, 2, projection='3d')

ax.plot_trisurf(X_axis, Y_axis, put_sample_iv['Model IV'], 
                linewidth=0.2, antialiased=True, cmap='viridis')
ax.set_xlabel('Moneyness')
ax.set_ylabel('Time to expiration')
ax.set_zlabel('IV')
ax.set_title('ANN\'s IV Surface')
ax.view_init(30, 60)
plt.savefig('../results/IV-surface-heston.png')

In [None]:
fig = plt.figure(figsize=(20, 10))
ax = fig.add_subplot(1, 2, 1, projection='3d')

ax.plot_trisurf(X_axis_, Y_axis_, put_sample_iv_['Model IV'], 
                linewidth=0.2, antialiased=True, cmap='magma')
ax.set_xlabel('Moneyness')
ax.set_ylabel('Time to expiration')
ax.set_zlabel('IV')
ax.set_title('Brent IV Surface')
ax.view_init(30, 60)

ax = fig.add_subplot(1, 2, 2, projection='3d')

ax.plot_trisurf(X_axis, Y_axis, put_sample_iv['Model IV'], 
                linewidth=0.2, antialiased=True, cmap='magma')
ax.set_xlabel('Moneyness')
ax.set_ylabel('Time to expiration')
ax.set_zlabel('IV')
ax.set_title('Newton IV Surface')
ax.view_init(30, 60)