## Programming Assignment 3
Matriculation Nr: 01/1152810

Other group members: 1151277, 919755, 1151248

In [1]:
import numpy as np
import pandas as pd
import scipy.stats as st

from statsmodels.tsa.api import VAR

Read Appendix D in Lutkepohl (2005). Write a function that implements a residual bootstrap for a VAR(p) with intercept and returns the bootstrap standard errors of the VAR coefficients in B.†

The function should take
- a T + p × K matrix of observations on yt,
- the lag length p,
- and the number of bootstrap replications R as input.

In [2]:
def Z_matrix(y: np.array, p: int, c: int):
    """Calculate the Z-matrix for a given input

    Args:
        y (np.array): input with all the data of shape (T + p) × K
        p (int): lags
        c (int): intercept yes=1, no=0

    Returns:
        (np.array): Z-matrix for given input
    """

    y = y.T

    #determine matrix dimensions:
    T = y.shape[1] - p
    K = y.shape[0]

    # build Z-matrix
    if c==1:
        Z = np.ones((1, T+p), dtype=float)

    # 1b stacked lagged data
    for i in range(p):
        #add i columns of leading zeros (EDIT: empty, comp cost lower) to ktpmat
        zeros = np.zeros((K, i), dtype=float)
        zerostack = np.hstack((zeros, y[:,:(T+p-i)]))
        # vertically stack this to Z
        Z = np.vstack((Z, zerostack))

    # cutting of leading p columns and retrieving Z
    Z = Z[:, p-1:-1]

    return Z

In [3]:
def B_matrix(y: np.array, p: int, c: int):
    """Calculates the B matrix with the estimated coefficients

    Args:
        y (np.array): input with all the data of shape (T + p) × K
        p (int): lags
        c (int): intercept yes=1, no=0

    Returns:
        _type_: B = matrix with estimated coefficients; Z=Z-matrix; resids=residual-matrix, sigma_u=covariance matrix
    """

    # get Z-matrix from function above
    Z = Z_matrix(y, p, c)

    y = y.T # transpose y
    y = y[:,p:] # first p observations are lost as we need prior lags for estimation
    K = y.shape[0] # number of variables
    T = y.shape[1] # number of observations

    # calculate B
    B = y @ Z.T @ np.linalg.inv((Z@Z.T))

    resids = y-(B@Z)

    # calculate sigma_u (covariance matrix)
    sigma_u = (1/(T-K*p-1))*resids@resids.T

    return B, Z, resids, sigma_u

In [75]:
def resid_bootstrap(Tpkmat, p, R):
    '''
    :param Tpkmat: a T + p × K matrix of observations on yt,
    :param p: the lag length p,
    :param R: and the number of bootstrap replications R as input.
    :return: returns the bootstrap standard errors of the VAR coefficients in B
    '''

    y = Tpkmat.T # transpose input matrix to K x (T+p)
    T = y.shape[1] - p # get T (number of observations)
    K = y.shape[0]

    '''
    Description from Lütkepohl, appendix D, page 709
    (1) The parameters of the model under consideration are estimated. Let uthat, t = 1, . . . , T, be the estimation residuals.
    '''

    B, Z, resids, sigma_u = B_matrix(Tpkmat, p, c=1)

    '''
    (2) Centered residuals are computed (usual average).

    Bootstrap residuals u∗1, . . . , u∗T are then obtained by randomly drawing with replacement from the centered residuals.
    '''
    uthatbar = np.sum(resids, axis=1)/T
    uthatcenterded = resids - uthatbar.T.reshape(K, 1)

    '''
    (3) Bootstrap time series are computed recursively [...]where the same initial values may be used for each generated series, (y∗ −p+1, . . . , y∗0) = (y−p+1, . . . , y0).
    '''
    # I assume this part is wrong. We probably have to implement methods from appendix D1 (page 707) here instead of just adding the draws to our yhat

    yhat = B@Z

    # assuming we are sampling as many t as before (i.e. 189)? We will nevertheless not have any presample values anymore due to construction of Z. (i.e. new t will be t_old-p (because presample values are assumed in our B_matrix function)).
    B_bs_list = np.empty((K, K*p+1))

    for i in range(R):
        draws = np.random.randint(0, T, T)
        yhatbs = yhat + uthatcenterded[:,draws]

        '''
        (4) Based on the bootstrap time series, the parameters A1, . . . ,Ap are reestimated.
        '''
        B_bs, _, resids, sigma_u = B_matrix(yhatbs.T, p, c=1)
        # stacking samples along axis 2
        B_bs_list = np.dstack((B_bs_list, B_bs))

        #re-calculating yhat with new estimates of B based on sample
        yhat = B_bs@Z

    '''
    From assignment: †Use the standard deviation over the R bootstrap VAR estimates as a bootstrap standard error.
    '''

    Bbar_bs_list = np.mean(B_bs_list, axis = 2)
    deviation = B_bs_list - Bbar_bs_list[:, :, None]
    deviation_squared = deviation**2
    sd = np.sqrt(np.sum(deviation_squared, axis=2)/(R-1))
    se = sd/np.sqrt(R)

    return se

In [81]:
# read in data
awm = pd.read_csv("awm19up18.csv")
awm.rename(columns={awm.columns[0]: "Q" }, inplace = True)

of_interest = ["Q", "YER", "ITR", "LTN", "STN"]
awm = awm[awm.columns.intersection(of_interest)]
awm.set_index('Q', inplace=True)

# calculate logs and first differences and assign names accordingly
awm["YER_log"] = np.log(awm['YER'])
awm["ITR_log"] = np.log(awm['ITR'])

awm["d_lgdp"] = awm["YER_log"].diff()
awm["d_invest"] = awm["ITR_log"].diff()

awm["d_lgdp"] = awm["d_lgdp"] * 400
awm["d_invest"] = awm["d_invest"] * 400

awm["d_R"] = awm["LTN"].diff()
awm["d_r"] = awm["STN"].diff()

awm.dropna(inplace=True)

# get the input for our function
y_t = np.array(awm[["d_lgdp", "d_invest", "d_R", "d_r"]])

In [82]:
B, Z, resids, sigma_u = B_matrix(y_t, p=2, c=1)

In [83]:
se = resid_bootstrap(y_t, 2, R=499)

In [87]:
## does our B_matrix function return SDs?
print(B, se)

[[ 8.19991990e-01  4.41893745e-01 -3.09516135e-02  3.53840313e-01
   2.80812932e-01  1.40348551e-01  2.89658568e-02 -6.61782716e-01
  -6.99814626e-01]
 [-1.00607134e+00  1.20312057e+00 -2.28829643e-01 -1.26631209e+00
   6.99606104e-01 -5.17309344e-02  2.41250166e-01 -1.21968589e+00
  -1.57970603e+00]
 [-6.57473325e-02  1.20603459e-02  1.78784492e-04  5.15582681e-01
   3.78561218e-02  1.32232293e-02 -5.88137520e-03 -2.20154123e-01
   7.06111408e-02]
 [-2.43278598e-01  7.28410851e-02 -7.47895590e-03  4.22382718e-01
   2.78528354e-01  3.87124972e-02 -2.40842053e-03 -2.89226900e-01
  -7.25841414e-02]] [[0.83667181 2.72854121 3.01719155 0.02614192 0.09290543 2.42179127
  2.91557032 0.05330722 0.04738715]
 [0.90550665 3.36761351 3.01048254 0.07428419 0.13897989 2.63615086
  5.41161893 0.11883483 0.11323821]
 [0.08156234 0.04978427 0.12829088 0.02807889 0.03305658 0.04137488
  0.07066681 0.00895616 0.02202074]
 [0.13829864 0.22517133 0.4063585  0.03499647 0.07121358 0.18387271
  0.3450008  0.

Write a Python function that simulates time series data from a K-dimensional VAR(2) process yt = A1yt−1 + A2yt−2 + ut, where the innovations ut are drawn from a multivariate normal distribution with mean zero and covariance matrix Σu. Use y−1 = y0 = 0 as starting values, where 0 is a K × 1 vector of zeros, generate time series of length T + 50 and discard the first 50 observations, such that you have available time series of total length equal to T.

Your function should take A1, A2, Σu and T as an input and should return a T × K matrix
of observations on yt.

In [126]:
def var2sim(A1, A2, sigma_u, T):
    '''
    :param A1:
    :param A2:
    :param sigma_u:
    :param T:
    :return:
    '''
    p= 2
    K = int(sigma_u.shape[0]/p)

    # method from page 708
    A = np.hstack((np.identity((K*p-1)), np.zeros((K*p-1, 1), dtype=float)))
    A = np.vstack((np.hstack((A1, A2)), A))


    # generate time-series of length T+50

    # discard first 50 observations


    return A

In [128]:
A1 = np.random.rand(3, 3)
A2 = np.random.rand(3, 3)
sigma_u = np.random.rand(6, 6)
T = 100
var2sim(A1, A2, sigma_u, T)


array([[0.12155007, 0.37876129, 0.7010412 , 0.16732411, 0.56435442,
        0.12681757],
       [0.79998586, 0.45464893, 0.4459351 , 0.20893036, 0.63115058,
        0.14632153],
       [0.31541566, 0.86208548, 0.75614209, 0.48558215, 0.07879846,
        0.86614755],
       [1.        , 0.        , 0.        , 0.        , 0.        ,
        0.        ],
       [0.        , 1.        , 0.        , 0.        , 0.        ,
        0.        ],
       [0.        , 0.        , 1.        , 0.        , 0.        ,
        0.        ],
       [0.        , 0.        , 0.        , 1.        , 0.        ,
        0.        ],
       [0.        , 0.        , 0.        , 0.        , 1.        ,
        0.        ]])

In [115]:
K = 3
p=2
np.identity(K*p-1)
np.zeros((K*p-1, 1))
np.zeros(((K*p-1), 1)).shape

(5, 1)

# remaining code from assignment 2

In [None]:
def granger(y: np.array, p: int, dummy_vec: list, c=1):
    """Performs the Granger Causality Test on a given set of variables

    Args:
        y (np.array): input with all the data of shape (T + p) × K
        p (int): lags
        dummy_vec (list): list with causing (1) and caused (0) signs
        c (int): intercept yes=1, no=0 

    Returns:
        _type_: Wald- and F-statistic together with implied p-values
    """
    
    y = y.T # transpose y
    n_cause = sum(dummy_vec) # number of causing variables = 1's in the list
    n_caused = len(dummy_vec) - n_cause # number of caused variables
       
    # arrange in right order (first cause variables, then caused variables)
    cause = []
    caused = []
    for i, n in enumerate(dummy_vec):
        if n == 1:
            cause.append(y[i])
        else:
            caused.append(y[i])
    
    cause = np.column_stack(cause)
    caused = np.column_stack(caused)
    y = np.concatenate((cause, caused), axis=1)
    
    
    
    # get B matrix, Z, and covariance matrix from above function
    K = y.shape[1]  # number of variables
    T = np.size(y, 0) - p
    B, Z, sigma_u = B_matrix(y, p, c) # return all three
    
    # get indices for positions that should be checked
    relevant_parts = []
    for p_ in range(p):
        for a_v in range(n_cause):
            for p_v in range(n_caused):
                relevant_parts.append(K*c + n_cause + p_v + a_v*K + p_*(K**2))
   
    # vectorize B matrix (F=column-wise)
    vec_B = B.flatten(order="F").T
    
    # initialize C with zeros only
    C = np.zeros([len(relevant_parts), len(vec_B)])
    
    
    # add 1 at relevant parts
    for i, num in enumerate(relevant_parts):
        C[i, num] = 1
    
    # calculate lambdas
    lambda_w = (C@vec_B).T @ np.linalg.inv(C @ np.kron(np.linalg.inv(Z@Z.T), sigma_u) @ C.T) @ C@vec_B
    lambda_f = lambda_w/len(relevant_parts)
    
    # degrees of freedom
    df_chi2 = len(relevant_parts)
    df_fn = len(relevant_parts)
    df_fd = T*K-((K**2)*p)-K
    
    # p_values
    p_val_chi2 = round(abs(1-st.chi2.cdf(lambda_w, df_chi2)), 4)
    p_val_f = round(abs(1-st.f.cdf(lambda_f, df_fn, df_fd)), 4)
    
    # ftest degrees freedom
    df_fd = (len(relevant_parts),T*K-((K**2)*p)-K)

    return lambda_w, p_val_chi2, df_fn, lambda_f, p_val_f, df_fd

In [None]:
# calculate Granger Causality
ts_w, p_w, df_w, ts_f, p_f, df_f = granger(y_t, 2, [0, 0, 1, 1], 1)
print(f'ts_w: {ts_w}, \np_w: {p_w}, \ndf_w: {df_w}, \nts_f: {ts_f}, \np_f: {p_f}, \ndf_f: {df_f}')

### Check with Built-In Functions

In [None]:
# check result with statsmodels VAR module
model = VAR(awm[["d_lgdp", "d_invest", "d_R", "d_r"]])
results = model.fit(2)

In [None]:
# Wald test
granger_stat_wald = results.test_causality(["d_lgdp", "d_invest"],['d_R', "d_r"], kind='wald')
granger_stat_wald.summary()

In [None]:
# F-test
granger_stat_f = results.test_causality(["d_lgdp", "d_invest"],['d_R', "d_r"], kind='f')
granger_stat_f.summary()