In [20]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import time as tm
import datetime

def windowed_dataset(data_series, num=5):
    """
    windowed the data
    args:
      data_series: (ndarray)
      num: (int, default:5)
    return: 
      windowed_data (ndarray)
    """
    windowed_data = []
    target_data = []
    for i in range(len(data_series) - num):
        windowed_data.append(data_series[i : i + num])
        target_data.append(data_series[i + num])
    return np.array(windowed_data), np.array(target_data)

def import_dataset(path):
    """
    Import the dataset contains data of time series
    The data of oldest time should be on top and the recent should be on bottom
    args:
      path : directory of the data file (str)
    return:
      dataframe (DataFrame)
    """
    df = pd.read_csv(path)
    return df

def split_series_and_date(df):
    """
    splitting the series and date into two different variable
    args:
      df : dataset of time series (DataFrame)
    return:
      series (DataFrame)
      time (ndarray)
    """
    time = df[df.columns[0]].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    return df, time

def normalize(data_series):
    """
    Normalized the data based on each columns
    args:
      data_series: (ndarray)
    return: 
      normalized_data (ndarray), maximum (ndarray), minimum (ndarray)
    """
    maximum = np.array([])
    minimum = np.array([])
    normalized_data = []
    for col in data_series.T:
        max_col = np.max(col)
        min_col = np.min(col)
        normalized_data_col = (col - min_col) / (max_col - min_col)
        normalized_data.append(normalized_data_col)
        maximum = np.concatenate([maximum, np.array([max_col])])
        minimum = np.concatenate([minimum, np.array([min_col])])
    return np.array(normalized_data).T, maximum, minimum

def denormalize(normalized_data, maximum, minimum):
    """
    Denormalized the data based on maximum and minimum value of each column before
    args:
      normalized_data: (ndarray)
      maximum: (ndarray)
      minimum: (ndarray)
    return: 
      denormalized_data (ndarray)
    """
    normalized_data = np.array(normalized_data).T
    denormalized_data = []
    for max_col, min_col, col in zip(maximum, minimum, normalized_data):
        denormalized_data_col = (col * (max_col - min_col)) + min_col
        denormalized_data.append(denormalized_data_col)
    return np.array(denormalized_data).T
    
def initial_weights_and_biases(num_data, hidden_size, zero = False):
    """
    initial weights and biases for Vanilla LSTM method
    args:
      num_data : the size of data in one time (int)
      hidden_size : the size of hidden state (int)
    return:
      params (tuple) contains:
        W_g : weight from x_t to g
        W_i : weight from x_t to i
        W_f : weight from x_t to f
        W_o : weight from x_t to o
        W_y : weight from h_t to y
        R_g : weight from h_prev to g
        R_i : weight from h_prev to i
        R_f : weight from h_prev to f
        R_o : weight from h_prev to o
        b_g : bias of g
        b_i : bias of i
        b_f : bias of f
        b_o : bias of o
        b_y : bias of y
    """
    W_g = np.random.randn(hidden_size, num_data)
    W_i = np.random.randn(hidden_size, num_data)
    W_f = np.random.randn(hidden_size, num_data)
    W_o = np.random.randn(hidden_size, num_data)
    W_y = np.random.randn(num_data, hidden_size)
    R_g = np.random.randn(hidden_size, hidden_size)
    R_i = np.random.randn(hidden_size, hidden_size)
    R_f = np.random.randn(hidden_size, hidden_size)
    R_o = np.random.randn(hidden_size, hidden_size)
    b_g = np.random.randn(1, hidden_size)
    b_i = np.random.randn(1, hidden_size)
    b_f = np.random.randn(1, hidden_size)
    b_o = np.random.randn(1, hidden_size)
    b_y = np.random.randn(1, num_data)
    params = W_g, W_i, W_f, W_o, W_y, R_g, R_i, R_f, R_o, b_g, b_i, b_f, b_o, b_y
    if zero:
        W_g -= W_g
        W_i -= W_i
        W_f -= W_f
        W_o -= W_o
        W_y -= W_y
        R_g -= R_g
        R_i -= R_i
        R_f -= R_f
        R_o -= R_o
        b_g -= b_g
        b_i -= b_i
        b_f -= b_f
        b_o -= b_o
        b_y -= b_y
    return params

def tanh(x, derivative=False):
    """
    tanh function
    """
    f = np.tanh(x)
    if derivative:
        return 1 - (f ** 2)
    else:
        return f

def sigmoid(x, derivative=False):
    """
    sigmoid function
    """
    f = 1 / (1 + np.exp(-x))
    if derivative:
        return f * (1 - f)
    else:
        return f

def get_mean_squared_error(output, target):
    """
    get mean squared error loss value from prediction
    input must be in same dimension
    args:
      output : output from forward prediction (ndarray)
      target : target from real data (ndarray)
    return: 
      loss (float)
    """
    return (1/len(output)) * np.sum((target - output) ** 2)

def get_loss(n, output, target, loss_type = "MSE", derivative=False):
    """
    get loss value from prediction
    input must be in same dimension
    args:
      n : total amount of train data set (int)
      output : output from forward prediction (ndarray)
      target : target from real data (ndarray)
      derivative : (boolean) (default = False)
    return: 
      loss (float)
    """
    if loss_type.lower() == "mse": 
        if derivative:
            return 2 * (target - output) / n
        else:
            return np.power(target - output, 2) / n
    elif loss_type.lower() == 'mape':
        E = (target - output) / target
        if derivative:
            return (E / np.abs(E)) / n
        else:
            return np.abs(E) / n
    elif loss_type.lower() == 'mae':
        E = target - output
        if derivative:
            return (E / np.abs(E)) / n
        else:
            return np.abs(E) / n

def clip_gradient_norm(grads, max_norm = 0.5):
    """
    Clips gradients to have a maximum norm of `max_norm`.
    This is to prevent the exploding gradients problem.
    args:
      grads: contains (tuple) 
      max_norm: (float) (default = 0.5)
    return: 
      grads (ndarray) contain:
        dWg : gradient of weight from x_t to g
        dWi : gradient of weight from x_t to i
        dWf : gradient of weight from x_t to f
        dWo : gradient of weight from x_t to o
        dRg : gradient of weight from h_prev to g
        dRi : gradient of weight from h_prev to i
        dRf : gradient of weight from h_prev to f
        dRo : gradient of weight from h_prev to o
        dbg : gradient of bias of g
        dbi : gradient of bias of i
        dbf : gradient of bias of f
        dbo : gradient of bias of o
    """ 
    total_norm = 0
    
    for grad in grads:
        grad_norm = np.sum(np.power(grad, 2))
        total_norm += grad_norm
    
    total_norm = np.sqrt(total_norm)
    
    if total_norm != 0:
        clip_coef = max_norm / (total_norm)
    else:
        clip_coef = max_norm / (total_norm + 1e-8)
    
    if clip_coef < 1:
        for grad in grads:
            grad *= clip_coef
    
    return grads

def LSTM(weights, input, h_prev, c_prev):
    W_g, W_i, W_f, W_o, W_y, R_g, R_i, R_f, R_o, b_g, b_i, b_f, b_o, b_y = weights
    f_t = sigmoid(np.double(W_f @ input.T + R_f @ h_prev.T + b_f.T)).T
    i_t = sigmoid(np.double(W_i @ input.T + R_i @ h_prev.T + b_i.T)).T
    g_t = tanh(np.double(W_g @ input.T + R_g @ h_prev.T + b_g.T)).T
    c_t = i_t * g_t + f_t * c_prev
    o_t = sigmoid(np.double(W_o @ input.T + R_o @ h_prev.T + b_o.T)).T
    h_t = o_t * tanh(np.double(c_t))
    return f_t, i_t, g_t, c_t, o_t, h_t

def dense(W_y, b_y, h_t):
    W_g, W_i, W_f, W_o, W_y, R_g, R_i, R_f, R_o, b_g, b_i, b_f, b_o, b_y = weights
    y = sigmoid(np.double(W_y @ h_t.T + b_y.T)).T
    return y

def forward(weights, inputs, h_prev, c_prev):
    W_g, W_i, W_f, W_o, W_y, R_g, R_i, R_f, R_o, b_g, b_i, b_f, b_o, b_y = weights

    for t, input in enumerate(inputs):
        input = np.reshape(input, (1, len(input)))
        f_t, i_t, g_t, c_t, o_t, h_t = LSTM(weights, input, h_prev, c_prev)
        if t < len(inputs):
            c_prev = np.copy(c_t)
            h_prev = np.copy(h_t)

    y = dense(W_y, b_y, h_t)
    outputs_params = (input, c_prev, h_prev, g_t, i_t, f_t, c_t, o_t, h_t, y)

    return outputs_params

def backward(n, weights, t, outputs_params):
    W_g, W_i, W_f, W_o, W_y, R_g, R_i, R_f, R_o, b_g, b_i, b_f, b_o, b_y = weights
    
    dWg = np.zeros_like(W_g)
    dRg = np.zeros_like(R_g)
    dbg = np.zeros_like(b_g)

    dWi = np.zeros_like(W_i)
    dRi = np.zeros_like(R_i)
    dbi = np.zeros_like(b_i)
    
    dWf = np.zeros_like(W_f)
    dRf = np.zeros_like(R_f)
    dbf = np.zeros_like(b_f)

    dWo = np.zeros_like(W_o)
    dRo = np.zeros_like(R_o)
    dbo = np.zeros_like(b_o)

    dWy = np.zeros_like(W_y)
    dby = np.zeros_like(b_y)

    input, c_prev, h_prev, g_t, i_t, f_t, c_t, o_t, h_t, y = outputs_params

    loss_mse = get_loss(n, np.copy(t), np.copy(y), loss_type = "mse")
    loss_mae = get_loss(n, np.copy(t), np.copy(y), loss_type = "mae")
    loss_mape = get_loss(n, np.copy(t), np.copy(y), loss_type = "mape")

    dE = get_loss(n, np.copy(t), np.copy(y), loss_type = "mse", derivative=True)

    dy = np.reshape(dE, (len(y[0]), 1)) * sigmoid(np.double(W_y @ h_t.T + b_y.T), derivative = True)
    dWy += dy @ h_t
    dby += dy.T

    dh = (dy.T @ W_y).T

    do = sigmoid(np.double(W_o @ input.T + R_o @ h_prev.T + b_o.T), derivative = True)
    dWo += dh * (do @ input) * tanh(c_t).T
    dRo += dh * (do @ h_prev) * tanh(c_t).T
    dbo += (dh * do * tanh(c_t).T).T

    di = sigmoid(np.double(W_i @ input.T + R_i @ h_prev.T + b_i.T), derivative = True)
    dWi += dh * o_t.T * tanh(c_t).T * (di @ input)
    dRi += dh * o_t.T * tanh(c_t).T * (di @ h_prev)
    dbi += (dh * o_t.T * tanh(c_t).T * di).T

    df = sigmoid(np.double(W_f @ input.T + R_f @ h_prev.T + b_f.T), derivative = True)
    dWf += dh * o_t.T * tanh(c_t).T * (df @ input) * c_prev.T
    dRf += dh * o_t.T * tanh(c_t).T * (df @ h_prev) * c_prev.T
    dbf += (dh * o_t.T * tanh(c_t).T * df).T

    dg = tanh(np.double(W_g @ input.T + R_g @ h_prev.T) ,derivative=True)
    dWg += dh * (o_t * tanh(np.double(c_t), derivative=True)).T * i_t.T * (dg @ input)
    dRg += dh * (o_t * tanh(np.double(c_t), derivative=True)).T * i_t.T * (dg @ h_prev)
    dbg += (dh * (o_t * tanh(np.double(c_t), derivative=True)).T * i_t.T).T

    grads = dWg, dWi, dWf, dWo, dWy, dRg, dRi, dRf, dRo, dbg, dbi, dbf, dbo, dby

    loss = (loss_mse, loss_mae, loss_mape)

    return grads, loss

def RMSprop(weights, v_weights, grads, learning_rate=1e-3, beta = 0.9):
    """
    RMSprop as optimizers for update the weights
    args:
      weights : contains weight and bias (tuple) 
      v_weights : moment of weight and bias (tuple)
      grads : contains gradient of weight and bias (tuple)
      learning_rate : (float) (default = 1e-3)
      beta : (float) (default = 0.9)
    return: 
      weights : contains weight and bias (tuple) 
      v_weights : moment of weight and bias (tuple)
    """
    for weight, v_weight, grad in zip(weights, v_weights, grads):
        v_weight *= beta
        v_weight += (1 - beta) * np.power(grad, 2)
        weight -= learning_rate * (grad / (np.sqrt(v_weight) + 1e-6)) 

    return weights, v_weights
    

In [21]:
from google.colab import drive
drive.mount('/content/drive')

df = pd.read_csv("drive/MyDrive/Dataset/AAPL.csv")
time = df[df.columns[0]]
df.drop(df.columns[0], axis=1, inplace=True)
time = time.to_numpy()
series = df.to_numpy()

series, maximum, minimum = normalize(series)
length_time = 21
series_set, target_set = windowed_dataset(series, num = length_time)
num_data = len(series_set)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [22]:
split_time = int(0.8 * num_data)

train_set = series_set[:split_time]
train_label = target_set[:split_time]
test_set = series_set[split_time:]
test_label = target_set[split_time:]

num_train = len(train_set)
num_test = len(test_set)

hidden_size = 5

v_weights = initial_weights_and_biases(len(series[0]), hidden_size, zero = True)
weights = initial_weights_and_biases(len(series[0]), hidden_size)

epochs = 30
learning_rate = 0.001

for i in range(epochs):
    
    train_loss_mse = 0
    train_loss_mae = 0
    train_loss_mape = 0
    test_loss_mse = 0
    test_loss_mae = 0
    test_loss_mape = 0

    st = tm.time_ns()

    for train, label in zip(train_set, train_label):
        h_prev = np.zeros([1, hidden_size])
        c_prev = np.zeros([1, hidden_size])

        outputs_params = forward(weights, train, h_prev, c_prev)
        grads, loss = backward(num_train, weights, label, outputs_params)

        weights, v_weights = RMSprop(weights, v_weights, grads, learning_rate = learning_rate)

        train_loss_mse += loss[0]
        train_loss_mae += loss[1]
        train_loss_mape += loss[2]

    et = tm.time_ns()

    for test, label in zip(test_set, test_label):
        h_prev = np.zeros([1, hidden_size])
        c_prev = np.zeros([1, hidden_size])

        outputs_params = forward(weights, test, h_prev, c_prev)

        _, loss = backward(num_test, weights, label, outputs_params)

        test_loss_mse += loss[0]
        test_loss_mae += loss[1]
        test_loss_mape += loss[2]

    print(f"epoch {i+1} : time elapse {round((et-st) * 1e-9, 3)}")
    print(f"train loss (mse, mae, mape) : ({round(np.mean(train_loss_mse),8)}, {round(np.mean(train_loss_mae),8)}, {round(np.mean(train_loss_mape),8)})")
    print(f"test loss (mse, mae, mape) : ({round(np.mean(test_loss_mse),8)}, {round(np.mean(test_loss_mae),8)}, {round(np.mean(test_loss_mape),8)})\n")

epoch 1 : time elapse 1.26
train loss (mse, mae, mape) : (0.07345979, 0.20873018, 0.46124661)
test loss (mse, mae, mape) : (0.03809242, 0.17617743, 0.32343338)

epoch 2 : time elapse 1.257
train loss (mse, mae, mape) : (0.06730802, 0.19706578, 0.45870711)
test loss (mse, mae, mape) : (0.03267832, 0.16107542, 0.29991874)

epoch 3 : time elapse 1.244
train loss (mse, mae, mape) : (0.06335255, 0.18726787, 0.45445891)
test loss (mse, mae, mape) : (0.02892073, 0.15059148, 0.2814816)

epoch 4 : time elapse 1.26
train loss (mse, mae, mape) : (0.06040074, 0.1808344, 0.45150078)
test loss (mse, mae, mape) : (0.02625511, 0.14290852, 0.26778695)

epoch 5 : time elapse 1.273
train loss (mse, mae, mape) : (0.05852611, 0.17690933, 0.44867232)
test loss (mse, mae, mape) : (0.02442407, 0.13747583, 0.25812045)

epoch 6 : time elapse 1.259
train loss (mse, mae, mape) : (0.05613959, 0.17105267, 0.43794451)
test loss (mse, mae, mape) : (0.02243394, 0.13128646, 0.24715442)

epoch 7 : time elapse 1.265
trai

In [54]:
import json

split_time = int(0.8 * num_data)

train_set = series_set[:split_time]
train_label = target_set[:split_time]
test_set = series_set[split_time:]
test_label = target_set[split_time:]

num_train = len(train_set)
num_test = len(test_set)

hidden_size = 25
epochs = 30
learning_rate = 0.001
lib_weights = {}

for trial in range(30):
    print(f"Trial Number - {trial+1}")

    v_weights = initial_weights_and_biases(len(series[0]), hidden_size, zero = True)
    weights = initial_weights_and_biases(len(series[0]), hidden_size)

    total_time = 0

    for i in range(epochs):
        
        train_loss_mse = 0
        train_loss_mae = 0
        train_loss_mape = 0
        test_loss_mse = 0
        test_loss_mae = 0
        test_loss_mape = 0

        st = tm.time_ns()

        for train, label in zip(train_set, train_label):
            h_prev = np.zeros([1, hidden_size])
            c_prev = np.zeros([1, hidden_size])

            outputs_params = forward(weights, train, h_prev, c_prev)
            grads, loss = backward(num_train, weights, label, outputs_params)

            weights, v_weights = RMSprop(weights, v_weights, grads, learning_rate = learning_rate)

            train_loss_mse += loss[0]
            train_loss_mae += loss[1]
            train_loss_mape += loss[2]

        et = tm.time_ns()

        total_time += (et - st) * 1e-9

        for test, label in zip(test_set, test_label):
            h_prev = np.zeros([1, hidden_size])
            c_prev = np.zeros([1, hidden_size])

            outputs_params = forward(weights, test, h_prev, c_prev)

            _, loss = backward(num_test, weights, label, outputs_params)

            test_loss_mse += loss[0]
            test_loss_mae += loss[1]
            test_loss_mape += loss[2]

        print(f"epoch {i+1} : time elapse {round((et-st) * 1e-9, 3)}")
        print(f"train loss (mse, mae, mape) : ({round(np.mean(train_loss_mse),8)}, {round(np.mean(train_loss_mae),8)}, {round(np.mean(train_loss_mape),8)})")
        print(f"test loss (mse, mae, mape) : ({round(np.mean(test_loss_mse),8)}, {round(np.mean(test_loss_mae),8)}, {round(np.mean(test_loss_mape),8)})\n")

    result = {
        "weights" : weights,
        "train_mse" : train_loss_mse,
        "train_mae" : train_loss_mae,
        "train_mape" : train_loss_mape,
        "test_mse" : test_loss_mse,
        "test_mae" : test_loss_mae,
        "test_mape" : test_loss_mape,
        "total_time" : total_time
    }
    
    lib_weights[trial+1] = result


Trial Number - 1
epoch 1 : time elapse 1.324
train loss (mse, mae, mape) : (0.03522157, 0.11205764, 0.33456605)
test loss (mse, mae, mape) : (0.01055667, 0.08199239, 0.20505758)

epoch 2 : time elapse 1.339
train loss (mse, mae, mape) : (0.0292063, 0.09038482, 0.37502148)
test loss (mse, mae, mape) : (0.00743575, 0.07124282, 0.19538576)

epoch 3 : time elapse 1.361
train loss (mse, mae, mape) : (0.01324122, 0.06467075, 0.28872654)
test loss (mse, mae, mape) : (0.00750163, 0.07244641, 0.18876155)

epoch 4 : time elapse 1.336
train loss (mse, mae, mape) : (0.01041093, 0.05876212, 0.27978794)
test loss (mse, mae, mape) : (0.00741704, 0.0723282, 0.18024588)

epoch 5 : time elapse 1.362
train loss (mse, mae, mape) : (0.00898185, 0.05627217, 0.28082293)
test loss (mse, mae, mape) : (0.00743597, 0.0723615, 0.17494655)

epoch 6 : time elapse 1.347
train loss (mse, mae, mape) : (0.00644425, 0.04908675, 0.24865522)
test loss (mse, mae, mape) : (0.00666124, 0.06843587, 0.16753401)

epoch 7 : time

In [55]:
!pip install deepdish

import deepdish as dd
dd.io.save(f'weights_vanilla_lstm_{hidden_size}.h5', lib_weights)

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [56]:
load = dd.io.load(f'weights_vanilla_lstm_{hidden_size}.h5')

In [57]:
train_mse = []
train_mae = []
train_mape = []
test_mse = []
test_mae = []
test_mape = []
total_time = []

for key in load.keys():
    train_mse.append(load[key]['train_mse'])
    train_mae.append(load[key]['train_mae'])
    train_mape.append(load[key]['train_mape'])
    test_mse.append(load[key]['test_mse'])
    test_mae.append(load[key]['test_mae'])
    test_mape.append(load[key]['test_mape'])
    total_time.append(load[key]['total_time'])

print(f"train MSE \t: mean = {round(np.mean(train_mse), 5)},\t std = {round(np.std(train_mse), 5)}")
print(f"train MAE \t: mean = {round(np.mean(train_mae), 5)},\t std = {round(np.std(train_mae), 5)}")
print(f"train MAPE \t: mean = {round(np.mean(train_mape), 5)},\t std = {round(np.std(train_mape), 5)}")
print(f"test MSE \t: mean = {round(np.mean(test_mse), 5)},\t std = {round(np.std(test_mse), 5)}")
print(f"test MAE \t: mean = {round(np.mean(test_mae), 5)},\t std = {round(np.std(test_mae), 5)}")
print(f"test MAPE \t: mean = {round(np.mean(test_mape), 5)},\t std = {round(np.std(test_mape), 5)}")
print(f"total time \t: mean = {round(np.mean(total_time), 5)},\t std = {round(np.std(total_time), 5)}")

train MSE 	: mean = 0.00301,	 std = 0.00448
train MAE 	: mean = 0.03064,	 std = 0.02253
train MAPE 	: mean = 0.1651,	 std = 0.11433
test MSE 	: mean = 0.00664,	 std = 0.01043
test MAE 	: mean = 0.06137,	 std = 0.0278
test MAPE 	: mean = 0.18711,	 std = 0.27158
total time 	: mean = 40.46484,	 std = 0.51088
