In [None]:
import yfinance as yf
import pandas as pd
import numpy as np
from datetime import date
import seaborn as sns
import random

import matplotlib.pyplot as plt

from sklearn.linear_model import Ridge
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score, mean_absolute_percentage_error
from sklearn.preprocessing import MinMaxScaler
from sklearn.ensemble import RandomForestRegressor
from sklearn.mixture import GaussianMixture


from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.stats.diagnostic import acorr_ljungbox

import scipy.stats as stats
from scipy.stats import probplot, laplace, norm, t, chi2


import statsmodels.api as sm
from statsmodels.nonparametric.kde import KDEUnivariate
from statsmodels.tsa.stattools import adfuller, kpss
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.tsa.arima_process import ArmaProcess

import pymc as pm
import pytensor.tensor as pt
import arviz as az

import tensorflow as tf
from tensorflow import keras


#from tensorflow.keras.utils import plot_model


######################################
#from pmdarima import auto_arima
#from diptest import diptest

In [None]:
class HistoricalVaRBacktester:
    def __init__(self, data, alpha=0.05, window=21, horizon=10, var_type='var'):
        self.data = data.copy()
        self.alpha = alpha
        self.window = window
        self.horizon = horizon
        self.var_type = var_type

    def compute_future_log_return(self):
        self.data[f'Ret {self.horizon}D'] = np.log(self.data['Close'] / self.data['Close'].shift(self.horizon))
        self.data[f'Ret {self.horizon}D'] = self.data[f'Ret {self.horizon}D'].shift(-self.horizon)

    def compute_var(self):
        var_series = []
        modvar_series = []
        cvar_series = []
        shapiro_p_series = []
        KS_stat_series = []

        for t in range(self.window, len(self.data)):
            sample = self.data['LogReturn'].iloc[t-self.window : t].dropna()

            if len(sample) < self.window:
                var_series.append(np.nan)
                modvar_series.append(np.nan)
                cvar_series.append(np.nan)
                shapiro_p_series.append(np.nan)
                KS_stat_series.append(np.nan)
                continue

            mu = sample.mean()
            sigma = sample.std(ddof=1)
            skewness = sample.skew()
            kurt = sample.kurtosis()

            shapiro_stat, shapiro_p = shapiro(sample)
            shapiro_p_series.append(shapiro_p)

            ks_stat, ks_p_val = kstest(sample, 'norm', args=(mu, sigma))
            KS_stat_series.append(ks_p_val)

            # Empirical VaR
            empirical_var = np.percentile(sample, 100 * self.alpha)

            # Empirical Cornish-Fisher ModVaR
            z_score = norm.ppf(self.alpha)
            z_cf = (
                z_score
                + (z_score**2 - 1) * skewness / 6
                + (z_score**3 - 3 * z_score) * (kurt - 3) / 24
                - (2 * z_score**3 - 5 * z_score) * (skewness**2) / 36
            )
            empirical_modvar = mu + z_cf * sigma

            # Empirical CVaR
            losses_beyond_var = sample[sample <= empirical_var]
            empirical_cvar = losses_beyond_var.mean() if len(losses_beyond_var) > 0 else empirical_var

            var_series.append(empirical_var * np.sqrt(self.horizon))
            modvar_series.append(empirical_modvar * np.sqrt(self.horizon))
            cvar_series.append(empirical_cvar * np.sqrt(self.horizon))

        # Store results in DataFrame
        self.data[f'VaR {self.horizon}D'] = pd.Series([np.nan]*self.window + var_series, index=self.data.index)
        self.data[f'ModVaR {self.horizon}D'] = pd.Series([np.nan]*self.window + modvar_series, index=self.data.index)
        self.data[f'CVaR {self.horizon}D'] = pd.Series([np.nan]*self.window + cvar_series, index=self.data.index)
        self.data['Shapiro_p'] = pd.Series([np.nan]*self.window + shapiro_p_series, index=self.data.index)
        self.data['KS_p'] = pd.Series([np.nan]*self.window + KS_stat_series, index=self.data.index)

    def backtest(self):
        ret_col = f'Ret {self.horizon}D'
        var_col = f'VaR {self.horizon}D' if self.var_type == 'var' else f'CVaR {self.horizon}D'

        self.data['Breach'] = (self.data[ret_col] < self.data[var_col]).where(
            self.data[[ret_col, var_col]].notna().all(axis=1)
        )

        num_breaches = self.data['Breach'].sum()
        total_obs = self.data['Breach'].notna().sum()
        breach_ratio = num_breaches / total_obs
        expected_breaches = total_obs * self.alpha

        # Continuous breaches
        breaches_series = self.data['Breach'].fillna(False).astype(int)
        max_cont_breaches = (breaches_series.groupby((breaches_series != breaches_series.shift()).cumsum()).cumsum()).max()

        # Conditional probability of breaches (lag-1)
        cond_prob_breaches = 0
        if num_breaches > 1:
            consecutive = ((breaches_series.shift(1) == 1) & (breaches_series == 1)).sum()
            cond_prob_breaches = consecutive / (breaches_series.shift(1) == 1).sum()

        # Kupiec's POF test
        p_hat = breach_ratio
        p0 = self.alpha
        if p_hat in [0, 1]:
            LR_pof = np.nan
            p_value = np.nan
        else:
            LR_pof = -2 * (
                np.log((1 - p0)**(total_obs - num_breaches) * p0**num_breaches)
                - np.log((1 - p_hat)**(total_obs - num_breaches) * p_hat**num_breaches)
            )
            p_value = 1 - chi2.cdf(LR_pof, df=1)

        # Binomial test
        binom_p_value = binomtest(num_breaches, total_obs, p0, alternative='two-sided').pvalue

        results = pd.DataFrame({
            "Actual Breaches": [num_breaches],
            "Expected Breaches": [expected_breaches],
            "Breach Ratio": [breach_ratio],
            "Number of Continuous Breaches": [max_cont_breaches],
            "Conditional Probability of Breaches": [cond_prob_breaches],
            "kupiec_LR": [LR_pof],
            "kupiec_p_value": [p_value],
            "binomial_test_p_value": [binom_p_value]
        })

        print(results)
        print('---------------------------------------------------------------------------------')
        print(self.data.head(35))

    def plot_var(self):
        ret_col = f'Ret {self.horizon}D'
        var_col = f'VaR {self.horizon}D' if self.var_type == 'var' else f'CVaR {self.horizon}D'

        sns.set_style("whitegrid")
        fig, axs = plt.subplots(1, 1, figsize=(12, 6))

        axs.plot(self.data.index, self.data[var_col], color='orange', linewidth=1,
                 label=f'{int((1-self.alpha)*100)}%/{self.horizon}D Empirical VaR')
        axs.plot(self.data.index, self.data[ret_col], color='blue', linewidth=2, alpha=0.5,
                 label=f'Actual {self.horizon}D Return')

        breaches = self.data['Breach'] == True
        axs.scatter(
            self.data.index[breaches],
            self.data.loc[breaches, ret_col],
            color='red', marker='X', s=30, label='Breaches', zorder=3
        )

        axs.axhline(y=0, color='black', linestyle='--')
        axs.set_title(f'Backtesting of {int((1-self.alpha)*100)}%/{self.horizon}D Empirical VaR')
        axs.set_ylabel(f'{self.horizon}D log return')
        axs.legend()
        plt.show()

