In [None]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from scipy import stats
from tqdm import tqdm

import statsmodels.api as sm

from statsmodels.tsa.api import SVAR, VAR
from statsmodels.tsa.stattools import adfuller, grangercausalitytests

import sys
import os
from dotenv import load_dotenv

load_dotenv()
REPO_PATH = os.getenv("REPO_PATH")

# Import main utility functions
sys.path.insert(0, rf'{REPO_PATH}src_HF')
from utils import *

### Load data

In [None]:
INSTRUMENT = 'LCOc1'
TOPIC = 'CWP'

price_df = pd.read_csv(rf'{REPO_PATH}data\time_series\{INSTRUMENT}_5min_processed.csv', index_col=0)
price_df.index = pd.to_datetime(price_df.index)

news_df = pd.read_csv(rf'{REPO_PATH}data\sentiment_data\{TOPIC}_ARTICLE_SENTIMENT.csv', index_col=0)
news_df.index = pd.to_datetime(news_df.index)

# multiply RV by 100 to make it more readable
price_df['REALIZED_VOL'] = price_df['REALIZED_VOL'] * 12 * 252 * 24
price_df['LOGRET'] = price_df['LOGRET'] * 12 * 24

RESAMPLE_WINDOW: str = '5min'

SENTIMENT_COLUMNS = ['TextBlob_headline', 'VADER_headline', 'TextBlob_fullStory', 'VADER_fullStory']
ABS_SENTIMENT_COLUMNS = [col + '_abs' for col in SENTIMENT_COLUMNS]

df_sent_5min = pd.DataFrame(
    {col: news_df[col].resample(RESAMPLE_WINDOW).mean() for col in SENTIMENT_COLUMNS}
).fillna(0)

df_sent_abs_5min = pd.DataFrame(
    {col + '_abs': news_df[col].resample(RESAMPLE_WINDOW).mean().abs() for col in SENTIMENT_COLUMNS}
).fillna(0)

df_sent_5min = pd.concat([df_sent_5min, df_sent_abs_5min], axis=1)

display(price_df.head(2))

### combine data

In [None]:
df_combined = df_sent_5min.join(price_df).dropna()

fig, ax = plt.subplots(1, 1, figsize=(15, 5))
ax2 = ax.twinx()

display(df_combined.head(2))

ax.plot(df_combined['VADER_headline'].rolling(12 * 24).mean(), label='VADER_headline')
ax2.plot(df_combined['REALIZED_VOL'].rolling(12 * 24).mean(), label='VADER_fullStory', color='red')

### Stationarity of time series

In [None]:
results = []

for col in SENTIMENT_COLUMNS + ['CLOSE', 'VOLUME', 'COUNT', 'REALIZED_VOL']:
    print(col)
    result = adfuller(df_combined[col])
    results.append(result)

    # show p-value
    print('ADF Statistic: %f' % result[0])
    print('p-value: %f' % result[1])
    print('Critical Values:' % result[4])
    for key, value in result[4].items():
        print('\t%s: %.3f' % (key, value))


### VAR Optmal lag order

In [None]:
# find optimal lag order
RESPONSE: list[str] = ['VOLUME', 'REALIZED_VOL']
SENTIMENT_COLUMNS: list[str] = ['TextBlob_headline', 'VADER_headline', 'TextBlob_fullStory', 'VADER_fullStory']

model = VAR(df_combined[[*RESPONSE, SENTIMENT_COLUMNS[2]]])
lag_order = model.select_order(50, trend='c')

print(lag_order.summary())

# plot AIC vs lag order

aic_list = lag_order.ics['bic']
lag_list = range(len(aic_list))


fig, ax = plt.subplots(1, 1, figsize=(15, 5))
ax.plot(lag_list, aic_list)

### Granger causality

In [None]:
df = df_combined[[*SENTIMENT_COLUMNS, *RESPONSE]]

def grangers_causation_matrix(data, variables, test='ssr_chi2test', verbose=False, maxlag=12):    
   
    df = pd.DataFrame(np.zeros((len(variables), len(variables))), columns=variables, index=variables)
    for c in df.columns:
        for r in df.index:
            test_result = grangercausalitytests(data[[r, c]], maxlag=maxlag, verbose=False)
            p_values = [round(test_result[i+1][0][test][1],4) for i in range(maxlag)]
            if verbose: print(f'Y = {r}, X = {c}, P Values = {p_values}')
            min_p_value = np.min(p_values)
            df.loc[r, c] = min_p_value
    df.columns = [var + '_x' for var in variables]
    df.index = [var + '_y' for var in variables]
    return df

grangers_causation_matrix(df, df.columns)


### VAR

In [None]:
RESPONSE: list[str] = ['VOLUME', 'REALIZED_VOL']
SENTIMENT_COLUMNS: list[str] = ['TextBlob_headline', 'VADER_headline', 'TextBlob_fullStory', 'VADER_fullStory']

def plot_response(df: pd.DataFrame, response: list[str], sentiment: list[str], response_names: list[str], sentiment_names: list[str]) -> plt.Figure:

    colors_pos = ['tab:blue', 'gray', 'gray']
    colors_neg = ['tab:red', 'gray', 'gray']
    linestyles = ['-', '--', '--']

    figure, axs = plt.subplots(
        len(response), 
        len(sentiment), 
        figsize=(4 * len(sentiment), 4 * len(response))
    )

    df_pos = df.copy()
    df_pos[SENTIMENT_COLUMNS] = df_pos[SENTIMENT_COLUMNS].clip(lower=0)
    df_neg = df.copy()
    df_neg[SENTIMENT_COLUMNS] = df_neg[SENTIMENT_COLUMNS].clip(upper=0).abs()

    for j, sent_col in enumerate(sentiment):
        model_pos: VAR = VAR(df_pos[[sent_col, *response]])
        model_neg: VAR = VAR(df_neg[[sent_col, *response]])

        results_pos = model_pos.fit(5)
        results_neg = model_neg.fit(5)

        # impulse response
        irf_pos = results_pos.irf(12)
        irf_neg = results_neg.irf(12)
        fig = irf_pos.plot(orth=False, impulse=sent_col)
        fig_neg = irf_neg.plot(orth=False, impulse=sent_col)

        for i, (ax, ax_neg) in enumerate(zip(fig.axes[1:], fig_neg.axes[1:])):

            for k in range(3):
                axs[i, j].plot(
                    ax.lines[k].get_xdata(), ax.lines[k].get_ydata(), 
                    linestyle=linestyles[k], 
                    color=colors_pos[k], 
                    label=f'Positive sentiment' if k == 0 else None
                )
                axs[i, j].plot(
                    ax_neg.lines[k].get_xdata(), ax_neg.lines[k].get_ydata(), 
                    linestyle=linestyles[k], 
                    color=colors_neg[k], 
                    label=f'Negative sentiment' if k == 0 else None
                )
            axs[i, j].set_title(fr'{sentiment_names[j]} $\rightarrow$ {response_names[i]}', fontsize=15)
            axs[i, j].axhline(0, color='black', lw=0.5)
            axs[i, j].set_xlabel('Minutes', fontsize=14)
            axs[i, j].set_ylabel(response_names[i] + ' response', fontsize=14)
            axs[i, j].legend(fontsize=11, frameon=False, loc='upper right')

            # multiply x-ticks with 5 
            axs[i, j].set_xticks(np.arange(0, 13, 2))
            axs[i, j].set_xticklabels(np.arange(0, 13, 2) * 5)

    figure.tight_layout(pad=1.0)

    return figure


fig = plot_response(df_combined, RESPONSE, SENTIMENT_COLUMNS, ['Volume', 'RV'], SENTIMENT_COLUMNS)

fig.savefig(rf'VAR_IRF.png', dpi=200)

### Impulse response function

In [None]:
results.impulse_responses(10, orthogonalized=True, impulse=[1, 0]).plot(figsize=(13,3))