## 1. Load Necessary Libraries

In [1701]:
import numpy as np 
import pandas as pd
import matplotlib.pyplot as plt
from pathlib import Path
import seaborn as sns
import os
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
import datetime as dt
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.metrics import mean_absolute_error, r2_score, root_mean_squared_error


## 2. Set seed for reproducibility

In [1702]:
# -----------------------------
# Reproducibility
# -----------------------------
RANDOM_SEED = 42

np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)

if torch.cuda.is_available():
    print("CUDA is available. Setting seed for all GPUs.")
    torch.cuda.manual_seed_all(RANDOM_SEED)

sns.set_style("darkgrid")


CUDA is available. Setting seed for all GPUs.


## 3. Set paths

In [1703]:
# -----------------------------
# Project Root Resolution
# -----------------------------
PROJECT_ROOT = Path.cwd().resolve().parents[0]

DATA_RAW_DIR = PROJECT_ROOT / "data" / "raw"
DATA_INTERIM_DIR = PROJECT_ROOT / "data" / "interim"
DATA_PROCESSED_DIR = PROJECT_ROOT / "data" / "processed"
FIGURES_DIR = PROJECT_ROOT / "reports" / "figures"

DATA_INTERIM_DIR.mkdir(parents=True, exist_ok=True)
FIGURES_DIR.mkdir(parents=True, exist_ok=True)

DATA_RAW_DIR, DATA_INTERIM_DIR, FIGURES_DIR


(WindowsPath('C:/Users/Kinjal Mitra/Documents/stock-price-prediction-ff/data/raw'),
 WindowsPath('C:/Users/Kinjal Mitra/Documents/stock-price-prediction-ff/data/interim'),
 WindowsPath('C:/Users/Kinjal Mitra/Documents/stock-price-prediction-ff/reports/figures'))

## 4. Load Data from data/processed/

In [1704]:
path = DATA_PROCESSED_DIR /"processed_dataset.csv"
df = pd.read_csv(path)
df = df.drop(columns=["Unnamed: 0"])

In [1705]:
df

Unnamed: 0,Date,Data_Value,StockPrice,daily_return,log_return,price_change,volatility_7d,MA_7,MA_30,MA_50,...,bollinger_lower,momentum_5d,momentum_20d,price_lag_1,price_lag_2,price_lag_3,price_lag_4,price_lag_5,rolling_max_20d,rolling_min_20d
0,2010-01-04,0.700,1178.00,-0.002962,-0.002967,-3.50,0.004291,1184.892857,1152.920000,1160.837,...,1115.877196,-0.011538,0.027475,1181.50,1182.25,1186.75,1190.75,1191.75,1195.0,1119.75
1,2010-01-05,0.699,1181.50,-0.000634,-0.000635,-0.75,0.004301,1186.714286,1151.945000,1161.577,...,1113.758332,-0.001479,0.031202,1182.25,1186.75,1190.75,1191.75,1183.25,1195.0,1119.75
2,2010-01-06,0.694,1182.25,-0.003792,-0.003799,-4.50,0.006435,1188.571429,1151.053333,1162.252,...,1111.799951,-0.007138,0.064131,1186.75,1190.75,1191.75,1183.25,1190.75,1195.0,1119.75
3,2010-01-07,0.692,1186.75,-0.003359,-0.003365,-4.00,0.007695,1188.464286,1150.161667,1162.812,...,1103.950485,-0.006488,0.070108,1190.75,1191.75,1183.25,1190.75,1194.50,1195.0,1111.00
4,2010-01-08,0.691,1190.75,-0.000839,-0.000839,-1.00,0.008593,1189.642857,1149.161667,1163.397,...,1097.085188,0.007829,0.077358,1191.75,1183.25,1190.75,1194.50,1181.50,1195.0,1109.00
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3748,2025-01-08,2.782,5959.25,0.002439,0.002436,14.50,0.009858,5927.357143,6056.241667,5950.645,...,5854.670147,-0.004967,-0.024034,5944.75,5866.25,5874.50,5882.25,5989.00,6152.0,5866.25
3749,2025-01-09,2.789,5944.75,0.013382,0.013293,78.50,0.009941,5937.964286,6058.566667,5945.825,...,5861.877525,-0.005146,-0.017315,5866.25,5874.50,5882.25,5989.00,5975.50,6152.0,5866.25
3750,2025-01-10,2.766,5866.25,-0.001404,-0.001405,-8.25,0.006703,5957.892857,6060.433333,5943.240,...,5872.408570,-0.027720,-0.036543,5874.50,5882.25,5989.00,5975.50,6033.50,6152.0,5866.25
3751,2025-01-13,2.785,5874.50,-0.001318,-0.001318,-7.75,0.006418,5994.214286,6063.891667,5942.445,...,5903.923835,-0.034474,-0.035742,5882.25,5989.00,5975.50,6033.50,6084.25,6152.0,5874.50


In [1706]:
X_scaled = np.load(DATA_PROCESSED_DIR / "X_features.npy")
y = np.load(DATA_PROCESSED_DIR / "y_target.npy")

## 5. Create time-series sequences

#### Sequence creation function

In [1707]:
def create_sequences(X, y, lookback):
    """
    Create rolling window sequences for time series forecasting.

    Parameters
    ----------
    X : np.ndarray
        Scaled feature matrix of shape (n_samples, n_features)
    y : np.ndarray
        Target array of shape (n_samples,)
    lookback : int
        Number of past timesteps to use

    Returns
    -------
    X_seq : np.ndarray
        Shape: (n_sequences, lookback, n_features)
    y_seq : np.ndarray
        Shape: (n_sequences,)
    """
    
    X_seq, y_seq = [], []

    for i in range(lookback, len(X)):
        X_seq.append(X[i - lookback:i])
        y_seq.append(y[i])

    return np.array(X_seq), np.array(y_seq)


#### Choose lookback window and generate Sequences

In [1708]:
LOOKBACK = 25  # Number of days to look back for features
X_seq, y_seq = create_sequences(X_scaled, y, LOOKBACK)

#### Verify Shapes of sequences

In [1709]:
print("X_seq shape:", X_seq.shape)
print("y_seq shape:", y_seq.shape)


X_seq shape: (3728, 25, 26)
y_seq shape: (3728,)


In [1710]:
# This should correspond to y at time t
print(y[LOOKBACK])
print(y_seq[0])

0.0026929998321919
0.0026929998321919


## 6. Train / Validation / Test split (time-aware)



70% → Training

15% → Validation

15% → Test

#### Split the data into training, validation, and test sets


In [1711]:

n_samples = X_seq.shape[0]

train_size = int(0.7 * n_samples)
val_size = int(0.15 * n_samples)

X_train = X_seq[:train_size]
y_train = y_seq[:train_size]

X_val = X_seq[train_size:train_size + val_size]
y_val = y_seq[train_size:train_size + val_size]

X_test = X_seq[train_size + val_size:]
y_test = y_seq[train_size + val_size:]


#### Verify shapes

In [1712]:

print("Train:", X_train.shape, y_train.shape)
print("Val  :", X_val.shape, y_val.shape)
print("Test :", X_test.shape, y_test.shape)


Train: (2609, 25, 26) (2609,)
Val  : (559, 25, 26) (559,)
Test : (560, 25, 26) (560,)


#### Save Splits

In [1713]:
SPLITS_DIR = DATA_PROCESSED_DIR/"splits"
np.save(SPLITS_DIR/"X_train.npy", X_train)
np.save(SPLITS_DIR/"y_train.npy", y_train)
np.save(SPLITS_DIR/"X_val.npy", X_val)
np.save(SPLITS_DIR/"y_val.npy", y_val)
np.save(SPLITS_DIR/"X_test.npy", X_test)
np.save(SPLITS_DIR/"y_test.npy", y_test)

## 7. GRU Architecture

In [1714]:
class GRUModel(nn.Module):
    def __init__(
        self,
        input_size,
        hidden_size=64,
        num_layers=2,
        dropout=0.2
    ):
        super(GRUModel, self).__init__()

        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.gru = nn.GRU(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0.0
        )

        self.fc = nn.Linear(hidden_size, 1)

    def forward(self, x):
        # x: (batch_size, seq_len, input_size)
        batch_size = x.size(0)

        # Initialize hidden state
        h0 = torch.zeros(
            self.num_layers,
            batch_size,
            self.hidden_size,
            device=x.device
        )

        # GRU forward
        out, _ = self.gru(x, h0)

        # Take last time step
        out = out[:, -1, :]

        # Final regression output
        out = self.fc(out)

        return out.squeeze()


#### Model Instantiation

In [1715]:
input_size = X_train.shape[2]

model = GRUModel(
    input_size=input_size,
    hidden_size=64,
    num_layers=2,
    dropout=0.1
)

print(model)


GRUModel(
  (gru): GRU(26, 64, num_layers=2, batch_first=True, dropout=0.1)
  (fc): Linear(in_features=64, out_features=1, bias=True)
)


## 8. Training Loop

#### Convert NumPy arrays to PyTorch tensors

In [1716]:
# Convert to tensors
X_train_t = torch.tensor(X_train, dtype=torch.float32)
y_train_t = torch.tensor(y_train, dtype=torch.float32)

X_val_t = torch.tensor(X_val, dtype=torch.float32)
y_val_t = torch.tensor(y_val, dtype=torch.float32)


#### Create DataLoaders


In [1717]:
BATCH_SIZE = 64

train_dataset = TensorDataset(X_train_t, y_train_t)
val_dataset = TensorDataset(X_val_t, y_val_t)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=False)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)

#### Device Setup

In [1718]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print ("Using device:", device)
model = model.to(device)


Using device: cuda


#### Loss function & optimizer

In [1719]:
criterion = nn.SmoothL1Loss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)


#### Training Loop

In [1720]:
EPOCHS = 50
PATIENCE = 5   # stop if val loss doesn't improve for 5 epochs
MIN_DELTA = 1e-6  # minimum improvement threshold

In [1721]:
best_val_loss = float("inf")
epochs_no_improve = 0

for epoch in range(EPOCHS):
    # -----------------
    # Training
    # -----------------
    model.train()
    train_loss = 0.0

    for X_batch, y_batch in train_loader:
        X_batch = X_batch.to(device)
        y_batch = y_batch.to(device)

        optimizer.zero_grad()

        preds = model(X_batch)
        loss = criterion(preds, y_batch)

        loss.backward()
        optimizer.step()

        train_loss += loss.item() * X_batch.size(0)

    train_loss /= len(train_loader.dataset)

    # -----------------
    # Validation
    # -----------------
    model.eval()
    val_loss = 0.0

    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)

            preds = model(X_batch)
            loss = criterion(preds, y_batch)

            val_loss += loss.item() * X_batch.size(0)

    val_loss /= len(val_loader.dataset)

    # -----------------
    # Early stopping check
    # -----------------
    if val_loss < best_val_loss - MIN_DELTA:
        best_val_loss = val_loss
        epochs_no_improve = 0

        # Save best model
        torch.save(model.state_dict(), "best_gru_model.pt")
    else:
        epochs_no_improve += 1

    # -----------------
    # Logging every 5 epochs
    # -----------------
    if (epoch + 1) % 5 == 0:
        print(
            f"Epoch [{epoch+1}/{EPOCHS}] | "
            f"Train Loss: {train_loss:.6f} | "
            f"Val Loss: {val_loss:.6f}"
        )

    # -----------------
    # Stop condition
    # -----------------
    if epochs_no_improve >= PATIENCE:
        print(
            f"\nEarly stopping triggered at epoch {epoch+1}. "
            f"Best Val Loss: {best_val_loss:.6f}"
        )
        break


Epoch [5/50] | Train Loss: 0.000059 | Val Loss: 0.000049
Epoch [10/50] | Train Loss: 0.000046 | Val Loss: 0.000040

Early stopping triggered at epoch 14. Best Val Loss: 0.000041


#### Load best model after training

In [1722]:
model.load_state_dict(torch.load("best_gru_model.pt"))
model.eval()

GRUModel(
  (gru): GRU(26, 64, num_layers=2, batch_first=True, dropout=0.1)
  (fc): Linear(in_features=64, out_features=1, bias=True)
)

## 9. Test Set Evaluation

In [1723]:
# Prepare test tensors
X_test_t = torch.tensor(X_test, dtype=torch.float32).to(device)
y_test_t = torch.tensor(y_test, dtype=torch.float32).to(device)

# Generate predictions on test set
with torch.no_grad():
    y_pred = model(X_test_t)

# Move to CPU and convert to numpy for metric calculations    
y_pred = y_pred.cpu().numpy()
y_true = y_test_t.cpu().numpy()

In [1724]:
# Mean Absolute Error (MAE)
mae = mean_absolute_error(y_true, y_pred)
print(f"Mean Absolute Error (MAE): {mae:.4f}")

# Root Mean Squared Error (RMSE)
rmse = root_mean_squared_error(y_true, y_pred)
print(f"Root Mean Squared Error (RMSE): {rmse:.4f}")

# R-squared (R2) Score
r2 = r2_score(y_true, y_pred)
print(f"R-squared (R2) Score: {r2:.4f}")

# Directional accuracy
direction_true = np.sign(y_true)
direction_pred = np.sign(y_pred)

directional_accuracy = (direction_true == direction_pred).mean()
print(f"Directional Accuracy: {directional_accuracy:.2%}")


Mean Absolute Error (MAE): 0.0109
Root Mean Squared Error (RMSE): 0.0126
R-squared (R2) Score: -1.1228
Directional Accuracy: 55.00%


## 10. LSTM Model

In [None]:
class LSTMModel(nn.Module):
    def __init__(
        self,
        input_size,
        hidden_size=64,
        num_layers=2,
        dropout=0.2
    ):
        super(LSTMModel, self).__init__()

        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0.0
        )

        # Same regression head as GRU
        self.fc = nn.Linear(hidden_size, 1)

    def forward(self, x):
        # x: (batch_size, seq_len, input_size)
        batch_size = x.size(0)

        # Initialize hidden state and cell state
        h0 = torch.zeros(
            self.num_layers,
            batch_size,
            self.hidden_size,
            device=x.device
        )
        c0 = torch.zeros(
            self.num_layers,
            batch_size,
            self.hidden_size,
            device=x.device
        )

        # LSTM forward
        out, _ = self.lstm(x, (h0, c0))

        # Take last time step
        out = out[:, -1, :]

        # Final regression output
        out = self.fc(out)

        return out.squeeze()

# Model Initialization
model = LSTMModel(
    input_size=X_train.shape[2],
    hidden_size=96,
    num_layers=2,
    dropout=0.1
).to(device)



# Training 
EPOCHS = 50
PATIENCE = 5   # stop if val loss doesn't improve for 5 epochs
MIN_DELTA = 1e-6  # minimum improvement threshold

best_val_loss = float("inf")
epochs_no_improve = 0

for epoch in range(EPOCHS):
    # -----------------
    # Training
    # -----------------
    model.train()
    train_loss = 0.0

    for X_batch, y_batch in train_loader:
        X_batch = X_batch.to(device)
        y_batch = y_batch.to(device)

        optimizer.zero_grad()

        preds = model(X_batch)
        loss = criterion(preds, y_batch)

        loss.backward()
        optimizer.step()

        train_loss += loss.item() * X_batch.size(0)

    train_loss /= len(train_loader.dataset)

    # -----------------
    # Validation
    # -----------------
    model.eval()
    val_loss = 0.0

    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)

            preds = model(X_batch)
            loss = criterion(preds, y_batch)

            val_loss += loss.item() * X_batch.size(0)

    val_loss /= len(val_loader.dataset)

    # -----------------
    # Early stopping check
    # -----------------
    if val_loss < best_val_loss - MIN_DELTA:
        best_val_loss = val_loss
        epochs_no_improve = 0

        # Save best model
        torch.save(model.state_dict(), "best_lstm_model.pt")
    else:
        epochs_no_improve += 1

    # -----------------
    # Logging every 5 epochs
    # -----------------
    if (epoch + 1) % 5 == 0:
        print(
            f"Epoch [{epoch+1}/{EPOCHS}] | "
            f"Train Loss: {train_loss:.6f} | "
            f"Val Loss: {val_loss:.6f}"
        )

    # -----------------
    # Stop condition
    # -----------------
    if epochs_no_improve >= PATIENCE:
        print(
            f"\nEarly stopping triggered at epoch {epoch+1}. "
            f"Best Val Loss: {best_val_loss:.6f}"
        )
        break


# Load best model
model.load_state_dict(torch.load("best_lstm_model.pt"))
model.eval()

Epoch [5/50] | Train Loss: 0.016224 | Val Loss: 0.036622

Early stopping triggered at epoch 6. Best Val Loss: 0.036622


LSTMModel(
  (lstm): LSTM(26, 128, num_layers=2, batch_first=True, dropout=0.1)
  (layer_norm): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
  (proj): Linear(in_features=128, out_features=64, bias=True)
  (fc): Linear(in_features=64, out_features=1, bias=True)
)

In [1726]:
# Testing
# Prepare test tensors
X_test_t = torch.tensor(X_test, dtype=torch.float32).to(device)
y_test_t = torch.tensor(y_test, dtype=torch.float32).to(device)

# Generate predictions on test set
with torch.no_grad():
    y_pred = model(X_test_t)

# Move to CPU and convert to numpy for metric calculations    
y_pred = y_pred.cpu().numpy()
y_true = y_test_t.cpu().numpy()

# Mean Absolute Error (MAE)
mae = mean_absolute_error(y_true, y_pred)
print(f"Mean Absolute Error (MAE): {mae:.4f}")

# Root Mean Squared Error (RMSE)
rmse = root_mean_squared_error(y_true, y_pred)
print(f"Root Mean Squared Error (RMSE): {rmse:.4f}")

# R-squared (R2) Score
r2 = r2_score(y_true, y_pred)
print(f"R-squared (R2) Score: {r2:.4f}")

# Directional accuracy
direction_true = np.sign(y_true)
direction_pred = np.sign(y_pred)

directional_accuracy = (direction_true == direction_pred).mean()
print(f"Directional Accuracy: {directional_accuracy:.2%}")


Mean Absolute Error (MAE): 0.3397
Root Mean Squared Error (RMSE): 0.3420
R-squared (R2) Score: -1569.1715
Directional Accuracy: 54.46%
