In [None]:
#Relevant imports.

from datetime import datetime
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from scipy import stats
from pandas.tseries.holiday import USFederalHolidayCalendar

from pandas_datareader import data as pdr
import yfinance as yf

%matplotlib inline

In [None]:
class Option(object):
    '''Create object to store option characteristics and methods.
    
    
    This class allows the user to create an Option object with desired parameters, and call a number
    of functions as well as access data for their own use.
    
    INPUTS: assets (list), weights (list), strike price K (int), tenor t in yrs (float),
    risk-free rate r (float), startdate of data collection (str YYYY-MM-DD), enddate (str YYYY-MM-DD).'''
    
    def __init__(self, assets, weights, K, t, flag, rate, startdate, enddate):
        '''Initialize Option object storing methods and information.'''
        
        self.assets = assets
        self.weights = weights
        self.strike = K
        self.tenor = t
        self.flag = flag
        self.rate = rate
        
        assetdata = get_data(assets, startdate, enddate)
        bsdata = black_scholes_pricer(assetdata, weights, K, t, flag, rate)
        self.price_history = bsdata["Option Price"]
        self.price = self.price_history[-1].round(2)
        self.greeks = bsdata["Option Greeks"]
        self.basket_price_history = bsdata["Basket Price"]
        self.basket_price = self.basket_price_history[-1].round(2)
        self.dates = bsdata["Dates"]
        self.corr = bsdata["Correlations"]
        self.basket_vol = bsdata["Volatility"]
        self.data = assetdata
    
    
    def backtest(self, buydate):
        '''Calculate and return profit/loss for option purchase at input date.'''
        
        #Start by getting holiday calendar to avoid exceptions during backtest calculations.
        buydate = np.datetime64(buydate, 'D')
        if not buydate in self.dates:
            raise KeyError("Input buy date outside of Option date range.")
            
        cal = USFederalHolidayCalendar()
        holidaycal = cal.holidays(start=str(np.datetime_as_string(self.dates[0], unit="D")),
                                  end=str(np.datetime_as_string(self.dates[-1], unit="D")))
        
        profits = np.where(self.basket_price_history - self.strike >= 0,
                           self.basket_price_history - self.strike,
                           0)
        prof_dict = dict(zip(self.dates.astype('datetime64[D]'), profits))
        price_dict = dict(zip(self.dates.astype('datetime64[D]'), self.price_history))
        busday_diff = np.busday_count(buydate, buydate + np.timedelta64(int(self.tenor*365), 'D'),
                                      holidays=np.array(holidaycal.to_pydatetime(), dtype='datetime64[D]'))
        try:
            return (prof_dict[np.busday_offset(buydate, busday_diff,
                                              holidays=holidaycal.to_pydatetime().astype('datetime64[D]'),
                                              roll='forward')] - price_dict[buydate]).round(2)
        except KeyError:
            print("Invalid key: " , buydate)
            temp = np.busday_offset(buydate, busday_diff,
                                              holidays=holidaycal.to_pydatetime().astype('datetime64[D]'),
                                              roll='forward')
            return (prof_dict[np.busday_offset(temp, 1,
                                    holidays=holidaycal.to_pydatetime().astype('datetime64[D]'),
                                    roll='forward')] - price_dict[buydate]).round(2)
    
    def plot_backtest(self):
        '''Create graph of backtested profit/loss for relevant dates.'''
        
        arr = []
        offset = np.busday_count(self.dates[-1].astype('datetime64[D]') - np.timedelta64(int(self.tenor*365), 'D'),
                                 self.dates[-1].astype('datetime64[D]'))
        for date in self.dates[:-offset]:
            arr.append(self.backtest(date))
        fig, ax = plt.subplots(1, figsize=(21,7))
        sns.lineplot(self.dates.astype('datetime64')[:-offset], arr)
        ax.set_xlabel("Buy Date")
        ax.set_ylabel("Backtested PnL")
        ax.set_title("Historical PnL of Basket Option")
        return (arr, self.dates[:-offset])
        
    
    def hedge(self):
        '''Return relevant option Greeks to hedge.'''
        
        #Delta calculation will come directly out of black scholes pricer.
        delta_risk = self.greeks["Delta"][-1]
        
        #Vega calculation involves repricing with higher/lower vol and using central difference method.
        upper_vol_shock = black_scholes(self.basket_price_history, self.basket_vol + 0.01,
                                       self.strike, self.tenor, self.flag, self.rate)["Price"][-1]
        lower_vol_shock = black_scholes(self.basket_price_history, self.basket_vol - 0.01,
                                       self.strike, self.tenor, self.flag, self.rate)["Price"][-1]
        vega_risk = ((upper_vol_shock - lower_vol_shock)/(2*0.01))
        
        #Correlation risk calculation comes from repricing using higher/lower avg correlation.
        #We recalculate the porfolio vol here rather than recycling Option method.
        #See in future if this can be adjusted.
        adj_vol = [np.std(self.data[col].pct_change()*np.sqrt(252)) for col in self.data.columns]
        qprime = np.multiply(np.eye(len(self.weights)), adj_vol)
        upper_corr_adj = 0.01*np.ones((len(self.weights), len(self.weights))) - 0.01*np.eye(len(self.weights))
        lower_corr_adj = -0.01*np.ones((len(self.weights), len(self.weights))) + 0.01*np.eye(len(self.weights))
        upper_covar = np.matmul(np.matmul(qprime, self.corr + upper_corr_adj), qprime)
        lower_covar = np.matmul(np.matmul(qprime, self.corr + lower_corr_adj), qprime)
        upper_corr_shock_vol = np.sqrt(np.matmul(np.matmul(self.weights, upper_covar), np.transpose(self.weights)))
        lower_corr_shock_vol = np.sqrt(np.matmul(np.matmul(self.weights, lower_covar), np.transpose(self.weights)))
        upper_corr_shock = black_scholes(self.basket_price_history, upper_corr_shock_vol,
                                    self.strike, self.tenor, self.flag, self.rate)["Price"][-1]
        lower_corr_shock = black_scholes(self.basket_price_history, lower_corr_shock_vol,
                                    self.strike, self.tenor, self.flag, self.rate)["Price"][-1]
        corr_risk = ((upper_corr_shock - lower_corr_shock)/(2*0.01)) #*self.strike
        
        print("Delta risk: ", delta_risk.round(2))
        print("Vega risk: ", vega_risk.round(2))
        print("Correlation risk: ", corr_risk.round(2))
        
    def most_recent_backtest_date(self):
        '''Return most recent purchase date.'''
        
        cal = USFederalHolidayCalendar()
        holidaycal = cal.holidays(start=str(np.datetime_as_string(self.dates[0], unit="D")),
                                  end=str(np.datetime_as_string(self.dates[-1], unit="D")))
        recent = self.dates[-1].astype('datetime64[D]')
        busday_diff = np.busday_count(recent, recent - np.timedelta64(int(self.tenor*365), 'D'),
                                      holidays=np.array(holidaycal.to_pydatetime(), dtype='datetime64[D]'))
        final = np.busday_offset(recent, busday_diff,
                                        holidays=holidaycal.to_pydatetime().astype('datetime64[D]'),
                                        roll='forward')
        return pd.to_datetime(str(final)).strftime('%Y-%m-%d')
        
        

In [None]:
def get_data(assets, data_start, data_end):
    '''Fetch and return asset data using yfinance open-source module.'''
    
    if assets is None:
        raise ValueError("No basket of securities provided.")
    if type(assets) == str:
        raise TypeError("Basket must be passed in as a list, not string.")
    if data_start is None:
        raise ValueError("No start date for data collection provided.")
    if data_end is None:
        raise ValueError("No end date for data collection provided.")
    
    if datetime.strptime(data_start, "%Y-%m-%d") > datetime.strptime(data_end, "%Y-%m-%d"):
        raise ValueError("End date cannot be before start date.")
        
    
    yf.pdr_override()
    data = {}
    for asset in assets:
        try:
            prices = pdr.get_data_yahoo(asset, start=data_start,
                                        end=data_end)["Close"]
            data[asset] = prices
        except ValueError:
            print("Date format is incorrect: ensure they are in string YYYY-MM-DD format.")
    return pd.DataFrame(data)

In [None]:
def black_scholes_pricer(assetdata, weights, K, t, flag, rate):
    '''Return Black-Scholes price information and associated option information.'''
    
    vols, corr = portfolio_vol(assetdata, weights)
    price = np.sum([np.multiply(weight, assetdata[col]) for (weight, col) in zip(weights, assetdata)], axis=0)
    bs = black_scholes(price, vols, K, t, flag, rate)
    return {"Option Price": bs["Price"], "Option Greeks": bs["Greeks"],
            "Basket Price": price, "Correlations": corr,
            "Volatility": vols, "Dates": assetdata.index.values}

In [None]:
def black_scholes(S, vol, K, t, flag, r):
    ''' Return Black-Scholes price and Greeks for given option parameters.'''
    
    d1 = np.log(S/K + t*(r + 0.5*vol**2))/(vol*np.sqrt(t))
    d2 = d1 - vol*np.sqrt(t)
    price = 0
    greeks = {}
    if flag == 'C':
        greeks['Delta'] = stats.norm.cdf(d1)
        greeks['Gamma'] = stats.norm.pdf(d1)/(S*vol*np.sqrt(t))
        greeks['Vega'] = S*stats.norm.pdf(d1)*np.sqrt(t)
        greeks['Theta'] = -(S*stats.norm.pdf(d1)*vol)/(2*np.sqrt(t)) - r*K*np.exp(-r*t)*stats.norm.cdf(d2)
        price = S*stats.norm.cdf(d1) - K*np.exp(-r*t)*stats.norm.cdf(d2)
        #return {"Price": S*stats.norm.cdf(d1) - K*np.exp(-r*t)*stats.norm.cdf(d2), "Greeks": greeks}
    else:
        greeks['Delta'] = stats.norm.cdf(d1) -1
        greeks['Gamma'] = stats.norm.pdf(d1)/(S*vol*np.sqrt(t))
        greeks['Vega'] = S*stats.norm.pdf(d1)*np.sqrt(t)
        greeks['Theta'] = -(S*stats.norm.pdf(d1)*vol)/(2*np.sqrt(t)) + r*K*np.exp(-r*t)*stats.norm.cdf(-d2)
        price = K*np.exp(-r*t)*stats.norm.cdf(-d2) - S*stats.norm.cdf(-d1)
        #return {"Price": K*np.exp(-r*t)*stats.norm.cdf(-d2) - S*stats.norm.cdf(-d1), "Greeks": greeks}
    return {"Price": price, "Greeks": greeks}

In [None]:
def portfolio_vol(data, weights):
    ''' Return portfolio volatility of input basket.'''
    
    volatility = [np.std(data[col].pct_change()*np.sqrt(252)) for col in data.columns]
    q = np.multiply(np.eye(len(weights)), volatility)
    corrs = data.corr(method='pearson')
    covar = np.matmul(np.matmul(q, corrs), q)
    return (np.sqrt(np.matmul(np.matmul(weights, covar), np.transpose(weights))), corrs)

In [None]:
#MAIN CALLS and TESTING

#Change the basket/weights definition as needed.
test_basket = ['BABA', 'JD', 'PDD', 'NTES', 'BIDU', 'BILI', 'TCOM']
test_weights = [0.2508, 0.1754, 0.1697, 0.1144, 0.1039, 0.093, 0.0928]

#Define the start and end dates of data fetched.
START = "2020-01-01"
END = "2021-08-25"

china_tech = Option(test_basket, test_weights, 120, 1/4, 'C', 0.013, START, END)

#Display option price at end date passed into pricer.
print("Latest option price:")
print(china_tech.price)

#Display option price history.
#sns.lineplot(megacaptech.dates, megacaptech.price_history)

#Display option basket price history.
#sns.lineplot(megacaptech.dates, megacaptech.basket_price_history)

#Display backtested PnL of option purchased at input date.
BACKTEST_DATE = "2020-08-17" #just to demo function. else, input str date YYYY-MM-DD.
print("Profit/loss if purchased on "+ BACKTEST_DATE + ": ")
print(china_tech.backtest(BACKTEST_DATE))

#Display option delta, vega, correlation risks.
print("RISK TO HEDGE:")
china_tech.hedge()

#Plot backtested profit/loss.
print("GRAPHING BACKTEST...")
china_tech_backtest, ctbdates = china_tech.plot_backtest()
print("DONE")

In [None]:
#Secondary function calls on Option object. Adjust these as necessary.
fig, ax = plt.subplots(1, figsize=(15, 5))
print(china_tech.basket_price_history[-1])
sns.lineplot(china_tech.dates, china_tech.basket_price_history, ax = ax)
ax.set_title('China Tech Basket Price History')
ax.set_xlabel('Date')
ax.set_ylabel('Price')

In [None]:
mchi = Option(['MCHI'], [1], 80, 1/4, 'C', 0.013, START, END)
kweb = Option(['KWEB'], [1], 55, 1/4, 'C', 0.013, START, END)
fig2, ax2 = plt.subplots(1, figsize=(15,5))
fig3, ax3 = plt.subplots(1, figsize=(15,5))
ax2.set_title('MCHI price history')
ax3.set_title('KWEB price history')
print(mchi.price, kweb.price)
print(mchi.basket_price_history[-1], kweb.basket_price_history[-1])
sns.lineplot(mchi.dates, mchi.basket_price_history, ax = ax2)
sns.lineplot(kweb.dates, kweb.basket_price_history, ax=ax3)

In [None]:
#Volatilities:
print(china_tech.basket_vol.round(3))
print(kweb.basket_vol.round(3))
print(mchi.basket_vol.round(3))

In [None]:
kweb_backtest, kwbdates = kweb.plot_backtest()

In [None]:
#relative comparisons of KWEB vs basket
f, a = plt.subplots(1, figsize=(21, 7))
sns.lineplot(china_tech.dates, china_tech.basket_price_history/max(china_tech.basket_price_history), ax=a, label='Basket')
sns.lineplot(kweb.dates, kweb.basket_price_history/max(kweb.basket_price_history), ax=a, label='KWEB')
a.set_title("Max-Normalized Basket Price")
a.legend(loc='best')

In [None]:
g, b = plt.subplots(1, figsize=(21, 7))
sns.lineplot(china_tech.dates, china_tech.backl, ax=b, label='Basket')
sns.lineplot(china_tech.dates, pd.Series(china_tech.basket_price_history).ewm(span=5).mean(), ax=b, label='5 Day EWMA')
sns.lineplot(china_tech.dates, pd.Series(china_tech.basket_price_history).ewm(span=20).mean(), ax=b, label='20 Day EWMA')

b.set_title('Basket Option vs EWMA')
b.legend(loc='best')

In [None]:
kweb.hedge()

In [None]:
h, c = plt.subplots(1, figsize=(21,7))
sns.lineplot(ctbdates, np.array(china_tech_backtest)/china_tech.strike, ax=c, label='Basket Option')
sns.lineplot(kwbdates, np.array(kweb_backtest)/kweb.strike, ax=c, label='KWEB Option')
c.set_title('Normalized Historical Profit/Loss')
c.set_xlabel('Buy Date')
c.legend(loc='best')