In [None]:
import pandas as pd
import numpy as np
import scipy as sci
import arch
import pyvinecopulib as pv
import matplotlib.pyplot as plt
import plotly.express as px
from SourceCodes.invariance_analysis import quick_invariance_analysis
from SourceCodes.garch_aux_methods import zero_mean_garch_1_1_scenario


columns_mappings = {'ES1 Index': 'SP500',
                    'NQ1 Index': 'Nasdaq100',
                    'VG1 Index': 'Euro Stox50',
                    'BZ1 Index': 'Ibovespa',
                    'TY1 Comdty':'10-Year Treasury',
                    'RX1 Comdty': 'Euro Bund',
                    'EC1 Curncy': 'EUR/USD',
                    'BP1 Curncy': 'GBP/USD',
                    'UC1 Curncy': 'USD/BRL',
                    'CL1 Comdty': 'WTI',
                    'CO1 Comdty': 'Brent'}

selected_securities = ['SP500', 'Nasdaq100', 'Ibovespa', '10-Year Treasury','USD/BRL']


In [None]:
# Reading data
df = pd.read_csv("Data/data.csv", index_col="Dates", date_parser = pd.to_datetime).rename(columns=columns_mappings)[selected_securities]
df.sort_index(inplace=True)
returns_scale_factor = 100
df_log_ret = np.log(df).diff() * returns_scale_factor
n = df.shape[0]
n_os = 252
n_scenarios = 1000
seeds = [int(x) for x in np.ones(len(selected_securities)).tolist()]

In [None]:
# Quick Invariance Analysis
for item in df_log_ret.columns:
    fig, axs = quick_invariance_analysis(df_log_ret[item].iloc[:-n_os], n_chunks=3, nbins=30)
    fig.set_size_inches(20, 20)

In [None]:
# Fitting a GARCH(1,1) Model and
model_list = []

for k, item in enumerate(df_log_ret):
    mdl = list()
    mdl.append(arch.arch_model(df_log_ret.iloc[1:-n_os, k] , mean="Zero", vol="GARCH", p=1, q=1, dist="normal"))
    mdl[0].constraints()
    mdl.append(mdl[0].fit())
    model_list.append(mdl)

In [None]:
# Example - Univariate Fitting
fig = model_list[0][1].plot(scale=252)
fig.set_size_inches(10, 10)

In [None]:
# Creating the matrix of residuals and scatter plotting
for k, item in enumerate(model_list):
    if k == 0:
        df_log_resid = pd.DataFrame(item[1].std_resid)
        df_log_resid.rename(columns={df_log_resid.columns[-1]:item[0].y.name}, inplace=True)
    else:
        df_log_resid = pd.concat([df_log_resid, pd.DataFrame(item[1].std_resid)], axis=1)
        df_log_resid.rename(columns={df_log_resid.columns[-1]:item[0].y.name}, inplace=True)

In [None]:
# Quick Invariance Analysis - Of residuals
for item in df_log_resid.columns:
    fig, axs = quick_invariance_analysis(df_log_resid[item], n_chunks=3, nbins=30)
    fig.set_size_inches(20, 20)

In [None]:
# Calculating the pseudo-observations - Estimated Probability Integral Transform
for k, item in enumerate(model_list):
    if k == 0:
        df_log_resid_pseudo_observations = pd.DataFrame(sci.stats.norm.cdf(df_log_resid.iloc[:, k]))
    else:
        df_log_resid_pseudo_observations = pd.concat([df_log_resid_pseudo_observations, pd.Series(sci.stats.norm.cdf(df_log_resid.iloc[:, k]))], axis=1, ignore_index=True)

df_log_resid_pseudo_observations.columns = df_log_resid.columns
df_log_resid_pseudo_observations.columns = df_log_resid.columns
df_log_resid_pseudo_observations.index=df_log_resid.index

In [None]:
# Empirical Copula Plotting
fig1 =  px.scatter_matrix(data_frame=df_log_resid_pseudo_observations,
                          dimensions=df_log_resid_pseudo_observations.columns,
                          height=2000, width=2000)
fig1.update_traces(marker=dict(size=4, line=dict(width=1)), opacity=0.6, showlegend=False)
#fig1.update_traces(diagonal_visible=False)
fig1.update_layout(plot_bgcolor = "#ffebe3", colorway = ["#ff774a"], title = "Empirical Copula")
fig1.update_layout({"yaxis"+str(i+1): dict(range = [0, 1]) for i in range(1, len(df.columns))})
fig1.update_layout({"xaxis"+str(i+1): dict(range = [0, 1]) for i in range(1, len(df.columns))})
fig1.update_xaxes(visible=True, showgrid=True)
fig1.update_yaxes(visible=True, showgrid=True)
fig1.write_html("HTML/file.html")
#fig1.show()

In [None]:
# Meta Distribution - Distribution which is constructed by an arbitrary copula and arbitrary marginal distributions

In [None]:
# Creating Vine - Structure
copVine = pv.Vinecop(d = df_log_resid_pseudo_observations.columns.shape[0])

# Selecting Most Appropriate Model Given pseudo-observations
copVine.select(data=df_log_resid_pseudo_observations)

In [None]:
# Simulating U - Given Vine - Empirical X Simulated Copulas
n_sim = 10000
u_sim = pd.DataFrame(copVine.simulate(n_sim, seeds=seeds), columns=df_log_resid_pseudo_observations.columns)

In [None]:
# Plotting simulation results given Vine
fig2 =  px.scatter_matrix(data_frame=u_sim,
                          dimensions=u_sim.columns,
                          height=2000, width=2000)
fig2.update_traces(marker=dict(size=4, line=dict(width=1)), opacity=0.6, showlegend=False)
#fig1.update_traces(diagonal_visible=False)
fig2.update_layout(plot_bgcolor = "#ffebe3", colorway = ["#ff774a"], title = "Simulated Data - Given Vine Structure")
fig2.update_layout({"yaxis"+str(i+1): dict(range = [0, 1]) for i in range(1, len(df.columns))})
fig2.update_layout({"xaxis"+str(i+1): dict(range = [0, 1]) for i in range(1, len(df.columns))})
fig2.update_xaxes(visible=True, showgrid=True)
fig2.update_yaxes(visible=True, showgrid=True)
fig2.write_html("HTML/simulated.html")

In [None]:
# Scenario Simulation - Vine Copula
scenarios_copula = list()

for k in range(n_scenarios):
    scenario_copula = dict()
    scenario_copula['u_sim'] = pd.DataFrame(copVine.simulate(n_os), columns=df_log_resid_pseudo_observations.columns)

    sim_given_models_copula = []
    for d, model in enumerate(model_list):
        # Filterting data - Given Models + Simulated Residuals
        sim_given_models_copula.append(zero_mean_garch_1_1_scenario(sci.stats.norm.ppf(scenario_copula['u_sim'].iloc[:, d]), model[1].conditional_volatility,
                                                             model[1].resid, w=model[1].params.omega, alpha=model[1].params['alpha[1]'],
                                                             beta=model[1].params['beta[1]']))
        # Return projection for the proposed horizon
        if d == 0:
            projected_returns_simulation_copula = pd.DataFrame(pd.Series(np.exp(np.cumsum(sim_given_models_copula[d][0] / returns_scale_factor)) - 1))
        else:
            projected_returns_simulation_copula = pd.concat([projected_returns_simulation_copula, pd.Series(np.exp(np.cumsum(sim_given_models_copula[d][0] / returns_scale_factor)) - 1)], axis=1, ignore_index=True)

    projected_returns_simulation_copula.index = df_log_ret.index[-n_os:]
    projected_returns_simulation_copula.columns = selected_securities

    scenario_copula["sim_given_models"] = sim_given_models_copula
    scenario_copula["projections"] = projected_returns_simulation_copula

    scenarios_copula.append(scenario_copula)

# Scenarios Projections
scenario_projections_copula = np.zeros((n_scenarios, len(selected_securities)))

for k, scenario in enumerate(scenarios_copula):
    scenario_projections_copula[k, :] = scenario["projections"].iloc[-1, :].to_numpy()

scenario_projections_copula = pd.DataFrame(scenario_projections_copula, columns = selected_securities)

In [None]:
# Ledoit and Wolf - Shrinked Normal Covariance Estimate
from sklearn.model_selection import GridSearchCV
from sklearn.covariance import LedoitWolf, OAS
from sklearn.covariance import ShrunkCovariance, empirical_covariance, log_likelihood
from scipy import linalg

n_split = np.floor(df_log_resid.shape[0] / 2).astype(int)
X_train = df_log_resid.iloc[0:n_split, :]
X_test = df_log_resid.iloc[n_split:, :]

shrinkages = np.logspace(-2, 0, 30)

# GridSearch for an optimal shrinkage coefficient
tuned_parameters = [{"shrinkage": shrinkages}]
cv = GridSearchCV(ShrunkCovariance(), tuned_parameters)
cv.fit(X_train)

# Ledoit-Wolf optimal shrinkage coefficient estimate
lw = LedoitWolf()
lw_fit = lw.fit(X_train)

In [None]:
# Scenario Simulation - Ledoit and Wolf
scenarios_ld = list()

for k in range(n_scenarios):
    scenario_ld = dict()
    scenario_ld['std_resid_sim'] = pd.DataFrame(np.random.multivariate_normal(mean=np.zeros(shape=(len(selected_securities))), cov=lw_fit.covariance_, size=n_os), columns=selected_securities)

    sim_given_models_ld = []
    for d, model in enumerate(model_list):
        # Filterting data - Given Models + Simulated Residuals
        sim_given_models_ld.append(zero_mean_garch_1_1_scenario(scenario_ld['std_resid_sim'].iloc[:, d], model[1].conditional_volatility,
                                                             model[1].resid, w=model[1].params.omega, alpha=model[1].params['alpha[1]'],
                                                             beta=model[1].params['beta[1]']))
        # Return projection for the proposed horizon
        if d == 0:
            projected_returns_simulation_ld = pd.DataFrame(pd.Series(np.exp(np.cumsum(sim_given_models_ld[d][0] / returns_scale_factor)) - 1))
        else:
            projected_returns_simulation_ld = pd.concat([projected_returns_simulation_ld, pd.Series(np.exp(np.cumsum(sim_given_models_ld[d][0] / returns_scale_factor)) - 1)], axis=1, ignore_index=True)

    projected_returns_simulation_ld.index = df_log_ret.index[-n_os:]
    projected_returns_simulation_ld.columns = selected_securities

    scenario_ld["sim_given_models"] = sim_given_models_ld
    scenario_ld["projections"] = projected_returns_simulation_ld

    scenarios_ld.append(scenario_ld)

# Scenarios Projections
scenario_projections_ld = np.zeros((n_scenarios, len(selected_securities)))

for k, scenario in enumerate(scenarios_ld):
    scenario_projections_ld[k, :] = scenario["projections"].iloc[-1, :].to_numpy()

scenario_projections_ld = pd.DataFrame(scenario_projections_ld, columns = selected_securities)

In [None]:
# Volatilities - Vine Copula
print(scenario_projections_copula.std())

# Correlation - Vine Copula
print(scenario_projections_copula.corr())

In [None]:
# Volatilities - Ledoit and Wolf
print(scenario_projections_ld.std())

# Correlation Ledoit and Wolf
print(scenario_projections_ld.corr())

In [None]:
plt.plot( sci.stats.t.ppf(scenario['u_sim'].iloc[:, 0], df=model.params.nu, loc=0, scale=1))

https://www.oreilly.com/library/view/bayesian-statistics-an/9781118359778/OEBPS/c1-sec1-0007.htm

In [None]:
# Invariance Analysis
fig1 =  px.scatter_matrix(data_frame=df_log_ret.iloc[1:, :],
                          dimensions=df_log_ret.columns[1:],
                          height=4000, width=4000,
                          symbol="sample", color="sample")
fig1.update_traces(marker=dict(size=4, line=dict(width=1)), opacity=0.6, showlegend=False)
#fig1.update_traces(diagonal_visible=False)
fig1.update_layout(plot_bgcolor = "#ffebe3", colorway = ["#ff774a"], title = "Scatter plots")
fig1.update_layout({"yaxis"+str(i+1): dict(range = [-0.15, 0.15]) for i in range(1, len(df.columns))})
fig1.update_layout({"xaxis"+str(i+1): dict(range = [-0.15, 0.15]) for i in range(1, len(df.columns))})
fig1.update_xaxes(visible=True, showgrid=True)
fig1.update_yaxes(visible=True, showgrid=True)
fig1.write_html("HTML/file.html")

fig1.show()