In [4]:
import yfinance as yf
from scipy.stats import norm, zscore, halfnorm
import numpy as np
import pandas as pd
import math
import scipy as sq
import yahoo_fin.stock_info as si
from datetime import datetime, timedelta, date
import os


def print_full(x):
    pd.set_option('display.max_rows', len(x))
    print(x)
    pd.reset_option('display.max_rows')

In [5]:
# ticker = yf.Ticker("ASO")
# todays_data = ticker.history(period='1y')
# todays_data

In [6]:
def get_current_price(symbol):
    tk = yf.Ticker(symbol)
    return tk.history(period='1d')['Close'][0]

def get_days_to_expiration(symbol):
    tk = yf.Ticker(symbol)
    exps = tk.options
    e = exps[0]
    dt_obj = datetime.strptime(e, '%Y-%m-%d')
    days_left = abs((datetime.now() - dt_obj).days)
    return days_left

def options_chain(symbol):
    tk = yf.Ticker(symbol)
    exps = tk.options
    options = pd.DataFrame()
    e = exps[0] 

    opt = tk.option_chain(e)
    calls = pd.DataFrame(opt.calls)
    puts = pd.DataFrame(opt.puts)
    calls['expirationDate'] = e
    puts['expirationDate'] = e
    
    options = pd.concat([options, calls])
    options = pd.concat([options, puts])
    
    options['CALL'] = options['contractSymbol'].str[4:].apply(lambda x: "C" in x)
    options[['bid', 'ask', 'strike']] = options[['bid', 'ask', 'strike']].apply(pd.to_numeric)
    options['mark_price'] = (options['bid'] + options['ask']) / 2
    options = options.drop(columns = ['contractSize', 'currency', 'change', 'percentChange', 'lastTradeDate'])
    
#     vol = []
#     for row in options.iterrows():
#         S = get_current_price(symbol)
#         X = row[1].strike
#         r = 0.03
#         T = get_days_to_expiration(symbol)
#         option = row[1].mark_price
#         p_c = 'C'
#         if row[1].CALL == False :
#             p_c = 'P'
            
#         IV = Calculate_IV_Call_Put(S, X, r, T, option, p_c)
#         vol.append(IV)
    
#     options["Black Scholes IV"] = vol
    return options

# options_chain("FB")

In [7]:
def get_expected_move(symbol):
    stock = yf.Ticker(symbol)
    curr = stock.history(period='1d')['Close'][0]
    options = options_chain(symbol)
        
    test_list = list(set(options.strike))
    temp = sorted(test_list, key=lambda x:abs(x-curr))
    if len(temp) < 3:
        return "Error"
    
    minVal = temp[0]
    higherVal = temp[1] if temp[1] > temp[2] else temp[2]
    lowerVal = temp[2] if temp[1] > temp[2] else temp[1]
    
    strad = options.loc[options.strike == minVal]
    low_stran = options.loc[options.strike == lowerVal]
    high_stran = options.loc[options.strike == higherVal]
    
    # IV Expected Move
    expected_move = curr * strad.impliedVolatility * math.sqrt(get_days_to_expiration(symbol)/365)
    per = expected_move / curr
#     print("Stock Expected Move based on IV is around -/+ %.5f" % expected_move.mean(), "or %.5f" % (per.mean() * 100), "%")
    # Expected Move = Stock Price x (Implied Volatility / 100) x square root of (Days to Expiration / 365)
    
    straddle_sum = 0
    if len(strad.mark_price) == 2:
        straddle_sum = strad.mark_price.sum()
    elif len(strad.mark_price) == 1:
        straddle_sum = strad.mark_price.sum() * 2

    strangle_sum = 0
    high_mark = high_stran.loc[high_stran.CALL == True].mark_price
    low_mark = low_stran.loc[low_stran.CALL == False].mark_price
    if not high_mark.empty and not low_mark.empty:
        strangle_sum = high_mark.iloc[0] + low_mark.iloc[0]
    
    if straddle_sum == 0:
        if len(strad.lastPrice) == 2:
            straddle_sum = strad.lastPrice.sum()
        elif len(strad.lastPrice) == 1:
            straddle_sum = strad.lastPrice.sum() * 2
    if strangle_sum == 0:
        high_last = high_stran.loc[high_stran.CALL == True].lastPrice
        low_last = low_stran.loc[low_stran.CALL == False].lastPrice
        if not high_last.empty and not low_last.empty:
            strangle_sum = high_last.iloc[0] + low_last.iloc[0]

    #Straddle Expected Move - 70% of ATM straddle + 30% of 1st Strangle
    total = 0.7 * (straddle_sum) + 0.3 * (strangle_sum)
    
#     print("Stock Expected Move based on Straddles is around -/+ %.5f" % result.mark_price.sum(), "or %.5f" % (straddle * 100), "%")
    # IV Expected, ATM Straddle, ATM Straddle + 1st Strangle, Avg of IV + ATM
    return (per.mean(), straddle_sum / curr, total / curr, (per.mean() + straddle_sum / curr) / 2)

get_expected_move("SIG")

(0.027347105286010337,
 0.03095975148623031,
 0.026979212009429272,
 0.029153428386120325)

In [20]:
def get_put_options(symbol, price_ceiling = -1):
    tk = yf.Ticker(symbol)
    exps = tk.options
    e = exps[0] 

    opt = tk.option_chain(e)
    puts = pd.DataFrame(opt.puts)
    
    curr_price = tk.history(period='1d')['Close'][0]

    puts[['bid', 'ask', 'strike']] = puts[['bid', 'ask', 'strike']].apply(pd.to_numeric)
    puts.insert(6, 'mark', (puts['bid'] + puts['ask']) / 2)
    puts.insert(12, 'dist%', ((curr_price - puts['strike'] ) / curr_price) * 100)
    
    puts = puts.drop(columns = ['contractSize', 'currency', 'change', 'percentChange', 'lastTradeDate', 'inTheMoney'])
#     temp = puts.pop('contractSymbol') 
#     puts['contractSymbol'] = temp

    
    moves = get_expected_move(symbol)
    puts['average_move%'] = moves[3] * 100
    puts['IV_move%'] = moves[0] * 100
    puts['straddle_move%'] = moves[1] * 100
    puts['strad_strang_move%'] = moves[2] * 100
    
    puts['expirationDate'] = e
    
    if price_ceiling == -1:
        price_ceiling = curr_price
    
    index_names = puts[(puts['strike'] > price_ceiling)].index
    puts = puts.drop(index_names)    
    
    zscore = (puts['dist%'] / puts['average_move%']) 
    puts.insert(6, '%probability_not_called', (halfnorm.cdf(zscore) * 100).round(4))
    puts.insert(7, '%gain_of_collateral', (puts['mark'] * 100 / puts['strike']))    
    
    puts.sort_values(by=['strike'], inplace=True, ascending=False)
    puts.reset_index(drop=True, inplace=True)
    return puts

val = get_put_options("DOCU")
val

Unnamed: 0,contractSymbol,strike,lastPrice,bid,ask,mark,%probability_not_called,%gain_of_collateral,volume,openInterest,impliedVolatility,dist%,average_move%,IV_move%,straddle_move%,strad_strang_move%,expirationDate
0,DOCU220610P00087000,87.0,6.32,6.0,6.4,6.2,2.3848,7.126437,261.0,57,2.480961,0.412089,13.78525,12.975719,14.59478,14.242788,2022-06-10
1,DOCU220610P00086000,86.0,5.7,5.5,6.0,5.75,8.9914,6.686047,106.0,160,2.50684,1.556777,13.78525,12.975719,14.59478,14.242788,2022-06-10
2,DOCU220610P00085000,85.0,5.45,5.15,5.45,5.3,15.5365,6.235294,1802.0,358,2.522953,2.701466,13.78525,12.975719,14.59478,14.242788,2022-06-10
3,DOCU220610P00084000,84.0,4.88,4.8,5.0,4.9,21.9759,5.833333,158.0,484,2.549808,3.846155,13.78525,12.975719,14.59478,14.242788,2022-06-10
4,DOCU220610P00083000,83.0,4.45,4.35,4.6,4.475,28.268,5.391566,907.0,256,2.556644,4.990843,13.78525,12.975719,14.59478,14.242788,2022-06-10
5,DOCU220610P00082000,82.0,4.06,3.9,4.2,4.05,34.3738,4.939024,365.0,238,2.553715,6.135532,13.78525,12.975719,14.59478,14.242788,2022-06-10
6,DOCU220610P00081500,81.5,3.88,3.75,3.9,3.825,37.3457,4.693252,136.0,138,2.542972,6.707876,13.78525,12.975719,14.59478,14.242788,2022-06-10
7,DOCU220610P00081000,81.0,3.65,3.5,3.8,3.65,40.2582,4.506173,1059.0,695,2.550785,7.28022,13.78525,12.975719,14.59478,14.242788,2022-06-10
8,DOCU220610P00080000,80.0,3.31,3.25,3.45,3.35,45.8902,4.1875,3601.0,667,2.581058,8.424909,13.78525,12.975719,14.59478,14.242788,2022-06-10
9,DOCU220610P00079000,79.0,2.89,2.87,3.15,3.01,51.2438,3.810127,647.0,245,2.583988,9.569598,13.78525,12.975719,14.59478,14.242788,2022-06-10


In [19]:
def get_all_put_options(symbol, price_ceiling = -1):
    tk = yf.Ticker(symbol)
    exps = tk.options
    e = exps[0] 

    opt = tk.option_chain(e)
    puts = pd.DataFrame(opt.puts)
    
    curr_price = tk.history(period='1d')['Close'][0]

    puts[['bid', 'ask', 'strike']] = puts[['bid', 'ask', 'strike']].apply(pd.to_numeric)
    puts.insert(6, 'mark', (puts['bid'] + puts['ask']) / 2)
    puts.insert(12, 'dist%', ((curr_price - puts['strike'] ) / curr_price) * 100)
    
    puts = puts.drop(columns = ['contractSize', 'currency', 'change', 'percentChange', 'lastTradeDate'])
#     temp = puts.pop('contractSymbol') 
#     puts['contractSymbol'] = temp

    
    moves = get_expected_move(symbol)
    puts['average_move%'] = moves[3] * 100
    puts['IV_move%'] = moves[0] * 100
    puts['straddle_move%'] = moves[1] * 100
    puts['strad_strang_move%'] = moves[2] * 100
    
    puts['expirationDate'] = e
    
    if price_ceiling == -1:
        price_ceiling = curr_price
    
#     index_names = puts[(puts['strike'] > price_ceiling)].index
#     puts = puts.drop(index_names)    
    puts.sort_values(by=['strike'], inplace=True, ascending=False)
    puts.reset_index(drop=True, inplace=True)
    
    zscore = (puts['dist%'] / puts['average_move%']) 
    puts.insert(6, '%probability_not_called', (halfnorm.cdf(zscore) * 100).round(4))
    puts.insert(7, '%gain_of_collateral', (puts['mark'] * 100 / puts['strike']))    
    return puts

# get_all_put_options("AAPL")

In [10]:
# Get Earnings Price Effect
# Returns Pandas DataFrame with Data of Market Close to Next Day Open and Close
def get_earnings_price_effect(symbol, period = "10y"):
    stock = yf.Ticker(symbol)
    hist = stock.history(period)
    earn_hist = si.get_earnings_history(symbol)
    data = {}
    is_AMC = earn_hist[0]['startdatetimetype'] == "AMC" #Default AMC
    for earning in earn_hist: 
        earn_date = earning['startdatetime']
        dt_obj = datetime.strptime(earn_date, '%Y-%m-%dT%H:%M:%S.000Z')
        date = dt_obj.date()
        # If hour is < 9, count as BMO, else count as AMC
        if(dt_obj.hour < 9): 
            is_AMC = False
        else:
            is_AMC = True

        if is_AMC:
            earning_day = hist.loc[(hist.index == str(date))]
            earning_next_day = hist.loc[hist.index == str(date + timedelta(days = 1))]
            if not earning_day.empty and not earning_next_day.empty:
                earning_effect_close_to_next_open = earning_next_day["Open"].iloc[0] - earning_day["Close"].iloc[0]
                earning_effect_close_to_next_close = earning_next_day["Close"].iloc[0] -  earning_day["Close"].iloc[0]

                data[str(date)] = [earning_effect_close_to_next_open, 
                                  earning_effect_close_to_next_open * 100 / earning_day["Close"].iloc[0] ,
                                   earning_effect_close_to_next_close ,
                                  earning_effect_close_to_next_close * 100 / earning_day["Close"].iloc[0]]

        else:
            earning_day = hist.loc[(hist.index == str(date))]
            earning_prev_day = hist.loc[hist.index == str(date - timedelta(days = 1))]
            if not earning_day.empty and not earning_prev_day.empty:
                earning_effect_prev_close_to_open = earning_day["Open"].iloc[0] -  earning_prev_day["Close"].iloc[0]
                earning_effect_prev_close_to_close = earning_day["Close"].iloc[0] -  earning_prev_day["Close"].iloc[0]

                data[str(date)] = [earning_effect_prev_close_to_open,
                                  earning_effect_prev_close_to_open * 100/ earning_prev_day["Close"].iloc[0] ,
                                   earning_effect_prev_close_to_close,
                                  earning_effect_prev_close_to_close * 100/ earning_prev_day["Close"].iloc[0]]

    earn_data = pd.DataFrame.from_dict(data, orient='index',
                columns=['day_close_to_next_day_open', '%change_close_to_next_open',
                        'day_close_to_next_day_close', '%change_close_to_next_close'])
    
    new_row = pd.DataFrame({'day_close_to_next_day_open':earn_data['day_close_to_next_day_open'].abs().mean(), '%change_close_to_next_open':earn_data['%change_close_to_next_open'].abs().mean(), 
                            'day_close_to_next_day_close':earn_data['day_close_to_next_day_close'].abs().mean(), '%change_close_to_next_close':earn_data['%change_close_to_next_close'].abs().mean()}, 
                           index=["Absolute Average"])

    earn_data = pd.concat([earn_data, new_row])
    earn_data.insert(0, 'Symbol', symbol)

    return earn_data

# get_earnings_price_effect("FB")

In [11]:
indexOptions = pd.read_csv('cboesymboldirequityindex.csv')
weeklyOptions = pd.read_csv('cboesymboldirweeklys.csv')

indexOptions.drop(columns = [' DPM Name', ' Post/Station'], inplace=True)
indexOptions.columns = ['name', 'symbol']
weeklyOptions.columns = ['name', 'symbol']
optionSet = set(indexOptions['symbol'])
weeklySet = set(weeklyOptions['symbol'])

In [12]:
def get_companies_with_earnings_today_AMC_or_tomm_BMO(add_days = 0):
    curr_date = datetime.now().date() +  timedelta(add_days)
        
    earn_today = si.get_earnings_for_date(str(curr_date))
    earn_tomm = si.get_earnings_for_date(str(curr_date + timedelta(1)))
    comp_today = pd.DataFrame.from_dict(earn_today)
    comp_tomm = pd.DataFrame.from_dict(earn_tomm)

    index_names = comp_today[(comp_today['startdatetimetype'] != 'AMC')].index
    comp_today = comp_today.drop(index_names)

    index_names = comp_tomm[(comp_tomm['startdatetimetype'] != 'BMO')].index
    comp_tomm = comp_tomm.drop(index_names)

    comp = pd.DataFrame()
    comp = pd.concat([comp_today, comp_tomm])
    comp = comp.drop(columns = ['gmtOffsetMilliSeconds', 'quoteType', 'epsactual', 'epssurprisepct'])
    comp = comp.drop_duplicates()
    comp.reset_index(drop=True, inplace=True)

    comp.insert(2, "stock_price", 0)
    comp.insert(3, "is_weekly", False)
    comp['volume'] = 0
    for row in comp.iterrows():
        ticker = row[1].ticker
        if ticker not in optionSet:
            continue

        tk = yf.Ticker(ticker)
        exps = tk.options        
        if not exps: 
            continue

        e = exps[0]
        opt = tk.option_chain(e)
        volumeSum = opt.calls.volume.sum() + opt.puts.volume.sum()
        
        if volumeSum < 2000:
            continue
            
        openInterestSum = opt.calls.openInterest.sum() + opt.puts.openInterest.sum()

        rowVal = comp.index[comp['ticker'] == ticker]
        comp.loc[rowVal, 'stock_price'] = round(tk.history(period='1d')['Close'][0] , 2)
        comp.loc[rowVal, 'volume'] = volumeSum
        comp.loc[rowVal, 'openInterest'] = openInterestSum
        comp.loc[rowVal, 'is_weekly'] = (ticker in weeklySet)
        
#         move = get_expected_move(ticker)
#         comp.loc[rowVal, 'IV_expected_move%'] = move[0] * 100
#         comp.loc[rowVal, 'straddle_expected_move%'] = move[1] * 100        
#         comp.loc[rowVal, 'straddle_strangle_expected_move%'] = move[2] * 100        

    index_names = comp[(comp['volume'] == 0) | (comp['stock_price'] <= 5)].index
    comp = comp.drop(index_names)

    comp.sort_values(by=['volume'], inplace=True, ascending=False)
    comp.reset_index(drop=True, inplace=True)

    return comp

today_comp = get_companies_with_earnings_today_AMC_or_tomm_BMO(0)
today_comp.to_html("today_companies.html")

In [13]:
def get_IV_crush_for_puts():
    dir = os.path.join(str(date.today().year), str(date.today() + timedelta(-1)))

    if not os.path.exists(dir):
        print("Directory does not exist for IV Crush")
        return
    
    num_files = len([name for name in os.listdir(dir) if os.path.isfile(os.path.join(dir, name))])

    for x in range(num_files - 1):
        list_companies = pd.read_csv(os.path.join(dir, "companies.csv"))
        tickers = list_companies[['ticker', 'stock_price']]

        company = pd.read_csv(os.path.join(dir, tickers['ticker'][x] + '.csv'))
        df = get_put_options(tickers['ticker'][x], tickers['stock_price'][x])[['strike','impliedVolatility']]
        # Calculuate IV for ATM and feasible OTM options
        ls = company[company['probability_not_called%'] != 100]['impliedVolatility']
        company['IV_crush'] = (df['impliedVolatility'] - ls)/ ls
        company['nextday_impliedVolatility'] = df['impliedVolatility']
        avg = company['IV_crush'].mean()
        company.loc['Average'] = {'IV_crush': avg}
        
        company.to_csv(os.path.join(dir, tickers['ticker'][x] + '.csv'))
        
# get_IV_crush_for_puts()

In [14]:
def check_today(duration = "10y"):
    today_comp = get_companies_with_earnings_today_AMC_or_tomm_BMO(0)
    total = pd.DataFrame()
    price = pd.DataFrame()
    outdir = os.path.join(str(date.today().year), str(date.today()))

    for row in today_comp.iterrows():
        ticker = row[1].ticker
        temp = get_put_options(ticker)

        if not os.path.exists(outdir):
            os.mkdir(outdir)

        fullname = os.path.join(outdir, ticker + ".csv")    
        temp.to_csv(fullname)
        
        today_path = os.path.join(outdir, "companies.csv")    
        today_comp.to_csv(today_path)

        total = pd.concat([total, temp], ignore_index=True)
        total.loc["Empty"] = '' 
        total.loc["Space"] = ''        

        price_effect = get_earnings_price_effect(ticker, duration)
        price_effect.loc["Empty"] = '' 
        price_effect.loc["Space"] = '' 
        price = pd.concat([price, price_effect])   
        
    return (total, price)

In [21]:
today_comp.to_html("today_companies.html")
df = check_today("3y")
df[0].to_html('today_options.html')
df[1].to_html('today_price_effect.html')
get_IV_crush_for_puts()
print("Done")

Done
