In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from Parameter.VaR_limit import var_limits
from Parameter.models import model
from Parameter.color_map import color_map
from Parameter.weights import weights as portfolio_weights
#
from tqdm import tqdm
from arch import arch_model
from scipy.stats import norm
import warnings
warnings.filterwarnings('ignore')
import matplotlib.colors as mcolors

In [None]:
class FinancialMetrics:
    def __init__(self, data):
        self.data = data
        self.returns = None

    def calculate_returns(self):
        self.returns = np.log(self.data/self.data.shift(1).dropna()
        return self.returns

    def calculate_portfolio_volatility(self, weights, method='std', models=none, rolling_window=60, lambda_value=0.94):
        if self.returns is None:
            raise ValueError("Returns have not been calculated")
        vloatilities = []

        if method == 'std':
            for i in range(rolling_window, len(self.returns)):
                window_data = self.returns.iloc[i - rolling_window:i]
                cov_matrix = window_data.cov()
                vol = np.sqrt(np.dot(weights.T, np.dot(cov_matrix.values, weights)))
                volatilities.append(vol)

        elif method == 'ewma':
            for i in range(rolling_window, len(self.returns)):
                window_data = self.returns.iloc[i - rolling_window:i]
                weights_ewma = lambda_value ** np.arrange(window_data.shape[0])[::-1]
                weights_ewma /= weights_ewma.sum()

                weighted_mean1 = np.sum(window_data.iloc[:, 0] ** 2 * weights_ewma) / (np.sum(weights_ewma))
                weighted_mean2 = np.sum(window_data.iloc[:, 1] ** 2 * weights_ewma) / (np.sum(weights_ewma))

                correlation = window_data.iloc[:, 0].corr(window_data.iloc[:, 1])
                portfolio_variance = (weights[0] ** 2 * weighted_mean1 + weights[1] ** 2 * weighted_mean2 +
                                      2 * weights[0] * weights[1] * np.sqrt(weighted_mean1 * weighted_mean2) * correlation)

                volatilities.append(np.sqrt(portfolio_variance))

        elif method == 'garch':
            if models is None:
                raise ValueError("Models must be provided with GARCH method.")
            for model_spec in tqdm(models, desc='Models'):
                vol = model_spec['vol']
                dist = model_spec['dist']
                o = model_spec['o']
                print(vol, dist, f'o={o}')

                portfolio_vol = []

            for i in tqdm(range(rolling_window, len(self.returns)), desc=f'{vol} {dist} o={o}', leave=FALSE):
                window_returns1 = self.returns[self.returns.columns[0]][i - rolling_window:i]
                window_returns2 = self.returns[self.returns.columns[1]][i - rolling_window:i]

                model1 = arch_model(window_returns1, vol=vol, p=1, o=o, q=1, dist=dist)
                fit1 = model1.fit(disp='off')
                cond_vol1= fit1.conditional_volatility.iloc[-1] ** 2

                model2 = arch_model(window_returns2, vol=vol, p=1, o=o, q=1, dist=dist)
                fit1 = model2.fit(disp='off')
                cond_vol2= fit2.conditional_volatility.iloc[-1] ** 2

                correlation = window_returns1.corr(window_returns2)
                portfolio_variance = (weights[0] ** 2 * cond_vol1 + weights[1] ** 2 * cond_vol2 +
                                      2 * weights[0] * weights[1] * (np.sqrt(cond_vol1 * cond_vol2) * correlation))
                portfolio_vol.append(np.sqrt(portfolio_variance))

            volatilities.append(pd.series(portfolio_vol, index=self.return.index[rolling_window:]))

        portfolio_volatility_garch_df = pd.DataFrame({f"{model['vol']} {model['dist']} o={model['o']}":
                                                    vol for model, vol in zip(models, volatilities)}, index=self.returns.index[rolling_window:])

        return portfolio_volatility_garch_df

    else:
        raise ValueError(f"Method {method} not recognized.")

    dates = self.returns.index[rolling_window:]
    return pd.Series(volatilities, index=dates)


class VaRCalculator: 
    def __init__(self, data, rolling_window=60, confidence_level = 0.99):
        self.data = data
        self.rolling_window = rolling_window
        self.confidence_level = confidence_level
        self.weights = portfolio_weights
        self.initial_investment = (self.weights[0] * data.iloc[0, 0]) + ((self.weights[1] * data.iloc[0, 1]))
        self.portfolio_prices = (data * self_weights).sum(axis=1)
        self.portfolio.pnl = self.portfolio_prices.diff().dropna()

    def parametric_var(self, vol_data):
        z_score = norm.ppf(1 - self.confidence_level)
        var_results = {}

        for col in vol_data.columns:
            portfolio_std = vol_data[col]
            var = abs(z_score * portfolio_std * self.portfolio_prices.shift(1))
            var_results[col] = var
        return pd.DataFrame(var_results)

    def historical_var(self):
        var_series = []
        for i in range (self.rolling_window, len(self.portfolio_pnl)):
            window_data = self.portfolio_pnl.iloc[i - self.rolling_window:i]
            var = abs(np.percentile(window.data, (1 - self.confidence_level) *100))
            var_series.append(var)

        return pd.DataFrame(var.series, index=pd.to_datetime(self.portfolio_pnl.index[self.rolling_window:]), 
                            columns=['Historical_VaR'])

    def Monte_Carlo_VaR(self, num_simulations=1000):
        daily_returns = self.data.pct_change().dropna()
        data = self.data
        var_series = []

        for i in range(self.rolling_window, len(daily_returns)):
            today_investment = sum(data.iloc[i] * 50)
            today_weights = data.iloc[i] * 50 / today_investment
            window_data = daily_returns.iloc[i - self.rolling_window:i]
            window_mean = window_data.mean()
            window_cov = window_data.cov()

            portfolio_mean = np.dot(today_weights, window_mean)
            portfolio_std = np.sqrt(np.dot(today_weights.T, np.dot(window_cov, today_weights)))

            simulations = np.random.normal(portfolio_mean, portfolio_std, (num_simulations, 1))
            simulated_end_values = today_investment * np.exp(simulations)
            simulated_pnl = simulated_end_values - today_investment

            var = np.percentile(simulated_pnl, (1 - self.confidence_level) * 100)
            var_series.append(abs(var))

        return pd.DataFrame(var_series, index=pd.to_datetime( daily_returns.index[self.rolling_window:]), columns=['Monte_Carlo_VaR'])

    def backtest_var(self, var_df):
        exceedances = (var_df.sub(self.portfolio_pnl, axis=0) < 0).astype(int)
        exceedances_df = exceedances[-252:]
        print(exceedances_df.sum())
        return exceedances_df


class VaRStatisticsCalculator:
    def __init__(self, data_path, model, rolling_window=60, confidence_level=0.99):
        self.data = pd.read_excel(data_path, sheet_name='Data', index_col='Date')
        self.rolling_window = rolling_window
        self.confidence_level = confidence_level
        self.weights = np.array([50 * self.data.iloc[0]['AAPL'],
                                50 * self.data.iloc[0]['MSFT']])
        self.investment = sum(self.weights)
        self.weights /= self.weights.sum()
        self.metrics = FinancialMetrics(self.data)
        self.returns = self.metrics.calculate_returns()
        self.model = model
        self.var_calcualtor = VaRCalculator(self.data, rolling_window, confidence_level)
        self.var_limit = var_limits
        self.color_map = color_map

    def check_data(self):
        head, tail = self.data.head(), self.data.tail()
        return head, tail

    def calculate_var(self):
        portfolio_volatility_std = (self.metrics.calculate_portfolio_volatility(self.weights, method='std'))
        portfolio_volatility_ewma = (self.metrics.calculate_portfolio_volatility(self.weights, method='ewma'))
        
                                                                                