### Imported Packages

In [1]:
import pandas as pd
import matplotlib.pyplot as plt
from arch import arch_model #GARCH Models
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
import numpy as np
import math
import scipy.stats
from statistics import NormalDist
from scipy.stats import t
from scipy.stats import f
from datetime import datetime
from scipy.stats import shapiro #Shapiro Test
from scipy import stats #t-test
import statsmodels.api as sm #Ljung-Box Test
from scipy.stats import chi2
import statistics as stat

### Data

I will only be using PH data for the initial test.

In [2]:
start_date = '2006-01-01' #yyyy-mm-dd
end_date = '2021-01-01'


#Philippines
PH = pd.read_csv('https://raw.githubusercontent.com/raphaelyt/thesis199.11/main/data/2006-2021/PSEi.csv')
PH['Date'] = pd.to_datetime(PH['Date'])
PH = PH.rename(columns={'Price': 'Close'})
PH = PH.replace(',','', regex=True)
PH['Close'] = PH['Close'].astype(float, errors = 'raise')
mask = (PH['Date'] >= start_date) & (PH['Date'] <= end_date)
PH = PH.loc[mask]
PH = PH.set_index('Date')
PH = PH.sort_index(axis=0, ascending = False)

### Logarithmic Returns

In [3]:
def get_returns(df,d):
    '''
    The function obtains the log returns of the asset shifted d days
    
    PARAMETERS
    ----------
    df : pandas.DataFrame
        The data frame contains data of a chosen stock index 
        (Stock index must be arrange in DESCENDING ORDER by DATE)
    d : int
        The dth day being forecast
        (Assumed to be 1 for most cases)
        
    RETURNS
    -------
    df : pandas.DataFrame
        The data frame returns an updated data frame containing the
        'Returns' column
    '''
    df['Previous'] = df['Close'].shift(-d)
    df['Returns'] = np.log(df['Close']/df['Previous'])*100
    return df.dropna()

PH = get_returns(PH, 1)

# Delta-Normal Approach

### Estimating VaR

In [4]:
def get_VaR_DN(df, alpha):
    '''
    The function returns the d-day VaR of the asset
    
    PARAMETERS
    ----------
    df : pandas.DataFrame
        The data frame contains data of a chosen stock index 
        (Stock index must be arrange in DESCENDING ORDER by DATE)
    alpha : float 
        The level of significance of the VaR
        (Assumes a value in between 0 and 1)
        
    RETURNS
    -------
    VaR : float
        The float returned is the VaR for the d-day with a 
        (1-alpha)% VaR
    '''
    sigma = np.std(df['Returns'])
    VaR = sigma*NormalDist().inv_cdf(1-alpha)
    return round(VaR, 5)

alpha = 0.05
print(f'PH: {get_VaR_DN(PH, alpha)}')

alpha = 0.01
print(f'PH: {get_VaR_DN(PH, alpha)}')

alpha = 0.0001
print(f'PH: {get_VaR_DN(PH, alpha)}')

PH: 2.18134
PH: 3.08511
PH: 4.93201


### Fixed Window

In [5]:
prediction_size = 365
alpha = 0.05
name = 'FW-DN-5%'

In [6]:
def fixed_window(df, test_size, alpha):
    '''
    The function returns the (1-alpha)% d-day VaR of the asset staring at
    time t = test size to the present using a fixed time window of size test_size
    
    PARAMETERS
    ----------
    df : pandas.DataFrame
        The data frame contains data of a chosen stock index 
        (Stock index must be arrange in DESCENDING ORDER by DATE)
    test_size : int
        The test size is used to create a time window to estimate VaR
        using smaller time periods
        (The test size cannot be greater than the total number of entries
        of the data frame)
    alpha : float 
        The level of significance of the VaR
        (Assumes a value in between 0 and 1)

    RETURNS
    -------
    VaR_df : pandas.DataFrame
        The data frame contains the estimated (1-alpha)% d-day VaR from 
        time t = test size to current using the fixed time window 
    '''
    VaR_lst = []
    date_df = df[:test_size].reset_index()
    date_df = date_df.filter(['Date'])
    for i in range(test_size):
        temp_df = df[test_size-i:-(i+1)]
        temp_VaR = get_VaR_DN(temp_df, alpha)
        VaR_lst.append(temp_VaR)
    VaR_lst = VaR_lst[::-1]
    VaR_df = pd.DataFrame(VaR_lst, columns = ['Forecasted VaR'])
    VaR_df = pd.merge(date_df, VaR_df, left_index = True, right_index = True)
    VaR_df = VaR_df.set_index('Date')
    VaR_df.index = pd.to_datetime(VaR_df.index, utc = None)
    return VaR_df

#Standard 365 test subjects


PH_fw_dn = fixed_window(PH, prediction_size,alpha)

### Rolling Window

In [7]:
prediction_size = 365
alpha = 0.05
name = 'RW-DN-5%'

In [8]:
def rolling_window(df, test_size, alpha):
    '''
    The function returns the (1-alpha)% d-day VaR of the asset staring at
    time t = test size to the present using a smaller time window to begin
    and gradually increasing the size
    
    PARAMETERS
    ----------
    df : pandas.DataFrame
        The data frame contains data of a chosen stock index 
        (Stock index must be arrange in DESCENDING ORDER by DATE)
    test_size : int
        The test size is used to create a time window to estimate VaR
        using smaller time periods
        (The test size cannot be greater than the total number of entries
        of the data frame)
    alpha : float 
        The level of significance of the VaR
        (Assumes a value in between 0 and 1)

    RETURNS
    -------
    VaR_df : pandas.DataFrame
        The data frame contains the estimated (1-alpha)% d-day VaR from 
        time t = test size to current using the rolling time window 
    '''
    VaR_lst = []
    date_df = df[:test_size].reset_index()
    date_df = date_df.filter(['Date'])
    for i in range(test_size):
        temp_df = df[test_size-i:]
        temp_VaR = get_VaR_DN(temp_df, alpha)
        VaR_lst.append(temp_VaR)
    VaR_lst = VaR_lst[::-1]
    VaR_df = pd.DataFrame(VaR_lst, columns = ['Forecasted VaR'])
    VaR_df = pd.merge(date_df, VaR_df, left_index = True, right_index = True)
    VaR_df = VaR_df.set_index('Date')
    return VaR_df

#Standard
PH_rw_dn = rolling_window(PH, prediction_size, alpha)

# GARCH Model Approach

### Determining the Order of the GARCH Model (Standard is GARCH(1, 1))

In [9]:
alpha = 0.05
decay = 0.992
vol = 'GARCH'
p = 1
q = 1

In [10]:
def fit_params(df, vol,  p = None, q = None):
    returns = df['Returns'].dropna(axis = 0)
    model = arch_model(returns, vol = vol, p = p, q = q, rescale = False)
    fit = model.fit(disp = 'off')
    params = fit.params
    resid = fit.resid
    condv = fit.conditional_volatility
    return fit, params, resid, condv

PH_fit_g, PH_params_g, PH_resid_g, PH_condv_g = fit_params(PH, vol, p, q)

### Verifying Order Determination (Ljung-Box Test)

In [11]:
#Philippines
print(sm.stats.acorr_ljungbox((PH_resid_g**2).dropna(), lags=[20], return_df=True))

        lb_stat      lb_pvalue
20  1242.925736  4.891077e-251


### Estimating VaR

In [12]:
def get_VaR_GARCH(df, alpha, sigma, dist):
    if dist == 'normal':
        VaR = sigma*NormalDist().inv_cdf(1-alpha) #t.ppf(1-alpha, len(df['Returns'])-1 )
    elif dist == 't':
        VaR = sigma*t.ppf(1-alpha, len(df['Returns'])-1 )
    return VaR

### Fixed Window

In [13]:
test_size = 365
alpha = 0.05
vol = 'GARCH'
d = 1
dist = 'normal'
p = 1
q = 1
name = 'FW-G-5%'

In [14]:
def fixed_window(df, test_size, d, alpha, p, q, vol, dist):
    df = df[::-1]
    returns = df['Returns'].dropna(axis = 0)
    sigma_lst = []
    date_df = df.iloc[:test_size]
    date_df = date_df.reset_index()
    date_df = date_df.filter(['Date'])
    for i in range(test_size):
        temp_df = returns[i:-(test_size-i)]
        model = arch_model(temp_df, p=p, q=q, vol = vol, rescale=None)
        fit = model.fit(disp='off')
        pred = fit.forecast(horizon=d, reindex = False)
        sigma_lst.append(np.sqrt(pred.variance.values[-1,:][0]))
    sigma_df = pd.Series(sigma_lst, index=returns.index[-test_size:])
    VaR_df = get_VaR_GARCH(df, alpha, sigma_df, dist)
    return sigma_df, VaR_df


PH_sigma_df_g, PH_fw_df_g = fixed_window(PH, test_size,d, alpha,p,q, vol, dist)

### Rolling Window

In [15]:
test_size = 365
alpha = 0.05
d = 1
p = 1
q = 1
dist = 'normal'
vol = 'GARCH'
name = 'RW-G-5%'

In [16]:
def rolling_window(df, test_size, d, alpha, p, q, vol, dist):
    df = df[::-1]
    returns = df['Returns'].dropna(axis = 0)
    sigma_lst = []
    date_df = df.iloc[:test_size]
    date_df = date_df.reset_index()
    date_df = date_df.filter(['Date'])
    for i in range(test_size):
        temp_df = returns[:-(test_size-i)]
        model = arch_model(temp_df, p=p, q=q, vol = vol , rescale=None)
        fit = model.fit(disp='off')
        pred = fit.forecast(horizon=d, reindex = False)
        sigma_lst.append(np.sqrt(pred.variance.values[-1,:][0]))
    sigma_df = pd.Series(sigma_lst, index=returns.index[-test_size:])
    VaR_df = get_VaR_GARCH(df, alpha, sigma_df, dist)
    return sigma_df, VaR_df

PH_sigma_df_g, PH_rw_df_g = rolling_window(PH, test_size,d, alpha,p,q, vol, dist)

# Diebold-Mariano Test
1. https://www.real-statistics.com/time-series-analysis/forecasting-accuracy/diebold-mariano-test/

- Suppose that we have two forecasts f1, …, fn and g1, …, gn for a time series if y1, …, yn and we want to see which forecast is better, in the sense of it having better predictive accuracy. The obvious approach is to select the forecast that has the smaller error measurement based on one of the error measurements described in Forecasting Errors. But we need to go one step further and determine whether this difference is significant (for predictive purposes) or simply due to the specific choice of data values in the sample.
    
- The null hypothesis is that the two forecasts have the same accuracy. The alternative hypothesis is that the two forecasts have different levels of accuracy

- Hypotheiss testing under a two tailed distribution. We use a standard normal distribution will add this to the preliminaries


2. https://github.com/johntwk/Diebold-Mariano-Test (Reference for the code, not sure how to import the package from github kasi)

In [17]:
# Author   : John Tsang
# Date     : December 7th, 2017
# Purpose  : Implement the Diebold-Mariano Test (DM test) to compare 
#            forecast accuracy
# Input    : 1) actual_lst: the list of actual values
#            2) pred1_lst : the first list of predicted values
#            3) pred2_lst : the second list of predicted values
#            4) h         : the number of stpes ahead
#            5) crit      : a string specifying the criterion 
#                             i)  MSE : the mean squared error
#                            ii)  MAD : the mean absolute deviation
#                           iii) MAPE : the mean absolute percentage error
#                            iv) poly : use power function to weigh the errors
#            6) poly      : the power for crit power 
#                           (it is only meaningful when crit is "poly")
# Condition: 1) length of actual_lst, pred1_lst and pred2_lst is equal
#            2) h must be an integer and it must be greater than 0 and less than 
#               the length of actual_lst.
#            3) crit must take the 4 values specified in Input
#            4) Each value of actual_lst, pred1_lst and pred2_lst must
#               be numerical values. Missing values will not be accepted.
#            5) power must be a numerical value.
# Return   : a named-tuple of 2 elements
#            1) p_value : the p-value of the DM test
#            2) DM      : the test statistics of the DM test
##########################################################
# References:
#
# Harvey, D., Leybourne, S., & Newbold, P. (1997). Testing the equality of 
#   prediction mean squared errors. International Journal of forecasting, 
#   13(2), 281-291.
#
# Diebold, F. X. and Mariano, R. S. (1995), Comparing predictive accuracy, 
#   Journal of business & economic statistics 13(3), 253-264.
#
##########################################################
def dm_test(actual_lst, pred1_lst, pred2_lst, h = 1, crit="MSE", power = 2):
    # Routine for checking errors
    def error_check():
        rt = 0
        msg = ""
        # Check if h is an integer
        if (not isinstance(h, int)):
            rt = -1
            msg = "The type of the number of steps ahead (h) is not an integer."
            return (rt,msg)
        # Check the range of h
        if (h < 1):
            rt = -1
            msg = "The number of steps ahead (h) is not large enough."
            return (rt,msg)
        len_act = len(actual_lst)
        len_p1  = len(pred1_lst)
        len_p2  = len(pred2_lst)
        # Check if lengths of actual values and predicted values are equal
        if (len_act != len_p1 or len_p1 != len_p2 or len_act != len_p2):
            rt = -1
            msg = "Lengths of actual_lst, pred1_lst and pred2_lst do not match."
            return (rt,msg)
        # Check range of h
        if (h >= len_act):
            rt = -1
            msg = "The number of steps ahead is too large."
            return (rt,msg)
        # Check if criterion supported
        if (crit != "MSE" and crit != "MAPE" and crit != "MAD" and crit != "poly"):
            rt = -1
            msg = "The criterion is not supported."
            return (rt,msg)  
        # Check if every value of the input lists are numerical values
        from re import compile as re_compile
        comp = re_compile("^\d+?\.\d+?$")  
        def compiled_regex(s):
            """ Returns True is string is a number. """
            if comp.match(s) is None:
                return s.isdigit()
            return True
        for actual, pred1, pred2 in zip(actual_lst, pred1_lst, pred2_lst):
            is_actual_ok = compiled_regex(str(abs(actual)))
            is_pred1_ok = compiled_regex(str(abs(pred1)))
            is_pred2_ok = compiled_regex(str(abs(pred2)))
            if (not (is_actual_ok and is_pred1_ok and is_pred2_ok)):  
                msg = "An element in the actual_lst, pred1_lst or pred2_lst is not numeric."
                rt = -1
                return (rt,msg)
        return (rt,msg)
    
    # Error check
    error_code = error_check()
    # Raise error if cannot pass error check
    if (error_code[0] == -1):
        raise SyntaxError(error_code[1])
        return
    # Import libraries
    from scipy.stats import t
    import collections
    import pandas as pd
    import numpy as np
    
    # Initialise lists
    e1_lst = []
    e2_lst = []
    d_lst  = []
    
    # convert every value of the lists into real values
    actual_lst = pd.Series(actual_lst).apply(lambda x: float(x)).tolist()
    pred1_lst = pd.Series(pred1_lst).apply(lambda x: float(x)).tolist()
    pred2_lst = pd.Series(pred2_lst).apply(lambda x: float(x)).tolist()
    
    # Length of lists (as real numbers)
    T = float(len(actual_lst))
    
    # construct d according to crit
    if (crit == "MSE"):
        for actual,p1,p2 in zip(actual_lst,pred1_lst,pred2_lst):
            e1_lst.append((actual - p1)**2)
            e2_lst.append((actual - p2)**2)
        for e1, e2 in zip(e1_lst, e2_lst):
            d_lst.append(e1 - e2)
    elif (crit == "MAD"):
        for actual,p1,p2 in zip(actual_lst,pred1_lst,pred2_lst):
            e1_lst.append(abs(actual - p1))
            e2_lst.append(abs(actual - p2))
        for e1, e2 in zip(e1_lst, e2_lst):
            d_lst.append(e1 - e2)
    elif (crit == "MAPE"):
        for actual,p1,p2 in zip(actual_lst,pred1_lst,pred2_lst):
            e1_lst.append(abs((actual - p1)/actual))
            e2_lst.append(abs((actual - p2)/actual))
        for e1, e2 in zip(e1_lst, e2_lst):
            d_lst.append(e1 - e2)
    elif (crit == "poly"):
        for actual,p1,p2 in zip(actual_lst,pred1_lst,pred2_lst):
            e1_lst.append(((actual - p1))**(power))
            e2_lst.append(((actual - p2))**(power))
        for e1, e2 in zip(e1_lst, e2_lst):
            d_lst.append(e1 - e2)    
    
    # Mean of d        
    mean_d = pd.Series(d_lst).mean()
    
    # Find autocovariance and construct DM test statistics
    def autocovariance(Xi, N, k, Xs):
        autoCov = 0
        T = float(N)
        for i in np.arange(0, N-k):
              autoCov += ((Xi[i+k])-Xs)*(Xi[i]-Xs)
        return (1/(T))*autoCov
    gamma = []
    for lag in range(0,h):
        gamma.append(autocovariance(d_lst,len(d_lst),lag,mean_d)) # 0, 1, 2
    V_d = (gamma[0] + 2*sum(gamma[1:]))/T
    DM_stat=V_d**(-0.5)*mean_d
    harvey_adj=((T+1-2*h+h*(h-1)/T)/T)**(0.5)
    DM_stat = harvey_adj*DM_stat
    # Find p-value
    p_value = 2*t.cdf(-abs(DM_stat), df = T - 1)
    
    # Construct named tuple for return
    dm_return = collections.namedtuple('dm_return', 'DM p_value')
    
    rt = dm_return(DM = DM_stat, p_value = p_value)
    
    return rt


# Testing

In [26]:
def DM_test(df, model_1, model_2, test_size):
    returns = np.array(df['Returns'])[:test_size]
    arr_model_1 = model_1['Forecasted VaR'].to_numpy()
    arr_model_2 = model_2.to_numpy()
    dm = dm_test(returns, arr_model_1, arr_model_2, h = 1, crit = 'MSE')
    return dm

In [27]:
print(DM_test(PH, PH_fw_dn, PH_fw_df_g, 365))
print(DM_test(PH, PH_rw_dn, PH_rw_df_g, 365))


dm_return(DM=-4.288983204452318, p_value=2.3025259640654284e-05)
dm_return(DM=-4.345339701871624, p_value=1.806265876876205e-05)
