In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.dates import DateFormatter
sns.set_style("white")

import scipy 
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import Sequential, layers, callbacks
from tensorflow.keras.layers import Dense, LSTM, Dropout, GRU, Bidirectional, Conv2D,Conv1D, BatchNormalization

from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split

from IPython.core.pylabtools import figsize

from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()
from tensorflow.keras.callbacks import EarlyStopping, LearningRateScheduler
from tensorflow.keras.callbacks import ModelCheckpoint
from keras.layers import *
from keras.models import *
from keras import backend as K



# Set random seed for reproducibility
tf.random.set_seed(42)

In [None]:
!pip install Historic-Crypto

In [None]:
import warnings
warnings.filterwarnings("ignore")

from Historic_Crypto import HistoricalData

In [None]:
btc_historicals = HistoricalData('BTC-USD',86400,'2021-01-01-00-00', '2022-05-30-00-00').retrieve_data() # cyrpto name, number of seconds, start date, end date


In [None]:
btc_historicals.shape

In [None]:
btc_historicals.tail()

In [None]:
# Check out variables
btc_historicals.dtypes

In [None]:
tst = btc_historicals

In [None]:
print('Null Values:',tst.isnull().values.sum())
print('If any NA values:', tst.isnull().values.any())

In [None]:
# Create variable '‘TOMORROW_CLOSE’' which shifts 'Close' up by 1
tst['TOMORROW_CLOSE'] = tst['close'].shift(-1,fill_value=0)

In [None]:
# drop last row because we shifted value (remember this when you add your forecasting data)
tst.drop(tst.tail(1).index,inplace=True) 

In [None]:
# Drop close column, since we created new ‘TOMORROW_CLOSE’
tst = tst.drop(columns=['close'])

In [None]:
tst.head()

In [None]:
tst.shape

In [None]:
# Split train data (90%) and test data (10%)

train_size = int(len(tst)*0.90)
train_dataset, test_dataset = tst.iloc[:train_size],tst.iloc[train_size:]


In [None]:
train_dataset.head()

In [None]:
# Split train data to X and y
X_train = train_dataset.drop('TOMORROW_CLOSE', axis = 1)
y_train = train_dataset.loc[:,['TOMORROW_CLOSE']]

# Split test data to X and y
X_test = test_dataset.drop('TOMORROW_CLOSE', axis = 1)
y_test = test_dataset.loc[:,['TOMORROW_CLOSE']]

In [None]:
X_train.head()

In [None]:
'''
#Plot train and test data
plt.figure(figsize = (12, 6))
plt.rcParams['figure.dpi'] = 360
plt.plot(train_dataset.TOMORROW_CLOSE)
plt.plot(test_dataset.TOMORROW_CLOSE)
plt.xlabel('Date')
plt.ylabel('Close value (US$)')
plt.legend(['Train set', 'Test set'], loc='upper left')
print('Dimension of train data: ',train_dataset.shape)
print('Dimension of test data: ', test_dataset.shape)
'''

In [None]:
# Different scaler for input and output ----> Normalizate the data
scaler_x = MinMaxScaler(feature_range = (-1,1))
scaler_y = MinMaxScaler(feature_range = (-1,1))

# Fit the scaler using available training data
input_scaler = scaler_x.fit(X_train)
output_scaler = scaler_y.fit(y_train)

# Apply the scaler to training data
train_y_norm = output_scaler.transform(y_train)
train_x_norm = input_scaler.transform(X_train)

# Apply the scaler to test data
test_y_norm = output_scaler.transform(y_test)
test_x_norm = input_scaler.transform(X_test)

In [None]:
# Create 3 dimensional data set 
def threeD_dataset (X, y, time_steps = 1):
    Xs, ys = [], []
    
    for i in range(len(X)-time_steps):
        v = X[i:i+time_steps, :]
        Xs.append(v)
        ys.append(y[i+time_steps])
        
    return np.array(Xs), np.array(ys)


TIME_STEPS = 15 # which means the model will make predictions of ‘TOMORROW_CLOSE’ based on the input from the 10 previous days or future days bi-lstm or bi-gru

X_test, y_test = threeD_dataset(test_x_norm, test_y_norm, TIME_STEPS)
X_train, y_train = threeD_dataset(train_x_norm, train_y_norm, TIME_STEPS)
print('X_train.shape: ', X_train.shape)
print('y_train.shape: ', y_train.shape)
print('X_test.shape: ', X_test.shape) 
print('y_test.shape: ', y_test.shape)

In [None]:
# Create BiLSTM model
def create_model_bilstm():
    model = Sequential()
    # First layer of BiLSTM
    model.add(Bidirectional(LSTM(units = 128, return_sequences=False), input_shape=[X_train.shape[1], X_train.shape[2]])) 
    model.add(Dropout(0.2))
    model.add(Dense(1))
    model.compile(loss='mse', optimizer=tf.keras.optimizers.Adam(learning_rate = 0.001)) # use Adam with default parameters, learning rate, ...
    return model

In [None]:
model_bi_lstm = create_model_bilstm()

In [None]:
# Let's check the summary of our baby model
model_bi_lstm.summary()

In [None]:
'''
# Fit the model !
early_stop = keras.callbacks.EarlyStopping(monitor = 'val_loss',patience = 10) # use early stop technique to stop the training process if the val loss didn't improve for 10 epochs

# decaying lr on every 2nd epoch by 1%
def scheduler(epoch, lr):
    rate =  1 - 0.01 
    if epoch%2 == 0:
        return lr*rate
    return lr

LRscheduler = LearningRateScheduler(scheduler)
callbacks_list = [early_stop, LRscheduler]
'''

#,callbacks=callbacks_list
history_bi_lstm = model_bi_lstm.fit(X_train, y_train, epochs = 80, validation_split = 0.2,batch_size = 32, shuffle = False) # no shuffer , as its time series data

In [None]:
#Plot train and validation loss
def plot_loss (history, model_name):
    plt.figure(figsize = (10, 6))
    plt.rcParams['figure.dpi'] = 360
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('Model Train vs Validation Loss for ' + model_name)
    plt.ylabel('Loss')
    plt.xlabel('epoch')
    plt.legend(['Train loss', 'Validation loss'], loc='upper right')
    
plot_loss (history_bi_lstm, 'Bi-LSTM')

**Afte 30 epochs, the validation errors starts to rise, which is a sign of over fitting, therefore, one has to stop the training process at 30 epochs.**

In [None]:
y_test = scaler_y.inverse_transform(y_test)
y_train = scaler_y.inverse_transform(y_train)

In [None]:
def prediction(model):
    prediction = model.predict(X_test)
    prediction = scaler_y.inverse_transform(prediction)
    return prediction

In [None]:
prediction_bi_lstm = prediction(model_bi_lstm)

In [None]:
## Define a function to calculate MAE and RMSE
def evaluate_prediction(predictions, actual, model_name):
    errors = predictions - actual
    mse = np.square(errors).mean()
    rmse = np.sqrt(mse)
    mae = np.abs(errors).mean()

    print(model_name + ':')
    print('Mean Absolute Error: {:.4f}'.format(mae))
    print('Root Mean Square Error: {:.4f}'.format(rmse))
    print('')

In [31]:
evaluate_prediction(prediction_bi_lstm, y_test, 'Bi-LSTM')

Bi-LSTM:
Mean Absolute Error: 1197.1455
Root Mean Square Error: 1668.2546

