In [None]:
%pip install pandas scikit-learn
%pip install matplotlib tensorflow ta

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from ta.momentum import RSIIndicator
from keras.models import Sequential
from keras.layers import Dense, LSTM, GRU
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error
from sklearn.linear_model import SGDRegressor
from keras.callbacks import EarlyStopping
from keras.utils import Sequence
from keras.optimizers import Adam
from keras.regularizers import l2

# fix random seed for reproducibility
np.random.seed(7)

In [None]:
data_path = './data/xrdusd.csv'
df = pd.read_csv(data_path, index_col='time')
df.index = pd.to_datetime(df.index, unit='ms')
df = df[~df.index.duplicated(keep='first')]
df = df.resample('min').ffill()
df = df.iloc[:30000]

days_to_look_back = 1
look_back = days_to_look_back * 24 * 60 
rsi = RSIIndicator(df.close, window=look_back).rsi()
sma = df.close.rolling(window=look_back).mean()
ema = df.close.ewm(span=look_back, adjust=False).mean()
df['rsi'] = rsi
df['sma'] = sma
df['ema'] = ema

df.drop(['Unnamed: 0'], axis=1, inplace=True)
# Ensure there are no NaN values
df = df.dropna()


df.head(5)

In [None]:
plt.figure(figsize=(15,10))
plt.plot(df.index[:], df.close[:])
plt.show()

In [None]:
# Using generators to handle large data
class DataGenerator(Sequence):
    def __init__(self, data, look_back, batch_size):
        self.data = data
        self.look_back = look_back
        self.batch_size = batch_size
        self.on_epoch_end()
    
    def on_epoch_end(self):
        self.indices = np.arange(len(self.data) - self.look_back - 1)
        np.random.shuffle(self.indices)

    def __len__(self):
        return int(np.ceil(len(self.indices) / self.batch_size))

    def __getitem__(self, index):
        batch_indices = self.indices[index * self.batch_size:(index + 1) * self.batch_size]
        batchX, batchY = [], []
        for i in batch_indices:
            a = self.data[i:(i + self.look_back)]
            batchX.append(a)
            batchY.append(self.data[i + self.look_back, 0])
        return np.array(batchX), np.array(batchY)


In [None]:
xrpdata = df.values.astype('float32')
scaler = StandardScaler()
xrpdata = scaler.fit_transform(xrpdata)
    
# Split into training, validation, and test sets
train_size = int(len(xrpdata) * 0.7)
val_size = int(len(xrpdata) * 0.15)
test_size = len(xrpdata) - train_size - val_size

train_data = xrpdata[:train_size]
val_data = xrpdata[train_size:train_size+val_size]
test_data = xrpdata[train_size+val_size:]

batch_size = 32
train_generator = DataGenerator(train_data, look_back, batch_size)
val_generator = DataGenerator(val_data, look_back, batch_size)
test_generator = DataGenerator(test_data, look_back, batch_size)

In [None]:
def invert_scaling(predictions, scaler, n_features):
    temp_array = np.zeros((len(predictions), n_features))
    # Flatten the predictions array if it's not already 1D
    if predictions.ndim == 2:
        temp_array[:, 0] = predictions[:, 0]
    else:
        temp_array[:, 0] = predictions
    inverted_predictions = scaler.inverse_transform(temp_array)
    return inverted_predictions[:, 0]

# Define the models with L2 regularization
def create_lstm_model(input_shape):
    model = Sequential()
    model.add(LSTM(4, return_sequences=False, input_shape=input_shape, kernel_regularizer=l2(0.01)))
    model.add(Dense(1))
    model.compile(loss='mean_squared_error', optimizer=Adam(learning_rate=0.001))
    return model

def create_gru_model(input_shape):
    model = Sequential()
    model.add(GRU(4, return_sequences=False, input_shape=input_shape, kernel_regularizer=l2(0.01)))
    model.add(Dense(1))
    model.compile(loss='mean_squared_error', optimizer=Adam(learning_rate=0.001))
    return model

models = {
    'LSTM': create_lstm_model((look_back, train_data.shape[1])),
    'GRU': create_gru_model((look_back, train_data.shape[1])),
}


In [None]:
# Train and evaluate the models
results = {}
train_predictions = {}
test_predictions = {}
for name, model in models.items():
    print(f"Training {name}...")
    early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
    model.fit(train_generator, epochs=20, validation_data=val_generator, verbose=2, callbacks=[early_stopping])
    trainPredict = model.predict(train_generator)
    testPredict = model.predict(test_generator)

    # Invert predictions and actual values for scaling back to the original range
    trainPredict = invert_scaling(trainPredict, scaler, train_data.shape[1])
    testPredict = invert_scaling(testPredict, scaler, test_data.shape[1])
    trainY_inv = invert_scaling(train_data[:, 0].reshape(-1, 1), scaler, train_data.shape[1])
    testY_inv = invert_scaling(test_data[:, 0].reshape(-1, 1), scaler, test_data.shape[1])

    # Flatten the arrays for computing RMSE
    trainY_flat = trainY_inv[:len(trainPredict)].flatten()
    testY_flat = testY_inv[:len(testPredict)].flatten()
    trainPredict_flat = trainPredict.flatten()
    testPredict_flat = testPredict.flatten()

    # Calculate RMSE
    train_rmse = np.sqrt(mean_squared_error(trainY_flat, trainPredict_flat))
    test_rmse = np.sqrt(mean_squared_error(testY_flat, testPredict_flat))
    
    if name not in results:
        results[name] = {
            'train_rmse': [],
            'test_rmse': []
        }
    results[name]['train_rmse'].append(train_rmse)
    results[name]['test_rmse'].append(test_rmse)

    if name not in train_predictions:
        train_predictions[name] = []
    if name not in test_predictions:
        test_predictions[name] = []

    train_predictions[name].append(trainPredict_flat)
    test_predictions[name].append(testPredict_flat)

# Print the results
for name, metrics in results.items():
    print(f"{name}: Train RMSE = {metrics['train_rmse'][0]}, Test RMSE = {metrics['test_rmse'][0]}")
    if 'val_rmse' in metrics:
        print(f"{name}: Val RMSE = {metrics['val_rmse'][0]}")

In [None]:
# RMSE Comparison Bar Chart
model_names = list(results.keys())
train_rmse = [results[name]['train_rmse'][0] for name in model_names]
test_rmse = [results[name]['test_rmse'][0] for name in model_names]

plt.figure(figsize=(12, 6))
x = range(len(model_names))
plt.bar(x, train_rmse, width=0.4, label='Train RMSE', align='center')
plt.bar(x, test_rmse, width=0.4, label='Test RMSE', align='edge')
plt.xlabel('Models')
plt.ylabel('RMSE')
plt.xticks(x, model_names, rotation=45)
plt.title('RMSE Comparison of Different Models')
plt.legend()
plt.savefig('./output/rmse_comparison.png')
plt.show()

# Prediction vs Actual Plot
plt.figure(figsize=(15, 10))
plt.plot(testY_flat, label='Actual', color='black')
for name in model_names:
    plt.plot(test_predictions[name][0], label=f'{name} Predictions')
plt.title(f'Actual vs Predictions')
plt.xlabel('Time')
plt.ylabel('Price')
plt.legend()
plt.savefig('./output/test_predict_plot.png')
plt.show()