In [1]:
import datetime
import random
from collections import Counter

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import statsmodels.api as sm
from scipy.stats import expon, kurtosis, skew, uniform, lognorm

plt.style.use('seaborn')


def simulate_log_prices(n, s0, T, m, u, vol, seed=1):
    np.random.seed(seed)
    time_periods = m * T
    delta_t = T / time_periods
    zs = np.random.normal(size=(n, time_periods))
    exp = (u - vol ** 2 / 2) * delta_t + vol * np.sqrt(delta_t) * zs
    prices = np.log(s0) + np.cumsum(exp, axis=1)
    return prices


def simulate_log_prices_shock(n, s0, T, m, u, vol, shock_func, shock_freq, vol_shock, seed=1):
    log_prices = simulate_log_prices(n, s0, T, m, u, vol)
    log_prices_shocked = simulate_log_prices(n, s0, T, m, u, vol_shock)
    time_periods = m * T
    delta_t = T / time_periods

    price_diff = log_prices_shocked - log_prices
    for i in range(n):
        start = int(expon.rvs(scale=shock_freq) / delta_t)
        while start < time_periods:
            shock = shock_func()
            end_index = min(time_periods, start + len(shock))
            log_prices[i, start:end_index] += price_diff[i, start:end_index] + shock[:end_index - start]
            start += int(expon.rvs(scale=shock_freq) / delta_t)

    return log_prices


def constant_shock_symmetric(shock_size, shock_length, m):
    a = -4 * shock_size / shock_length ** 2
    b = shock_size
    x = np.linspace(0, shock_length, int(m * shock_length))
    shock_values = a * np.square(x - shock_length / 2) + shock_size

    def shock():
        return shock_values

    return shock


def plot_prices(prices, title, T, stats, n_paths=100):
    prices_plotted = prices[:n_paths]
    left, width = 0.1, 0.65
    bottom, height = 0.1, 0.65
    spacing = 0.005

    rect_scatter = [left, bottom, width, height]
    rect_histx = [left, bottom + height + spacing, width, 0.2]
    rect_histy = [left + width + spacing, bottom, 0.2, height]

    plt.figure(figsize=(12 / 1.1, 5 / 1.1))

    ax_price_paths = plt.axes(rect_scatter)
    ax_price_paths.tick_params(direction='in', top=True, right=True)
    ax_price_paths.set_ylim(prices_plotted.min(),
                            prices_plotted.max())
    ax_price_paths.set_xlabel('Years', fontsize=13)
    ax_price_paths.set_ylabel('Log Price (S0 = 1)', fontsize=13)
    ax_price_paths.set_xlim(0, T)
    plt.title(title, fontsize=16)

    ax_histy = plt.axes(rect_histy)
    ax_histy.tick_params(direction='in', labelleft=False)
    ax_histy.set_ylim(prices_plotted.min(), prices_plotted.max())
    ax_histy.set_xticks([])

    ax_price_paths.plot(np.linspace(0, T, prices.shape[1]),
                        prices_plotted.T)

    terminal_prices = prices[:, -1]
    ax_histy.hist(terminal_prices, bins=100, orientation='horizontal', rwidth=1.3, edgecolor='C0')

    plt.rc('text', usetex=True)
    text = '\n'.join([
        r'\underline{Terminal Log Prices}', '',
        r'$\mathrm{ann}(\mu)=%.4f$' % (stats[0] / T),
        r'$\mathrm{ann}(\sigma)=%.4f$' % (stats[1] / np.sqrt(T)),
        r'$\mathrm{skew}=%.4f$' % stats[2],
        r'$\mathrm{kurtosis}=%.4f$' % stats[3],
        r'$\mathrm{median}=%.4f$' % stats[4],
        r'$\mathrm{max}=%.4f$' % stats[5],
        r'$\mathrm{min}=%.4f$' % stats[6],
    ])

    props = dict(boxstyle='round', facecolor='wheat', alpha=0.5)
    plt.text(0.985, 0.015, text, fontsize=12,
             transform=ax_histy.transAxes,
             verticalalignment='bottom', bbox=props,
             horizontalalignment='right')
    plt.show()
    plt.clf()


def get_stats(terminal_prices):
    return (
        terminal_prices.mean(),
        terminal_prices.std(),
        skew(terminal_prices),
        kurtosis(terminal_prices),
        np.median(terminal_prices),
        terminal_prices.max(),
        terminal_prices.min()
    )


def find_jumps(price_path, cutoff):
    n = len(price_path)
    running_max = [(price_path[0], 0)]
    running_min = [(price_path[-1], n - 1)]

    for i in range(1, n):
        if price_path[i] > running_max[-1][0]:
            running_max.append((price_path[i], i))
    for i in range(n - 2, -1, -1):
        if price_path[i] < running_min[-1][0]:
            running_min.append((price_path[i], i))
    running_min = running_min[::-1]
    running_min_n = len(running_min)

    drops = []
    min_idx = 0
    for price1, max_i in running_max:
        price2, min_i = running_min[min_idx]
        if max_i == n - 1:
            break
        while max_i >= min_i:
            min_idx += 1
            price2, min_i = running_min[min_idx]

        drop = price1 - price2
        if drop > cutoff:
            drops.append((drop, (max_i, min_i)))

    drops.sort()

    existing_drops = []
    while len(drops) > 0:
        drop = drops.pop()
        start, end = drop[1]
        add = True
        for _, (existing_start, existing_end) in existing_drops:
            if existing_start <= start <= existing_end or existing_start <= end <= existing_end:
                add = False
                break
        if add:
            existing_drops.append(drop)
    return existing_drops


def plot_jumps(price_path, jumps):
    index = pd.date_range(start='01/01/1970', end='01/01/2020', periods=len(price_path))
    plt.plot(index, price_path, color='black')
    for jump, (start, end) in jumps:
        plt.plot(index[start:end], price_path[start:end], color='red')


In [None]:
prices = simulate_log_prices(10 ** 4, 1, 252, 100, 0, 0.15432016021070638)
jumps = [find_jumps(price_path, -np.log(0.65)) for price_path in prices]
long_jumps = [(jump, i) for i, jump in enumerate(jumps) if len(jump) > 2]

i = 1
fig = plt.figure(figsize=(10, 7))
for jumps_, idx in long_jumps[:4]:
    plt.subplot(2, 2, i)
    plot_jumps(prices[idx], jumps_)
    i += 1
fig.suptitle('Log Price with Significant Drops')
fig.supxlabel('Date')
fig.supylabel('Log Price')
plt.show()

n_jumps = [len(x) for x in jumps]
n_jumps_table = pd.Series(Counter(n_jumps)).rename('Percentage').sort_index()
n_jumps_table /= n_jumps_table.sum()
n_jumps_table.index.name = 'Number of Drops'
print(pd.DataFrame(n_jumps_table).reset_index().to_latex(index=False))

In [None]:
n = 10 ** 5

vol = 0.15
T = 5
m = 12
u = r = 0.07
S0 = 1
bins = 200

shock_length = 1
shock = -1
shock_parabola = constant_shock_symmetric(shock, shock_length, m)()
price_path = simulate_log_prices(1, S0, T, m, u, vol)[0]

shocked_path = np.copy(price_path)
start_idx = 23
shocked_path[start_idx:start_idx + len(shock_parabola)] += shock_parabola

x = np.linspace(0, T, m * T)

fig, axs = plt.subplots(1, 3, figsize=(9, 3), sharey='all')
axs[0].plot(np.linspace(0, shock_length, m), shock_parabola)
axs[0].set_title('Shock Parabola')
axs[1].plot(x, price_path)
axs[1].set_title('Log Price Path')
axs[2].plot(x, shocked_path)
axs[2].set_title('Price Path with Shock Parabola')

In [None]:
shocks = np.array([0.3, 0.5, 0.7])
log_shocks = -np.log(1 - shocks)
percents = [f'{shock * 100}%' for shock in shocks]
print(pd.DataFrame({'Price Shock': shocks, 'Price Drop (%)': percents, 'Log Price Shock': log_shocks}).to_latex(
    index=False))

In [None]:
n = 10 ** 5

vol = 0.15
T = 5
m = 12
u = r = 0.07
S0 = 1
bins = 200

vanilla_prices = simulate_log_prices(n, S0, T, m, u, vol)
terminal_prices = vanilla_prices[:, -1]
stats = get_stats(terminal_prices)
plot_prices(vanilla_prices, 'Vanilla Log Prices', T, stats, bins)

In [None]:
n = 10 ** 5

vol = 0.15
T = 5
m = 12
u = r = 0.07
S0 = 1
bins = 200

shocks = np.log(1 - np.array([0.3, 0.5, 0.7]))
shock_freq = 3
shock_length = 1

for shock in shocks:
    shock_func = constant_shock_symmetric(shock, shock_length, m)
    prices = simulate_log_prices_shock(n, S0, T, m, u, vol, shock_func, shock_freq, vol)
    terminal_prices = prices[:, -1]
    stats = get_stats(terminal_prices)
    shock_ = np.round(1 - np.exp(shock), 2)
    plot_prices(prices, f'Log Prices (shock={shock_}, length={shock_length}, frequency={shock_freq})', T, stats, bins)

In [None]:
sp500 = pd.read_csv('volatility_SP500_index_110_day_S0237_series.csv', index_col='Date', parse_dates=True)[['S&P 500']]


def fix_date(x):
    year = x.year
    if x.year > 2022:
        year = x.year - 100
    return datetime.date(year, x.month, x.day)


sp500.index = pd.to_datetime(pd.Series(sp500.index).apply(fix_date))
sp500.sort_index(inplace=True)
sp500['log_returns'] = np.log(sp500['S&P 500']) - np.log(sp500['S&P 500'].shift(1))
sp500['log_return_annual'] = 252 * sp500['log_returns']
sp500['log_vol'] = sp500['log_returns'].rolling('100D').std(ddof=0) * np.sqrt(252)

sp500 = sp500.dropna()

cutoff_quantile = 0.6

cutoff = sp500['log_vol'].quantile(cutoff_quantile)
non_shock_mean = sp500['log_vol'][sp500['log_vol'] < cutoff].mean()
shock_mean = sp500['log_vol'].mean()
plt.rc('text', usetex=False)
plt.figure(figsize=(6.4, 2.4))
plt.plot(sp500['S&P 500'])
plt.title('S&P 500 Price')
plt.show()

In [None]:
shocks = np.log(1 - np.linspace(1, 99, 50) / 100)
shock_freqs = np.linspace(1, 10, 41)
shock_lengths = np.linspace(0.5, 2, 40)
Ts = np.linspace(1, 20, 40).astype(int)
n = 10 ** 5

T = constant_T = 5
shock = constant_shock = np.log(0.65)
shock_freq = constant_freq = 2
shock_length = constant_length = 1

res = []

vol_shock = 0.181
vol = shock_mean

for T in Ts:
    shock_func = constant_shock_symmetric(shock, shock_length, m)
    prices = simulate_log_prices_shock(n, S0, T, m, u, vol, shock_func, shock_freq, vol_shock)
    terminal_prices = prices[:, -1]
    stats = get_stats(terminal_prices)
    res.append(f'{T},{shock_freq},{shock_length},{shock},{str(stats)[1:-1]}')
f = open("simulation_data_T.csv", "w")
f.write('\n'.join(res))
f.close()
res = []
T = constant_T

for shock_freq in shock_freqs:
    shock_func = constant_shock_symmetric(shock, shock_length, m)
    prices = simulate_log_prices_shock(n, S0, T, m, u, vol, shock_func, shock_freq, vol_shock)
    terminal_prices = prices[:, -1]
    stats = get_stats(terminal_prices)
    res.append(f'{T},{shock_freq},{shock_length},{shock},{str(stats)[1:-1]}')
f = open("simulation_data_shock_freq.csv", "w")
f.write('\n'.join(res))
f.close()
res = []
shock_freq = constant_freq

for shock_length in shock_lengths:
    shock_func = constant_shock_symmetric(shock, shock_length, m)
    prices = simulate_log_prices_shock(n, S0, T, m, u, vol, shock_func, shock_freq, vol_shock)
    terminal_prices = prices[:, -1]
    stats = get_stats(terminal_prices)
    res.append(f'{T},{shock_freq},{shock_length},{shock},{str(stats)[1:-1]}')
f = open("simulation_data_shock_length.csv", "w")
f.write('\n'.join(res))
f.close()
res = []
shock_length = constant_length

for shock in shocks:
    shock_func = constant_shock_symmetric(shock, shock_length, m)
    prices = simulate_log_prices_shock(n, S0, T, m, u, vol, shock_func, shock_freq, vol_shock)
    terminal_prices = prices[:, -1]
    stats = get_stats(terminal_prices)
    res.append(f'{T},{shock_freq},{shock_length},{shock},{str(stats)[1:-1]}')

f = open("simulation_data_shock.csv", "w")
f.write('\n'.join(res))
f.close()

In [None]:
header = ['T', 'shock_freq', 'shock_length', 'shock', 'mean', 'sd', 'skew', 'kurtosis', 'median', 'max', 'min']

fields = ['mean', 'sd']
data_T = pd.read_csv('simulation_data_T.csv', names=header).set_index('T')[fields]
data_shock_freq = pd.read_csv('simulation_data_shock_freq.csv', names=header).set_index('shock_freq')[fields]
data_shock_length = pd.read_csv('simulation_data_shock_length.csv', names=header).set_index('shock_length')[fields]
data_shock = pd.read_csv('simulation_data_shock.csv', names=header).set_index('shock')[fields]
fig, axs = plt.subplots(2, 2, figsize=(7, 7))
axs[0, 0].plot(data_T.index, data_T, label=fields)
axs[0, 0].set_xlabel('Time')
axs[0, 0].set_title('Time to Maturity')
axs[0, 0].legend()

axs[0, 1].plot(1 - np.exp(data_shock.index), data_shock, label=fields)
axs[0, 1].set_xlabel('Drop in Price')
axs[0, 1].set_title('Shock')
axs[0, 1].legend()

axs[1, 0].plot(data_shock_freq.index, data_shock_freq, label=fields)
axs[1, 0].set_xlabel('Frequency')
axs[1, 0].set_title('Shock Frequency')
axs[1, 0].legend()

axs[1, 1].plot(data_shock_length.index, data_shock_length, label=fields)
axs[1, 1].set_xlabel('Length')
axs[1, 1].set_title('Shock Length')
axs[1, 1].legend()
plt.tight_layout()
plt.show()

fields = ['skew', 'kurtosis']
data_T = pd.read_csv('simulation_data_T.csv', names=header).set_index('T')[fields]
data_shock_freq = pd.read_csv('simulation_data_shock_freq.csv', names=header).set_index('shock_freq')[fields]
data_shock_length = pd.read_csv('simulation_data_shock_length.csv', names=header).set_index('shock_length')[fields]
data_shock = pd.read_csv('simulation_data_shock.csv', names=header).set_index('shock')[fields]

fig, axs = plt.subplots(2, 2, figsize=(7, 7))
axs[0, 0].plot(data_T.index, data_T, label=fields)
axs[0, 0].set_xlabel('Time')
axs[0, 0].set_title('Time to Maturity')
axs[0, 0].legend()

axs[0, 1].plot(1 - np.exp(data_shock.index), data_shock, label=fields)
axs[0, 1].set_xlabel('Drop in Price')
axs[0, 1].set_title('Shock')
axs[0, 1].legend()

axs[1, 0].plot(data_shock_freq.index, data_shock_freq, label=fields)
axs[1, 0].set_xlabel('Frequency')
axs[1, 0].set_title('Shock Frequency')
axs[1, 0].legend()

axs[1, 1].plot(data_shock_length.index, data_shock_length, label=fields)
axs[1, 1].set_xlabel('Length')
axs[1, 1].set_title('Shock Length')
axs[1, 1].legend()
plt.tight_layout()
plt.show()

In [None]:


sp500['100 Day Rolling Log Return Volatility'] = sp500['log_vol']
sp500['100 Day Rolling Log Return Volatility'].plot()
plt.title('100 Day Rolling Log Return Volatility')
plt.xlabel('')
plt.show()

In [None]:
print(sp500['100 Day Rolling Log Return Volatility'].describe().to_latex())

In [None]:
cutoff_quantile = 0.6

cutoff = sp500['log_vol'].quantile(cutoff_quantile)
vol_non_shock_mean = sp500['log_vol'][sp500['log_vol'] < cutoff].mean()

print(
    f'We can see from above that the annualized volatility hovers around 15\% (including shocks). We can compute the volatility without shocks by computing the mean volatility using the bottom {cutoff_quantile * 100}\\% of values. We find this number to be {np.round(vol_non_shock_mean, 5) * 100}\\%\\\\')

In [None]:
past_events = pd.read_csv("SP 500 events for response curve 6.csv", parse_dates=True)

past_events = past_events.rename(lambda col: col.replace('Catalyst: ', ''), axis=1)
past_events['End Date'] = pd.to_datetime(past_events['End Date'])
past_events['Start Date'] = pd.to_datetime(past_events['Start Date'])
past_events['Minimum Date'] = pd.to_datetime(past_events['Minimum Date'])
past_events['Shock Length'] = past_events['End Date'] - past_events['Start Date']

past_events = past_events.sort_values("Start Date")

past_events['Time_Between_Shocks'] = past_events['Start Date'].diff()


def get_return(df):
    i = sp500.index.searchsorted(df['Start Date'])
    beg_price = sp500['S&P 500'].iloc[i]

    i = sp500.index.searchsorted(df['Minimum Date'])
    bot_price = sp500['S&P 500'].iloc[i]

    annualization_factor = np.sqrt(252 / np.busday_count(df['Start Date'].date(), df['Minimum Date'].date()))
    return np.exp(annualization_factor * np.log(bot_price / beg_price)) - 1


# past_events = past_events.loc[past_events.index[np.log(past_events.Shock+1) <- shock_mean]]
past_events['time_between_shocks'] = past_events['Time_Between_Shocks'].dt.days
past_events['shock_length'] = past_events['Shock Length'].dt.days
past_events['Shock Length '] = past_events['shock_length']

past_events['Shock'] = past_events.apply(get_return, axis=1)

past_events['Shock Frequency'] = past_events['Time_Between_Shocks']
past_events['Shock Frequency '] = past_events['time_between_shocks']

past_events[['Shock', 'Shock Length ', 'Shock Frequency ']].hist(bins=15)

plt.show()

In [None]:
print(r'\begin{center}')
print(past_events[['Shock', 'Shock Length', 'Shock Frequency']].describe().to_latex())
print(r'\end{center}')


In [None]:
n = 10 ** 5

T = 5
m = 12
u = r = 0.07
S0 = 1
bins = 200


def random_shock_symmetric(m):
    shock_params = lognorm.fit(-np.log(past_events['Shock'] + 1))
    shock_length_params = lognorm.fit(past_events['shock_length'] / 365)
    shock_size_rv = lognorm(shock_params[0], 0, shock_params[2])
    shock_length_rv = lognorm(shock_length_params[0], 0, shock_length_params[2])

    def shock():
        shock_size = -shock_size_rv.rvs()
        shock_length = shock_length_rv.rvs()
        return constant_shock_symmetric(shock_size, shock_length, m)()

    return shock


shock_freq = past_events['time_between_shocks'].mean() / 365
#
shock_func = random_shock_symmetric(m)
prices = simulate_log_prices_shock(n, S0, T, m, u, non_shock_mean, shock_func, shock_freq, 0.125)
terminal_prices = prices[:, -1]
stats = get_stats(terminal_prices)
plot_prices(prices, f'Log Stock Prices (frequency={np.round(shock_freq, 3)} years)', T, stats, bins)

In [None]:
Ts = np.arange(1, 31)
res = []
for T in Ts:
    prices = simulate_log_prices_shock(n, S0, T, m, u, non_shock_mean, shock_func, shock_freq, 0.125)
    terminal_prices = prices[:, -1]
    stats = get_stats(terminal_prices)
    res.append(f'{T},{shock_freq},{shock_length},{shock},{str(stats)[1:-1]}')
f = open("simulation_data_T_real.csv", "w")
f.write('\n'.join(res))
f.close()

In [None]:
header = ['T', 'shock_freq', 'shock_length', 'shock', 'mean', 'sd', 'skew', 'kurtosis', 'median', 'max', 'min']
fields = ['mean', 'sd']
data_T = pd.read_csv('simulation_data_T_real.csv', names=header).set_index('T')[fields]
fig, axs = plt.subplots(1, 2, figsize=(8, 4))
axs[0].plot(data_T.index, data_T, label=fields)
axs[0].set_xlabel('Time')
axs[0].set_title('Time to Maturity vs Mean/Volatility')
axs[0].legend()

fields = ['skew', 'kurtosis']
data_T = pd.read_csv('simulation_data_T_real.csv', names=header).set_index('T')[fields]
axs[1].plot(data_T.index, data_T, label=fields)
axs[1].set_xlabel('Time')
axs[1].set_title('Time to Maturity vs Skew/Kurtosis')
axs[1].legend()

plt.show()

In [None]:
data_T = pd.read_csv('simulation_data_T_real.csv', names=header)
data_T['Log_T'] = np.log(data_T['T'])

skew_fit = sm.OLS.from_formula('skew ~ Log_T', data_T).fit()
kurt_fit = sm.OLS.from_formula('kurtosis ~ Log_T', data_T).fit()

fig, axs = plt.subplots(1, 2, figsize=(8, 4))
axs[0].plot(data_T.index, data_T['kurtosis'], label='Simulated Kurtosis')
axs[0].plot(data_T.index, kurt_fit.predict(), label='OLS Fit Kurtosis')
axs[0].set_xlabel('Time')
axs[0].set_title('Kurtosis')
axs[0].legend()

axs[1].plot(data_T.index, data_T['skew'], label='Simulated Skew')
axs[1].plot(data_T.index, skew_fit.predict(), label='OLS Fit Skew')
axs[1].set_xlabel('Time')
axs[1].set_title('Skew')
axs[1].legend()
plt.show()
print('\\\\\\\\')
print(kurt_fit.summary().as_latex())
print('\\pagebreak')
print(skew_fit.summary().as_latex())
print('\\\\\\\\')

In [None]:
shock_type_freq = pd.DataFrame.from_dict({
    'Shock Type': [list(past_events)[i] for i in range(4, 8)],
    'Frequency': [(past_events.iloc[:, i] == 1).sum() / len(past_events) for i in range(4, 8)]
})
print(shock_type_freq.to_latex(index=False))

In [None]:
fig, axs = plt.subplots(2, 4)
fig.suptitle("Shock Parameters By Shock Type", fontsize='x-large')

past_events.loc[past_events.index[(past_events.iloc[:, 4] == 1)]][['Shock', 'Shock Length ']].hist(ax=axs[0, :2])
plt.figtext(0.28, 0.87, list(past_events)[4], va="center", ha="center", )
past_events.loc[past_events.index[(past_events.iloc[:, 5] == 1)]][['Shock', 'Shock Length ']].hist(ax=axs[0, 2:])
plt.figtext(0.75, 0.87, list(past_events)[5], va="center", ha="center", )
past_events.loc[past_events.index[(past_events.iloc[:, 6] == 1)]][['Shock', 'Shock Length ']].hist(ax=axs[1, :2])
plt.figtext(0.28, 0.43, list(past_events)[6], va="center", ha="center", )
past_events.loc[past_events.index[(past_events.iloc[:, 7] == 1)]][['Shock', 'Shock Length ']].hist(ax=axs[1, 2:])
plt.figtext(0.75, 0.43, list(past_events)[7], va="center", ha="center", )
fig.tight_layout(h_pad=4)
fig.subplots_adjust(top=0.78)
plt.show()

In [None]:
fig, axs = plt.subplots(2, 4)
fig.suptitle("Shock Parameters By Shock Type", fontsize='x-large')

past_events.loc[past_events.index[(past_events.iloc[:, 4] == 1)]][['Shock', 'Shock Length ']].hist(ax=axs[0, :2])
plt.figtext(0.28, 0.87, list(past_events)[4], va="center", ha="center", )
past_events.loc[past_events.index[(past_events.iloc[:, 5] == 1)]][['Shock', 'Shock Length ']].hist(ax=axs[0, 2:])
plt.figtext(0.75, 0.87, list(past_events)[5], va="center", ha="center", )
past_events.loc[past_events.index[(past_events.iloc[:, 6] == 1)]][['Shock', 'Shock Length ']].hist(ax=axs[1, :2])
plt.figtext(0.28, 0.43, list(past_events)[6], va="center", ha="center", )
past_events.loc[past_events.index[(past_events.iloc[:, 7] == 1)]][['Shock', 'Shock Length ']].hist(ax=axs[1, 2:])
plt.figtext(0.75, 0.43, list(past_events)[7], va="center", ha="center", )
fig.tight_layout(h_pad=4)
fig.subplots_adjust(top=0.78)
plt.show()

In [None]:
n = 10 ** 5

T = 5
m = 12
u = r = 0.07
S0 = 1
bins = 200
get_event_rows = lambda i: past_events.loc[past_events.index[(past_events.iloc[:, i] == 1)]]

shock_types = [
    {
        'shock_size_rv': uniform(*(uniform.fit(get_event_rows(4)['Shock']))),
        'shock_length_rv': uniform(*(uniform.fit(get_event_rows(4)['shock_length'] / 365)))
    },
    {
        'shock_size_rv': lognorm(*(lognorm.fit(-get_event_rows(5)['Shock']))),
        'shock_length_rv': lognorm(*(lognorm.fit(get_event_rows(5)['shock_length'] / 365)))
    },
    {
        'shock_size_rv': lognorm(*(lognorm.fit(-get_event_rows(6)['Shock']))),
        'shock_length_rv': uniform(*(uniform.fit(get_event_rows(6)['shock_length'] / 365)))
    },
    {
        'shock_size_rv': lognorm(*(lognorm.fit(-get_event_rows(7)['Shock']))),
        'shock_length_rv': lognorm(*(np.where([0, 1, 0], 0, lognorm.fit(get_event_rows(7)['shock_length'] / 365))))
    },
]

shock_probs = shock_type_freq['Frequency']


def generate_numbers(n, rng):
    choices = rng.choice(4, p=shock_probs, size=n)


def weighted_random(w, n):
    cumsum = np.cumsum(w)
    rdm_unif = np.random.rand(n)
    return np.searchsorted(cumsum, rdm_unif)


def real_shock_symmetric(m):
    numbers = [{'shock_size_rv': [], 'shock_length_rv': []} for _ in range(4)]
    choices = []

    def shock():
        nonlocal choices
        if len(choices) == 0:
            choices = random.choices(range(4), weights=shock_probs, k=1000)
        shock_rvs_idx = choices.pop()
        if len(numbers[shock_rvs_idx]['shock_size_rv']) == 0:
            numbers[shock_rvs_idx]['shock_size_rv'] = shock_types[shock_rvs_idx]['shock_size_rv'].rvs(
                size=1000).tolist()
            numbers[shock_rvs_idx]['shock_length_rv'] = shock_types[shock_rvs_idx]['shock_length_rv'].rvs(
                size=1000).tolist()

        shock_length = numbers[shock_rvs_idx]['shock_length_rv'].pop()
        shock_size = numbers[shock_rvs_idx]['shock_size_rv'].pop()
        return constant_shock_symmetric(-shock_size, shock_length, m)()

    return shock


shock_freq = past_events['time_between_shocks'].mean() / 365
shock_func = real_shock_symmetric(m)
prices = simulate_log_prices_shock(n, S0, T, m, u, non_shock_mean, shock_func, shock_freq, shock_mean * 1.07)
terminal_prices = prices[:, -1]
stats = get_stats(terminal_prices)
plot_prices(prices, f'Log Stock Prices (frequency={np.round(shock_freq, 3)} years)', T, stats, bins)

In [None]:
Ts = np.arange(1, 31)
res = []
for T in Ts:
    prices = simulate_log_prices_shock(n, S0, T, m, u, non_shock_mean, shock_func, shock_freq, shock_mean * 1.07)
    terminal_prices = prices[:, -1]
    stats = get_stats(terminal_prices)
    res.append(f'{T},{str(stats)[1:-1]}')
f = open("simulation_data_T_real2.csv", "w")
f.write('\n'.join(res))
f.close()

In [None]:
header = ['T', 'mean', 'sd', 'skew', 'kurtosis', 'median', 'max', 'min']
fields = ['mean', 'sd']
data_T = pd.read_csv('simulation_data_T_real2.csv', names=header).set_index('T')[fields]
fig, axs = plt.subplots(1, 2, figsize=(8, 4))
axs[0].plot(data_T.index, data_T, label=fields)
axs[0].set_xlabel('Time')
axs[0].set_title('Time to Maturity vs Mean/Volatility')
axs[0].legend()

fields = ['skew', 'kurtosis']
data_T = pd.read_csv('simulation_data_T_real2.csv', names=header).set_index('T')[fields]
axs[1].plot(data_T.index, data_T, label=fields)
axs[1].set_xlabel('Time')
axs[1].set_title('Time to Maturity vs Skew/Kurtosis')
axs[1].legend()

plt.show()

In [None]:
header = ['T', 'mean', 'sd', 'skew', 'kurtosis', 'median', 'max', 'min']
data_T = pd.read_csv('simulation_data_T_real2.csv', names=header)
data_T['Log_T'] = np.log(data_T['T'])

skew_fit = sm.OLS.from_formula('skew ~ Log_T', data_T).fit()
kurt_fit = sm.OLS.from_formula('kurtosis ~ Log_T', data_T).fit()

fig, axs = plt.subplots(1, 2, figsize=(8, 4))
axs[0].plot(data_T.index, data_T['kurtosis'], label='Simulated Kurtosis')
axs[0].plot(data_T.index, kurt_fit.predict(), label='OLS Fit Kurtosis')
axs[0].set_xlabel('Time')
axs[0].set_title('Kurtosis')
axs[0].legend()

axs[1].plot(data_T.index, data_T['skew'], label='Simulated Skew')
axs[1].plot(data_T.index, skew_fit.predict(), label='OLS Fit Skew')
axs[1].set_xlabel('Time')
axs[1].set_title('Skew')
axs[1].legend()
plt.show()

print('\\\\\\\\')
print(kurt_fit.summary().as_latex())
# print('\\pagebreak')
print(skew_fit.summary().as_latex())
print('\\\\\\\\')

In [None]:
n = 10 ** 5

vol = 0.15
T = 5
m = 12
u = r = 0.07
S0 = 1
bins = 200

Ts = [5, 30]
shock_freqs = [3, 7]
shock_lengths = [1, 2]
shocks = np.log(1 - np.array([0.3, 0.5, 0.7]))
for T in Ts:
    vanilla_prices = simulate_log_prices(n, S0, T, m, u, vol)
    exp_vanilla_prices = np.exp(vanilla_prices)
    for shock_freq in shock_freqs:
        for shock_length in shock_lengths:
            for shock in shocks:
                shock_func = constant_shock_symmetric(shock, shock_length, m)
                prices = simulate_log_prices_shock(n, S0, T, m, u, vol, shock_func, shock_freq, vol)
                terminal_prices = prices[:, -1]
                stats = get_stats(terminal_prices)
                shock_ = np.round(1 - np.exp(shock), 2)
                plot_prices(prices, f'Log Prices (shock={shock_}, length={shock_length}, frequency={shock_freq})', T,
                            stats, bins)


In [None]:
def get_old_return(days_back):
    def from_date(date):
        i = sp500.index.searchsorted(date - datetime.timedelta(days_back))
        j = sp500.index.searchsorted(date)
        return sp500['S&P 500'][j] / sp500['S&P 500'][i]

    return from_date


def get_old_vol(days_back):
    def from_date(date):
        i = sp500.index.searchsorted(date - datetime.timedelta(days_back))
        j = sp500.index.searchsorted(date)
        return sp500['log_returns'][i:j].std(ddof=0) * np.sqrt(252)

    return from_date


def get_old_skew(days_back):
    def from_date(date):
        i = sp500.index.searchsorted(date - datetime.timedelta(days_back))
        j = sp500.index.searchsorted(date)
        return skew(sp500['log_returns'][i:j])

    return from_date


formula = []
days_list = [30, 90, 180, 360, 540, 720]

plt.rc('text', usetex=False)
for days in days_list:
    past_events[f'log_return_{days}'] = past_events['Start Date'].apply(get_old_return(days))
    past_events[f'log_vol_{days}'] = past_events['Start Date'].apply(get_old_vol(days))
    past_events[f'log_skew_{days}'] = past_events['Start Date'].apply(get_old_skew(days))
    formula.append(f'log_return_{days}')
    formula.append(f'log_vol_{days}')
    formula.append(f'log_skew_{days}')


def plot_run_up(days_list, shock_param):
    fig, axs = plt.subplots(len(days_list), 3, figsize=(10, len(days_list) * 2))
    stats = ['return', 'vol', 'skew']
    for i, days in enumerate(days_list):
        for j, stat in enumerate(stats):
            axs[i, j].scatter(past_events[f'log_{stat}_{days}'], past_events[shock_param])
            axs[i, j].set_title(f'log {stat} {days} vs \n{shock_param}', fontsize=14)
            axs[i, j].set_xlabel(f'log {stat} {days}')
            axs[i, j].set_ylabel(shock_param.replace('_', ' '))
            props = dict(boxstyle='round', facecolor='wheat', alpha=0.5)

            x = np.ma.masked_invalid(past_events[f'log_{stat}_{days}'])
            y = np.ma.masked_invalid(past_events[shock_param])
            msk = (~x.mask & ~y.mask)
            corr = np.ma.corrcoef(x[msk], y[msk])[1, 0]

            t = f"Corr: {np.round(corr, 3)}"
            axs[i, j].text(0.99, 0.01, t, fontsize=12,
                           transform=axs[i, j].transAxes,
                           verticalalignment='bottom', bbox=props,
                           horizontalalignment='right')
    plt.tight_layout()
    plt.show(block=False)
    print(f'\\subsubsection{{Run Up vs {shock_param.replace("_", " ")}}}')
    f = f'{shock_param} ~ {"+".join(formula)}'
    print(
        sm.OLS.from_formula(f, past_events).fit().get_robustcov_results(cov_type='HAC', maxlags=1).summary().as_latex())


plot_run_up(days_list, 'Shock')
plot_run_up(days_list, 'shock_length')
plot_run_up(days_list, 'time_between_shocks')