## Transform a stock option input file into a bigquery dataset
- Calcualte equilibrium stock price where call and put implied volatility are equal
- Isolate strikes that have a bid and are within +- 2 standard deviations of equilibrium price
- Calculate moneyness for all strikes filtered above
- Calculate implied volatility of all strikes filtered above
- generate json formatted output
- write daily output files to bigquery
- for table schema: see ddl_stock_market.ipynb

<b>Data Source:</b> https:\\historicaloptiondata.com

## Codebase

In [39]:
#declare dependencies and constants
from google.cloud import bigquery
import pandas as pd
import pandas_market_calendars as mcal
import datetime
import math
import mibian
import scipy
import json
import random
import sys

STOCK_SYMBOLS = ['SPY']
PROJECT_ID = 'expiry-week'
DATASET_ID = 'option_quotes'
JOB_ID = '' #supplied as an arguement when running program

In [40]:
def get_interest_rate(quote_date):
    "return the fed funds rate that was in effect on the supplied quote date"
    df_fedfunds = pd.read_csv('gs://expiry-week-data/options/FEDFUNDS.csv', parse_dates=['DATE'])
    df_fedfunds = df_fedfunds[df_fedfunds['DATE'].dt.date <= quote_date]
    target_index = df_fedfunds['DATE'].idxmax()
    return df_fedfunds.loc[target_index]['FEDFUNDS']


In [41]:
def calc_call_iv(stock_price, strike_price, interest_rate, days_to_expiry, call_price):
    """
    calculate the implied volatility of a call option
    - return annualized implied volatility as a decimal value
    """
    bs = mibian.BS([stock_price, strike_price, interest_rate, days_to_expiry], callPrice=call_price)
    return bs.impliedVolatility / 100

In [42]:
def calc_put_iv(stock_price, strike_price, interest_rate, days_to_expiry, put_price):
    """
    calculate the implied volatility of a put option
    - return annualized implied volatility as a decimal value
    """
    bs = mibian.BS([stock_price, strike_price, interest_rate, days_to_expiry], putPrice=put_price)
    return bs.impliedVolatility / 100

In [43]:
def center_underlying_price(stock_price, strike_price, interest_rate, days_to_expiry, call_price, put_price):
    """
    calculate equilibrium implied volatility and the adjusted underlying price at which it is acheived
    this is the underlying price at which both call and put implied volatility are equal
    note: interest_rate needs be passed as a percent (e.g 5 = 5%)
    """
    #get starting call and put implied volatility adjusted for time to expiry
    call_iv = calc_call_iv(stock_price, strike_price, interest_rate, days_to_expiry, call_price)       
    put_iv = calc_put_iv(stock_price, strike_price, interest_rate, days_to_expiry, put_price)
    
    #calculate implied volatility difference adjusted for time to expiry
    iv_diff = abs(put_iv - call_iv) * math.sqrt(days_to_expiry / 365) 

    adj_lower = stock_price  * math.exp(-iv_diff) 
    adj_upper = stock_price  * math.exp(iv_diff)
    adj_stock_price = stock_price
    
    if put_iv > call_iv:
        #stock price is above equilibrium price
        adj_stock_price = round((stock_price + adj_lower) / 2, 2)
    else:
        #stock price is below equilibrium price
        adj_stock_price = round((stock_price + adj_upper) / 2, 2)
           
    for i in range(100):
        call_iv = calc_call_iv(adj_stock_price, strike_price, interest_rate, days_to_expiry, call_price)
        put_iv = calc_put_iv(adj_stock_price, strike_price, interest_rate, days_to_expiry, put_price)     
       
        prev_adj_stock_price = adj_stock_price
        if put_iv > call_iv:
            #adjusted stock price is to high
            adj_upper = adj_stock_price
        else:
            #adjusted stock price is to low
            adj_lower = adj_stock_price        
            
        adj_stock_price = round((adj_lower + adj_upper) / 2, 2)
        
        #once the adjusted stock price stops changing no further price convergence is possible and we are done
        #average the call and put implied volatilities to estimate the implied volatility at the equilibrium price        
        if adj_stock_price == prev_adj_stock_price:
            atm_iv = (call_iv + put_iv) / 2
            return adj_lower, atm_iv     
                
    #throw error if convergence was not achieved
    print('strike_price:', strike_price,'days_to_expiry:', days_to_expiry)
    raise Exception("stock price failed to converge on an equilibrium price")

In [44]:
def merge_options(df_expiry):
    """
    merge calls and puts for a given expiry date and quote date using strike price as the key
    - this will produce a straddle layout and keep only strike prices that exist on both the call and put side
    """

    df_calls = df_expiry[df_expiry['option_type'] == 'call'].reset_index(drop=True)
    df_calls.rename(columns={'bid': 'call_bid', 'ask': 'call_ask', 'volume': 'call_volume', 'open_interest' : 'call_open_interest'}, inplace=True)
    del df_calls['option_type']

    df_puts = df_expiry[df_expiry['option_type'] == 'put'][['bid', 'ask', 'volume', 'open_interest', 'strike_price']].reset_index(drop=True)
    df_puts.rename(columns={'bid': 'put_bid', 'ask': 'put_ask', 'volume': 'put_volume', 'open_interest' : 'put_open_interest'}, inplace=True)

    df_straddle = pd.merge(df_calls, df_puts, on='strike_price', how='inner')
    return df_straddle


In [107]:
def calc_put_moneyness(atm_price, atm_iv, strike_price, days_to_expiry):
    """
    calculate the probability that a put option will close in the money on expiry date
    Note that the probability of a call option with the same strike price closing in the money
    will be 1 - the probability of the put option closing in the money    
    """
    iv_to_expiry = atm_iv * math.sqrt(days_to_expiry / 365)
    zscore = math.log(strike_price /  atm_price) / iv_to_expiry
    norm_cdf = scipy.stats.norm.cdf(zscore)
    return norm_cdf
    

In [97]:
def create_base_record(quote_date, quote_week, underlying_price):
    """Generate the non repeating part of an option record"""
    base_record = {}
    base_record['quote_date'] = quote_date.strftime('%Y-%m-%d')
    base_record['quote_week'] = quote_week 
    base_record['underlying_price'] = underlying_price
    base_record['wt_call_volume'] = 0
    base_record['wt_put_volume'] = 0
    base_record['wt_call_open_interest'] = 0
    base_record['wt_put_open_interest'] = 0
    base_record['expiry_dates'] = []
    base_record['sampling_key'] = round(random.random(), 5)
    return base_record


In [84]:
def create_expiry_record(expiry_date, days_to_expiry, atm_price, atm_iv):
    """Generate an expiry week record"""
    expiry_record = {}
    expiry_record['expiry_date'] = expiry_date.strftime('%Y-%m-%d')
    expiry_record['days_to_expiry'] = days_to_expiry
    expiry_record['atm_price'] = round(atm_price, 2)
    expiry_record['atm_iv'] = round(atm_iv, 3)
    expiry_record['wt_call_volume'] = 0
    expiry_record['wt_put_volume'] = 0
    expiry_record['wt_call_open_interest'] = 0
    expiry_record['wt_put_open_interest'] = 0
    expiry_record['strike_prices'] = []
    return expiry_record
    

In [109]:
def create_strike_record(row):
    """Generate a strike price record"""
    strike_record = {}
    strike_record['strike_price'] = row['strike_price']
       
    #call attributes
    call_moneyness = round(1 - row['put_moneyness'], 3)
    strike_record['call_bid'] = row['call_bid']
    strike_record['call_ask'] = row['call_ask']
    strike_record['wt_call_volume'] = int(row['call_volume'] * call_moneyness)
    strike_record['wt_call_open_interest'] = int(row['call_open_interest'] * call_moneyness)
    strike_record['call_moneyness'] = call_moneyness
    strike_record['call_iv'] = round(row['call_iv'], 3)
    
     #put attributes
    strike_record['put_bid'] = row['put_bid']
    strike_record['put_ask'] = row['put_ask']
    strike_record['wt_put_volume'] = int(row['put_volume'] * row['put_moneyness'])
    strike_record['wt_put_open_interest'] = int(row['put_open_interest'] * row['put_moneyness'])
    strike_record['put_moneyness'] = round(row['put_moneyness'], 3)
    strike_record['put_iv'] = round(row['put_iv'], 3)
        
    return strike_record
    

In [81]:
def process_expiry_date(df_expiry, quote_week, interest_rate):
    """
    read and process options for a given expiry date
    """
    #transpose calls and puts into a straddle layout
    df_straddle = merge_options(df_expiry)

    #find nearest strike price
    target_index = abs(df_straddle['strike_price'] - df_straddle['underlying_price']).idxmin()
    df_strike = df_straddle.loc[target_index]

    #get option values at target strike price
    quote_date = df_strike['quote_date']
    expiry_date = df_strike['expiry_date']
    underlying_price = df_strike['underlying_price']
    strike_price = df_strike['strike_price']
    days_to_expiry = (df_strike['expiry_date'] -  df_strike['quote_date']).days 
    call_price = (df_strike['call_bid'] + df_strike['call_ask']) / 2
    put_price = (df_strike['put_bid'] + df_strike['put_ask']) / 2

    #find adjusted underlying price and Implied Volatility where call and put implied volatility are the same
    atm_price, atm_iv = center_underlying_price(underlying_price, strike_price, interest_rate, 
        days_to_expiry, call_price, put_price)    
  
    #calculate put moneyness using at-the-money price and implied volatility
    #exclude options that are far out-of-the-money or deep in-the-money
    df_straddle['put_moneyness'] = df_straddle.apply(lambda x: calc_put_moneyness(atm_price, atm_iv, x['strike_price'], days_to_expiry), axis=1)
    df_straddle = df_straddle[(df_straddle['put_moneyness'] > .05) & (df_straddle['put_moneyness'] < .95)].reset_index(drop=True)

    #calculate call implied volatilities using atm_price as the current price
    df_straddle['call_iv'] = df_straddle.apply(lambda x: calc_call_iv(atm_price, x['strike_price'], \
        interest_rate, days_to_expiry, (x['call_bid'] + x['call_ask']) / 2), axis=1)
  
    #calculate put implied volatilities using atm_price as the current price
    df_straddle['put_iv'] = df_straddle.apply(lambda x: calc_put_iv(atm_price, x['strike_price'], \
        interest_rate, days_to_expiry, (x['put_bid'] + x['put_ask']) / 2), axis=1)
  
    #create base record
    expiry_record = create_expiry_record(expiry_date, days_to_expiry, atm_price, atm_iv)

    #add strike price records
    for index, row in df_straddle.iterrows():
        strike_record =  create_strike_record(row)
        expiry_record['strike_prices'].append(strike_record)
        
    #calculate totals for weighted volume and weighted open interest
    for strike_record in expiry_record['strike_prices']:
        expiry_record['wt_call_volume'] += strike_record['wt_call_volume']
        expiry_record['wt_put_volume'] += strike_record['wt_put_volume']
        expiry_record['wt_call_open_interest'] += strike_record['wt_call_open_interest']
        expiry_record['wt_put_open_interest'] += strike_record['wt_put_open_interest']
 
    return expiry_record

    

In [103]:
def process_quote_date(df_daily, quote_date, quote_week, symbol):
    """
    process options for a given symbol and quote date
    - save results to disk in newline delimited json format
    - load saved file to bigquery dataset    
    """
    #get fed funds rate that was effective on the supplied quote date
    interest_rate = get_interest_rate(quote_date)
    
    #create base record for quote date
    underlying_price = df_daily['underlying_price'].iloc[0]
    base_record =  create_base_record(quote_date, quote_week, underlying_price)
    
    #process each expiry date
    expiry_dates = df_daily['expiry_date'].unique()
    
    for expiry_date in expiry_dates:
        df_expiry = df_daily[df_daily['expiry_date'] == expiry_date].reset_index(drop=True)
        expiry_record = process_expiry_date(df_expiry, quote_week, interest_rate)
        base_record['expiry_dates'].append(expiry_record)
        
    #calculate grand totals for weighted volume and weighted open interest
    for expiry_record in base_record['expiry_dates']:
        base_record['wt_call_volume'] += expiry_record['wt_call_volume']
        base_record['wt_put_volume'] += expiry_record['wt_put_volume']
        base_record['wt_call_open_interest'] += expiry_record['wt_call_open_interest']
        base_record['wt_put_open_interest'] += expiry_record['wt_put_open_interest']
          
    #converty dictionary to ndjson format
    ndjson_record = json.dumps(base_record) + '\n'
     
    #save option record to disk
    filepath = 'temp/{}_{}.ndjson'.format(symbol.upper(), JOB_ID)
    with open(filepath, 'a') as f:
        f.write(ndjson_record)
  

In [52]:
def end_of_week_dates():
    """
    return a list of end of week trading days between 2010-06-04 and the current date
    the index of each item will be the week number relative to the start date
    NOTE: 2010-06-04 is the first date the SPY weekly options began trading
    """
    eow_dates = []
    start_date = datetime.date(2010, 6, 4)
    end_date = datetime.datetime.today().date()
    date_offsets = [4, 3, 2, 1, 0, 6, 5]
    offset = date_offsets[start_date.weekday()]
    next_date = start_date + datetime.timedelta(days=offset)
   
    #load stock exchange holidays
    nyse = mcal.get_calendar('NYSE')
    market_holidays = nyse.holidays().holidays
   
    while next_date <= end_date:
        #subtract 1 day from next_date if it is a market holiday
        if next_date in market_holidays:
            adj_date = next_date - datetime.timedelta(days=1)
            eow_dates.append(adj_date)
        else:
            eow_dates.append(next_date)
        next_date += datetime.timedelta(days=7)
    return eow_dates
  

In [53]:
def load_to_bq(symbol):
    """load transformed data into bigquery"""

    client = bigquery.Client()
    table_id = '{}.{}.{}'.format(PROJECT_ID, DATASET_ID, symbol.upper()) 
   
    job_config = bigquery.LoadJobConfig(
        wrute_disposition=bigquery.WriteDisposition.WRITE_APPEND,
        source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
    )

    filepath = 'temp/{}_{}.ndjson'.format(symbol.upper(), JOB_ID)
    with open(filepath, 'rb') as source_file:
        job = client.load_table_from_file(source_file, table_id, job_config=job_config)

    job.result()  # Waits for the job to complete.
    
    table = client.get_table(table_id)  # Make an API request.
    print("Loaded {} rows to table {}".format(table.num_rows, table_id))
  

In [54]:
def main(in_filepath):
    """
    capture options data on a weekly cadence
    file path is path to options input file
    example: gs://expiry-week-data/options/SPY_2020.csv
    """
    #load file into a dataframe
    input_columns = [0,1,5,6,7,8,10,11,12,13]

    column_names = ['underlying_symbol', 'underlying_price', 'option_type', 'expiry_date', 'quote_date',
        'strike_price', 'bid', 'ask', 'volume', 'open_interest']

    #exclude options that have already expired and options with no meaningful bid
    df_options = pd.read_csv(in_filepath, usecols=input_columns, names=column_names, header=0, parse_dates=[3,4])
    df_options = df_options[(df_options['underlying_symbol'].isin(STOCK_SYMBOLS)) & 
        (df_options['expiry_date'] > df_options['quote_date']) &
        (df_options['bid'] >= .05)]  

    return df_options
    
    #get all the end of week quote dates that exist in the input file
    #also get the corresponding week number where 2010-06-04 = 0
    start_date = df_options['quote_date'].dt.date.min()
    end_date = df_options['quote_date'].dt.date.max()
    eow_dates = end_of_week_dates()
    eow_indexes = [i for i, x in enumerate(eow_dates) if x >= start_date and x <= end_date]
    
    #process each stock symbol
    for symbol in STOCK_SYMBOLS:

        #process end of week quote dates
        for i in eow_indexes:
            df_daily = df_options[df_options['quote_date'].dt.date == eow_dates[i]]
            process_quote_date(df_daily, eow_dates[i], i, symbol)
            
        #upload records to bigquery
        load_to_bq(symbol)
        

In [33]:
#RUN PROGRAM
"""
process end-of-day stock options file
arguements:
argv[1] = Job ID (used to generate a unique local filename for storing intermediate results)
argv[2] = Input filepath (example: gs://expiry-week-data/options/SPY_2015.csv)
"""
#read in arguements if running .py program from command line
if sys.argv[0] == 'capture_options.py':
    JOB_ID = argv[1]
    main(sys.argv[2])

#supply arguements manually if running inside JupyterLab
else:
    JOB_ID = '01'
    in_filepath = 'gs://expiry-week-data/options/SPY_2015.csv'
    df_options = main(in_filepath)
    print('All Done!')

All Done!


## Code Exploration Section

In [111]:
JOB_ID = '01'
quote_date = datetime.date(2015, 7 ,2)
eow_dates = end_of_week_dates()
quote_week =  eow_dates.index(quote_date)
symbol = 'SPY'
df_test = df_options[df_options['quote_date'] == '2015-07-02']
base_record = process_quote_date(df_test, quote_date, quote_week, symbol)
print('All Done!')

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 5.48 µs
All Done!


In [110]:
import os
temp_files = os.listdir('temp')
for item in temp_files:
    filepath = 'temp/{}'.format(item)
    os.remove(filepath)

os.listdir('temp')

[]

In [112]:
symbol = 'SPY'
filepath = 'temp/{}_{}.ndjson'.format(symbol.upper(), JOB_ID)
with open(filepath, 'r') as f:
    records = f.readlines()
    
record = json.loads(records[-1])
print(json.dumps(record, indent=4))


{
    "quote_date": "2015-07-02",
    "quote_week": 265,
    "underlying_price": 207.32,
    "wt_call_volume": 238988,
    "wt_put_volume": 334863,
    "wt_call_open_interest": 1150927,
    "wt_put_open_interest": 2365850,
    "expiry_dates": [
        {
            "expiry_date": "2015-07-10",
            "days_to_expiry": 8,
            "atm_price": 207.38,
            "atm_iv": 0.181,
            "wt_call_volume": 79585,
            "wt_put_volume": 140038,
            "wt_call_open_interest": 61054,
            "wt_put_open_interest": 164250,
            "strike_prices": [
                {
                    "strike_price": 198.5,
                    "call_bid": 8.67,
                    "call_ask": 9.66,
                    "wt_call_volume": 231,
                    "wt_call_open_interest": 231,
                    "call_moneyness": 0.949,
                    "call_iv": 0.221,
                    "put_bid": 0.28,
                    "put_ask": 0.3,
                    "wt_put_vo