In [None]:
!pip install yfinance

In [None]:
import math
import os, datetime
import shutil
from functools import reduce

import numpy as np
import pandas as pd
from pandas.core.common import flatten
import matplotlib.pyplot as plt

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import SimpleRNN, Dense
import tensorflow as tf
from tensorflow.keras.models import *
from tensorflow.keras.layers import *

import yfinance as yf

In [None]:
OUTPUT_FILE_NAME = ... # OUTPUT FILENAME TO SAVE
MOVING_AVERAGE_STEPS = 14
SEQUENCE_LEN = 128
EPOCHS = 100 # 35, 50, or 100 is good
BATCH_SIZE = 32

# Put the ticker of the stock and its name in this list
LIST_OF_OBJECT = [
    ("^GSPC", "S&P500"),
    ("^IXIC", "NASDAQ"),
]

# Transformer constant
d_k = 256
d_v = 256
n_heads = 12
ff_dim = 256

In [None]:
def get(code: str, moving_average_steps = MOVING_AVERAGE_STEPS):
    # Fetch history from yahoo finance
    df = yf.Ticker(code).history(period = 'max')
    df.drop(columns = ['Dividends', 'Stock Splits'], inplace = True, axis = 1)
    
    # Create missing date, fill forward method
    start_date = df.index.min()
    end_date = df.index.max()
    all_dates = pd.date_range(start = start_date, end = end_date)
    df = df.reindex(all_dates)
    df.reset_index(inplace = True, names = ['Date'])
    df.fillna(0, inplace = True)
    df.sort_values('Date', inplace = True, ascending = False)
    df.replace(to_replace = 0, method = 'ffill', inplace = True)
    df.sort_values('Date', inplace = True)
    
    # Get the close prices for later use
    plain_close_price = df['Close'].copy().values
    
    # Apply Moving Average and Clear NaN
    df[['Open', 'High', 'Low', 'Close', 'Volume']] = df[['Open', 'High', 'Low', 'Close', 'Volume']].rolling(moving_average_steps).mean()
    df.dropna(how = 'any', axis = 0, inplace = True)
    
    # Get the moving average values for later use
    mva_close_price = df['Close'].copy().values
    
    return df, plain_close_price, mva_close_price

def draw_close_and_volume(df, df_name):
    to_str = lambda x: x.strftime("%Y")
    
    fig = plt.figure(figsize=(15,10))
    st = fig.suptitle(f"{df_name} Close Price and Volume", fontsize=20)
    st.set_y(0.92)

    ax1 = fig.add_subplot(211)
    ax1.plot(df['Close'], label=f'{df_name} Close Price')
    ax1.set_xticks(range(0, df.shape[0], 1464))
    ax1.set_xticklabels(list(map(to_str, df['Date']))[::1464])
    ax1.set_ylabel('Close Price', fontsize=18)
    ax1.legend(loc="upper left", fontsize=12)

    ax2 = fig.add_subplot(212)
    ax2.plot(df['Volume'], label=f'{df_name} Volume')
    ax2.set_xticks(range(0, df.shape[0], 1464))
    ax2.set_xticklabels(list(map(to_str, df['Date']))[::1464])
    ax2.set_ylabel('Volume', fontsize=18)
    ax2.legend(loc="upper left", fontsize=12)
    
def normalize(df):
    # Convert price and volume into daily delta values to make the series stationary.
    # Then normalise with min-max

    '''Calculate percentage change'''

    df['Open'] = df['Open'].pct_change() 
    df['High'] = df['High'].pct_change()
    df['Low'] = df['Low'].pct_change()
    df['Close'] = df['Close'].pct_change()
    df['Volume'] = df['Volume'].pct_change()

    df.dropna(how='any', axis=0, inplace=True) # Drop all rows with NaN values
    
    # Get percentage changed values for later use
    pct_close_price = df['Close'].copy().values
    
    ###############################################################################
    '''Normalize price columns'''

    min_return = min(df[['Open', 'High', 'Low', 'Close']].min(axis=0))
    max_return = max(df[['Open', 'High', 'Low', 'Close']].max(axis=0))

    # Min-max normalize price columns (0-1 range)
    df['Open'] = (df['Open'] - min_return) / (max_return - min_return)
    df['High'] = (df['High'] - min_return) / (max_return - min_return)
    df['Low'] = (df['Low'] - min_return) / (max_return - min_return)
    df['Close'] = (df['Close'] - min_return) / (max_return - min_return)

    # Get normalized values for later use
    normalize_close_price = df['Close'].copy().values
    
    ###############################################################################
    '''Normalize volume column'''

    # Get max-min encoding values for later use
    min_volume = df['Volume'].min(axis=0)
    max_volume = df['Volume'].max(axis=0)

    # Min-max normalize volume columns (0-1 range)
    df['Volume'] = (df['Volume'] - min_volume) / (max_volume - min_volume)
    
    df.replace(0, np.nan, inplace = True)
    df.fillna(method = 'ffill', inplace = True)

    return min_return, max_return, pct_close_price, normalize_close_price

""" MEAN NOT NAN """

def create_new_column(df, regex_name, method):
    cols = df.filter(regex=regex_name)
    Col = []
    mnn_params = []
    
    if method == "am":
        for _, row in cols.iterrows():
            sum, cnt = 0, 0
            for name in cols.columns:
                if not np.isnan(row[name]):
                    sum += row[name]
                    cnt += 1

            Col.append(sum / cnt)
            mnn_params.append(cnt)
    
    else:
        for _, row in cols.iterrows():
            pro, cnt = 1, 0
            for name in cols.columns:
                if not np.isnan(row[name]):
                    pro *= row[name]
                    cnt += 1

            Col.append(np.power(pro, 1 / cnt))
            mnn_params.append(cnt)
    
    return pd.Series(np.transpose(np.array(Col))), mnn_params

def arithmetic_mean_not_nan(dfs):
    method = "am"
            
    df = dfs[0]
    for i in range(1, len(dfs)):
        df = pd.merge(df, dfs[i], on=['Date'], how='outer', suffixes=['', f'_{i}'])
    
    df['Open'], _ = create_new_column(df, "^Open", method)
    df['High'], _ = create_new_column(df, "^High", method)
    df['Low'], _ = create_new_column(df, "^Low", method)
    df['Close'], mnn_close_price = create_new_column(df, "^Close", method)
    df['Volume'], _ = create_new_column(df, "^Volume", method)
    
    cols = list(filter(lambda x: '_' in x, df.columns))

    df.drop(columns = cols, inplace = True)
    df.dropna(axis = 1, how='any', inplace = True)
    
    return df, mnn_close_price

def geometry_mean_not_nan(dfs):
    method = "gm"
            
    df = dfs[0]
    for i in range(1, len(dfs)):
        df = pd.merge(df, dfs[i], on=['Date'], how='outer', suffixes=['', f'_{i}'])
    
    df['Open'], _ = create_new_column(df, "^Open", method)
    df['High'], _ = create_new_column(df, "^High", method)
    df['Low'], _ = create_new_column(df, "^Low", method)
    df['Close'], mnn_close_price = create_new_column(df, "^Close", method)
    df['Volume'], _ = create_new_column(df, "^Volume", method)
    
    cols = list(filter(lambda x: '_' in x, df.columns))

    df.drop(columns = cols, inplace = True)
    df.dropna(axis = 1, how='any', inplace = True)
    
    return df, mnn_close_price

def split(df):
    # Sort on date and find the rows -10% and -20% from the end
    times = sorted(df.index.values)
    last_10pct = sorted(df.index.values)[-int(0.1*len(times))] # Last 10% of series
    last_20pct = sorted(df.index.values)[-int(0.2*len(times))] # Last 20% of series

    # Split train, valid and test
    df_train = df[(df.index < last_20pct)]  # Training data are 80% of total data
    df_val = df[(df.index >= last_20pct) & (df.index < last_10pct)]
    df_test = df[(df.index >= last_10pct)]

    # Remove date column
    df_train.drop(columns=['Date'], inplace=True)
    df_val.drop(columns=['Date'], inplace=True)
    df_test.drop(columns=['Date'], inplace=True)

    # Convert pandas columns into arrays
    return df_train, df_val, df_test

def draw_date_seperation(df_train, df_val, df_test, train_data, val_data, test_data):
    fig = plt.figure(figsize=(15,12))
    st = fig.suptitle("Data Separation", fontsize=20)
    st.set_y(0.95)

    ###############################################################################

    ax1 = fig.add_subplot(211)
    ax1.plot(np.arange(train_data.shape[0]), df_train['Close'], label='Training data')

    ax1.plot(np.arange(train_data.shape[0],
                       train_data.shape[0]+val_data.shape[0]), df_val['Close'], label='Validation data')

    ax1.plot(np.arange(train_data.shape[0]+val_data.shape[0],
                       train_data.shape[0]+val_data.shape[0]+test_data.shape[0]), df_test['Close'], label='Test data')
    ax1.set_xlabel('Date')
    ax1.set_ylabel('Normalized Closing Returns')
    ax1.set_title("Close Price", fontsize=18)
    ax1.legend(loc="best", fontsize=12)

    ###############################################################################

    ax2 = fig.add_subplot(212)
    ax2.plot(np.arange(train_data.shape[0]), df_train['Volume'], label='Training data')

    ax2.plot(np.arange(train_data.shape[0],
                       train_data.shape[0]+val_data.shape[0]), df_val['Volume'], label='Validation data')

    ax2.plot(np.arange(train_data.shape[0]+val_data.shape[0],
                       train_data.shape[0]+val_data.shape[0]+test_data.shape[0]), df_test['Volume'], label='Test data')
    ax2.set_xlabel('Date')
    ax2.set_ylabel('Normalized Volume Changes')
    ax2.set_title("Volume", fontsize=18)
    ax2.legend(loc="best", fontsize=12)
    
def split_data(train_data, val_data, test_data, seq_len = SEQUENCE_LEN):
    # Training data
    X_train, y_train = [], []
    for i in range(seq_len, len(train_data)):
      X_train.append(train_data[i-seq_len:i, 3]) # Chunks of training data with a length of 128 df-rows
      y_train.append(train_data[:, 3][i]) #Value of 4th column (Close Price) of df-row 128+1
    X_train, y_train = np.array(X_train), np.array(y_train)

    ###############################################################################

    # Validation data
    X_val, y_val = [], []
    for i in range(seq_len, len(val_data)):
        X_val.append(val_data[i-seq_len:i, 3])
        y_val.append(val_data[:, 3][i])
    X_val, y_val = np.array(X_val), np.array(y_val)

    ###############################################################################

    # Test data
    X_test, y_test = [], []
    for i in range(seq_len, len(test_data)):
        X_test.append(test_data[i-seq_len:i, 3])
        y_test.append(test_data[:, 3][i])
    X_test, y_test = np.array(X_test), np.array(y_test)

    print('Training set shape', X_train.shape, y_train.shape)
    print('Validation set shape', X_val.shape, y_val.shape)
    print('Testing set shape' ,X_test.shape, y_test.shape)
    
    return X_train, y_train, X_val, y_val, X_test, y_test

class Time2Vector(Layer):
  def __init__(self, seq_len, **kwargs):
    super(Time2Vector, self).__init__()
    self.seq_len = seq_len

  def build(self, input_shape):
    '''Initialize weights and biases with shape (batch, seq_len)'''
    # initiate 6 matrices, 3 for ω and 3 forφ since we need aω and φ matrix for
    # non-periodical (linear) and the periodical (sin, cosine) features.
    self.weights_linear = self.add_weight(name='weight_linear',
                                shape=(int(self.seq_len),),
                                initializer='uniform',
                                trainable=True)

    self.bias_linear = self.add_weight(name='bias_linear',
                                shape=(int(self.seq_len),),
                                initializer='uniform',
                                trainable=True)

    self.weights_periodic_sine = self.add_weight(name='weight_periodic',
                                shape=(int(self.seq_len),),
                                initializer='uniform',
                                trainable=True)

    self.bias_periodic_sine = self.add_weight(name='bias_periodic',
                                shape=(int(self.seq_len),),
                                initializer='uniform',
                                trainable=True)
    
    self.weights_periodic_cosine = self.add_weight(name='weight_periodic',
                                shape=(int(self.seq_len),),
                                initializer='uniform',
                                trainable=True)

    self.bias_periodic_cosine = self.add_weight(name='bias_periodic',
                                shape=(int(self.seq_len),),
                                initializer='uniform',
                                trainable=True)

  def call(self, x):
    '''Calculate linear and periodic time features'''

    # Exclude Volume and average across the Open, High, Low, and Close prices, resulting
    # in the shape (batch_size, seq_len)
    x = tf.math.reduce_mean(x[:,:,:4], axis=-1)

    # calculate the non-periodic (linear) time feature and expand the dimension by 1 again ie. (batch_size, seq_len, 1)
    time_linear = self.weights_linear * x + self.bias_linear # Linear time feature
    time_linear = tf.expand_dims(time_linear, axis=-1) # Add dimension (batch, seq_len, 1)

    # repeat for the periodic time feature, also resulting in the same matrix shape. (batch_size, seq_len, 1)
    time_periodic_sine = tf.math.sin(tf.multiply(x, self.weights_periodic_sine) + self.bias_periodic_sine)
    time_periodic_sine = tf.expand_dims(time_periodic_sine, axis=-1) # Add dimension (batch, seq_len, 1)
    
    # repeat for the periodic time feature, also resulting in the same matrix shape. (batch_size, seq_len, 1)
    time_periodic_cosine = tf.math.cos(tf.multiply(x, self.weights_periodic_cosine) + self.bias_periodic_cosine)
    time_periodic_cosine = tf.expand_dims(time_periodic_cosine, axis=-1) # Add dimension (batch, seq_len, 1)

    # concatenate the linear and periodic time feature. (batch_size, seq_len, 3)
    return tf.concat([time_linear, time_periodic_sine, time_periodic_cosine], axis=-1) # shape = (batch, seq_len, 3)

  def get_config(self): # Needed for saving and loading model with custom layer
    config = super().get_config().copy()
    config.update({'seq_len': self.seq_len})
    return config

class SingleAttention(Layer):
  def __init__(self, d_k, d_v):
    super(SingleAttention, self).__init__()
    self.d_k = d_k
    self.d_v = d_v

  def build(self, input_shape):
    self.query = Dense(self.d_k,
                       input_shape=input_shape,
                       kernel_initializer='glorot_uniform',
                       bias_initializer='glorot_uniform')

    self.key = Dense(self.d_k,
                     input_shape=input_shape,
                     kernel_initializer='glorot_uniform',
                     bias_initializer='glorot_uniform')

    self.value = Dense(self.d_v,
                       input_shape=input_shape,
                       kernel_initializer='glorot_uniform',
                       bias_initializer='glorot_uniform')

  def call(self, inputs): # inputs = (in_seq, in_seq, in_seq)
    q = self.query(inputs[0])
    k = self.key(inputs[1])

    attn_weights = tf.matmul(q, k, transpose_b=True)
    attn_weights = tf.map_fn(lambda x: x/np.sqrt(self.d_k), attn_weights)
    attn_weights = tf.nn.softmax(attn_weights, axis=-1)

    v = self.value(inputs[2])
    attn_out = tf.matmul(attn_weights, v)
    return attn_out

class MultiAttention(Layer):
  def __init__(self, d_k, d_v, n_heads):
    super(MultiAttention, self).__init__()
    self.d_k = d_k
    self.d_v = d_v
    self.n_heads = n_heads
    self.attn_heads = list()

  def build(self, input_shape):
    for n in range(self.n_heads):
      self.attn_heads.append(SingleAttention(self.d_k, self.d_v))

    self.linear = Dense(input_shape[0][-1],
                        input_shape=input_shape,
                        kernel_initializer='glorot_uniform',
                        bias_initializer='glorot_uniform')

  def call(self, inputs):
    attn = [self.attn_heads[i](inputs) for i in range(self.n_heads)]
    concat_attn = tf.concat(attn, axis=-1)
    multi_linear = self.linear(concat_attn)
    return multi_linear

class TransformerEncoder(Layer):
  def __init__(self, d_k, d_v, n_heads, ff_dim, dropout=0.1, **kwargs):
    super(TransformerEncoder, self).__init__()
    self.d_k = d_k
    self.d_v = d_v
    self.n_heads = n_heads
    self.ff_dim = ff_dim
    self.attn_heads = list()
    self.dropout_rate = dropout

  def build(self, input_shape):
    self.attn_multi = MultiAttention(self.d_k, self.d_v, self.n_heads)
    self.attn_dropout = Dropout(self.dropout_rate)
    self.attn_normalize = LayerNormalization(input_shape=input_shape, epsilon=1e-6)

    self.ff_conv1D_1 = Conv1D(filters=self.ff_dim, kernel_size=1, activation='relu')
    self.ff_conv1D_2 = Conv1D(filters=input_shape[0][-1], kernel_size=1)
    self.ff_dropout = Dropout(self.dropout_rate)
    self.ff_normalize = LayerNormalization(input_shape=input_shape, epsilon=1e-6)

  def call(self, inputs): # inputs = (in_seq, in_seq, in_seq)
    attn_layer = self.attn_multi(inputs)
    attn_layer = self.attn_dropout(attn_layer)
    attn_layer = self.attn_normalize(inputs[0] + attn_layer)

    ff_layer = self.ff_conv1D_1(attn_layer)
    ff_layer = self.ff_conv1D_2(ff_layer)
    ff_layer = self.ff_dropout(ff_layer)
    ff_layer = self.ff_normalize(inputs[0] + ff_layer)
    return ff_layer

  def get_config(self): # Needed for saving and loading model with custom layer
    config = super().get_config().copy()
    config.update({'d_k': self.d_k,
                   'd_v': self.d_v,
                   'n_heads': self.n_heads,
                   'ff_dim': self.ff_dim,
                   'attn_heads': self.attn_heads,
                   'dropout_rate': self.dropout_rate})
    return config

def create_model():
  '''Initialize time and transformer layers'''
  time_embedding = Time2Vector(SEQUENCE_LEN)
  attn_layer1 = TransformerEncoder(d_k, d_v, n_heads, ff_dim)
  attn_layer2 = TransformerEncoder(d_k, d_v, n_heads, ff_dim)
  attn_layer3 = TransformerEncoder(d_k, d_v, n_heads, ff_dim)
  attn_layer4 = TransformerEncoder(d_k, d_v, n_heads, ff_dim)
  attn_layer5 = TransformerEncoder(d_k, d_v, n_heads, ff_dim)

  '''Construct model'''
  in_seq = Input(shape=(SEQUENCE_LEN, 5))
  x = time_embedding(in_seq)
  x = Concatenate(axis=-1)([in_seq, x])
  x = attn_layer1((x, x, x))
  x = attn_layer2((x, x, x))
  x = attn_layer3((x, x, x))
  x = attn_layer4((x, x, x))
  x = attn_layer5((x, x, x))
  x = GlobalAveragePooling1D(data_format='channels_first')(x)
  x = Dropout(0.1)(x)
  x = Dense(64, activation='relu')(x)
  x = Dropout(0.1)(x)
  out = Dense(1, activation='linear')(x)

  model = Model(inputs=in_seq, outputs=out)
  model.compile(loss='mse', optimizer='adam', metrics=['mae', 'mape'])
  return model

def evaluate(model, X_train, y_train, X_val, y_val, X_test, y_test):
    ###############################################################################
    '''Calculate predictions and metrics'''

    #Calculate predication for training, validation and test data
    train_pred = model.predict(X_train)
    val_pred = model.predict(X_val)
    test_pred = model.predict(X_test)

    #Print evaluation metrics for all datasets
    # Returned eval object contains loss and metric values
    train_eval = model.evaluate(X_train, y_train, verbose=0)
    val_eval = model.evaluate(X_val, y_val, verbose=0)
    test_eval = model.evaluate(X_test, y_test, verbose=0)
    print(' ')
    print('Evaluation metrics')
    print('Training Data - Loss: {:.4f}, MAE: {:.4f}, MAPE: {:.4f}'.format(train_eval[0], train_eval[1], train_eval[2]))
    print('Validation Data - Loss: {:.4f}, MAE: {:.4f}, MAPE: {:.4f}'.format(val_eval[0], val_eval[1], val_eval[2]))
    print('Test Data - Loss: {:.4f}, MAE: {:.4f}, MAPE: {:.4f}'.format(test_eval[0], test_eval[1], test_eval[2]))
    
    return (train_pred, val_pred, test_pred)

def display(evaluation_result, name):
    train_pred, val_pred, test_pred = evaluation_result
    
    '''Display results from predict()'''

    fig = plt.figure(figsize=(15,20))
    st = fig.suptitle("Transformer + TimeEmbedding Model", fontsize=22)
    st.set_y(0.92)

    #Plot training data results
    ax11 = fig.add_subplot(311)
    ax11.plot(train_data[:, 3], label=f'{name} Closing Returns')
    ax11.plot(np.arange(SEQUENCE_LEN, train_pred.shape[0]+SEQUENCE_LEN), train_pred, linewidth=3, label=f'Predicted {name} Closing Returns')
    ax11.set_title("Training Data", fontsize=18)
    ax11.set_xlabel('Date')
    ax11.set_ylabel(f'{name} Closing Returns')
    ax11.legend(loc="best", fontsize=12)

    #Plot validation data results
    ax21 = fig.add_subplot(312)
    ax21.plot(val_data[:, 3], label=f'{name} Closing Returns')
    ax21.plot(np.arange(SEQUENCE_LEN, val_pred.shape[0]+SEQUENCE_LEN), val_pred, linewidth=3, label=f'Predicted {name} Closing Returns')
    ax21.set_title("Validation Data", fontsize=18)
    ax21.set_xlabel('Date')
    ax21.set_ylabel(f'{name} Closing Returns')
    ax21.legend(loc="best", fontsize=12)

    #Plot test data results
    ax31 = fig.add_subplot(313)
    ax31.plot(test_data[:, 3], label=f'{name} Closing Returns')
    ax31.plot(np.arange(SEQUENCE_LEN, test_pred.shape[0]+SEQUENCE_LEN), test_pred, linewidth=3, label=f'Predicted {name} Closing Returns')
    ax31.set_title("Test Data", fontsize=18)
    ax31.set_xlabel('Date')
    ax31.set_ylabel(f'{name} Closing Returns')
    ax31.legend(loc="best", fontsize=12)

In [None]:
def adjust_list_lengths(target_list, *lists):
    # Get the length of the target list
    target_length = len(target_list)
    
    # Placeholder for extending lists that are too short
    placeholder = 1
    
    # Create a list to store the adjusted lists
    adjusted_lists = []
    
    # Iterate through the provided lists
    for lst in lists:
        # Calculate the length difference
        length_difference = len(lst) - target_length
        
        if length_difference > 0:
            # List is too long: trim from the beginning
            adjusted_list = lst[length_difference:]
        elif length_difference < 0:
            # List is too short: prepend placeholders to the beginning
            adjusted_list = [placeholder] * abs(length_difference) + lst
        else:
            # List is already the right length
            adjusted_list = lst
        
        # Add the adjusted list to the list of adjusted lists
        adjusted_lists.append(adjusted_list)
    
    return adjusted_lists

from sklearn.metrics import mean_squared_error, mean_absolute_error, accuracy_score

def calculate_rmse(real_prices, predicted_prices):
    mse = mean_squared_error(real_prices, predicted_prices)
    return np.sqrt(mse)

def calculate_mse(real_prices, predicted_prices):
    return mean_squared_error(real_prices, predicted_prices)

def calculate_mape(real_prices, predicted_prices):
    real_prices, predicted_prices = np.array(real_prices), np.array(predicted_prices)
    mape = np.mean(np.abs((real_prices - predicted_prices) / real_prices)) * 100
    return mape

def calculate_mae(real_prices, predicted_prices):
    return mean_absolute_error(real_prices, predicted_prices)

def calculate_r2(real_prices, predicted_prices):
    TSS = np.sum((np.array(real_prices) - np.mean(np.array(real_prices))) ** 2)

    # Calculate RSS (Residual Sum of Squares)
    RSS = np.sum((np.array(real_prices) - np.array(predicted_prices)) ** 2)

    # Calculate R²
    R2 = 1 - (RSS / TSS)
    return R2

def stat(real_prices, predicted_prices):
    print("RMSE:", calculate_rmse(real_prices, predicted_prices))
    print("MSE:", calculate_mse(real_prices, predicted_prices))    
    print("MAPE:", calculate_mape(real_prices, predicted_prices))    
    print("MAE:", calculate_mae(real_prices, predicted_prices))    
    print("R2:", calculate_r2(real_prices, predicted_prices))    

***TRANING PROCESS STARTS FROM HERE***

In [None]:
# Single feature
if len(LIST_OF_OBJECt) == 1:
    df = get('XOM')[0]
    normalize(df)
    
# Multiple features
else:
    dfs = [get(code)[0] for code, _ in LIST_OF_OBJECT]

    for df, (_, name) in zip(dfs, LIST_OF_OBJECT):
        draw_close_and_volume(df, name)

    for df in dfs:
        normalize(df)

    df, mnn_params = geometry_mean_not_nan(dfs)

In [None]:
# Split the data sequentially into 80% train, 10% valid and 10% test

###############################################################################
'''Create training, validation and test split'''
df_train, df_val, df_test = split(df)
train_data, val_data, test_data = df_train.values, df_val.values, df_test.values

print('Training data shape: {}'.format(train_data.shape))
print('Validation data shape: {}'.format(val_data.shape))
print('Test data shape: {}'.format(test_data.shape))

In [None]:
draw_date_seperation(df_train, df_val, df_test, train_data, val_data, test_data)
X_train, y_train, X_val, y_val, X_test, y_test = split_data(train_data, val_data, test_data)

In [None]:
model.summary()

In [None]:
model = create_model()
model.summary()

In [None]:
callback = tf.keras.callbacks.ModelCheckpoint(f'{OUTPUT_FILE_NAME}.keras',
                                              monitor='val_loss',
                                              save_best_only=True, verbose=1)

history = model.fit(X_train, y_train,
                    batch_size=BATCH_SIZE,
                    epochs=EPOCHS,
                    callbacks=[callback],
                    validation_data=(X_val, y_val))

In [None]:
evaluate_result = evaluate(model, X_train, y_train, X_val, y_val, X_test, y_test)

In [None]:
display(evaluate_result, "Mix Data")

In [None]:
fig = plt.figure(figsize=(15,20))
st = fig.suptitle("Transformer + TimeEmbedding Model Metrics", fontsize=22)
st.set_y(0.92)

#Plot model loss
ax1 = fig.add_subplot(311)
ax1.plot(history.history['loss'], label='Training loss (MSE)')
ax1.plot(history.history['val_loss'], label='Validation loss (MSE)')
ax1.set_title("Model loss", fontsize=18)
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Loss (MSE)')
ax1.legend(loc="best", fontsize=12)

#Plot MAE
ax2 = fig.add_subplot(312)
ax2.plot(history.history['mae'], label='Training MAE')
ax2.plot(history.history['val_mae'], label='Validation MAE')
ax2.set_title("Model metric - Mean average error (MAE)", fontsize=18)
ax2.set_xlabel('Epoch')
ax2.set_ylabel('Mean average error (MAE)')
ax2.legend(loc="best", fontsize=12)

#Plot MAPE
ax3 = fig.add_subplot(313)
ax3.plot(history.history['mape'], label='Training MAPE')
ax3.plot(history.history['val_mape'], label='Validation MAPE')
ax3.set_title("Model metric - Mean average percentage error (MAPE)", fontsize=18)
ax3.set_xlabel('Epoch')
ax3.set_ylabel('Mean average percentage error (MAPE)')
ax3.legend(loc="best", fontsize=12)

***FURTHER EVALUATION***

In [None]:
target, plain_close_price, mva_close_price = get('...') # Place the target stock's ticker to evaluate
m, M, pct_close_price, normalize_close_price = normalize(target)
plain_close_price = plain_close_price.tolist()
mva_close_price = mva_close_price.tolist()
pct_close_price = pct_close_price.tolist()
normalize_close_price = normalize_close_price.tolist()

In [None]:
""" Predict real target """

data = target.drop(columns = ['Date'])
data = data.values
x = []
y = []
for i in range(SEQUENCE_LEN, len(data)):
    x.append(data[i - SEQUENCE_LEN:i])
    y.append(data[:, 3][i]) #Value of 4th column (Close Price) of df-row 128+1
x, y = np.array(x), np.array(y)

x = model.predict(x)
x = x.tolist()
x = list(flatten(x))

In [None]:
plt.figure(figsize=(15,20))
plt.plot(y)
plt.plot(x)

In [None]:
# Display statistic base on RMSE, MSE, MAPE, MAE, R2
stat(y, x)

In [None]:
# The model may predict some values that greater than 1 (it is rare, but if it happened, we need to handle)
normalize_predict_close_price = []
for i in range(len(x)):
    if x[i] > 1:
        normalize_predict_close_price.append(1 / x[i])
    else:
        normalize_predict_close_price.append(x[i])

In [None]:
# Concatinating back the first 128 values that we have lost due to splitting data
normalize_predict_close_price = normalize_close_price[:128] + normalize_predict_close_price
plt.plot(normalize_predict_close_price)
plt.plot(normalize_close_price)

***Reverse to find back values***

In [None]:
# "Find back" predicted percentage change values
pct_predict_close_price = []
for val in normalize_predict_close_price:
    pct_predict_close_price.append(val * (M - m) + m)

# And display it with the real one
plt.plot(pct_predict_close_price)
plt.plot(pct_close_price)

In [None]:
# "Find back" moving average values
mva_predict_close_price = [mva_close_price[0]]
for i, val in enumerate(pct_predict_close_price):
    mva_predict_close_price.append(mva_close_price[i] * (1 + val))
    
# And display it with the real one
plt.figure(figsize=(15,20))
plt.plot(mva_predict_close_price)
plt.plot(mva_close_price)

In [None]:
# Checking the statistic of our reverse engineering with moving average prices
stat(mva_close_price, mva_predict_close_price)

In [None]:
# "Find back" real values
plain_predict_close_price = plain_close_price[:MOVING_AVERAGE_STEPS - 1]

for i, val in enumerate(mva_predict_close_price):
    plain_predict_close_price.append(
        abs(val * MOVING_AVERAGE_STEPS - sum(plain_close_price[i :MOVING_AVERAGE_STEPS - 1 + i]))
    )

# And display it with the real one
plt.figure(figsize=(20,20))
plt.plot(plain_predict_close_price)
plt.plot(plain_close_price)

In [None]:
# Checking the statistic of our reverse engineering with real close prices
stat(plain_close_price, plain_predict_close_price)

In [None]:
def convert_to_trend(prices):
    trend = []
    # Iterate through the list starting from the second element
    for i in range(1, len(prices)):
        # Compare the current price with the previous price
        if prices[i] >= prices[i - 1]:
            trend.append(1)  # Increasing trend
        else:
            trend.append(0)  # No change in trend
    return trend

In [None]:
plain_trend = convert_to_trend(mva_close_price)
predict_trend = convert_to_trend(mva_predict_close_price)

In [None]:
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

def evaluate_trend_predictions(real_trends, predicted_trends):
    # Calculate accuracy
    accuracy = accuracy_score(real_trends, predicted_trends)
    
    # Calculate precision, recall, and F1-score for each class (0, 1)
    precision, recall, f1_score, _ = precision_recall_fscore_support(
        real_trends, predicted_trends, labels=[0, 1], zero_division=0
    )
    
    # Print the results
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision for 0, 1: {precision}")
    print(f"Recall for 0, 1: {recall}")
    print(f"F1-score for 0, 1: {f1_score}")

    # Return the metrics as a dictionary
    metrics = {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1_score': f1_score
    }
    
    return metrics

In [None]:
c = evaluate_trend_predictions(plain_trend, predict_trend)