# Load packages

In [None]:
import pandas as pd
import numpy as np
import os
import matplotlib as mpl
import yfinance as yf
import seaborn as sns
import warnings

from matplotlib import pyplot as plt
from statsmodels.tsa.stattools import grangercausalitytests
from statsmodels.tsa.api import VAR

# Configuration

In [None]:
# Korean font
from matplotlib import font_manager, rc
try:
    font_path = "C:/Windows/Fonts/malgun.TTF"
    Kfont = font_manager.FontProperties(fname=font_path).get_name()
    rc('font', family=Kfont)
except:
    pass

# Fix minus presentation
mpl.rcParams['axes.unicode_minus'] = False

In [None]:
# Fix random seed
def fix_random_seed(seed=42):
    import random
    import numpy as np 
    import os

    random.seed(seed)
    np.random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    
fix_random_seed()

# Load preprocessed data

In [None]:
df_list = []
yr_list = [19, 20, 21, 22]

for yr in yr_list:
    df = pd.read_csv(f'data/preprocessed/kau{str(yr)}.csv')
    df.rename(columns={'날짜': 'date'}, inplace=True)
    df.set_index('date', inplace=True)
    df.index = df.index.astype('datetime64[ns]')
    df_list.append(df)

df_19, df_20, df_21, df_22 = df_list

In [None]:
# Divide kau19 and kau20 by year
df_19_pre = df_19[
    (df_19.index > pd.Timestamp('2018-12-31')) & (df_19.index < pd.Timestamp('2020-01-01'))
    ].rename(columns={'kau19':'kau19_19'})
df_19_post = df_19[
    (df_19.index > pd.Timestamp('2019-12-31')) & (df_19.index < pd.Timestamp('2021-01-01'))
    ].rename(columns={'kau19':'kau19_20'})
df_20_pre = df_20[
    (df_20.index > pd.Timestamp('2019-12-31')) & (df_20.index < pd.Timestamp('2021-01-01'))
    ].rename(columns={'kau20':'kau20_20'})
df_20_post = df_20[
    (df_20.index > pd.Timestamp('2020-12-31')) & (df_20.index < pd.Timestamp('2022-01-01'))
    ].rename(columns={'kau20':'kau20_21'})

In [None]:
# Simple check df kau 19
df_19_post

In [None]:
# Simple check df kau 20
df_20

# Analysis with VAR and Granger Causality model

In [None]:
def var_analysis(df:pd.DataFrame, var_max_lag:int=12, print_summary=False, visualize=False):
    mdl_var = VAR(df)
    
    while True:
        try:
            best_lag_analysis = mdl_var.select_order(var_max_lag)
            break
        except:
            var_max_lag += -1
        

    if best_lag_analysis.selected_orders['aic'] == 0:
        best_lag = var_max_lag
    else:
        best_lag = best_lag_analysis.selected_orders['aic']
        
    rslt_var = mdl_var.fit(best_lag)
    if print_summary:
        print(rslt_var.summary())

    var_pval_tf_mat = rslt_var.pvalues.applymap(lambda x: True if x < 0.05 else False)

    # plot - VAR model coefficients p-value
    if visualize:
        fig_var_pval, ax_var_pval = plt.subplots(1, 1, figsize=(max(4*best_lag, 8), max(6*best_lag, 12)))

        heat_pval = sns.heatmap(
            var_pval_tf_mat, 
            square=True, 
            ax=ax_var_pval, 
            annot=True,
            annot_kws={'fontsize':15-best_lag}, 
            cbar=False,
            linecolor='grey',
            linewidth=0.1,
            )
        heat_pval.set_xticklabels(heat_pval.get_xticklabels(), fontsize=12)
        heat_pval.set_yticklabels(heat_pval.get_yticklabels(), fontsize=12)
        ax_var_pval.set_title(
            'Statistical significances of VAR coefficients by p-values', 
            fontsize=15, weight='bold')

        fig_var_pval.tight_layout()

    return rslt_var, var_pval_tf_mat

rslt, tf_mat = var_analysis(df_20)

In [None]:
# Analyze causality between two time series variables with Granger Causality Test
def granger_analysis(df: pd.DataFrame, additional_component: list=None, print_analysis_result=False):
    """
    additional_component must be a shape of
    (feature x, feature y, time_lag)
    """
    # Get p-value significance matrix
    tf_mat = var_analysis(df, print_summary=False)[1]

    selc_granger_test = []
    main_col = tf_mat.columns[0]
    for sig_col_info in tf_mat[tf_mat[main_col]==True].index.to_list():
        lag, col = sig_col_info.split('.')
        lag = lag[1:]
        if col == main_col:
            continue # Ignore autocorrelation
        else:
            selc_granger_test.append((main_col, col, int(lag)))

    if print_analysis_result:
        verbose = 1
    else:
        verbose = 0
    
    # Granger Causality Test
    col_nms = list(set([x[0] for x in selc_granger_test]+[x[1] for x in selc_granger_test]))

    df_gct = pd.DataFrame(
        index=col_nms,
        columns=col_nms,
        dtype='object',
        )
    df_gct = df_gct.applymap(
        lambda x: pd.Series(index=np.arange(1, int(tf_mat.index[-1].split('.')[0][1:])+1), dtype='object'))
    df_gct.index.name='cause'
    df_gct.columns.name='effect'

    for factor_x, factor_y, time_lag in selc_granger_test:
        # factor x -> factor y
        if print_analysis_result:
            print(f'\n[{factor_x}][t-{time_lag}] -> [{factor_y}][t]', end='')
        df_gct.loc[factor_x, factor_y][time_lag] = grangercausalitytests(
            df[[factor_y, factor_x]],
            maxlag=[time_lag],
            verbose=verbose,
        )

        # factor y -> factor x
        if print_analysis_result:
            print(f'\n[{factor_y}][t-{time_lag}] -> [{factor_x}][t]', end='')
        df_gct.loc[factor_y, factor_x][time_lag] = grangercausalitytests(
            df[[factor_x, factor_y]],
            maxlag=[time_lag],
            verbose=verbose,
            )
    
    return df_gct

granger_analysis(df_20, print_analysis_result=True)

In [None]:
def var_n_granger(df, print_var_summary=False, viz_var_pval=False, print_granger_result=False):
    rslt_var, tf_mat = var_analysis(df, print_summary=print_var_summary, visualize=viz_var_pval)
    df_gct = granger_analysis(df, print_analysis_result=print_granger_result)

    return rslt_var, tf_mat, df_gct

for df in [df_19_pre, df_19_post, df_20_pre, df_20_post]:
    var_n_granger(df, print_var_summary=False, viz_var_pval=True, print_granger_result=True)
