In [46]:
import os
from pathlib import Path
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn import *
from keras.layers import Activation, Dense, Dropout, LSTM
import tensorflow as tf
from tensorflow.keras import Input

In [47]:
bit_df = pd.read_csv("./Resources/BitcoinDataDaily.csv").set_index("Date")
bit_df.head()

Unnamed: 0_level_0,Price,Open,High,Low,Vol.,Change %
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
1-Mar-23,23619.0,23130.6,23914.1,23025.3,328.18K,2.11%
28-Feb-23,23130.5,23494.0,23595.0,23033.8,275.10K,-1.55%
27-Feb-23,23494.1,23558.7,23876.2,23166.8,297.65K,-0.27%
26-Feb-23,23558.7,23166.1,23671.8,23066.0,209.12K,1.69%
25-Feb-23,23166.1,23191.3,23215.3,22777.4,198.35K,-0.11%


In [49]:
bit_df2 = bit_df[["Price"]]
bit_df2.head()

Unnamed: 0_level_0,Price
Date,Unnamed: 1_level_1
1-Mar-23,23619.0
28-Feb-23,23130.5
27-Feb-23,23494.1
26-Feb-23,23558.7
25-Feb-23,23166.1


In [50]:
bit_df2.dtypes

Price    object
dtype: object

In [None]:
#bit_df2 = bit_df2.astype("float")

In [32]:
#X = bit_df.copy()
#X = X.drop("Price", axis=1)
#y = bit_df["Price"].values
#X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=78)

<IPython.core.display.Javascript object>

In [None]:
# Create scaler instance
#X_scaler = sklearn.preprocessing.MinMaxScaler()

# Fit the scaler
#X_scaler.fit(X_train)

# Scale the data
#X_train_scaled = X_scaler.transform(X_train)
#X_test_scaled = X_scaler.transform(X_test)

In [None]:
def normalize_zero_size(df):
    return df / df.iloc[0] - 1

In [None]:
def train_test_split(df, test_size):
    split_row = len(df) - int(test_size * len(df))
    train_data = df.iloc[:split_row]
    test_data = df.iloc[split_row:]
    return train_data, test_data

In [None]:
fig, ax = plt.subplots(1, figsize=(13,7))
ax.plot(y_train, label='training', linewidth=2)
ax.plot(y_test, label='test', linewidth=2)
ax.set_ylabel("Price [USD]", fontsize=14)
ax.set_title("Bitcoin Prices [Training & Testing Sets]", fontsize=16)
ax.legend(loc='best', fontsize=16)

In [None]:
def extract_window_data(df, window_len, zero_base=True):
    window_data = []
    for idx in range(len(df) - window_len):
        tmp = df[idx: (idx + window_len)].copy()
        if zero_base:
            tmp = normalize_zero_size(tmp)
        window_data.append(tmp.values)
    return np.array(window_data)

In [None]:
def prepare_data(df, target_col, window_len, zero_base=True, test_size, time_steps):
    train_data, test_data = train_test_split(df, test_size=test_size)
    X_train = extract_window_data(train_data, window_len, zero_base)
    X_test = extract_window_data(test_data, window_len, zero_base)
    y_train = train_data[target_col][window_len:].values
    y_test = test_data[target_col][window_len:].values
    if zero_base:
        y_train = y_train / train_data[target_col][:-window_len].values - 1
        y_test = y_test / test_data[target_col][:-window_len].values - 1
    X_train = np.reshape(X_train, (int(X_train.shape[0]/time_steps), time_steps, X_train.shape[1]))
    X_test = np.reshape(X_test, (int(X_test.shape[0]/time_steps), time_steps, X_test.shape[1]))
    return train_data, test_data, X_train, X_test, y_train, y_test

In [None]:
def build_lstm_model(input_data, output_size, neurons, activ_func, dropout, loss, optimizer, metrics):
    model = tf.keras.models.Sequential()
    model.add(LSTM(neurons, input_shape=(input_data.shape[1], input_data.shape[2]))
    model.add(Dropout(dropout))
    model.add(Dense(units=output_size))
    model.add(Activation(activ_func))
    model.compile(loss=loss, optimizer=optimizer, metrics=metrics)
    return model

In [None]:
# def build_lstm_model2(time_steps, feats_dim, num_classes, optimizer):
#     x_in = Input(shape=(time_steps, feats_dim))
#     h1 = LSTM(128,return_sequences=True)(x_in)
#     h1 = Dropout(0.5)(h1)
#     h2 = LSTM(64,return_sequences=True)(h1)
#     h2 = Dropout(0.5)(h2)
#     h3 = LSTM(32,return_sequences=True)(h2)
#     h4 = LSTM(32)(h3)
#     h4 = Dropout(0.5)(h4)
#     out = Dense(activation='linear')(h4)
#     model = Model(inputs=x_in, outputs=out)
#     model.compile(optimizer=optimizer,
#                   loss='mean_squared_error',
#                   metrics=['accuracy'])
#     return model

In [None]:
window_len = 5
test_size = 0.2
zero_base = True
lstm_neurons = 100
epochs = 20
batch_size = 32
loss = 'mse'
dropout = 0.2
optimizer = 'adam'
target_col = 'Price'
metrics = 'accuracy'

In [None]:
train, test, X_train, X_test, y_train, y_test = prepare_data( \
    bit_df2, target_col, window_len=window_len, zero_base=zero_base, test_size=test_size)

print(f' X (Training): {X_train.shape}')

In [None]:
model = build_lstm_model(X_train, output_size=1, neurons=lstm_neurons, dropout=dropout, \
    loss=loss, optimizer=optimizer, metrics=metrics)


In [None]:
fit_model = model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size, verbose=1, shuffle=True)

plt.plot(fit_model.history['loss'])
plt.title('Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='best')
plt.show()

In [None]:
model_loss, model_accuracy = model.evaluate(X_train, y_train, verbose=2)
print(f' Loss: {model_loss}, Accuracy: {model_accuracy}')

In [None]:
model_loss, model_accuracy = model.evaluate(X_test, y_test, verbose=2)
print(f' Loss: {model_loss}, Accuracy: {model_accuracy}')

In [None]:
y_pred = model.predict(X_test).squeeze()  #The squeeze() function is used to remove single-dimensional entries from the shape of an array

In [None]:
mean_absolute_error(y_pred, y_test)

In [None]:
targets = test[target_col][window_len:]
y_pred_inverse = test[target_col].values[:-window_len] * (y_pred + 1) #Inverse Normalization to show actual values

predictions = pd.Series(index=targets.index, data=y_pred_inverse)
plt.plot(predictions, label= "Predicted Price", color="green")
plt.plot(targets, label= "Actual Price", color="red")
plt.title('Bitcoin Price Prediction')
plt.ylabel('Price')
plt.xlabel('Time [days]')
plt.legend(loc='best')
plt.show()
