In [None]:
import os
import pandas as pd
import numpy as np
import time
from datetime import datetime

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' #tensorflow warning hide

import matplotlib.pyplot as plt
from pathlib import Path
import tensorflow as tf
import tensorflow.keras.backend as K
import tensorflow.keras.layers as L
import tensorflow.keras.models as M

###################GPU CONFIG###################
#gpus = tf.config.experimental.list_physical_devices(device_type='GPU')
#tf.config.experimental.set_visible_devices(devices=gpus[0], device_type='GPU')
#tf.config.experimental.set_memory_growth(device=gpus[0], enable=True)
###################GPU CONFIG###################

from tensorflow.python.client import device_lib
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.layers import Activation
from tensorflow.keras.activations import elu
from sklearn.model_selection import train_test_split
from keras import backend as K
from keras.callbacks import EarlyStopping
import os
from scipy.stats import norm
from scipy.stats import chi2
import arch
from arch import arch_model
from sklearn.metrics import mean_squared_error, mean_absolute_error
import ipynbname
import warnings
from pathlib import Path
import vartests
import csv
from keras.models import load_model
from tensorflow.keras import regularizers
from scipy import optimize
from typing import List, Union, Dict

##########R Packages##########
from rpy2.robjects import pandas2ri, r
from rpy2.robjects.packages import importr
from rpy2.robjects.vectors import IntVector
import rpy2.robjects as robjects
import rpy2.robjects.packages as rpackages
from rpy2.robjects import ListVector
from rpy2.robjects.vectors import FloatVector
from rpy2.robjects import numpy2ri, r
import rpy2.robjects as ro
import rpy2.robjects.numpy2ri
##########R Packages##########


# Ignore all warnings
warnings.filterwarnings('ignore')

# If you specifically want to ignore warnings from pandas, you can do so as follows
warnings.filterwarnings('ignore', category=UserWarning, module='pandas')

# Set options to display all columns
pd.set_option('display.max_columns', None)

# Optionally, set the display width to ensure that pandas does not wrap text
pd.set_option('display.width', None)

# Set the column display length to maximum
pd.set_option('display.max_colwidth', None)

os.environ['CUDA_VISIBLE_DEVICES'] = '-1'

In [None]:
###################GPU CONFIG###################
'''
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))
gpu_number = 0 #### GPU number 
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    tf.config.experimental.set_visible_devices(gpus[gpu_number], 'GPU') 
    logical_gpus = tf.config.experimental.list_logical_devices('GPU')
    print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPU")
'''
###################GPU CONFIG###################

In [None]:
##########PARAMS##########
garch_train_size=21*3 # in days
train_size=0.7
scaleing=True
look_back=21
confidence_level = 0.975
patience=30
validation_split=0.3
num_iterations=3
absolute=True #predict volatility instead of variance
learning_rates=[0.001]
batch_size=32
epochs=200

cwd = Path.cwd()
news_lib='News_Daily_5Y'
twitter_lib='Twitter_Daily_5Y'
path_news=cwd/news_lib
path_twitter=cwd/twitter_lib

sp500_data = pd.read_excel("S&P500.xlsx")
sp500 = sp500_data["Ticker"]
sp500 = sp500.str.replace("/", "_")
##########PARAMS##########

In [None]:
def realized_volatility_daily(series_log_return):
    n = len(series_log_return)
    return np.sqrt(np.sum(series_log_return**2)/(n - 1))

In [None]:
def garch_data_generate(ticker,path_news,path_twitter):
    def data_merge(ticker,path_news,path_twitter):
        #News
        files = os.listdir(path_news)
        # Filter files that start with the specified prefix
        matching_files = [file for file in files if file.startswith(ticker)]
        #print(matching_files)
        if len(matching_files) > 1:
            print("There are more than", threshold, "files starting with the prefix", prefix, "in the directory.")
            return None
        news_df = pd.read_csv(os.path.join(path_news, matching_files[0]), encoding='utf-8', index_col='Date', parse_dates=True)

        #Twitter
        files = os.listdir(path_twitter)
        # Filter files that start with the specified prefix
        matching_files = [file for file in files if file.startswith(ticker)]

        if len(matching_files) > 1:
            print("There are more than", threshold, "files starting with the prefix", prefix, "in the directory.")
            return None
        twitter_df = pd.read_csv(os.path.join(path_twitter, matching_files[0]), encoding='utf-8', index_col='Date', parse_dates=True)

        df = pd.merge(news_df,twitter_df, on="Date",how='inner')
        return df
    def data_transformer(data):
        data = data.sort_values(by='Date')
        data["Date"]=data.index
        data = data.reset_index(drop=True)
        
        columns_to_drop = ["(R1) Open_y","(R1) High_y","(R1) Low_y","(R1) Close_y"] 
        data = data.drop(columns=columns_to_drop)
        columns_to_rename = {"(R1) Open_x": "(R1) Open","(R1) High_x": "(R1) High","(R1) Low_x":"(R1) Low","(R1) Close_x":"(R1) Close"}  # Dictionary mapping old column names to new names
        data = data.rename(columns=columns_to_rename)
        
        
        new_columns = [col.replace(" - Realtime", "") for col in data.columns]

        data.columns = new_columns
        
        for column in data.select_dtypes(include=['object']).columns:
        # Replace "," with "." in the entire column and convert the column to float
            if column != "Date":
                data[column] = data[column].str.replace(",", ".").astype(float)

        #Make the modifications for the analysis
        
        data = data.dropna(subset=["(R1) Close", "News Positive Sentiment Count", "News Negative Sentiment Count","News Publication Count (L1)","Twitter Positive Sentiment Count", "Twitter Negative Sentiment Count","Twitter Publication Count (L1)"])

        data['Log Returns'] = np.log(data['(R1) Close'] / data['(R1) Close'].shift(1))

        data["News Positive Sentiment"] = data["News Positive Sentiment Count"]   / data["News Publication Count (L1)"]
        data["News Negative Sentiment"] = -data["News Negative Sentiment Count"]   / data["News Publication Count (L1)"]
        data["Twitter Positive Sentiment"] = data["Twitter Positive Sentiment Count"]   / data["Twitter Publication Count (L1)"]
        data["Twitter Negative Sentiment"] = -data["Twitter Negative Sentiment Count"]   / data["Twitter Publication Count (L1)"]
        data = data.dropna(subset=["Log Returns"])
        data = data.replace(np.inf, 0)
        data = data.replace(np.nan, 0)
        data=data[["Date","Log Returns","News Positive Sentiment","News Negative Sentiment","Twitter Positive Sentiment","Twitter Negative Sentiment"]]
        data['Date'] = pd.to_datetime(data['Date'])  # Convert the date column to datetime format
        data.set_index('Date', inplace=True)
        return data
    df=data_merge(ticker,path_news,path_twitter)
    data=data_transformer(df)
    return data
    
def generate_garch_forecast(data, window_size, forecast_horizon=1):
    refit_every=1
    model_fit = None
    forecasts = []
    dates = []

    p = 1
    q = 1


    for i in range(window_size, len(data)):
        window_data = data.iloc[:i]
        
        # Refit the model every 'refit_every' runs
        if i % refit_every == 0 or model_fit is None:
            j=0
            model = arch_model(window_data, vol='GARCH', mean='Constant', p=p, q=q)
            model_fit = model.fit(disp='off')
            forecast = model_fit.forecast(horizon=refit_every)
            forecasts.append(forecast.variance.values[-1][-1])
            dates.append(data.index[i])

        forecasts.append(forecast.variance.values[-1][0])
        j+=1
        dates.append(data.index[i])
        
    df = pd.DataFrame({'GARCH_Forecast': forecasts})    
    dates_list = [pd.to_datetime(date_str) for date_str in dates]
    df.set_index(pd.Index(dates_list), inplace=True)
    return df

In [None]:
def generate_garch_x_forecast(data, window_size):

    file_name='R_data.csv'
    garch_data.to_csv(file_name, index=True)
    file=str(cwd/file_name)

    # Load the R script
    with open("R_GARCH_X.R", "r") as f:
        r_code = f.read()
    
    # Evaluate the R code
    robjects.r(r_code)
    
    # Call the R function with parameters
    robjects.r['garch_x'](file, window_size)

    df = pd.read_csv('Python_Input.csv', index_col=0)
    df=df[["Sigma"]]**2
    df = df.rename(columns={'Sigma': 'GARCH_X_Forecast'})
    return df


In [None]:
def data_generator(path_news,path_twitter,ticker,look_back,garch_train_size,train_size,scaleing,absolute,simple,garch_prediction):
    
    def data_merge(ticker,path_news,path_twitter):
        #News
        files = os.listdir(path_news)
        # Filter files that start with the specified prefix
        matching_files = [file for file in files if file.startswith(ticker)]
        if len(matching_files) > 1:
            print("There are more than", threshold, "files starting with the prefix", prefix, "in the directory.")
            return None
        news_df = pd.read_csv(os.path.join(path_news, matching_files[0]), encoding='utf-8', index_col='Date', parse_dates=True)

        #Twitter
        files = os.listdir(path_twitter)
        # Filter files that start with the specified prefix
        matching_files = [file for file in files if file.startswith(ticker)]

        if len(matching_files) > 1:
            print("There are more than", threshold, "files starting with the prefix", prefix, "in the directory.")
            return None
        twitter_df = pd.read_csv(os.path.join(path_twitter, matching_files[0]), encoding='utf-8', index_col='Date', parse_dates=True)

        df = pd.merge(news_df,twitter_df, on="Date",how='inner')
        return df
        
    def data_transformer(data,absolute):
        data = data.sort_values(by='Date')
        data["Date"]=data.index
        data = data.reset_index(drop=True)
        
        columns_to_drop = ["(R1) Open_y","(R1) High_y","(R1) Low_y","(R1) Close_y"] 
        data = data.drop(columns=columns_to_drop)
        columns_to_rename = {"(R1) Open_x": "(R1) Open","(R1) High_x": "(R1) High","(R1) Low_x":"(R1) Low","(R1) Close_x":"(R1) Close"}  # Dictionary mapping old column names to new names
        data = data.rename(columns=columns_to_rename)
        
        
        new_columns = [col.replace(" - Realtime", "") for col in data.columns]

        data.columns = new_columns
        
        for column in data.select_dtypes(include=['object']).columns:
        # Replace "," with "." in the entire column and convert the column to float
            if column != "Date":
                data[column] = data[column].str.replace(",", ".").astype(float)

        #Make the modifications for the analysis
        data = data.dropna(subset=["(R1) Close", "News Positive Sentiment Count", "News Negative Sentiment Count","News Publication Count (L1)","Twitter Positive Sentiment Count", "Twitter Negative Sentiment Count","Twitter Publication Count (L1)"])

        data['Log Returns'] = np.log(data['(R1) Close'] / data['(R1) Close'].shift(1))
        if absolute==True:
            data['Log Returns Variance']  = data['Log Returns'].rolling(window=10).apply(realized_volatility_daily)
        else:
            data['Log Returns Variance']  = data['Log Returns'].rolling(window=10).apply(realized_variance_daily)

        data["News Positive Sentiment"] = data["News Positive Sentiment Count"]   / data["News Publication Count (L1)"]
        data["News Negative Sentiment"] = -data["News Negative Sentiment Count"]   / data["News Publication Count (L1)"]
        data["Twitter Positive Sentiment"] = data["Twitter Positive Sentiment Count"]   / data["Twitter Publication Count (L1)"]
        data["Twitter Negative Sentiment"] = -data["Twitter Negative Sentiment Count"]   / data["Twitter Publication Count (L1)"]
        data = data.replace(np.inf, 0)
        data = data.dropna(subset=["News Positive Sentiment","News Negative Sentiment","Twitter Positive Sentiment","Twitter Negative Sentiment", "Log Returns Variance"])
        return data
        
    def data_prepare(data,time_steps,train_size,scaleing):
        columns=["Log Returns Variance","GARCH_Forecast","News Positive Sentiment","News Negative Sentiment","Twitter Positive Sentiment",
                    "Twitter Negative Sentiment"]
        data=data[columns]
        cut=round(train_size*data.shape[0])
        
        if scaleing==True:

            columns=["Log Returns Variance"]
            data_y=data[columns]
            scaler = StandardScaler()
            data_y= scaler.fit_transform(data_y.values.reshape(-1, 1))

            columns=["Log Returns Variance","GARCH_Forecast","News Positive Sentiment","News Negative Sentiment","Twitter Positive Sentiment",
                     "Twitter Negative Sentiment"]
            data_x=data[columns]
            scaler1 = StandardScaler()
            data_x= scaler1.fit_transform(data_x.values)

            data_scaled= np.concatenate((data_y, data_x), axis=1)
        else:
            scaler=None
            columns=["Log Returns Variance","GARCH_Forecast","News Positive Sentiment","News Negative Sentiment","Twitter Positive Sentiment",
                     "Twitter Negative Sentiment"]
            data=data[columns]
            data_scaled=data.values

        cut=round(train_size*data.shape[0])
        training_set_scaled = data_scaled[:cut]
        test_set_scaled = data_scaled[cut:]

        X_train = []
        y_train = []
        for i in range(time_steps, len(training_set_scaled)-time_steps-1):
            X_train.append(training_set_scaled[i-time_steps:i, :])
            y_train.append(training_set_scaled[i, 0])
        X_train, y_train = np.array(X_train), np.array(y_train)
        X_train = np.reshape(X_train, (X_train.shape[0], X_train.shape[1], len(training_set_scaled[0])))

        inputs = data_scaled[len(data_scaled) - len(test_set_scaled) - time_steps:]

        X_test = []
        y_test = []
        for i in range(time_steps,time_steps+len(test_set_scaled)):
            X_test.append(inputs[i-time_steps:i,:])
            y_test.append(inputs[i-time_steps:i,0])
        X_test,y_test= np.array(X_test),np.array(y_test)
        X_test = np.reshape(X_test, (X_test.shape[0], X_test.shape[1], len(test_set_scaled[0])))

        return X_train, X_test, y_train, y_test,scaler
        
    def data_prepare_simple(data,time_steps,train_size,scaleing):

        columns=["Log Returns Variance","GARCH_Forecast"]
        data=data[columns]
        cut=round(train_size*data.shape[0])
        if scaleing==True:

            columns=["Log Returns Variance"]
            data_y=data[columns]
            scaler = StandardScaler()
            data_y= scaler.fit_transform(data_y.values.reshape(-1, 1))

            columns=["Log Returns Variance","GARCH_Forecast"]
            data_x=data[columns]
            scaler1 = StandardScaler()
            data_x= scaler1.fit_transform(data_x.values)

            data_scaled= np.concatenate((data_y, data_x), axis=1)
        else:
            scaler=None
            columns=["Log Returns Variance","GARCH_Forecast"]
            data=data[columns]
            data_scaled=data.values

        cut=round(train_size*data.shape[0])
        training_set_scaled = data_scaled[:cut]
        test_set_scaled = data_scaled[cut:]


        X_train = []
        y_train = []
        for i in range(time_steps, len(training_set_scaled)-time_steps-1):
            X_train.append(training_set_scaled[i-time_steps:i, :])
            y_train.append(training_set_scaled[i, 0])
        X_train, y_train = np.array(X_train), np.array(y_train)
        X_train = np.reshape(X_train, (X_train.shape[0], X_train.shape[1], len(training_set_scaled[0])))


        inputs = data_scaled[len(data_scaled) - len(test_set_scaled) - time_steps:]

        X_test = []
        y_test = []
        for i in range(time_steps,time_steps+len(test_set_scaled)):
            X_test.append(inputs[i-time_steps:i,:])
            y_test.append(inputs[i-time_steps:i,0])
        X_test,y_test= np.array(X_test),np.array(y_test)
        X_test = np.reshape(X_test, (X_test.shape[0], X_test.shape[1], len(test_set_scaled[0])))

        return X_train, X_test, y_train, y_test,scaler

    df=data_merge(ticker,path_news,path_twitter)
    data=data_transformer(df,absolute)

    ##################MERGE DATA AND GARCH PREDICTIONS##############################
    data["Date"] = pd.to_datetime(data["Date"])
    data.set_index(data["Date"], inplace=True)
    garch_prediction['GARCH_Forecast'] = garch_prediction['GARCH_Forecast'].shift(-1)
    garch_prediction['GARCH_Forecast'].fillna(method='ffill', inplace=True)
    data = pd.merge(data,garch_prediction,left_index=True, right_index=True, how='inner')
    data.reset_index(drop=True, inplace=True)
    ##################MERGE DATA AND GARCH PREDICTIONS##############################
    
    cut = round(len(data) * train_size)
    train_data = data[:cut]
    test_data = data[cut:]
    if simple==True:
        X_train, X_test, y_train, y_test,scaler=data_prepare_simple(data,look_back,train_size,scaleing)
    else:
        X_train, X_test, y_train, y_test,scaler=data_prepare(data,look_back,train_size,scaleing)
    return X_train, y_train, X_test, y_test,train_data,test_data,data,scaler

In [None]:
#Source: https://github.com/BayerSe/VaR-Backtesting
def duration_test(
    violations: Union[List[int], np.ndarray, pd.Series, pd.DataFrame],
    conf_level: float = 0.95,
) -> Dict:

    """Perform the Christoffersen and Pelletier Test (2004) called Duration Test.
        The main objective is to know if the VaR model responds quickly to market movements
         in order to do not form volatility clusters.
        Duration is time betwenn violations of VaR.
        This test verifies if violations has no memory i.e. should be independent.

        Parameters:
            violations (series): series of violations of VaR
            conf_level (float):  test confidence level
        Returns:
            answer (dict):       statistics and decision of the test
    """
    typeok = False
    if isinstance(violations, pd.core.series.Series) or isinstance(
        violations, pd.core.frame.DataFrame
    ):
        violations = violations.values.flatten()
        typeok = True
    elif isinstance(violations, np.ndarray):
        violations = violations.flatten()
        typeok = True
    elif isinstance(violations, list):
        typeok = True
    if not typeok:
        raise ValueError("Input must be list, array, series or dataframe.")

    N = int(sum(violations))
    first_hit = violations[0]
    last_hit = violations[-1]

    duration = [i + 1 for i, x in enumerate(violations) if x == 1]

    D = np.diff(duration)

    TN = len(violations)
    C = np.zeros(len(D))

    if not duration or (D.shape[0] == 0 and len(duration) == 0):
        duration = [0]
        D = [0]
        N = 1

    if first_hit == 0:
        C = np.append(1, C)
        D = np.append(duration[0], D)  # days until first violation

    if last_hit == 0:
        C = np.append(C, 1)
        D = np.append(D, TN - duration[-1])

    else:
        N = len(D)

    def likDurationW(x, D, C, N):
        b = x
        a = ((N - C[0] - C[N - 1]) / (sum(D ** b))) ** (1 / b)
        lik = (
            C[0] * np.log(pweibull(D[0], a, b, survival=True))
            + (1 - C[0]) * dweibull(D[0], a, b, log=True)
            + sum(dweibull(D[1 : (N - 1)], a, b, log=True))
            + C[N - 1] * np.log(pweibull(D[N - 1], a, b, survival=True))
            + (1 - C[N - 1]) * dweibull(D[N - 1], a, b, log=True)
        )

        if np.isnan(lik) or np.isinf(lik):
            lik = np.float64(1e10)
        else:
            lik = -lik
        return lik

    # When b=1 we get the exponential
    def dweibull(D, a, b, log=False):
        # density of Weibull
        pdf = b * np.log(a) + np.log(b) + (b - 1) * np.log(D) - (a * D) ** b
        if not log:
            pdf = np.exp(pdf)
        return pdf

    def pweibull(D, a, b, survival=False):
        # distribution of Weibull
        cdf = 1 - np.exp(-((a * D) ** b))
        if survival:
            cdf = 1 - cdf
        return cdf

    optimizedBetas = optimize.minimize(
        likDurationW, x0=[2], args=(D, C, N), method="L-BFGS-B", bounds=[(0.001, 10)]
    )

    print(optimizedBetas.message)

    b = optimizedBetas.x
    uLL = -likDurationW(b, D, C, N)
    rLL = -likDurationW(np.array([1]), D, C, N)
    LR = 2 * (uLL - rLL)
    LRp = 1 - chi2.cdf(LR, 1)

    H0 = "Duration Between Exceedances have no memory (Weibull b=1 = Exponential)"
    # i.e. whether we fail to reject the alternative in the LR test that b=1 (hence correct model)
    if LRp < (1 - conf_level):
        decision = "Reject H0"
    else:
        decision = "Fail to Reject H0"

    answer = {
        "weibull exponential": b,
        "unrestricted log-likelihood": uLL,
        "restricted log-likelihood": rLL,
        "log-likelihood": LR,
        "log-likelihood ratio test statistic": LRp,
        "null hypothesis": H0,
        "decision": decision,
    }

    return answer

def smooth_loss(actual,forecast,alpha,delta=25, return_mean=True):
    """Gonzalez-Rivera, Lee and Mishra (2004)"""
    loss = ((alpha - (1 + np.exp(delta*(actual - forecast)))**-1) * (actual - forecast))
    if return_mean:
        return loss.mean()
    else:
        return loss

def quadratic_loss(actual,forecast,hit_series,return_mean=True):
    """Lopez (1999); Martens et al. (2009)"""
    loss = (hit_series* (1 + (actual - forecast)**2))
    if return_mean:
        return loss.mean()
    else:
        return loss
def firm_loss(actual,forecast,hit_series, c=1, return_mean=True):
    """Sarma et al. (2003)"""
    loss = (hit_series * (1 + (actual - forecast)**2) - c*(1-hit_series) * forecast)
    if return_mean:
        return loss.mean()
    else:
        return loss

In [None]:
def rmspe(y_true, y_pred):
    loss = K.sqrt(K.mean(K.square((y_true - y_pred) / y_true)))
    return loss

def nn_model_predict(X_train,y_train,X_test,y_test,scaler,scaling,patience,validation_split,num_iterations,y_true,learning_rates,batch_size,epochs):

    initializer = tf.keras.initializers.RandomNormal(mean=0., stddev=1.)
    z = L.Input(shape=(X_train.shape[1],X_train.shape[2]), name="Input")
    y = L.Bidirectional(L.LSTM(128,activation="tanh",return_sequences=True,name="LSTM1",kernel_initializer=initializer))(z) #,kernel_regularizer=regularizers.l1(0.01)
    y = L.Dropout(0.1)(y)
    y = (L.LSTM(64,activation="tanh",return_sequences=True,name="LSTM2",kernel_initializer=initializer))(y)
    y = L.Dropout(0.1)(y)
    y = (L.LSTM(32,activation="tanh",return_sequences=True,name="LSTM3",kernel_initializer=initializer))(y)
    y = L.Dropout(0.1)(y)
    y = (L.LSTM(16,activation="tanh",return_sequences=False,name="LSTM5",kernel_initializer=initializer))(y)
    y = L.Dropout(0.1)(y)
    x = L.Dense(1, activation="linear", name="o1",kernel_initializer=initializer)(y)

    model = M.Model(z, x, name="DNN")
    
    best_accuracy = 0
    best_mse = float('inf')
    
    for learning_rate in learning_rates:
        for i in range(num_iterations):            
            for layer in model.layers:
                if isinstance(layer, tf.keras.layers.LSTM):
                    layer.set_weights([tf.random.normal(w.shape) for w in layer.get_weights()])

            adam_optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate) #
            model.compile(optimizer=adam_optimizer, loss='mse', metrics=[rmspe]) #mse
            earlyStop=EarlyStopping(monitor='val_rmspe',verbose=2,mode='min',patience=patience,restore_best_weights=True)
            history=model.fit(X_train, y_train, batch_size=batch_size, epochs=epochs, verbose=0,validation_split=validation_split,callbacks=[earlyStop])
            mse=earlyStop.best
            if mse < best_mse:
                best_mse = mse
                best_model = model
    predict_nn= best_model.predict(X_test)
    predict_nn=predict_nn[:,0]
    if scaleing==True:
        predict_nn = scaler.inverse_transform(predict_nn.reshape(-1,1))
        
    return predict_nn
def nn_model_predict_simple(X_train,y_train,X_test,y_test,scaler,scaling,patience,validation_split,num_iterations,y_true,learning_rates,batch_size,epochs):

    initializer = tf.keras.initializers.RandomNormal(mean=0., stddev=1.)
    z = L.Input(shape=(X_train.shape[1],X_train.shape[2]), name="Input")
    y = L.Bidirectional(L.LSTM(128,activation="tanh",return_sequences=True,name="LSTM1",kernel_initializer=initializer))(z) #,kernel_regularizer=regularizers.l1(0.01)
    y = L.Dropout(0.1)(y)
    y = (L.LSTM(64,activation="tanh",return_sequences=True,name="LSTM2",kernel_initializer=initializer))(y)
    y = L.Dropout(0.1)(y)
    y = (L.LSTM(32,activation="tanh",return_sequences=True,name="LSTM3",kernel_initializer=initializer))(y)
    y = L.Dropout(0.1)(y)
    y = (L.LSTM(16,activation="tanh",return_sequences=False,name="LSTM5",kernel_initializer=initializer))(y)
    y = L.Dropout(0.1)(y)
    x = L.Dense(1, activation="linear", name="o1",kernel_initializer=initializer)(y)
    
    model = M.Model(z, x, name="DNN")
    
    best_accuracy = 0
    best_mse = float('inf')
    
    for learning_rate in learning_rates:
        for i in range(num_iterations):            
            for layer in model.layers:
                if isinstance(layer, tf.keras.layers.LSTM):
                    layer.set_weights([tf.random.normal(w.shape) for w in layer.get_weights()])

            adam_optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate) #
            model.compile(optimizer=adam_optimizer, loss='mse', metrics=[rmspe])
            earlyStop=EarlyStopping(monitor="val_rmspe",verbose=2,mode='min',patience=patience,restore_best_weights=True)
            history=model.fit(X_train, y_train, batch_size=batch_size, epochs=epochs, verbose=0,validation_split=validation_split,callbacks=[earlyStop])
            mse=earlyStop.best
            if mse < best_mse:
                best_mse = mse
                best_model = model
    predict_nn= best_model.predict(X_test)
    predict_nn=predict_nn[:,0]
    if scaleing==True:
        predict_nn = scaler.inverse_transform(predict_nn.reshape(-1,1))
        
    return predict_nn
def nn_model_predict_saved(X_train,y_train,X_test,y_test,scaler,scaling,patience,validation_split,num_iterations,y_true,learning_rates,batch_size,epochs):
    
    initializer = tf.keras.initializers.RandomNormal(mean=0., stddev=1.)
    saved_model = tf.keras.models.load_model('best_best_7.keras')

    model=tf.keras.Sequential()
    model.add(tf.keras.Input(batch_size=batch_size,shape=(X_train.shape[1],X_train.shape[2]), name="Input"))
    for layer in saved_model.layers[1:]:
        model.add(layer)

    
    best_accuracy = 0
    best_mse = float('inf')
    
    for learning_rate in learning_rates:
        for i in range(num_iterations):            
            for layer in model.layers:
                if isinstance(layer, tf.keras.layers.LSTM):
                    layer.set_weights([tf.random.normal(w.shape) for w in layer.get_weights()])

            adam_optimizer = tf.keras.optimizers.Adam() #learning_rate=learning_rate
            model.compile(optimizer=adam_optimizer, loss='mse', metrics=['mse'])
            earlyStop=EarlyStopping(monitor="val_loss",verbose=2,mode='min',patience=patience,restore_best_weights=True)
            history=model.fit(X_train, y_train, batch_size=batch_size, epochs=epochs, verbose=0,validation_split=validation_split,callbacks=[earlyStop])
            stopped_epoch = earlyStop.stopped_epoch
            mse = min(history.history['val_loss'][:stopped_epoch])
            if mse < best_mse:
                best_mse = mse
                best_model = model
    predict_nn= best_model.predict(X_test)
    predict_nn=predict_nn[:,0]
    if scaleing==True:
        predict_nn = scaler.inverse_transform(predict_nn.reshape(-1,1))
    return predict_nn   

def calculate_var(daily_variance,daily_mean, confidence_level=0.95):
    standard_deviation = np.sqrt(daily_variance)
    z_score = -1.0 * np.abs(norm.ppf((1 - confidence_level) / 2))
    var = daily_mean + z_score * standard_deviation 
    return var
def rmspe_metric(y_true, y_pred):
    output = np.sqrt(np.mean(np.square((y_true - y_pred) / y_true)))
    return output
    
def Losses(y_true,y_pred):
    mse=mean_squared_error(y_true, y_pred)
    mae=mean_absolute_error(y_true, y_pred)
    rmse=rmspe_metric(y_true, y_pred)
    return mse,mae,rmse
def Violation_ratio(returns,predictions,confidence_level):
    p=1-confidence_level
    VR = sum(VR < predictions)/(p*(len(predictions)))
    return VR

def Joint_test(violations,var_conf_level,conf_level):
    pof=vartests.kupiec_test(violations, var_conf_level=var_conf_level, conf_level=conf_level).get("log-likelihood")
    if np.isnan(pof) or np.isinf(pof):
        pof = np.float(0.0)
    ch=duration_test(violations, conf_level=0.95).get('log-likelihood')
    if not isinstance(ch, np.ndarray):
        ch=ch
    else:
        ch=ch[0]
    joint=pof+ch
    critical_chi_square = chi2.ppf(conf_level, 2)  # two degree of freedom
    if joint> critical_chi_square:
        result = "Reject H0"
    else:
        result = "Fail to reject H0"
    return result

def Violation_test(returns,predictions,confidence_level,plot=True):
    p=1-confidence_level
    results=[]
    for col in range(predictions.shape[0]):
        VR = sum(returns['Log Returns'].values < predictions[col])/(p*(len(predictions[col])))
        print (col, "\n",
           "Violation ratio:", round(VR, 3))
        results.append(VR)
    if plot==True:
        plt.plot(returns['Date'],returns['Log Returns'], color = 'red', label = "Log_returns")
        plt.plot(returns['Date'],predictions[0], color = 'blue', label = "NN_Social_Media_Prediction")
        plt.plot(returns['Date'],predictions[1], color = 'yellow', label = "NN_Prediction")
        plt.plot(returns['Date'],predictions[2], color = 'green', label = "GARCH_Prediction")
        plt.plot(returns['Date'],predictions[3], color = 'pink', label = "GARCH_X_Prediction")
        plt.legend()
        plt.show()
    return results
def write_to_csv_last_line(csv_file_path, new_data):
    # Open the CSV file in append mode with newline='' to handle new line characters correctly
    with open(csv_file_path, 'a', newline='') as csv_file:
        # Create a CSV writer object
        csv_writer = csv.writer(csv_file)
        
        # Write data into the last new line
        csv_writer.writerow(new_data)
    
    return None

In [None]:
#%%capture --no-display
start_time = time.time()

nb_fname = ipynbname.name()
csv_name=nb_fname+"_results.csv"

for ticker in sp500:
    try:
        print(f"{ticker} loaded successfully.")

        #simple GARCH(1,1)

        garch_data=garch_data_generate(ticker,path_news,path_twitter)
        garch_prediction=generate_garch_forecast(garch_data[["Log Returns"]], garch_train_size)

        #GARCH_X
        garch_x_prediction=generate_garch_x_forecast(garch_data, garch_train_size)


        if absolute==True:
            garch_prediction=garch_prediction**(1/2)
            garch_x_prediction=garch_x_prediction**(1/2)
            
        #social media neural network
        X_train, y_train, X_test, y_test,train_data,test_data,data,scaler=data_generator(path_news,path_twitter,ticker,look_back,garch_train_size,train_size,scaleing,absolute,False,garch_prediction)
        
        #GET EXPECTED VALUE, LOG RETURNS AND REAL VARIANCE
        daily_mean=test_data['Log Returns'].mean()
        returns=test_data[['Date','Log Returns']]
        y_true=test_data['Log Returns Variance'].values
    
        nn_prediction_social_media=nn_model_predict(X_train, y_train, X_test, y_test,scaler,scaleing,patience,validation_split,num_iterations,y_true,learning_rates,batch_size,epochs)
        nn_prediction_social_media=nn_prediction_social_media
    
        #simple GARCH-net
        X_train, y_train, X_test, y_test,train_data,test_data,full_data,scaler=data_generator(path_news,path_twitter,ticker,look_back,garch_train_size,train_size,scaleing,absolute,True,garch_prediction)
        #GET EXPECTED VALUE, LOG RETURNS AND REAL VARIANCE
        daily_mean=test_data['Log Returns'].mean()
        returns=test_data[['Date','Log Returns']]
        y_true=test_data['Log Returns Variance']
        nn_prediction=nn_model_predict_simple(X_train, y_train, X_test, y_test,scaler,scaleing,patience,validation_split,num_iterations,y_true,learning_rates,batch_size,epochs)
        nn_prediction=nn_prediction
    
        #TRANSFORM BACK PREDICTIONS
        if absolute==True:
            garch_prediction=garch_prediction**2
            garch_x_prediction=garch_x_prediction**2
            nn_prediction_social_media=nn_prediction_social_media**2
            nn_prediction=nn_prediction**2

        #CALCULATE LOSSES
        GARCH_prediction=garch_prediction[-len(nn_prediction):]
        GARCH_prediction=GARCH_prediction['GARCH_Forecast'].values
        mse_GARCH,mae_GARCH,rmspe_GARCH=Losses(y_true,GARCH_prediction)
        GARCH_X_prediction=garch_x_prediction[-len(nn_prediction):]
        GARCH_X_prediction=GARCH_X_prediction['GARCH_X_Forecast'].values
        mse_GARCH_X,mae_GARCH_X,rmspe_GARCH_X=Losses(y_true,GARCH_X_prediction)
        mse_nn_social_media,mae_nn_social_media,rmspe_nn_social_media=Losses(y_true,np.ravel(nn_prediction_social_media))
        mse_nn,mae_nn,rmspe_nn=Losses(y_true,np.ravel(nn_prediction))
    
        del  X_train, y_train, X_test, y_test,train_data,test_data,full_data,scaler

        #CALCUALTE VaR
        nn_social_media_VAR=calculate_var(nn_prediction_social_media,daily_mean, confidence_level=confidence_level)
        nn_VAR=calculate_var(nn_prediction,daily_mean, confidence_level=confidence_level)
        GARCH_VAR=calculate_var(GARCH_prediction,daily_mean, confidence_level=confidence_level)
        GARCH_X_VAR=calculate_var(GARCH_X_prediction,daily_mean, confidence_level=confidence_level)
    
        #CALCULATE LOSSES FROM VAR
        ret=returns['Log Returns'].values
        mse_GARCH_1,mae_GARCH_1,rmspe_GARCH_1=Losses(ret,GARCH_VAR)
        mse_GARCH_X_1,mae_GARCH_X_1,rmspe_GARCH_X_1=Losses(ret,GARCH_X_VAR)
        mse_nn_social_media_1,mae_nn_social_media_1,rmspe_nn_social_media_1=Losses(ret,nn_social_media_VAR)
        mse_nn_1,mae_nn_1,rmspe_nn_1=Losses(ret,nn_VAR)
    
        #calculate violation ratios
        predictions = np.vstack((nn_social_media_VAR.T, nn_VAR.T,GARCH_VAR.T,GARCH_X_VAR.T))
        test=Violation_test(returns,predictions,confidence_level,True)
        
        violations=pd.DataFrame(nn_social_media_VAR,columns=['VAR'])
        violations['Log Returns']=returns['Log Returns'].values
        violations['violations'] = (violations['Log Returns'] <violations['VAR']).astype(int)
        nn_social_media_Kupiec=vartests.kupiec_test(violations['violations'], var_conf_level=confidence_level, conf_level=0.95).get("result")
        nn_social_media_Christoffersen=duration_test(violations['violations'], conf_level=0.95).get('log-likelihood ratio test statistic')
        if not isinstance(nn_social_media_Christoffersen, np.ndarray):
            nn_social_media_Christoffersen="Reject H0"
        elif nn_social_media_Christoffersen[0]>=0.05:
            nn_social_media_Christoffersen="Fail to reject H0"
        else:
            nn_social_media_Christoffersen="Reject H0"
        nn_social_media_Joint=Joint_test(violations['violations'],confidence_level,0.95)
        nn_social_media_QL=quadratic_loss(violations['Log Returns'],violations['VAR'],violations['violations'])
        nn_social_media_FL=firm_loss(violations['Log Returns'],violations['VAR'],violations['violations'])
        nn_social_media_SL=smooth_loss(violations['Log Returns'],violations['VAR'],1-confidence_level)
            
        violations=pd.DataFrame(nn_VAR,columns=['VAR'])
        violations['Log Returns']=returns['Log Returns'].values
        violations['violations'] = (violations['Log Returns'] <violations['VAR']).astype(int)
        nn_Kupiec=vartests.kupiec_test(violations['violations'], var_conf_level=confidence_level, conf_level=0.95).get("result")
        nn_Christoffersen=duration_test(violations['violations'], conf_level=0.95).get('log-likelihood ratio test statistic')
        if not isinstance(nn_Christoffersen, np.ndarray):
            nn_Christoffersen="Reject H0"
        elif nn_Christoffersen[0]>=0.05:
            nn_Christoffersen="Fail to reject H0"
        else:
            nn_Christoffersen="Reject H0"
        nn_Joint=Joint_test(violations['violations'],confidence_level,0.95)
        nn_QL=quadratic_loss(violations['Log Returns'],violations['VAR'],violations['violations'])
        nn_FL=firm_loss(violations['Log Returns'],violations['VAR'],violations['violations'])
        nn_SL=smooth_loss(violations['Log Returns'],violations['VAR'],1-confidence_level)
        
        violations=pd.DataFrame(GARCH_VAR,columns=['VAR'])
        violations['Log Returns']=returns['Log Returns'].values
        violations['violations'] = (violations['Log Returns'] < violations['VAR']).astype(int)
        GARCH_Kupiec=vartests.kupiec_test(violations['violations'], var_conf_level=confidence_level, conf_level=0.95).get("result")
        GARCH_Christoffersen=duration_test(violations['violations'], conf_level=0.95).get('log-likelihood ratio test statistic')
        if not isinstance(GARCH_Christoffersen, np.ndarray):
            GARCH_Christoffersen="Reject H0"
        elif GARCH_Christoffersen[0]>=0.05:
            GARCH_Christoffersen="Fail to reject H0"
        else:
            GARCH_Christoffersen="Reject H0"
        GARCH_Joint=Joint_test(violations['violations'],confidence_level,0.95)
        GARCH_QL=quadratic_loss(violations['Log Returns'],violations['VAR'],violations['violations'])
        GARCH_FL=firm_loss(violations['Log Returns'],violations['VAR'],violations['violations'])
        GARCH_SL=smooth_loss(violations['Log Returns'],violations['VAR'],1-confidence_level)
            
        violations=pd.DataFrame(GARCH_X_VAR,columns=['VAR'])
        violations['Log Returns']=returns['Log Returns'].values
        violations['violations'] = (violations['Log Returns'] < violations['VAR']).astype(int)
        GARCH_X_Kupiec=vartests.kupiec_test(violations['violations'], var_conf_level=confidence_level, conf_level=0.95).get("result")
        GARCH_X_Christoffersen=duration_test(violations['violations'], conf_level=0.95).get('log-likelihood ratio test statistic')
        if not isinstance(GARCH_X_Christoffersen, np.ndarray):
            GARCH_X_Christoffersen="Reject H0"
        elif GARCH_X_Christoffersen[0]>=0.05:
            GARCH_X_Christoffersen="Fail to reject H0"
        else:
            GARCH_X_Christoffersen="Reject H0"
        GARCH_X_Joint=Joint_test(violations['violations'],confidence_level,0.95)
        GARCH_X_QL=quadratic_loss(violations['Log Returns'],violations['VAR'],violations['violations'])
        GARCH_X_FL=firm_loss(violations['Log Returns'],violations['VAR'],violations['violations'])
        GARCH_X_SL=smooth_loss(violations['Log Returns'],violations['VAR'],1-confidence_level)
        
        row=[ticker,mse_nn_social_media,rmspe_nn_social_media,mae_nn_social_media,mse_nn_social_media_1,rmspe_nn_social_media_1,
             mae_nn_social_media_1,test[0],nn_social_media_Kupiec,nn_social_media_Christoffersen,nn_social_media_Joint,
             nn_social_media_QL,nn_social_media_FL,nn_social_media_SL,mse_nn,rmspe_nn,mae_nn,mse_nn_1,rmspe_nn_1,
             mae_nn_1,test[1],nn_Kupiec,nn_Christoffersen,nn_Joint,nn_QL,nn_FL,nn_SL,mse_GARCH,rmspe_GARCH,mae_GARCH,mse_GARCH_1,
             rmspe_GARCH_1,mae_GARCH_1,test[2],GARCH_Kupiec,GARCH_Christoffersen,GARCH_Joint,GARCH_QL,GARCH_FL,GARCH_SL,
             mse_GARCH_X,rmspe_GARCH_X,mae_GARCH_X,mse_GARCH_X_1,rmspe_GARCH_X_1,mae_GARCH_X_1,test[3],GARCH_X_Kupiec,
             GARCH_X_Christoffersen,GARCH_X_Joint,GARCH_X_QL,GARCH_X_FL,GARCH_X_SL]

        print(row)
        write_to_csv_last_line(csv_name,row)
    except Exception as e:
        row=[ticker,e]
        write_to_csv_last_line(csv_name,row)

end_time = time.time()
elapsed_time = end_time - start_time
row=[round(elapsed_time, 3)]
write_to_csv_last_line(csv_name,row)    