In [None]:
import yfinance as yf
import pandas as pd
import numpy as np
from datetime import date
import seaborn as sns
import random

import matplotlib.pyplot as plt

from sklearn.linear_model import Ridge
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score, mean_absolute_percentage_error
from sklearn.preprocessing import MinMaxScaler
from sklearn.ensemble import RandomForestRegressor
from sklearn.mixture import GaussianMixture


from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.stats.diagnostic import acorr_ljungbox
from statsmodels.regression.linear_model import OLS
from statsmodels.tools import add_constant

from arch import arch_model

import scipy.stats as stats
from scipy.stats import probplot, laplace, norm, t
from scipy.optimize import minimize

from numdifftools import Hessian

import statsmodels.api as sm
from statsmodels.nonparametric.kde import KDEUnivariate
from statsmodels.tsa.stattools import adfuller, kpss
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.tsa.arima_process import ArmaProcess

import pymc as pm
import pytensor.tensor as pt
import arviz as az

import tensorflow as tf
from tensorflow import keras


#from tensorflow.keras.utils import plot_model


######################################
#from pmdarima import auto_arima
#from diptest import diptest

In [None]:
class StationarityCheck:
    def __init__(self, df):
        self.df = df
        self.series = None

    def log_difference(self, ndiff):
        daily_close = self.df.resample('1D').last().dropna()
        log_series = np.log(daily_close).dropna()
        diff = log_series.diff(ndiff).dropna() if ndiff > 0 else log_series
        self.series = diff
        self.check_stationarity()
        return diff

    def check_stationarity(self, window=7, lags=7):
        # Rolling Mean & Standard Deviation
        rolling_mean = self.series.rolling(window=window).mean()
        rolling_std = self.series.rolling(window=window).std()

        fig, ax1 = plt.subplots(figsize=(10, 5))
        ax1.plot(self.series, label="Return", color="gray", alpha=0.5)
        ax1.plot(rolling_mean, label="Rolling Mean of Return", color="blue")
        ax1.axhline(y=0, color='black', linestyle='dashed', linewidth=0.8)
        ax1.legend(loc="upper left")

        ax2 = ax1.twinx()
        ax2.plot(rolling_std, label="Rolling Std Dev of Return", color="red", linestyle="dashed", alpha=0.7)
        ax2.legend(loc="upper right")

        plt.title("Rolling Mean & Std Deviation")
        plt.show()

        # Augmented Dickey-Fuller Test
        adf_result = adfuller(self.series.dropna())
        print("ADF Test")
        print(f"ADF Statistic: {adf_result[0]:.4f}, p-value: {adf_result[1]:.4f}")
        print("Stationary" if adf_result[1] < 0.05 else "Not Stationary")

        # KPSS Test
        kpss_stat, kpss_p, _, crit = kpss(self.series.dropna(), regression='c')
        print("KPSS Test")
        print(f"KPSS Statistic: {kpss_stat:.4f}, p-value: {kpss_p:.4f}")
        print("Stationary" if kpss_p > 0.05 else "Not Stationary")

        '''
        # ACF and PACF
        plt.figure(figsize=(12, 5))
        plot_acf(self.series.dropna(), lags=lags)
        plt.title("Autocorrelation Function (ACF)")
        plt.show()

        plt.figure(figsize=(12, 5))
        plot_pacf(self.series.dropna(), lags=lags)
        plt.title("Partial Autocorrelation Function (PACF)")
        plt.show()
        '''

In [None]:
class EWMAVolatility:
    def __init__(self, df, log_return_series, nfuture=5, window=None, ema_smooth=21):
        self.df = df
        self.series = log_return_series
        self.trading_days = 252
        self.window = window
        self.ema_smooth = ema_smooth
        self.nfuture = nfuture
        self.rolling_rv = None
        self.rolling_rv_exp = None
        self.rolling_rv_sq = None
        self.rolling_rv_exp_sq = None


    def realized_volatility(self):
        # Rolling realized volatility
        self.df['log_return'] = np.log(self.df['Close'] / self.df['Close'].shift(1))
        self.rolling_rv = self.df['log_return'].groupby(self.df.index.date).apply(lambda x: np.sqrt(np.sum((x - x.mean())**2)))
        self.rolling_rv.dropna(inplace=True)

        # Exponentially weighted moving average (EWMA)
        self.rolling_rv_exp = self.rolling_rv.ewm(span=self.ema_smooth, adjust=False).mean()
        self.rolling_rv_exp.dropna(inplace=True)

        self.rolling_rv_sq = self.rolling_rv ** 2
        self.rolling_rv_exp_sq = self.rolling_rv_exp ** 2

        # Plot
        plt.figure(figsize=(12, 6))
        plt.plot(self.rolling_rv * np.sqrt(self.trading_days), color='black', label='Realized Vol (Annualized)')
        plt.plot(self.rolling_rv_exp * np.sqrt(self.trading_days), color='red', label='Exp Realized Vol (Annualized)')
        plt.title('Realized Volatility vs. Exponential Realized Volatility')
        plt.xlabel('Date')
        plt.ylabel('Annualized Volatility')
        plt.legend()
        plt.grid(True)
        plt.show()

    def forecast_volatility(self):
        alpha = 2 / (self.ema_smooth + 1)
        constant_rv = self.rolling_rv_sq.iloc[-self.nfuture]
        last_known_exp_rv = self.rolling_rv_exp_sq.iloc[-self.nfuture]

        forecast_values = []
        for t in range(self.nfuture):
            forecast = alpha * constant_rv + (1 - alpha) * last_known_exp_rv
            last_known_exp_rv = forecast
            forecast_values.append(forecast)

        # Forecast series
        forecast_index = self.rolling_rv_sq.index[-self.nfuture:]
        forecast_series = pd.Series(forecast_values, index=forecast_index, name='Forecasted ExpRV')

        # Actual realized volatility in forecast horizon
        actual_future_rv = self.rolling_rv.iloc[-self.nfuture:]

        # Forecast error
        forecast_error = (np.sqrt(forecast_series) - actual_future_rv) / actual_future_rv

        mae = round(np.mean(np.abs(forecast_error)), 4)
        rmse = round(np.sqrt(np.mean(forecast_error**2)), 4)
        print(f"MAE: {mae}, RMSE: {rmse}")

        # Plotting
        fig, axs = plt.subplots(2, 1, figsize=(14, 10), sharex=True)
        start_index = -self.nfuture - 21
        end_index = -1

        # Historical + Forecast
        axs[0].plot(self.rolling_rv * np.sqrt(self.trading_days), color='black', label='Realized Vol')
        axs[0].plot(self.rolling_rv_exp.iloc[:-self.nfuture] * np.sqrt(self.trading_days), color='red', label='EWMA Vol')
        axs[0].plot(np.sqrt(forecast_series) * np.sqrt(self.trading_days), color='blue', linestyle='--', label='Forecasted EWMA')
        axs[0].axvline(x=forecast_index[0], color='gray', linestyle='--', linewidth=1.5)
        axs[0].set_title(f'Historical RV and Forecast (EWMA)\nMAE={mae}, RMSE={rmse}')
        axs[0].set_ylabel('Annualized Volatility')
        axs[0].legend()
        axs[0].grid(True)
        axs[0].set_xlim([self.rolling_rv.index[start_index], self.rolling_rv.index[end_index]])
        axs[0].set_ylim(0, np.amax (np.sqrt(forecast_series) * np.sqrt(self.trading_days)) * 1.5)

        # Forecast error
        axs[1].bar(forecast_error.index, forecast_error.values * 100, color='purple')
        axs[1].axvline(x=forecast_index[0], color='gray', linestyle='--', linewidth=1.5)
        axs[1].axhline(y=0, color='gray', linestyle='--', linewidth=1.5)
        axs[1].set_title('Forecast Error: Actual RV − Forecasted EWMA')
        axs[1].set_xlabel('Date')
        axs[1].set_ylabel('Relative Volatility Difference (%)')
        axs[1].grid(True)
        axs[1].set_xlim([self.rolling_rv.index[start_index], self.rolling_rv.index[end_index]])

        plt.tight_layout()
        plt.show()


In [None]:
class ARCHVolatility:
    def __init__(self, df, log_return_series, model_type, nfuture=10, ar_order=1, ma_order=1, sym_order=1, window=1, ema_smooth=21):
        self.df = df
        self.series = log_return_series
        self.trading_days = 252
        self.model_type = model_type
        self.nfuture = nfuture
        self.window = window
        self.rolling_rv = None
        self.arch_rv = None
        self.rolling_rv_sq = None
        self.result = None

        # Fitted parameters
        self.alpha0 = None
        self.alpha_coeffs = None
        self.beta_coeffs = None
        self.gamma_coeffs = None
        self.forecast_df = None

        #
        if model_type == "ARCH":
            self.q, self.p, self.o = ar_order, 0, 0
        elif model_type  == "GARCH":
            self.q, self.p, self.o = ar_order, ma_order, 0
        elif model_type == "TGARCH":
            self.q, self.p, self.o = ar_order, ma_order, sym_order

    def realized_volatility(self):
        # Rolling realized volatility
        self.df['log_return'] = np.log(self.df['Close'] / self.df['Close'].shift(1))
        self.rolling_rv = self.df['log_return'].groupby(self.df.index.date).apply(lambda x: np.sqrt(np.sum((x - x.mean())**2)))
        self.rolling_rv.dropna(inplace=True)
        self.rolling_rv_sq = self.rolling_rv ** 2
        self.fit()

        # Plot
        plt.figure(figsize=(12, 6))
        plt.plot(self.rolling_rv * np.sqrt(self.trading_days), color='black', label='Realized Vol (Annualized)')
        plt.plot(self.arch_rv * np.sqrt(self.trading_days), color='red', label='Fitted Vol (Annualized)')
        plt.title('Realized Volatility vs. Exponential Realized Volatility')
        plt.xlabel('Date')
        plt.ylabel('Annualized Volatility')
        plt.legend()
        plt.grid(True)
        plt.show()

    def fit(self):
        sr = self.series.iloc[: -self.nfuture]
        if self.o > 0:
            model = arch_model(sr, vol='GARCH', p=self.q, o=self.o, q=self.p, dist='normal', rescale=False)
        else:
            model = arch_model(sr, vol=self.model_type, p=self.q, o=self.o, q=self.p, dist='normal', rescale=False)

        result = model.fit(update_freq=0, disp='off')

        # Store parameters
        self.alpha0 = result.params.get('omega', 0)
        self.alpha_coeffs = [result.params.get(f'alpha[{i+1}]', 0) for i in range(self.q)]
        self.beta_coeffs = [result.params.get(f'beta[{i+1}]', 0) for i in range(self.p)] if self.p > 0 else []
        self.gamma_coeffs = [result.params.get(f'gamma[{i+1}]', 0) for i in range(self.o)] if self.o > 0 else []

        fitted_vol = result.conditional_volatility
        self.arch_rv = pd.Series(fitted_vol, index=sr.index, name=f'{self.model_type}({self.q}, {self.p}, {self.o}) Vol')
        self.result = result

    def forecast_volatility(self):
        forecast = self.result.forecast(horizon=self.nfuture, reindex=False)
        forecast_var = forecast.variance.values[-1]
        forecast_vol = forecast_var.flatten()

        forecast_index = self.series.index[-self.nfuture:]

        forecast_series = pd.Series(
            forecast_vol,
            index=forecast_index,
            name=f'Forecasted ARCH({self.q}) Vol'
        )

        actual_future_rv = self.rolling_rv.iloc[-self.nfuture:]

        # Forecast error
        forecast_error = (np.sqrt(forecast_series) - actual_future_rv)/ (actual_future_rv + 1e-8)

        mae = round(np.mean(np.abs(forecast_error)), 4)
        rmse = round(np.sqrt(np.mean(forecast_error**2)), 4)
        print(f"MAE: {mae}, RMSE: {rmse}")

        # Plotting
        fig, axs = plt.subplots(2, 1, figsize=(14, 10), sharex=True)
        start_index = -self.nfuture - 21
        end_index = -1

        # Historical + Forecast
        axs[0].plot(self.rolling_rv * np.sqrt(self.trading_days), color='black', alpha=0.5, label='Realized Vol')
        axs[0].plot(self.arch_rv * np.sqrt(self.trading_days), color='red', label=f'ARCH({self.q}) Vol (Fitted)')
        axs[0].plot(np.sqrt(forecast_series) * np.sqrt(self.trading_days), color='blue', linestyle='--', label='Forecasted Vol')
        axs[0].axvline(x=forecast_series.index[0], color='gray', linestyle='--', linewidth=1.5)
        axs[0].set_title(f'Historical ARCH Volatility and Forecast\nMAE={mae}, RMSE={rmse}')
        axs[0].set_ylabel('Annualized Volatility')
        axs[0].legend()
        axs[0].grid(True)
        axs[0].set_xlim([self.rolling_rv.index[start_index], self.rolling_rv.index[end_index]])
        axs[0].set_ylim(0, np.amax (np.sqrt(forecast_series) * np.sqrt(self.trading_days)) * 1.5)

        # Forecast Error
        axs[1].bar(forecast_error.index, forecast_error.values * 100, color='purple')
        axs[1].axhline(0, color='gray', linestyle='--')
        axs[1].axvline(x=forecast_series.index[0], color='gray', linestyle='--', linewidth=1.5)
        axs[1].set_title('Forecast Error: Realized − Forecasted')
        axs[1].set_xlabel('Date')
        axs[1].set_ylabel('Relative Volatility Difference (%)')
        axs[1].grid(True)
        axs[1].set_xlim([self.rolling_rv.index[start_index], self.rolling_rv.index[end_index]])

        plt.tight_layout()
        plt.show()


    def forecast_volatility_uncertain(self, n_sim=100, alpha=0.05):
        """
        Forecast future volatility using parameter uncertainty only.
        """
        #----------------------------------------------------------------------------------------------------------------------
        # Negative log-likelihood function
        def nll(theta, r, q, p, o):
            T = len(r)
            alpha0 = theta[0]
            alpha = theta[1:1+q]
            beta = theta[1+q:1+q+p]
            gamma = theta[1+q+p:] if o > 0 else []

            sigma2 = np.zeros(T)
            sigma2[:max(q,p,o)] = np.var(r)  # initialization

            for t in range(max(q,p,o), T):
                arch_term = np.sum([alpha[i] * r[t-i-1]**2 for i in range(q)]) if q>0 else 0
                garch_term = np.sum([beta[i] * sigma2[t-i-1] for i in range(p)]) if p>0 else 0
                asym_term = np.sum([gamma[i] * r[t-i-1]**2 * (r[t-i-1] < 0) for i in range(o)]) if o > 0 else 0
                sigma2[t] = alpha0 + arch_term + garch_term + asym_term
                sigma2[t] = max(sigma2[t], 1e-8)

            ll = 0.5 * np.sum(np.log(2*np.pi*sigma2) + r**2 / sigma2)
            return ll

        theta_hat = self.result.params.values
        eps_series = self.series.iloc[: -self.nfuture]
        H = Hessian(lambda th: nll(th, r=eps_series, q=self.q, p=self.p, o=self.o))(theta_hat)

        H = 0.5 * (H + H.T)   # force symmetry
        try:
            cov_theta = np.linalg.inv(H)
        except np.linalg.LinAlgError:
            cov_theta = np.linalg.pinv(H)

        #----------------------------------------------------------------------------------------------------------------------


        #----------------------------------------------------------------------------------------------------------------------
        # --- Helper: draw valid parameters ---
        def draw_valid_params(theta_hat, cov_theta, q, p, o, n_draws, oversample=100):
            d = len(theta_hat)
            valid = []

            while len(valid) < n_draws:
                candidates = np.random.multivariate_normal(theta_hat, cov_theta, size=oversample)

                for theta_sim in candidates:
                    omega = theta_sim[0]
                    alpha = theta_sim[1:1+q]
                    beta  = theta_sim[1+q:1+q+p] if p > 0 else np.array([])
                    gamma = theta_sim[1+q+p:1+q+p+o] if o > 0 else np.array([])

                    # constraints
                    if omega <= 0:
                        continue
                    if np.any(alpha < 0) or np.any(beta < 0) or np.any(gamma < 0):
                        continue
                    if alpha.sum() + beta.sum() + 0.5 * gamma.sum() >= 1:
                        continue

                    # unconditional variance filter
                    try:
                        sigma2_inf = omega / (1 - alpha.sum() - beta.sum() - 0.5 * gamma.sum())
                    except ZeroDivisionError:
                        continue

                    sigma2_inf_hat = self.alpha0 / (
                        1 - np.sum(self.alpha_coeffs) - np.sum(self.beta_coeffs) - 0.5 * np.sum(self.gamma_coeffs)
                    )
                    if sigma2_inf > 3 * sigma2_inf_hat:
                        continue

                    valid.append(theta_sim)

                    if len(valid) >= n_draws:
                        break

            # If we have fewer than n_draws, resample with replacement
            valid = np.array(valid)
            if len(valid) < n_draws:
                idx = np.random.choice(len(valid), size=n_draws, replace=True)
                valid = valid[idx]

            return valid[:n_draws].reshape(n_draws, d)

        valid_draws = draw_valid_params(theta_hat, cov_theta, self.q, self.p, self.o, n_sim)
        #----------------------------------------------------------------------------------------------------------------------

        # Use last historical residuals for ARCH terms
        eps_hist = self.series.iloc[: -self.nfuture].values
        sigma_hist = self.arch_rv.values

        forecasts = []
        for sim in range(n_sim):
            # Sample new parameters from multivariate normal
            theta_sim = valid_draws[sim]
            idx = 0
            alpha0_sim = theta_sim[idx]; idx += 1
            alpha_coeffs_sim = theta_sim[idx:idx+self.q]; idx += self.q
            beta_coeffs_sim = theta_sim[idx:idx+self.p] if self.p > 0 else np.array([]); idx += self.p
            gamma_coeffs_sim = theta_sim[idx:idx+self.o] if self.o > 0 else np.array([])

            # Reject non-stationary draws
            if alpha_coeffs_sim.sum() + beta_coeffs_sim.sum() + 0.5 * gamma_coeffs_sim.sum() >= 1:
                continue

            # Initialize sigma2 array with last historical variances (for GARCH term)
            sigma2 = list(sigma_hist[-self.p:] if self.p > 0 else [sigma_hist[-1]])
            # Initialize past returns (for ARCH/TGARCH term)
            eps_sim = list(eps_hist[-self.q:] if self.q > 0 else [0])

            sim_sigma2 = []

            # Simulate forward nfuture steps
            for t in range(self.nfuture):
                # ARCH term
                arch_term = sum([alpha_coeffs_sim[j] * eps_sim[-j-1]**2 for j in range(len(alpha_coeffs_sim))]) if len(alpha_coeffs_sim) > 0 else 0

                # GARCH term
                garch_term = sum([beta_coeffs_sim[j] * sigma2[-j-1] for j in range(len(beta_coeffs_sim))]) if len(beta_coeffs_sim) > 0 else 0

                # Asymmetric TGARCH term
                asym_term = 0
                if len(gamma_coeffs_sim) > 0:
                    for j in range(len(gamma_coeffs_sim)):
                        val = -eps_sim[-j-1] if eps_sim[-j-1] < 0 else 0
                        asym_term += gamma_coeffs_sim[j] * val**2

                # New variance
                new_sigma2 = alpha0_sim + arch_term + garch_term + asym_term
                new_sigma2 = max(new_sigma2, 1e-8)
                sigma2.append(new_sigma2)

                # Simulate new return from N(0, sigma2)
                new_eps = np.random.normal(0, np.sqrt(new_sigma2))
                eps_sim.append(new_eps)

                sim_sigma2.append(new_sigma2)

            forecasts.append(sim_sigma2)

        forecasts = np.array(forecasts)
        forecast_mean = forecasts.mean(axis=0)
        lower = np.quantile(forecasts, alpha / 2, axis=0)
        upper = np.quantile(forecasts, (1 - alpha / 2), axis=0)
        forecast_index = self.series.index[-self.nfuture:]

        self.forecast_df = pd.DataFrame({
            'Forecast': forecast_mean,
            'Lower': lower,
            'Upper': upper
        }, index=forecast_index)

        # Plot
        plt.figure(figsize=(12, 6))
        plt.plot(self.rolling_rv * np.sqrt(self.trading_days), color='black', label='Realized Vol')
        plt.plot(self.arch_rv * np.sqrt(self.trading_days), label=f'{self.model_type} Fitted', color='red')
        plt.plot(np.sqrt(self.forecast_df['Forecast']) * np.sqrt(self.trading_days), '--', label='Forecast', color='blue')
        plt.fill_between(
            self.forecast_df.index,
            np.sqrt(self.forecast_df['Lower']) * np.sqrt(self.trading_days),
            np.sqrt(self.forecast_df['Upper']) * np.sqrt(self.trading_days),
            color='blue', alpha=0.2, label=f'{int((1-alpha)*100)}% CI'
        )
        plt.title(f'{self.model_type} Forecast Volatility (Parameter Uncertainty Only)')
        plt.xlabel('Date')
        plt.ylabel('Annualized Volatility')
        plt.legend()
        plt.grid(True)
        plt.show()
