In [33]:
import pandas as pd
import tushare as ts
import matplotlib.pyplot as plt
from tqdm import tqdm
from scipy.interpolate import interp1d
import numpy as np
from datetime import datetime

# 计算IVIX

In [34]:
token = 'f558cbc6b24ed78c2104e209a8a8986b33ec66b7c55bcfa2f46bc108'
pro = ts.pro_api(token)
# 所有期权合约信息
opt_basic = pro.opt_basic(exchange="SSE")

## 各种计算

In [35]:
# 获取无风险利率，用SHIBOR
def get_rf(trade_date):
    shibor = pro.shibor(start_date=trade_date, end_date=trade_date)
    rf = pd.DataFrame()
    rf['tau'] = [1,7,14,30,180,270,365]
    rf['rate'] = shibor.iloc[0,1:].reset_index(drop=True)
    interp = interp1d(rf['tau'],rf['rate'],kind="linear",fill_value="extrapolate")
    return interp

In [10]:
# 获取期权数据，并分成近月和次近月两份
def opt_data(trade_date, opt_code):
    opt_price = pro.opt_daily(trade_date=trade_date,exchange="SSE")
    opt_price_temp = opt_price[opt_price['exchange'] == 'SSE'].reset_index(drop=True)
    # OP510300.SH是50ETF期权，OP510050.SH是50ETF期权
    opt_basic_temp = opt_basic[(opt_basic['ts_code'].isin(opt_price_temp['ts_code']))&(opt_basic['opt_code']==opt_code)][
        ['ts_code','call_put','maturity_date','exercise_price']
    ].reset_index(drop=True)
    
    opt_price = pd.merge(opt_price_temp,opt_basic_temp,how='inner')
    opt_price['tau'] = (pd.to_datetime(opt_price['maturity_date']) - pd.to_datetime(opt_price['trade_date'])).apply(lambda x:x.days)
    opt_price = opt_price.sort_values(by=['tau','exercise_price']).reset_index(drop=True)
    opt_price = opt_price[opt_price['tau']>7].reset_index(drop=True)
    opt_near = opt_price[opt_price['tau'] == opt_price['tau'].drop_duplicates().nsmallest(2).tolist()[0] ].reset_index(drop=True)
    opt_2near = opt_price[opt_price['tau'] == opt_price['tau'].drop_duplicates().nsmallest(2).tolist()[1] ].reset_index(drop=True)
    return opt_near,opt_2near

In [11]:
# 根据CBOE公式计算sigma
def cal_sigma(opt_df):
    opt_df_call = opt_df[opt_df['call_put'] == "C"].sort_values(by=['exercise_price']).reset_index(drop=True)
    opt_df_put = opt_df[opt_df['call_put'] == "P"].sort_values(by=['exercise_price']).reset_index(drop=True)
    
    opt_df_call = opt_df_call[['trade_date','call_put','tau','exercise_price','close']].drop_duplicates(['tau','exercise_price'],keep='first').rename(columns={'close':'call_close'})
    opt_df_put = opt_df_put[['trade_date','call_put','tau','exercise_price','close']].drop_duplicates(['tau','exercise_price'],keep='first').rename(columns={'close':'put_close'})
    
    opt_df_merge = pd.merge(opt_df_call,opt_df_put,on=['tau','exercise_price'])
    opt_df_merge['diff_close'] = abs(opt_df_merge['call_close'] - opt_df_merge['put_close'])
    S = opt_df_merge[opt_df_merge['diff_close'] == opt_df_merge['diff_close'].min()]['exercise_price'].iloc[0]
    
    tau = opt_df_merge['tau'].unique().tolist()[0]
    opt_df_merge['R'] = interp(tau)/100
    opt_df_merge['F'] = S + np.exp( opt_df_merge['R'] * (tau/365) )*(opt_df_merge[opt_df_merge['exercise_price'] == S]['call_close'] - opt_df_merge[opt_df_merge['exercise_price'] == S]['put_close']).iloc[0]#.mean()
    opt_df_merge['F - Ki'] = opt_df_merge['F'] - opt_df_merge['exercise_price']
    
    try:
        opt_df_merge['K0'] = opt_df_merge[ opt_df_merge['F - Ki'] ==  opt_df_merge['F - Ki'][opt_df_merge['F - Ki']>0].nsmallest(1).iloc[0]]['exercise_price'].iloc[0]
    except Exception:
        opt_df_merge['K0'] = opt_df_merge[ opt_df_merge['F - Ki'] ==  opt_df_merge['F - Ki'].nsmallest(1).iloc[0]]['exercise_price'].iloc[0]

    opt_df_merge['PK'] = opt_df_merge.apply(lambda row: row['call_close'] if row['exercise_price']>row['K0'] else (row['put_close'] if row['exercise_price']<row['K0'] else (row['call_close']+row['put_close'])/2 ),axis=1)
    opt_df_merge['K_i+1'] = opt_df_merge['exercise_price'].shift(-1).fillna(opt_df_merge['exercise_price'].iloc[-1])
    opt_df_merge['K_i-1'] = opt_df_merge['exercise_price'].shift(1).fillna(opt_df_merge['exercise_price'].iloc[0])
    opt_df_merge['delta_K'] = (opt_df_merge['K_i+1'] - opt_df_merge['K_i-1'])/2
    opt_df_merge = opt_df_merge.dropna().reset_index(drop=True)
    
    opt_df_merge['sigma'] = 2/(opt_df_merge['tau']/365)*(opt_df_merge['delta_K']/opt_df_merge['exercise_price']**2)*np.exp(opt_df_merge['R']*opt_df_merge['tau']/365)*opt_df_merge['PK']
    sigma = opt_df_merge['sigma'].sum() - (1/(opt_df_merge['tau']/365)*((opt_df_merge['F']/opt_df_merge['K0']-1)**2)).iloc[0]
    return sigma

In [12]:
# 对近月波动率和次近月波动率插值并年化得到IVIX
def cal_ivix(opt_df1,opt_df2):
    tau1 = opt_df1['tau'].unique().tolist()[0]/365
    tau2 = opt_df2['tau'].unique().tolist()[0]/365
    if tau1 >=30:
        ivix = cal_sigma(opt_df1)*100
    else:
        ivix = np.sqrt((tau1*cal_sigma(opt_df1)*((tau2 - 30/365)/(tau2 - tau1)) + \
                tau2*cal_sigma(opt_df2)*((30/365 - tau1)/(tau2 - tau1))) * 365/30)
    return ivix

In [13]:
# 封装循环计算
def gen_date_list(first_date, last_date):
    date = pro.query('trade_cal', start_date='19991230', end_date=datetime.today().strftime('%Y%m%d'))
    date_list = date[date['is_open']==1].reset_index(drop=True)
    date_list = date_list[(date_list['cal_date']>first_date)&(date_list['cal_date']<last_date)].reset_index(drop=True)
    return date_list

In [14]:
# 封装成函数
def gen_ivix_list(first_date, last_date, opt_code):
    date_list = gen_date_list(first_date, last_date)
    ivix_list = []
    for trade_date in tqdm(date_list['cal_date'].tolist()):
        interp = get_rf(trade_date)
        opt_near,opt_2near = opt_data(trade_date, opt_code)   
        ivix = cal_ivix(opt_near,opt_2near)
        dic = {
            'trade_date':trade_date,
            'ivix':ivix
        }
        ivix_list.append(dic)
    return ivix_list

In [20]:
ivix_list = gen_ivix_list('20231030', '20241031', 'OP510300.SH')

100%|██████████| 242/242 [02:12<00:00,  1.83it/s]


In [45]:
# 通过pickle保存ivix_list
import pickle
with open('ivix_list.pkl','wb') as f:
    pickle.dump(ivix_list,f)

In [2]:
# 读取pickle文件
import pickle
with open('ivix_list.pkl','rb') as f:
    ivix_list = pickle.load(f)

## 与真实IVIX进行对比

In [None]:
my_ivix = pd.DataFrame(ivix_list)
my_ivix['date'] = pd.to_datetime(my_ivix['trade_date'])
my_ivix['ivix'] = my_ivix['ivix']*100
# save_dir = 'saved_data'
# if not os.path.exists(save_dir):
#     os.makedirs(save_dir)
# my_ivix.to_csv("./saved_data/my_ivix_new3.csv",index=False,encoding="utf_8_sig")

In [15]:
def get_bar_data(first_date, last_date, ts_code):
    daily_bar_data = pro.fund_daily(
        start_date=first_date,
        end_date=last_date,
        ts_code=ts_code, 
        fields='ts_code,trade_date,close,vol'
    )
    return daily_bar_data

In [36]:
last_date = datetime.today().strftime('%Y%m%d')
first_date = (datetime.today() - pd.DateOffset(years=1)).strftime('%Y%m%d')
tscode = '510300.SH'

bar_list = get_bar_data(first_date, last_date, tscode)
ivix_list = gen_ivix_list(first_date, last_date, "OP"+tscode)

  0%|          | 0/241 [00:06<?, ?it/s]


NameError: name 'interp' is not defined

In [28]:
my_ivix = pd.DataFrame(ivix_list)
my_ivix['date'] = pd.to_datetime(my_ivix['trade_date'])
my_ivix['ivix'] = my_ivix['ivix']*100
my_bar = bar_list[['trade_date','close']]
my_bar['date'] = pd.to_datetime(my_bar['trade_date'])
# save_dir = 'saved_data'
# if not os.path.exists(save_dir):
#     os.makedirs(save_dir)
# my_ivix.to_csv("./saved_data/my_ivix_new3.csv",index=False,encoding="utf_8_sig")



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



In [29]:
# 通过pickle保存ivix_list
import pickle
with open('my_ivix.pkl','wb') as f:
    pickle.dump(my_ivix,f)

with open('my_bar.pkl','wb') as f:
    pickle.dump(my_bar,f)

In [30]:
# 读取pickle文件
import pickle
with open('my_ivix.pkl','rb') as f:
    my_ivix = pickle.load(f)

with open('my_bar.pkl','rb') as f:
    my_bar = pickle.load(f)

In [32]:
import plotly.express as px
import plotly.graph_objects as go

my_ivix['year'] = my_ivix['date'].apply(lambda x:x.year)
fig = px.line(
    my_ivix, 
    x='date', 
    y='ivix', 
    title='iVIX Interactive Plot'
)
fig.add_trace(
    go.Scatter(
        x=my_bar['date'],
        y=my_bar['close'],
        mode='lines',
        name='Close',
        yaxis='y2'
    )
)

fig.update_layout(
    yaxis2=dict(
        title='Close',
        overlaying='y',
        side='right'
    )
)

# 添加 value=30 的横线
fig.add_shape(
    type='line',
    x0=my_ivix['date'].min(),
    y0=30,
    x1=my_ivix['date'].max(),
    y1=30,
    line=dict(color='Red', dash='dash'),
)

# 添加 value=20 的横线
fig.add_shape(
    type='line',
    x0=my_ivix['date'].min(),
    y0=20,
    x1=my_ivix['date'].max(),
    y1=20,
    line=dict(color='Blue', dash='dash'),
)

# 显示图表
fig.show()

----------------------
- 测算ivix历史数据，获得25，50，75分位数