In [1]:
import pandas as pd
import numpy as np
from datetime import datetime 
import warnings
warnings.filterwarnings("ignore")        
import datetime
import BS
import Config
import matplotlib.pyplot as plt
from pymongo import MongoClient
from scipy.optimize import bisect
from scipy import interpolate
import time
import math
pd.set_option('display.max_columns', None)
client = MongoClient(Config.DB_Hostname, Config.DB_Port)

MUST_INCLUDE=["NIFTY","BANKNIFTY"]

def IVOL(C,S,T,K,opt_type):
    
    def Calc_Call(sig):
        return BS.bs_call(S,K,T/252,0.07,sig)-C
    
    def Calc_Put(sig):
        return BS.bs_put(S,K,T/252,0.07,sig)-C
    
    sigma = 0.5
    if opt_type=="CE":
        
        while BS.bs_call(S,K,T/252,0.07,sigma)>C:
            sigma /= 2

        while BS.bs_call(S,K,T/252,0.07,sigma)<C:
            sigma *= 2

        hi = sigma
        lo = hi/2
        return bisect(Calc_Call,lo,hi)    
    
    elif opt_type=="PE":
        
        while BS.bs_put(S,K,T/252,0.07,sigma)>C:
            sigma /= 2

        while BS.bs_put(S,K,T/252,0.07,sigma)<C:
            sigma *= 2

        hi = sigma
        lo = hi/2
        try:
            return bisect(Calc_Put,lo,hi)
        except:
            return np.nan
    
    else:
        
        return 0


def Vol_Surf(df, hdays=[]):
    
    try:
        bhavcopy=df.copy()
        bhavcopy.EXPIRY_DT=pd.to_datetime(bhavcopy.EXPIRY_DT)
        bhavcopy.TIMESTAMP=pd.to_datetime(bhavcopy.TIMESTAMP)
        bhavcopy=bhavcopy[bhavcopy.SYMBOL.isin(MUST_INCLUDE)]
        TIMESTAMP=bhavcopy.TIMESTAMP.to_list()[0]
        bhavcopy["Time_To_Expiry"]=bhavcopy.EXPIRY_DT.apply(lambda x: np.busday_count(TIMESTAMP.date(), x.date(), holidays=hdays))
        bhavcopy=bhavcopy[bhavcopy.Time_To_Expiry!=0]
        bhavcopy=bhavcopy[bhavcopy.OPEN!=0]
        bhavcopy=bhavcopy[bhavcopy.CLOSE!=0]
        bhavcopy["closest_opt_expiry"]=bhavcopy.SYMBOL.map(bhavcopy[bhavcopy.OPTION_TYP!="XX"].sort_values("Time_To_Expiry").groupby("SYMBOL").head(1).set_index('SYMBOL').Time_To_Expiry.to_dict())
        bhavcopy["closest_fut_expiry"]=bhavcopy.SYMBOL.map(bhavcopy[bhavcopy.OPTION_TYP=="XX"].sort_values("Time_To_Expiry").groupby("SYMBOL").head(1).set_index('SYMBOL').Time_To_Expiry.to_dict())
        bhavcopy=pd.concat([bhavcopy[(bhavcopy.OPTION_TYP!="XX")&(bhavcopy.Time_To_Expiry==bhavcopy.closest_opt_expiry)],
                   bhavcopy[(bhavcopy.OPTION_TYP=="XX")&(bhavcopy.Time_To_Expiry==bhavcopy.closest_fut_expiry)]])
        bhavcopy["Fut_Close_Ref"]=bhavcopy.SYMBOL.map(bhavcopy[bhavcopy.OPTION_TYP=="XX"].groupby('SYMBOL').min().CLOSE.to_dict())
        bhavcopy["Close_Strike_Diff"]=np.where(bhavcopy.OPTION_TYP=="XX",0,bhavcopy.Fut_Close_Ref - bhavcopy.STRIKE_PR)
        bhavcopy["UNIQUE"]=bhavcopy.SYMBOL+bhavcopy.EXPIRY_DT.astype(str)+bhavcopy.STRIKE_PR.astype(str)+bhavcopy.OPTION_TYP.astype(str)
        bhavcopy=bhavcopy[bhavcopy.UNIQUE.isin(np.concatenate([bhavcopy[(bhavcopy.OPTION_TYP=="CE")&(bhavcopy.Close_Strike_Diff<=0)].sort_values('Close_Strike_Diff', ascending=False).groupby(['SYMBOL','EXPIRY_DT']).head(10).UNIQUE,
        bhavcopy[(bhavcopy.OPTION_TYP=="PE")&(bhavcopy.Close_Strike_Diff>=0)].sort_values('Close_Strike_Diff').groupby(['SYMBOL','EXPIRY_DT']).head(10).UNIQUE,bhavcopy[bhavcopy.OPTION_TYP=="XX"].UNIQUE]))]
        bhavcopy["IVOL"]=bhavcopy[['CLOSE','Fut_Close_Ref','Time_To_Expiry','STRIKE_PR','OPTION_TYP']].apply(
        lambda x : IVOL(x.CLOSE,x.Fut_Close_Ref,x.Time_To_Expiry,x.STRIKE_PR,x.OPTION_TYP),axis=1)

        symbols=bhavcopy.SYMBOL.unique()
        summary=pd.DataFrame(index=symbols,columns=["curv_tan","expected_return","expected_variance","expected_skew",
                                                   "expected_kurt","strike_conc_1st_moment","strike_conc_2nd_moment",
                                                   "strike_conc_3rd_moment","strike_conc_4th_moment","vol_neg5","vol_neg4",
                                                   "vol_neg3","vol_neg2","vol_neg1","vol_0","vol_pos1","vol_pos2","vol_pos3",
                                                   "vol_pos4","vol_pos5"])

        curv_tan=[]
        expected_return=[]
        expected_variance=[]
        expected_skew=[]
        expected_kurt=[]
        vol_surf=[]
        strike_conc_1st_moment=[]
        strike_conc_2nd_moment=[]
        strike_conc_3rd_moment=[]
        strike_conc_4th_moment=[]

        for i in range(11):
            vol_surf.append([])

        for sym in symbols:
            
            bhav_sym=bhavcopy[(bhavcopy.SYMBOL==sym)&(bhavcopy.OPTION_TYP!="XX")]
            if sym == "NIFTY":
                bhav_sym.to_csv('bhav_sym.csv')
                
            T=bhav_sym.Time_To_Expiry.unique()[0]
            fut_ref=bhav_sym.Fut_Close_Ref.unique()[0]
            strike_points=[]
            ivols=[]
            fitted_ivols=np.array([])
            fitted_strike=np.array([])
            x_points=[]
            y_points=[]
            pdf=[]
            temp_s=False
            strike_increment=0.1
            strike_lower_bound=-5.1
            strike_upper_bound=5.2
            for s,iv in sorted(zip(bhav_sym.Close_Strike_Diff,bhav_sym.IVOL)):

                if temp_s==True and abs(previous_s-s)<1e-5:
                    ivols[-1]=(ivols[-1]+iv)/2
                    continue
                if temp_s==False:
                    temp_s=True
                previous_s=s
                strike_points.append(-s)
                ivols.append(iv)

            for s, iv in sorted(zip(strike_points,ivols)):
                x_points.append(s)
                y_points.append(iv)

            tck = interpolate.splrep(x_points, y_points)
            fitted_strike=np.arange(strike_lower_bound,strike_upper_bound,strike_increment)
            fitted_ivols=[max(interpolate.splev(s*fut_ref/100, tck),0.01) for s in fitted_strike]
            curv_tan.append((fitted_ivols[51]-fitted_ivols[49])/((fitted_strike[51]-fitted_strike[49])/100))                
            pdf=[BS.bs_call(fut_ref,(1+(k/100))*fut_ref,T/252,0.07,iv) for k,iv in zip(fitted_strike,fitted_ivols)]
            pdf=[max((pdf[i+1]-2*pdf[i]+pdf[i-1])/((strike_increment*fut_ref/100)**2),0) for i in range(1,len(pdf)-1)]
            
            try:
                pdf=[p/sum(pdf) for p in pdf]
            except Exception as e:
                try:
                    print(e,"using default pdf")
                    pdf=[2/(len(pdf)-1) if i > (len(pdf)-1)/2 else 0 for i,p in enumerate(pdf)]
                except Exception as e:
                    print(e,"Check PDF")
                    pdf=[1/len(pdf) for p in pdf]
                
            fitted_ivols=[float(fitted_ivols[i]) for i in range(1,len(fitted_ivols)-1)]
            fitted_strike=[round(float(fitted_strike[i]),1) for i in range(1,len(fitted_strike)-1)]

            ret=sum([pdf[i]*fitted_strike[i] for i in range(len(fitted_strike))])
            vol=((sum([pdf[i]*((ret-fitted_strike[i])**2)for i in range(len(fitted_strike))]))**0.5)/100
            skew=(sum([pdf[i]*((ret-fitted_strike[i])**3)for i in range(len(fitted_strike))]))

            if skew > 0:
                skew = math.pow(skew, float(1)/3)/100
            elif skew < 0:
                skew = -math.pow(abs(skew), float(1)/3)/100
            else:
                skew = 0

            kurt=((sum([pdf[i]*((ret-fitted_strike[i])**4)for i in range(len(fitted_strike))]))**0.25)/100
            expected_return.append(ret*2.52/T)
            expected_variance.append(vol*((252/T)**0.5))
            expected_skew.append(skew*((252/T)**(1/3)))
            expected_kurt.append(kurt*((252/T)**0.25))

            for i,n in enumerate(np.arange(-5,5.1,1)):
                for j,k in enumerate(fitted_strike):
                    if abs(k-n)<1e-8:
                        vol_surf[i].append(fitted_ivols[j])

            strike_conc_1st_moment.append(sum([s*iv/100 for s,iv in zip(fitted_strike,fitted_ivols)])/sum(fitted_ivols))
            strike_conc_2nd_moment.append((sum([((s/100)**2)*iv for s,iv in zip(fitted_strike,fitted_ivols)])/sum(fitted_ivols))**0.5)
            skew=sum([((s/100)**3)*iv for s,iv in zip(fitted_strike,fitted_ivols)])/sum(fitted_ivols)
            if skew > 0:
                skew = math.pow(skew, float(1)/3)
            elif skew < 0:
                skew = -math.pow(abs(skew), float(1)/3)
            else:
                skew = 0
            strike_conc_3rd_moment.append(skew)
            strike_conc_4th_moment.append(np.power(sum([((s/100)**4)*iv for s,iv in zip(fitted_strike,fitted_ivols)])/sum(fitted_ivols),0.25))

        summary.curv_tan=curv_tan
        summary.expected_return=expected_return
        summary.expected_variance=expected_variance
        summary.expected_skew=expected_skew
        summary.expected_kurt=expected_kurt
        summary.strike_conc_1st_moment=strike_conc_1st_moment
        summary.strike_conc_2nd_moment=strike_conc_2nd_moment
        summary.strike_conc_3rd_moment=strike_conc_3rd_moment
        summary.strike_conc_4th_moment=strike_conc_4th_moment

        summary["vol_neg5"]=vol_surf[0]
        summary["vol_neg4"]=vol_surf[1]
        summary["vol_neg3"]=vol_surf[2]
        summary["vol_neg2"]=vol_surf[3]
        summary["vol_neg1"]=vol_surf[4]
        summary["vol_0"]=vol_surf[5]
        summary["vol_pos1"]=vol_surf[6]
        summary["vol_pos2"]=vol_surf[7]
        summary["vol_pos3"]=vol_surf[8]
        summary["vol_pos4"]=vol_surf[9]
        summary["vol_pos5"]=vol_surf[10]
        summary["CONTRACTS"]=summary.index.map(df[df.SYMBOL.isin(MUST_INCLUDE)].groupby("SYMBOL").sum().CONTRACTS.to_dict())
        summary["VAL_INLAKH"]=summary.index.map(df[df.SYMBOL.isin(MUST_INCLUDE)].groupby("SYMBOL").sum().VAL_INLAKH.to_dict())
        summary["OPEN_INT"]=summary.index.map(df[df.SYMBOL.isin(MUST_INCLUDE)].groupby("SYMBOL").sum().OPEN_INT.to_dict())
        summary["CHG_IN_OI"]=summary.index.map(df[df.SYMBOL.isin(MUST_INCLUDE)].groupby("SYMBOL").sum().CHG_IN_OI.to_dict())
        summary.reset_index(inplace=True)
        summary["SYMBOL"]=summary["index"]
        return summary.drop(columns=['index'])
    except Exception as e:
        print(e,"Error in generating vol curve")
        return pd.DataFrame()


In [2]:
start_date="21FEB2024"
end_date="26FEB2024"
start_date=datetime.datetime.strptime(start_date, '%d%b%Y')
end_date=datetime.datetime.strptime(end_date, '%d%b%Y')
df=pd.DataFrame(client.Strategy.Days_To_Expiry.find())
dates=df.sort_values('date').date.unique()

holidays=pd.DataFrame(client.Strategy.Holidays.find())
holidays=[d.date() for d in pd.to_datetime(holidays.date)]

bhav=pd.DataFrame(client.Strategy.Bhavcopy_Subset.find())
bhav.drop(columns=['_id'],inplace=True)

for date in dates:
    #if date not in ['2019-08-28','2020-02-01','2021-03-30','2021-05-12','2023-02-13']:
    #    continue
    date_=datetime.datetime.strptime(date, '%Y-%m-%d')    
    if date_<start_date or date_>end_date:
        continue
    
    else:
        
        date_today=datetime.datetime.strftime(date_,"%d-%b-%Y").upper()
        df=bhav[bhav.TIMESTAMP==date_today]
        
        if len(df)==0:
            print(f"Bhavcopy DB empty for {date_today}")
            continue
        
        else:
            
            summary=Vol_Surf(df, hdays=holidays)
            if len(summary)==0:
                print(f"Empty summary file for {date_today}")
            elif len(summary[summary.SYMBOL=="NIFTY"])==0 or len(summary[summary.SYMBOL=="BANKNIFTY"])==0:
                print(f"Issue with {date_today}")
            else:
                summary["date"]=date
                print(client.Strategy.Vol_Surface.delete_many({"date":date}).deleted_count,f" documents deleted for {date_today}")
                print(len(client.Strategy.Vol_Surface.insert_many(summary.to_dict('records')).inserted_ids),f" documents entered for {date_today}\n")
                

0  documents deleted for 21-FEB-2024
2  documents entered for 21-FEB-2024

0  documents deleted for 22-FEB-2024
2  documents entered for 22-FEB-2024

0  documents deleted for 23-FEB-2024
2  documents entered for 23-FEB-2024

0  documents deleted for 26-FEB-2024
2  documents entered for 26-FEB-2024



In [3]:
#bhav=pd.DataFrame(client.Strategy.Bhavcopy_Subset.find())
#bhav.drop(columns=['_id'],inplace=True)
#print(len(client.Strategy.Bhavcopy.insert_many(bhav.to_dict('records')).inserted_ids))


In [None]:
df = pd.read_csv('bhav.csv')