In [None]:
import pandas as pd
import numpy as np
import tensorflow as tf
import plotly.graph_objects as go
from sklearn.preprocessing import MinMaxScaler


df = pd.read_csv('./training_datasets/Bitcoin [BTC]_15mprice_history.csv')
df = df.drop(['DateTime','Closing Price', 'High', 'Low'], axis=1)

In [None]:
df.head()

In [None]:
# Preprocess the data
scaler = MinMaxScaler()
df['Open'] = scaler.fit_transform(df[['Open']].values)
# Drop rows with NaN values resulting from the shift operation
df.dropna(inplace=True)

df['Open'].head()

# Define the number of previous price points to include as additional features
# n_prev_points = 5

# Generate the additional input features using previous price points
# for i in range(1, n_prev_points + 1):
#     for col in ['Open', 'High', 'Low']:
#         df[f'{col}_Prev_{i}'] = df[col].shift(i)

In [None]:
# Split the data into training and testing sets
train_data = df.iloc[:-60]  # Use all but the last 60 data points for training
test_data = df.iloc[-60:]   # Use the last 60 data points for testing

# Define the window size for input sequences
window_size = 50

# Create the input sequences and target values for training
X_train = []
y_train = []
for i in range(len(train_data) - window_size):
    X_train.append(train_data.iloc[i:i+window_size].values)
    y_train.append(train_data.iloc[i+window_size].values)
X_train = np.array(X_train) #.astype(np.float32)
y_train = np.array(y_train) #.astype(np.float32)


# Create the input sequences and target values for testing
X_test = []
y_test = []
for i in range(len(test_data) - window_size):
    X_test.append(test_data.iloc[i:i+window_size].values)
    y_test.append(test_data.iloc[i+window_size].values)
X_test = np.array(X_test)
y_test = np.array(y_test)

print(len(X_test[0]), len(y_test[0]))

In [None]:
# Build the TensorFlow model
model = tf.keras.models.Sequential([
    tf.keras.layers.LSTM(64, activation='relu', input_shape=(window_size, 1)),
    tf.keras.layers.Dropout(0.2),  # Additional dropout layer for regularization
    tf.keras.layers.Dense(32, activation='relu'),
    tf.keras.layers.Dense(1)
])

# Compile the model
model.compile(optimizer='adam', loss='mean_squared_error')

# Train the model
model.fit(X_train, y_train, epochs=500, batch_size=16)

# Evaluate the model
loss = model.evaluate(X_test, y_test)
print("Test Loss:", loss)

In [None]:
# Test Prediction
last_window = np.expand_dims(X_test[-1], axis=0)
print(last_window)
forecast = model.predict(last_window)
print(forecast)

In [None]:
# Make predictions for multiple time steps into the future
num_steps = 4  # Number of future time steps to predict - 1 hour

# last_window = np.expand_dims(X_test[-1], axis=0)
# last_window = X_test[-4]


# print(X_test[-4])
predictions = []
for i in range(num_steps):
    
    test_index = 0-(num_steps-i) # -1, -2, -3, -4
    last_x = X_test[test_index].copy()  
    # make the generated prediction the new last samples of the last_window
    for p in range(1, len(predictions)+1):
        last_x[0-p] = predictions[0-p]
    # np.concatenate((last_window[:, 1:, :], np.expand_dims(forecast, axis=1)), axis=1)
    last_window = np.expand_dims(last_x, axis=0) # make test set the last window
    forecast = model.predict(last_window)
    print('Predicted:', forecast, '- Actual:', y_test[test_index])
    predictions.append(forecast)

# Plot actual and predicted prices
test_data_range = list(range(len(X_test)))
future_time_steps = list(range(len(X_test)-num_steps, len(X_test)))

# Reverse Transform
predictions = scaler.inverse_transform(np.asarray(predictions).reshape(-1, 1)).reshape(-1)
actual_y = scaler.inverse_transform(test_data['Open'].values.reshape(-1, 1)).reshape(-1)

# print(predictions.reshape(-1))

fig = go.Figure()
fig.add_trace(go.Scatter(x=test_data_range, y=actual_y, mode='lines', name='Actual Open Price'))
fig.add_trace(go.Scatter(x=future_time_steps, y=predictions, mode='lines', name='Predicted Open Price'))
fig.update_layout(title='Bitcoin Open Price Forecast',
                  xaxis_title='Time',
                  yaxis_title='Open Price')
fig.show()