In [5]:
import warnings
warnings.simplefilter(action='ignore', category=UserWarning)

import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import plotly.graph_objs as go
import plotly.express as px
from plotly.subplots import make_subplots
import gzip
from datetime import datetime, timedelta
from statistics import mean, median
from sklearn.neural_network import MLPClassifier
from sklearn.ensemble import RandomForestClassifier
import seaborn as sns
import matplotlib.pyplot as plt

import tensorflow
import tensorflow.keras as tf
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import Input, Dense, LeakyReLU, BatchNormalization, ReLU, LSTM, Conv1D, Conv2D
from tensorflow.keras.activations import sigmoid, tanh
from tensorflow.keras.utils import to_categorical

from sklearn.preprocessing import MinMaxScaler

from tqdm import tqdm
import csv
import random

from sklearn.metrics import accuracy_score as accuracy
from sklearn.metrics import precision_score as precision
from sklearn.metrics import recall_score as recall
from sklearn.metrics import f1_score as f1

In [6]:
def retrieve_data(varname, filename):
    df = pd.read_csv(filename, index_col=0)
    df["Date"] = pd.to_datetime(df["Date"])
    return df

def create_classification_data(df, lookback):
    rows = []
    columns = ['Date', 'SP500_relative_change_perc_1'] # Date and SP500_relative_change_perc_1 from t-0 are added first as target variables 
    
    # create column names based on original with the addition of t-i where i is lookback
    for i in range(1, lookback + 1): # starts at 1 since we do not want t-0 variables apart from 'Date' and 'SP500_relative_change_perc_1'
        new_columns = df.columns.tolist()[1:] # starts at 1 to exclude 'Date' column
        for x in range(len(new_columns)):
            new_columns[x] = new_columns[x] + "_t-" + str(i)
        columns = columns + new_columns
    
    # create lookback data
    for i, row in enumerate(df.iterrows()):
        if i > lookback: # lookback cannot be determined for earlier rows
            new_row = [row[1][0], row[1][1]] # add target 'Date' and 'SP500_relative_change_perc_1 '
            for x in range(1, lookback + 1): # starts at 1 since we do not want t-0 variables apart from 'Date' and 'SP500_relative_change_perc_1'
                add_row = df.iloc[i - x].tolist()[1:] # starts at 1 to exclude 'Date' column
                new_row = new_row + add_row
            rows.append(new_row)
    df2 = pd.DataFrame(rows)
    df2.columns = columns
    return df2

def create_train_val_test(df, year_val, year_test, perc_train=None):
    if perc_train == None:
        # assumes years_train < year_val < year_test
        df["Date"] = pd.to_datetime(df["Date"])
        
        val = df[df['Date'].dt.year == year_val]
        test = df[df['Date'].dt.year == year_test]
        train = df[df['Date'].dt.year < year_val]
    else:
        train = df.head(round(len(df) * perc_train))
        val = df.tail(len(df) - len(train))
        test = val.tail(round(0.5 * len(val)))
        val = df.head(len(val) - len(test))
    y_train = train['SP500_relative_change_perc_1']
    x_train = train.drop(['SP500_relative_change_perc_1'], axis=1)
    
    y_val = val['SP500_relative_change_perc_1']
    x_val = val.drop(['SP500_relative_change_perc_1'], axis=1)
    
    y_test = test['SP500_relative_change_perc_1']
    x_test = test.drop(['SP500_relative_change_perc_1'], axis=1)
    
    return x_train, y_train, x_val, y_val, x_test, y_test

def scale_data(x):
    standard_scaler = MinMaxScaler()
    x = x.drop(["Date"], axis=1)
    x_scaled = pd.DataFrame(standard_scaler.fit_transform(x), columns=x.columns)
    return x_scaled

In [7]:
lookback = 5
val_year = 2018
test_year = 2019

files = {
    # varname: filename
    "S&P500": "Dataset v3/SP500_reduced_data_20220425.csv",
}

for file in files:
    df = retrieve_data(file, files[file])

df = create_classification_data(df, lookback)

x_train, y_train, x_val, y_val, x_test, y_test = create_train_val_test(df, val_year, test_year)

In [8]:
def label_data(y):
    positives = []
    negatives = []
    y = list(y)
    for dev in y:
        if dev >= 0:
            positives.append(dev)
        else:
            negatives.append(dev)
    med_pos = median(positives)
    med_neg = median(negatives)
    
    labels = []
    for dev in y:
        if dev >= 0:
            if dev >= med_pos:
                labels.append(2)
            else:
                labels.append(1)
        else:
            if dev <= med_neg:
                labels.append(-2)
            else:
                labels.append(-1)
    return labels

y_train = label_data(y_train)
y_val = label_data(y_val)
y_test = label_data(y_test)

In [9]:
def random_baseline(y):
    counts = [0, 0, 0, 0]
    for i in y:
        if i == -2:
            counts[0] = counts[0] + 1
        elif i == -1:
            counts[1] = counts[1] + 1
        elif i == 1:
            counts[2] = counts[2] + 1
        elif i == 2:
            counts[3] = counts[3] + 1
    print(f"\tDistribution: {counts}")
    print(f"\tRandom baseline accuracy (majority class): {counts[np.argmax(np.asarray(counts))]/ len(y)}")
    
print("Random baseline training set")
random_baseline(y_train)
print("Random baseline validation set")
random_baseline(y_val)
print("Random baseline test set")
random_baseline(y_val)

Random baseline training set
	Distribution: [483, 483, 584, 585]
	Random baseline accuracy (majority class): 0.27400468384074944
Random baseline validation set
	Distribution: [62, 62, 63, 64]
	Random baseline accuracy (majority class): 0.2549800796812749
Random baseline test set
	Distribution: [62, 62, 63, 64]
	Random baseline accuracy (majority class): 0.2549800796812749


In [10]:
train_date = x_train[['Date']]
x_train = x_train.drop(['Date'], axis=1)

val_date = x_val[['Date']]
x_val = x_val.drop(['Date'], axis=1)

test_date = x_test[['Date']]
x_test = x_test.drop(['Date'], axis=1)

In [11]:
x_train = np.asarray(x_train)
x_val = np.asarray(x_val)
x_test = np.asarray(x_test)

y_train = np.asarray(y_train)
y_val = np.asarray(y_val)
y_test = np.asarray(y_test)

print(x_train.shape, y_train.shape)

(2135, 90) (2135,)


In [12]:
x_train = x_train.reshape((x_train.shape[0], 1, x_train.shape[1]))
x_val = x_val.reshape((x_val.shape[0], 1, x_val.shape[1]))
x_test = x_test.reshape((x_test.shape[0], 1, x_test.shape[1]))

y_train = to_categorical(y_train, 4)
y_val = to_categorical(y_val, 4)
y_test = to_categorical(y_test, 4)

y_train = y_train.reshape((y_train.shape[0], 1, y_train.shape[1]))
y_val = y_val.reshape((y_val.shape[0], 1, y_val.shape[1]))
y_test = y_test.reshape((y_test.shape[0], 1, y_test.shape[1]))

In [13]:
print(x_train.shape, y_train.shape)
print(x_val.shape, y_val.shape)
print(x_test.shape, y_test.shape)


(2135, 1, 90) (2135, 1, 4)
(251, 1, 90) (251, 1, 4)
(252, 1, 90) (252, 1, 4)


In [19]:
def evaluate(x, y):
    originals = []
    predictions = []
    
    y_pred = []

    for i in range(len(y)):
        pred = random.choice([0, 1, 2, 3])
        pred_hot = [0,0,0,0]
        pred_hot[pred] = 1
        y_pred.append(np.asarray(pred_hot))
    y_pred = np.asarray(y_pred)

    for i in range(len(y_pred)):
        originals.append(np.argmax(y[i]))
        predictions.append(np.argmax(y_pred[i]))
    
    acc = accuracy(originals, predictions)
    print(f"\tAccuracy: {acc}")
    
    mac_precision = precision(originals, predictions, average="macro")
    print(f"\tMacro-averaged precision: {mac_precision}")
    mic_precision = precision(originals, predictions, average="micro")
    print(f"\tMicro-averaged precision: {mic_precision}")
    
    mac_recall = recall(originals, predictions, average="macro")
    print(f"\tMacro-averaged recall: {mac_recall}")
    mic_recall = recall(originals, predictions, average="micro")
    print(f"\tMicro-averaged recall: {mic_recall}")
    
    mac_f1 = f1(originals, predictions, average="macro")
    print(f"\tMacro-averaged F1-score: {mac_f1}")
    mic_f1 = f1(originals, predictions, average="micro")
    print(f"\tMicro-averaged F1-score: {mic_f1}")

print("Training set evaluation")
evaluate(x_train, y_train)
print("Validation set evaluation")
evaluate(x_val, y_val)
# print("Test set evaluation")
# evaluate(model, x_test, y_test)

Training set evaluation
	Accuracy: 0.24824355971896955
	Macro-averaged precision: 0.2476732145849793
	Micro-averaged precision: 0.24824355971896955
	Macro-averaged recall: 0.18569313604208473
	Micro-averaged recall: 0.24824355971896955
	Macro-averaged F1-score: 0.2064381246512717
	Micro-averaged F1-score: 0.24824355971896955
Validation set evaluation
	Accuracy: 0.24302788844621515
	Macro-averaged precision: 0.23257291206363234
	Micro-averaged precision: 0.24302788844621515
	Macro-averaged recall: 0.17953149001536098
	Micro-averaged recall: 0.24302788844621515
	Macro-averaged F1-score: 0.1997094104297579
	Micro-averaged F1-score: 0.24302788844621515


In [176]:
y_pred = model.predict(x_val)
originals = []
predictions = []
for i in range(len(y_pred)):
    originals.append(np.argmax(y_val[i]))
    predictions.append(np.argmax(y_pred[i]))

wrong = 0
right_right = 0
right_wrong = 0

for i in range(len(y_pred)):
    if originals[i] == predictions[i]:
        print("right")
        right_right += 1
    elif (originals[i] == 0 or originals[i] == 1) and (predictions[i] == 2 or predictions[i] == 3):
        print("wrong direction")
        wrong += 1
    elif (originals[i] == 0 and predictions[i] == 1) or (originals[i] == 1 and predictions[i] == 0) or (originals[i] == 2 and predictions[i] == 3) or (originals[i] == 3 and predictions[i] == 2):
        print("right direction\twrong magnitude")
        right_wrong += 1
    
#     print(originals[i], predictions[i])

print("Total: ", len(originals))
print("Correct: ", right_right, right_right / len(originals))
print("Half Correct: ", right_wrong, right_wrong / len(originals))
print("Incorrect: ", wrong, wrong / len(originals))
        

right
wrong direction
right
wrong direction
wrong direction
wrong direction
right direction	wrong magnitude
wrong direction
right
wrong direction
right direction	wrong magnitude
right direction	wrong magnitude
right direction	wrong magnitude
right
right
right
right
right direction	wrong magnitude
right
right
right
right
right
right
wrong direction
right
right direction	wrong magnitude
right direction	wrong magnitude
right
right
right direction	wrong magnitude
right
wrong direction
right direction	wrong magnitude
right
right
right direction	wrong magnitude
right
right direction	wrong magnitude
wrong direction
right direction	wrong magnitude
right
right
right
right
right direction	wrong magnitude
right
right
right
right
wrong direction
right
right
right direction	wrong magnitude
wrong direction
right
right
right direction	wrong magnitude
right direction	wrong magnitude
wrong direction
right
right
right
wrong direction
right
wrong direction
wrong direction
right
right
right
right
right
wr

In [16]:
def retrieve_candles(filename, val_year, test_year):
    df = pd.read_csv(filename)
    df["Date"] = pd.to_datetime(df["Date"])
    df.columns = ["Date", "Close", "Open", "High", "Low", "Vol.", "Change%"]
    df = df.sort_values('Date', ascending=True)
    
    candles_val = df[df['Date'].dt.year == val_year]
    candles_train = df[df['Date'].dt.year < val_year]
    candles_train = candles_train[candles_train['Date'] > datetime(2009,7,10)]
    
    candles_train = candles_train.replace(',','', regex=True)
    candles_val = candles_val.replace(',','', regex=True)

    return candles_train, candles_val

def plot_predictions_candles(x, y, df, dataset):
    # PREDICTIONS LIJKEN NOG NIET GOED UITGELIJND MET CANDLES???
    originals = []
    predictions = []
    y_pred = model.predict(x)
    for i in range(len(y_pred)):
        originals.append(np.argmax(y[i]))
        predictions.append(np.argmax(y_pred[i]))
    
    for i in range(len(originals)):
        if originals[i] == 0:
            originals[i] = -2
        elif originals[i] == 1:
            originals[i] = -1
        elif originals[i] == 2:
            originals[i] = 1
        elif originals[i] == 3:
            originals[i] = 2
        
        if predictions[i] == 0:
            predictions[i] = -2
        elif predictions[i] == 1:
            predictions[i] = -1
        elif predictions[i] == 2:
            predictions[i] = 1
        elif predictions[i] == 3:
            predictions[i] = 2

    fig = go.Figure()
    fig.add_trace(go.Candlestick(x=df['Date'], open=df['Open'], high=df['High'], low=df['Low'], close=df['Close'], name="Price"))
    standard_factor = 0.5 * mean([float(df.iloc[0].tolist()[3]), float(df.iloc[0].tolist()[4])])
    
    green_legend = False
    orange_legend = False
    red_legend = False
    
    for i, row in enumerate(df.iterrows()):
        pred_date = row[1][0].to_pydatetime()
        pred_high = float(row[1][3])
        pred_low = float(row[1][4])
#         real_label = row[1][7]
#         pred_label = row[1][8]
        real_label = originals[i]
        pred_label = predictions[i]
        
        start_shape = pred_date - timedelta(days=0.1)
        end_shape = pred_date + timedelta(days=0.1)
        
        if pred_label == 1:
            extreme = pred_high + 0.1 * standard_factor
            standard = pred_high + 0.05 * standard_factor
        elif pred_label == 2:
            extreme = pred_high + 0.2 * standard_factor
            standard = pred_high + 0.05 * standard_factor
        elif pred_label == -1:
            extreme = pred_low - 0.1 * standard_factor
            standard = pred_low -  0.05 * standard_factor
        elif pred_label == -2:
            extreme = pred_low - 0.2 * standard_factor
            standard = pred_low - 0.05 * standard_factor
            
        if real_label == pred_label:
            color = "green"
        elif np.sign(real_label) == np.sign(pred_label):
            color = "orange"
        else:
            color = "red"
        
        if real_label == pred_label and not green_legend:
            green_legend = True
            fig.add_trace(go.Scatter(x=[start_shape,pred_date,end_shape,start_shape], 
                                 y=[standard,extreme,standard,standard], 
                                 fill="toself", name="Correct Direction / Correct Magnitude", mode="lines", line=dict(color=color)))
        elif np.sign(real_label) == np.sign(pred_label) and not orange_legend:
            orange_legend = True
            fig.add_trace(go.Scatter(x=[start_shape,pred_date,end_shape,start_shape], 
                                 y=[standard,extreme,standard,standard], 
                                 fill="toself", name="Correct Direction / Inorrect Magnitude", mode="lines", line=dict(color=color)))
        elif np.sign(real_label) != np.sign(pred_label) and not red_legend:
            red_legend = True
            fig.add_trace(go.Scatter(x=[start_shape,pred_date,end_shape,start_shape], 
                                 y=[standard,extreme,standard,standard], 
                                 fill="toself", name="Incorrect Direction", mode="lines", line=dict(color=color)))
        else:
            fig.add_trace(go.Scatter(x=[start_shape,pred_date,end_shape,start_shape], 
                                 y=[standard,extreme,standard,standard], 
                                 fill="toself", showlegend=False, mode="lines", line=dict(color=color)))
    
    title = f"Visualization of {dataset} Predictions"
    fig.update_xaxes(title_text="Date")
    fig.update_yaxes(title_text="Price")
    layout = dict(title=title, height=800, width=1500)
    fig.update_layout(layout)
#     config = dict({'scrollZoom': True})
#     fig.show(config=config)
    fig.show()

filename = "Dataset v3/Indices/S&P 500 Historical Data.csv"
candles_train, candles_val = retrieve_candles(filename, val_year, test_year) 
plot_predictions_candles(x_train, y_train, candles_train, "Training")
plot_predictions_candles(x_val, y_val, candles_val, "Validation")