<a href="https://colab.research.google.com/github/nicolai5965/SimpleRNN_Stoco_Price_Predictions/blob/main/SimpleRNN_Stoco_Price_Predictions.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import datetime
import numpy as np
import pandas as pd
import yfinance as yf
import plotly.express as px
import plotly.graph_objects as go
import plotly.offline as pyo
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error
import tensorflow as tf
from tensorflow.keras.layers import Layer
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras import regularizers
from tensorflow.keras.callbacks import EarlyStopping


In [2]:
ticker = "NVDA"
# Convert the dates to datetime objects
start_date = datetime.datetime.strptime("2021-07-20", '%Y-%m-%d')
end_date = datetime.datetime.strptime("2022-12-31", '%Y-%m-%d')

# Initialize an empty DataFrame
data = pd.DataFrame()

# Fetch hourly data for each 7-day interval
interval = datetime.timedelta(days=7)
current_date = start_date

while current_date < end_date:
    next_date = current_date + interval
    temp_data = yf.download(ticker, start=current_date, end=next_date, interval='1h', progress=False)
    data = pd.concat([data, temp_data])
    current_date = next_date

# Remove duplicate entries (if any)
data = data.loc[~data.index.duplicated(keep='first')]

# Reset the index to have consecutive integers
data = data.reset_index()
# Drop Adj Close column, since it is a future information
data = data.drop('Adj Close', axis=1)

display(data.head())
display(data.describe())

Unnamed: 0,Datetime,Open,High,Low,Close,Volume
0,2021-07-20 09:30:00-04:00,187.300003,187.720001,181.759995,184.419998,13302542
1,2021-07-20 10:30:00-04:00,184.441193,184.619904,181.649994,183.050003,6424054
2,2021-07-20 11:30:00-04:00,183.036804,184.990005,183.005005,184.875,3701542
3,2021-07-20 12:30:00-04:00,184.899994,185.899994,184.330002,184.429993,3524187
4,2021-07-20 13:30:00-04:00,184.411407,185.459396,183.050003,185.369995,3633746


Unnamed: 0,Open,High,Low,Close,Volume
count,2561.0,2561.0,2561.0,2561.0,2561.0
mean,204.456086,206.074217,202.786189,204.438132,6463281.0
std,53.844019,54.265309,53.31494,53.782142,4093565.0
min,109.709999,113.365196,108.129997,111.499901,949754.0
25%,162.543503,163.970001,160.919998,162.589996,3875606.0
50%,196.080002,197.479996,194.520004,195.990005,5495412.0
75%,239.759903,241.899994,237.309998,240.005997,7748876.0
max,342.450012,346.470001,333.5,342.403687,34220750.0


In [3]:
# Plot the data
fig = px.line(data, x='Datetime', y='Close', title='NVDA Stock Price')
# Show the plot
fig.show()

In [4]:
# Select relevant columns
features = ['Open', 'High', 'Low', 'Close', 'Volume']
data_selected = data[features]

# Normalize the data
scaler = MinMaxScaler()
data_normalized = pd.DataFrame(scaler.fit_transform(data_selected), columns=features)

def create_sequences_multifeature(data, sequence_length, target_column_name):
    X, y = [], []
    target_index = data.columns.get_loc(target_column_name)
    data_input = data.drop(columns=[target_column_name])  # Exclude the 'Close' column from the input features

    for i in range(len(data) - sequence_length):
        X.append(data_input.iloc[i:i + sequence_length].values)
        y.append(data.iloc[i + sequence_length - 1, target_index])
    return np.array(X), np.array(y)


sequence_length = 60  # Length of the input sequences
X, y = create_sequences_multifeature(data_normalized, sequence_length, target_column_name='Close')

# Split the data into training and test sets
split_index = int(len(X) * 0.8)
train_end_index = int(split_index / sequence_length) * sequence_length

X_train, y_train = X[:train_end_index], y[:train_end_index]
X_test, y_test = X[train_end_index:], y[train_end_index:]

print(X.shape)
print(y.shape)

(2501, 60, 4)
(2501,)


In [5]:
# Check the indices of the last sequences in the training and test sets
train_last_seq_index = len(X_train) - 1
test_first_seq_index = len(X_train)

print("Last sequence in the training set ends at index:", train_last_seq_index)
print("First sequence in the test set starts at index:", test_first_seq_index)

Last sequence in the training set ends at index: 1979
First sequence in the test set starts at index: 1980


In [6]:
# Custom RNN cell
class SimpleRNNCell(Layer):
    def __init__(self, units, activation='tanh', kernel_regularizer=None, recurrent_regularizer=None, **kwargs):
        super(SimpleRNNCell, self).__init__(**kwargs)
        self.units = units
        self.activation = tf.keras.activations.get(activation)
        self.kernel_regularizer = regularizers.get(kernel_regularizer)
        self.recurrent_regularizer = regularizers.get(recurrent_regularizer)


    def build(self, input_shape):
        self.kernel = self.add_weight(shape=(input_shape[-1], self.units),
                                      initializer='glorot_uniform',
                                      name='kernel',
                                      regularizer=self.kernel_regularizer)
        self.recurrent_kernel = self.add_weight(shape=(self.units, self.units),
                                                initializer='orthogonal',
                                                name='recurrent_kernel',
                                                regularizer=self.recurrent_regularizer)
        self.bias = self.add_weight(shape=(self.units,), initializer='zeros', name='bias')
        self.built = True


    def call(self, inputs, states):
        prev_output = states[0]
        h = tf.matmul(inputs, self.kernel) + tf.matmul(prev_output, self.recurrent_kernel) + self.bias
        output = self.activation(h)
        return output, [output]


    def get_initial_state(self, inputs):
        batch_size = tf.shape(inputs)[0]
        return [tf.zeros([batch_size, self.units], dtype=inputs.dtype)]


# Custom RNN layer
class SimpleRNN(Layer):
    def __init__(self, cell, return_sequences=False, **kwargs):
        super(SimpleRNN, self).__init__(**kwargs)
        self.cell = cell
        self.return_sequences = return_sequences


    def call(self, inputs):
        # Initialize the state
        initial_state = self.cell.get_initial_state(inputs)
        states = initial_state

        # Process the input sequence
        outputs = []
        for t in range(inputs.shape[1]):
            input_t = inputs[:, t]
            output, states = self.cell(input_t, states)
            outputs.append(output)

        # Stack the outputs
        outputs = tf.stack(outputs, axis=1)

        if self.return_sequences:
            return outputs
        else:
            return outputs[:, -1]


# Synthetic dataset
timesteps = 60
input_dim = 4
output_dim = 1
hidden_units = 16
l2_reg = 1e-4

# Create the RNN model
cell = SimpleRNNCell(units=hidden_units, kernel_regularizer=regularizers.l2(l2_reg), recurrent_regularizer=regularizers.l2(l2_reg))
model = Sequential([
    SimpleRNN(cell, return_sequences=False, input_shape=(timesteps, input_dim)),
    Dense(output_dim)
])

# Compile and fit the model
model.compile(optimizer='adam', loss='mean_squared_error', metrics=['mse'])
# Define the early stopping callback with the desired parameters
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

# Add the callback to the 'fit' method
history = model.fit(X_train, y_train, epochs=100, batch_size=32, validation_split=0.2, callbacks=[early_stopping], verbose=0)

In [7]:
def plot_loss(history):
    fig = go.Figure()
    fig.add_trace(go.Scatter(x=list(range(1, len(history.history['loss'])+1)),
                             y=history.history['loss'],
                             mode='lines',
                             name='Training loss',
                             line=dict(color='blue', width=2)))
    fig.add_trace(go.Scatter(x=list(range(1, len(history.history['val_loss'])+1)),
                             y=history.history['val_loss'],
                             mode='lines',
                             name='Validation loss',
                             line=dict(color='orange', width=2)))
    fig.update_layout(title='Training and Validation Loss Over Epochs',
                      xaxis_title='Epoch',
                      yaxis_title='Loss (MSE)',
                      legend=dict(x=0, y=1, bgcolor='rgba(0,0,0,0)'))
    pyo.iplot(fig)

plot_loss(history)


In [8]:
# Evaluate the model on the test set and print the results
loss, metric = model.evaluate(X_test, y_test, verbose=2)
print("Test Loss:", loss)
print("Test Metric:", metric)

17/17 - 0s - loss: 9.6085e-04 - mse: 2.4304e-05 - 69ms/epoch - 4ms/step
Test Loss: 0.0009608500986360013
Test Metric: 2.4304496037075296e-05


In [9]:
# Make predictions
y_train_pred = model.predict(X_train, verbose=0)
y_test_pred = model.predict(X_test, verbose=0)

# Directional Accuracy (DA)
def directional_accuracy(y_true, y_pred):
    direction_pred = np.sign(y_pred[1:] - y_pred[:-1]).ravel()
    direction_actual = np.sign(y_true[1:] - y_true[:-1])
    da = np.sum(direction_pred == direction_actual) / (len(y_true) - 1)
    return da

# Calculate the Mean Absolute Error (MAE)
train_mae = mean_absolute_error(y_train, y_train_pred)
test_mae = mean_absolute_error(y_test, y_test_pred)

# Calculate the Mean Squared Error (MSE)
train_mse = mean_squared_error(y_train, y_train_pred)
test_mse = mean_squared_error(y_test, y_test_pred)

# Calculate the Root Mean Squared Error (RMSE)
train_rmse = np.sqrt(train_mse)
test_rmse = np.sqrt(test_mse)

# Calculate Directional Accuracy (DA)
train_da = directional_accuracy(y_train, y_train_pred)
test_da = directional_accuracy(y_test, y_test_pred)

# Print the results
print("Training MAE:", train_mae)
print("Test MAE:", test_mae)

print("Training RMSE:", train_rmse)
print("Test RMSE:", test_rmse)

print("Training DA:", train_da)
print("Test DA:", test_da)

Training MAE: 0.005182738349277963
Test MAE: 0.003745356374483151
Training RMSE: 0.007290569626493906
Test RMSE: 0.004929959575223217
Training DA: 0.6664982314300152
Test DA: 0.6461538461538462


In [10]:
# Make predictions using the trained model
y_pred = model.predict(X_test) 

# Normalize the target column separately
target_scaler = MinMaxScaler()
target_data_normalized = pd.DataFrame(target_scaler.fit_transform(data_selected[['Close']]), columns=['Close'])

# Replace the normalized target column in the data_normalized DataFrame
data_normalized['Close'] = target_data_normalized['Close']

# Reverse scaling of the predicted and actual values
y_pred_actual = target_scaler.inverse_transform(y_pred)
y_test_actual = target_scaler.inverse_transform(y_test.reshape(-1, 1))

# Extract datetime values from the original dataset for the test set
datetime_values = data["Datetime"][train_end_index + sequence_length - 1:train_end_index + len(y_test_actual)]



In [11]:
def plot_actual_vs_predicted(datetime_values, actual_values, predicted_values, title='Actual vs. Predicted Stock Prices', yaxis_title='Stock Price'):
    # Create a Plotly Figure
    fig = go.Figure()

    # Add actual values trace
    fig.add_trace(go.Scatter(x=datetime_values,
                             y=actual_values,
                             mode='lines',
                             name='Actual',
                             line=dict(color='blue')))

    # Add predicted values trace
    fig.add_trace(go.Scatter(x=datetime_values,
                             y=predicted_values,
                             mode='lines',
                             name='Predicted',
                             line=dict(color='red', dash='dot')))

    # Set the layout
    fig.update_layout(title=title,
                      xaxis_title='Date',
                      yaxis_title=yaxis_title)

    # Show the figure
    fig.show()

In [12]:
plot_actual_vs_predicted(datetime_values, y_test, y_pred.squeeze(), title='Actual vs. Predicted Stock Prices (Normalized)', yaxis_title='Normalized Stock Price')
plot_actual_vs_predicted(datetime_values, y_test_actual.squeeze(), y_pred_actual.squeeze(), title='Actual vs. Predicted Stock Prices (Unnormalized)', yaxis_title='Stock Price')

In [13]:
def fetch_stock_data(ticker, start_date, end_date):
    # Convert the dates to datetime objects
    start_date = datetime.datetime.strptime(start_date, '%Y-%m-%d')
    end_date = datetime.datetime.strptime(end_date, '%Y-%m-%d')

    # Initialize an empty DataFrame
    data = pd.DataFrame()

    # Fetch hourly data for each 7-day interval
    interval = datetime.timedelta(days=7)
    current_date = start_date

    while current_date < end_date:
        next_date = current_date + interval
        temp_data = yf.download(ticker, start=current_date, end=next_date, interval='1h', progress=False)
        data = pd.concat([data, temp_data])
        current_date = next_date

    # Remove duplicate entries (if any)
    data = data.loc[~data.index.duplicated(keep='first')]

    # Reset the index to have consecutive integers
    data = data.reset_index()
    # Drop Adj Close column, since it is a future information
    data = data.drop('Adj Close', axis=1)

    # Select relevant columns
    features = ['Open', 'High', 'Low', 'Close', 'Volume']
    data_selected = data[features]

    # Normalize the data
    scaler = MinMaxScaler()
    data_normalized = pd.DataFrame(scaler.fit_transform(data_selected), columns=features)

    return data, data_normalized

# Fetch and preprocess new stock data
new_ticker = "NVDA"
new_start_date = "2023-01-01"
new_end_date = "2023-04-01"

new_data, new_data_normalized = fetch_stock_data(new_ticker, new_start_date, new_end_date)

# Prepare the new data for testing
new_X, new_y = create_sequences_multifeature(new_data_normalized, sequence_length, target_column_name='Close')

# Evaluate the model on the new data
new_loss, new_metric = model.evaluate(new_X, new_y, verbose=2)
print("New Test Loss:", new_loss)
print("New Test Metric:", new_metric)


12/12 - 0s - loss: 0.0011 - mse: 1.4890e-04 - 54ms/epoch - 4ms/step
New Test Loss: 0.0010854466818273067
New Test Metric: 0.00014890091551933438


In [14]:
def plot_stock_predictions(datetime_values, actual_values, predicted_values, ticker, title='Actual vs. Predicted Stock Prices', yaxis_title='Stock Price'):
    # Create a Plotly Figure
    fig = go.Figure()

    # Add actual values trace
    fig.add_trace(go.Scatter(x=datetime_values,
                             y=actual_values,
                             mode='lines',
                             name='Actual',
                             line=dict(color='blue')))

    # Add predicted values trace
    fig.add_trace(go.Scatter(x=datetime_values,
                             y=predicted_values,
                             mode='lines',
                             name='Predicted',
                             line=dict(color='red', dash='dot')))

    # Set the layout
    fig.update_layout(title=f'{ticker}: {title}',
                      xaxis_title='Date',
                      yaxis_title=yaxis_title)

    # Show the figure
    fig.show()


In [15]:
# Run the model on the new data
new_y_pred = model.predict(new_X)

# Reverse scaling of the predicted and actual values
new_y_pred_actual = target_scaler.inverse_transform(new_y_pred)
new_y_actual = target_scaler.inverse_transform(new_y.reshape(-1, 1))

# Extract datetime values from the new dataset for the test set
new_datetime_values = new_data["Datetime"][sequence_length - 1:len(new_y_actual)]

# Call the plot_stock_predictions function with the datetime_values, actual and predicted values for NVDA
plot_stock_predictions(new_datetime_values, new_y_actual.squeeze(), new_y_pred_actual.squeeze(), new_ticker)

