In [1]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from keras.models import Sequential
from keras.layers import LSTM, Dense

In [2]:
# Load the dataset
data = pd.read_csv('/Users/priyakundu/Downloads/updated_dataset.csv')

# Convert 'date' column to datetime format and sort the data by date
data['date'] = pd.to_datetime(data['date'])
data.sort_values('date', inplace=True)

# Generate 'Price_Up_Down' labels
data['Price_Up_Down'] = (data['c'].shift(-1) > data['c']).astype(int)

# Normalize the 'c' column using MinMaxScaler
scaler = MinMaxScaler()
data['c_scaled'] = scaler.fit_transform(data[['c']])

# Split the data into train and test sets (80-20 split)
split_index = int(0.8 * len(data))
train_data = data.iloc[:split_index]
test_data = data.iloc[split_index:]

# Set the sequence length and forecast length
sequence_length = 10
forecast_length = 10

# Function to create sequences
def create_sequences(data, sequence_length, forecast_length):
    X, y = [], []
    for i in range(len(data) - sequence_length - forecast_length + 1):
        X.append(data[i:(i + sequence_length)])
        y.append(data[(i + sequence_length):(i + sequence_length + forecast_length)])
    return np.array(X), np.array(y)

# Prepare training sequences
X_train, y_train = create_sequences(train_data['c_scaled'].values, sequence_length, forecast_length)
X_train = X_train.reshape((X_train.shape[0], X_train.shape[1], 1))

# Define the LSTM model
model = Sequential([
    LSTM(50, activation='relu', input_shape=(sequence_length, 1), return_sequences=True),
    LSTM(30, activation='relu'),
    Dense(forecast_length)
])
model.compile(optimizer='adam', loss='mean_squared_error')

# Train the model
model.fit(X_train, y_train, epochs=50, verbose=1)

Epoch 1/50


  super().__init__(**kwargs)


[1m89/89[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 4ms/step - loss: 0.1994
Epoch 2/50
[1m89/89[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - loss: 0.0023
Epoch 3/50
[1m89/89[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - loss: 0.0019
Epoch 4/50
[1m89/89[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - loss: 0.0018
Epoch 5/50
[1m89/89[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - loss: 0.0016
Epoch 6/50
[1m89/89[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 6ms/step - loss: 0.0017
Epoch 7/50
[1m89/89[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - loss: 0.0016
Epoch 8/50
[1m89/89[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - loss: 0.0017
Epoch 9/50
[1m89/89[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - loss: 0.0016
Epoch 10/50
[1m89/89[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - loss: 0.0016
Epoch 11/50
[1m89/8

<keras.src.callbacks.history.History at 0x291709eb0>

In [5]:
# Prepare test sequences
X_test, y_test = create_sequences(test_data['c_scaled'].values, sequence_length, forecast_length)
X_test = X_test.reshape((X_test.shape[0], X_test.shape[1], 1))

# Generate predictions for the test set
predictions_scaled = model.predict(X_test)

# Inverse transform predictions
predictions = scaler.inverse_transform(predictions_scaled)

# Compute the first column of predictions for RMSE (if predicting multiple steps)
y_true = y_test[:, 0]  # Assuming you want to compare against the first step prediction

# Calculate RMSE and other regression metrics
rmse = np.sqrt(mean_squared_error(y_true, predictions[:, 0]))
mae = mean_absolute_error(y_true, predictions[:, 0])
r2 = r2_score(y_true, predictions[:, 0])

# Derive predicted 'Price_Up_Down' from the predictions
# Compare the last predicted value in each sequence to the first predicted value
predicted_price_movement = [predictions[i, -1] > predictions[i, 0] for i in range(predictions.shape[0])]
predicted_labels = np.array(predicted_price_movement).astype(int)

# Adjust the slice of the test data labels to exactly match the number of predictions
# Note: We need to consider that the final `forecast_length - 1` sequences do not have full forecasts
test_labels_adjusted = test_data['Price_Up_Down'].values[sequence_length:len(predicted_labels) + sequence_length]

# Calculate classification metrics
accuracy = accuracy_score(test_labels_adjusted, predicted_labels)
precision = precision_score(test_labels_adjusted, predicted_labels)
recall = recall_score(test_labels_adjusted, predicted_labels)
f1 = f1_score(test_labels_adjusted, predicted_labels)

# Print the metrics
print(f"RMSE: {rmse}, MAE: {mae}, R²: {r2}")
print(f"Accuracy: {accuracy}, Precision: {precision}, Recall: {recall}, F1 Score: {f1}")

[1m22/22[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 
RMSE: 0.4647270944565891, MAE: 0.46267695162397776, R²: -41.859149817782935
Accuracy: 0.5100574712643678, Precision: 0.5342465753424658, Recall: 0.43213296398891965, F1 Score: 0.4777947932618683


In [None]:
# # Create test sequences and targets manually
# X_test, y_test = create_sequences(test_data['c_scaled'].values, sequence_length, forecast_length)

# # Reshape X_test for LSTM input (batch_size, sequence_length, features)
# X_test = X_test.reshape((X_test.shape[0], X_test.shape[1], 1))

# # Generate predictions for the test set
# predictions_scaled = model.predict(X_test)

# # Inverse transform predictions
# predictions = scaler.inverse_transform(predictions_scaled)

# # Compute the first column of predictions for RMSE (if predicting multiple steps)
# y_true = y_test[:, 0]  # Assuming you want to compare against the first step prediction

# # Calculate RMSE and other regression metrics
# rmse = np.sqrt(mean_squared_error(y_true, predictions[:, 0]))
# mae = mean_absolute_error(y_true, predictions[:, 0])
# r2 = r2_score(y_true, predictions[:, 0])

# # Derive predicted 'Price_Up_Down' from the predictions
# predicted_price_movement = np.diff(predictions, axis=1)[:, 0] > 0  # Comparing subsequent predictions
# predicted_labels = np.insert((predicted_price_movement).astype(int), 0, 0)  # First value has no previous value to compare

# # Calculate classification metrics
# accuracy = accuracy_score(test_data['Price_Up_Down'].values[sequence_length:], predicted_labels)
# precision = precision_score(test_data['Price_Up_Down'].values[sequence_length:], predicted_labels)
# recall = recall_score(test_data['Price_Up_Down'].values[sequence_length:], predicted_labels)
# f1 = f1_score(test_data['Price_Up_Down'].values[sequence_length:], predicted_labels)

# # Print the metrics
# print(f"RMSE: {rmse}, MAE: {mae}, R²: {r2}")
# print(f"Accuracy: {accuracy}, Precision: {precision}, Recall: {recall}, F1 Score: {f1}")