In [1]:
# Vol Surface Constructor

# This project constructs a volatility surface for options data from Yahoo Finance.
# K-Nearest Neighbours is used to detect outlier IV's, and an Radial Basis Function (with smoothing) is used for 3D interpolation of the surface.


# Step 1 -> Data Collection - collect options data from YF

# Step 2 -> Data Cleaning - e.g. removing option that:
                                                # have ultra low IV
                                                # have NA values
                                                # are illiquid
                                                # are extremely OTM/ITM

# Step 3 -> Outlier Detection - moneyness and maturity data is normalised and neighbours are identified
#                             - IVs that are more than X std deviations away from average are not considered in interpolation (this helps keep the surface smooth)

# Step 4 -> RBF Interpolation and plotting - a meshgrid for interpolation from normalised moneyness and maturity data is constructed
#                                          - the rbf interpolator calcualts model IV values at meshgrid points, meshgrid points are then rescaled
#                                          - surface is plotted using Plotly's 3D Surface and Scatter classes

In [2]:
import yfinance as yf
import pandas as pd
import numpy as np
import datetime as dt
from sklearn.neighbors import NearestNeighbors
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import RobustScaler
from scipy.stats import zscore
from scipy.interpolate import Rbf
import plotly.graph_objects as go

In [3]:
# INPUTS

ticker = 'SPY' # ticker of option we want (from yahoo finance)
open_interest = 1000 # open interest measures the number of active option contracts, I filter out (illiquid) options with open_interest below this value
outlier_penalty = 1.5 # if an IV is more than "__" STD's away from its neighbours, it is considered an 'Outlier'
k = 10   # no. of neighbours in KNN for outlier detection
smooth_input = 0.5  # used for RBF Interpolation (higher value means smoother surface, but captures less data)
minIV = 0.0001   # IVs below this value will be removed

In [4]:
# Step 1 -> Data Collection

def import_data(ticker):
    stock_data = yf.Ticker(ticker)
    expiration_dates = stock_data.options
    option_data = []

    for exp in expiration_dates:
        option_chain = stock_data.option_chain(exp)
        
        calls = option_chain.calls
        puts = option_chain.puts
        
        calls['expiration_date'] = exp
        calls['type'] = 'call'
        puts['expiration_date'] = exp
        puts['type'] = 'put'
        
        option_data.append(calls)
        option_data.append(puts)

    options_df = pd.DataFrame(pd.concat(option_data, ignore_index=True))

    return options_df

options_df = import_data(ticker)


In [5]:
# Step 2 -> Data Cleaning

def clean_data(options_df, open_interest):
    options_df["expiration_date"] = pd.to_datetime(options_df["expiration_date"])

    options_df = options_df[~options_df.isnull().any(axis=1)]

    options_df = options_df.drop_duplicates(subset="contractSymbol")

    options_df.drop(columns=["lastTradeDate","change","percentChange","volume","inTheMoney","contractSize","currency"],inplace=True, errors="ignore")

    current_price = yf.Ticker(ticker).history(period="1d").Close.iloc[-1]
    options_df["moneyness"] = options_df["strike"] / current_price
    options_df = options_df[(options_df["moneyness"] <= 1.2) & (options_df["moneyness"] >= 0.8)]

    options_df["time_to_maturity"] = (options_df.expiration_date - pd.to_datetime("today")).dt.days
    options_df = options_df[(options_df["time_to_maturity"] <= 180) & (options_df["time_to_maturity"] > 0)]

    options_df = options_df[options_df.openInterest > open_interest] 

    options_df = options_df[options_df["impliedVolatility"] > minIV]

    return options_df


def option_type_filter(options_df, option_type):
    return options_df[options_df['type'] == option_type]


options_df = clean_data(options_df)

calls_df = option_type_filter(options_df, 'call')
puts_df = option_type_filter(options_df, 'put')


In [6]:
# Step 3 -> Outlier Detection 

def outlier_detection(df, outlier_penalty):

    scaler = StandardScaler()

    scaled_data = scaler.fit_transform(df[['moneyness', 'time_to_maturity']])

    IV = df['impliedVolatility'].values

    knn = NearestNeighbors(n_neighbors=k)
    knn.fit(scaled_data)

    distances, indices = knn.kneighbors(scaled_data)

    avg_IV_neighbors = []
    for i in range(len(df)):
        neighbor_IVs = IV[indices[i]]
        avg_IV_neighbors.append(np.mean(neighbor_IVs))

    iv_diff = np.abs(IV - avg_IV_neighbors)

    threshold = outlier_penalty * np.std(iv_diff)

    outliers = iv_diff > threshold
    df['Outlier'] = outliers


    return df


outlier_detection(calls_df, outlier_penalty)
outlier_detection(puts_df, outlier_penalty)

calls_df_no_nan = calls_df[calls_df.Outlier == False]
puts_df_no_nan = puts_df[puts_df.Outlier == False]

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['Outlier'] = outliers
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['Outlier'] = outliers


In [9]:
# Step 4 -> RBF interpolation and plotting

# RBFs assign each meshgrid point a weight that diminishes the further it is from the target 
# The interpolated value at the target is a weighted sum of the known IV's

def construct_vol_surface(df, df_no_nan, ticker, smooth_input, option_type):
    
    scaler = RobustScaler()

    features_no_nan = df_no_nan[['moneyness', 'time_to_maturity']].values

    scaled_features_no_nan = scaler.fit_transform(features_no_nan)

    scaled_moneyness = np.linspace(scaled_features_no_nan[:, 0].min(), scaled_features_no_nan[:, 0].max(), 50)
    scaled_maturity = np.linspace(scaled_features_no_nan[:, 1].min(), scaled_features_no_nan[:, 1].max(), 50)

    scaled_moneyness_grid, scaled_maturity_grid = np.meshgrid(scaled_moneyness, scaled_maturity)

    rbf_interpolator = Rbf(scaled_features_no_nan[:, 0], scaled_features_no_nan[:, 1], df_no_nan['impliedVolatility'], function='linear', smooth=smooth_input)
    interpolated_volatility_scaled = rbf_interpolator(scaled_moneyness_grid, scaled_maturity_grid)

    rescaled_features = scaler.inverse_transform(np.column_stack([scaled_moneyness_grid.ravel(), scaled_maturity_grid.ravel()]))
    rescaled_features = rescaled_features.reshape(scaled_moneyness_grid.shape + (2,))  

    volatility_surface = go.Surface(
        x=rescaled_features[:, :, 0],  
        y=rescaled_features[:, :, 1],  
        z=interpolated_volatility_scaled.reshape(scaled_moneyness_grid.shape),  
        opacity=0.7,
        name='Smoothed Volatility Surface',
        colorscale="Cividis")

    outlier_points = df[df["Outlier"] == True]

    outlier_scatter = go.Scatter3d(
        x=outlier_points['moneyness'],
        y=outlier_points['time_to_maturity'],
        z=outlier_points['impliedVolatility'],  
        mode='markers',
        marker=dict(size=2,color='red'),
        name='Outliers',
        text=outlier_points["Outlier"],
        hovertemplate=(
            'Moneyness: %{x}<br>'
            'Time to Maturity: %{y}<br>'
            'Implied Volatility: %{z}<br>'
            'Outlier: %{text}<br>'))

    non_outlier_points = df[df["Outlier"] == False]
    non_outlier_scatter = go.Scatter3d(
        x=non_outlier_points['moneyness'],
        y=non_outlier_points['time_to_maturity'],
        z=non_outlier_points['impliedVolatility'],  
        mode='markers',
        marker=dict(size=2,color="black"),
        name='Market Data',
        text=non_outlier_points["Outlier"],
        hovertemplate=(
            'Moneyness: %{x}<br>'
            'Time to Maturity: %{y}<br>'
            'Implied Volatility: %{z}<br>'
            'Outlier: %{text}<br>'))

    fig = go.Figure(data=[outlier_scatter, non_outlier_scatter, volatility_surface])
    fig.update_layout(
        title=f"Volatility Surface - {ticker} {option_type}",
        scene=dict(
            xaxis_title='Moneyness',
            yaxis_title='Time to Maturity',
            zaxis_title='Implied Volatility'),
        legend=dict(
            orientation="h",  
            yanchor="bottom", 
            y=1,  
            xanchor="center", 
            x=0.5))

    fig.show()


construct_vol_surface(calls_df, calls_df_no_nan, ticker, smooth_input, option_type="Calls")
construct_vol_surface(puts_df, puts_df_no_nan, ticker, smooth_input, option_type="Puts")


