In [123]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import DataLoader, TensorDataset


In [124]:
df = pd.read_csv(fr'C:\Users\ivane\Desktop\Dissertation\data\merged_data.csv', parse_dates=['Date'])
df.head()

Unnamed: 0,Date,Open,Close,Change,sentiment,sentiment_score
0,2025-01-10,3761.549,3736.886,-0.66,0,0.700804
1,2024-12-19,3780.106,3708.531,-1.93,0,0.950467
2,2024-12-18,3774.744,3781.551,0.18,1,0.46088
3,2024-12-16,3748.22,3765.163,0.45,1,0.771217
4,2024-12-12,3744.519,3748.267,0.1,1,0.927686


In [125]:
df.columns

Index(['Date', 'Open', 'Close', 'Change', 'sentiment', 'sentiment_score'], dtype='object')

In [126]:
features = ['Open', 'Close', 'Change', 'sentiment', 'sentiment_score']
target = 'Close'

In [127]:
# 1. Data Preprocessing
# Scale the data
scaler = MinMaxScaler()
scaled_data = scaler.fit_transform(df[features])
scaled_data = scaled_data.astype(np.float32)  # Convert to float32 for PyTorch

In [128]:
# Create sequences (assuming we want to predict next day's Close price)
def create_sequences(data, seq_length=10):
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data[i:(i + seq_length)])
        y.append(data[i + seq_length, 1])  # Index 1 for Close price
    return np.array(X), np.array(y)

seq_length = 10
X, y = create_sequences(scaled_data)

In [129]:
# 5. Split into train/test
X_train, X_test = X_tensor[:split], X_tensor[split:]
split = int(0.8 * len(X))  # 80% train, 20% test
X_train, X_test = X[:split], X[split:]
y_train, y_test = y[:split], y[split:]

print("Split data into train/test sets:")
print(f"X_train: {len(X_train)}")
print(f"y_train: {len(y_train)}")
print(f"X_test: {len(X_test)}")
print(f"y_test: {len(y_test)}")

Split data into train/test sets:
X_train: 336
y_train: 336
X_test: 84
y_test: 84


In [131]:
batch_size = 16
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

train_loader = DataLoader(TensorDataset(X_train_tensor, y_train_tensor), batch_size=batch_size, shuffle=True)
test_loader = DataLoader(TensorDataset(X_test_tensor, y_test_tensor), batch_size=batch_size, shuffle=False)

In [132]:
# 7. Define CNN-LSTM Model
class CNNLSTM(nn.Module):
    def __init__(self, input_size, hidden_size=64, num_layers=2):
        super(CNNLSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        # CNN Layer
        self.conv1 = nn.Conv1d(in_channels=input_size, out_channels=32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv1d(in_channels=32, out_channels=64, kernel_size=3, padding=1)
        self.relu = nn.ReLU()
        self.pool = nn.MaxPool1d(kernel_size=2)

        # LSTM Layer
        self.lstm = nn.LSTM(input_size=64, hidden_size=hidden_size, num_layers=num_layers, batch_first=True)

        # Fully Connected Layer
        self.fc = nn.Linear(hidden_size, 1)  # Output layer (predicts closing price)

    def forward(self, x):
        x = x.permute(0, 2, 1)  # Reshape for Conv1D (batch, features, timesteps)
        x = self.relu(self.conv1(x))
        x = self.pool(self.relu(self.conv2(x)))

        x = x.permute(0, 2, 1)  # Reshape for LSTM (batch, timesteps, features)
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        
        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])  # Take output from last time step
        return out

In [133]:
# 8. Initialize Model, Loss Function, Optimizer
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
input_size = len(features)  # Number of features per time step

model = CNNLSTM(input_size=input_size).to(device)
criterion = nn.MSELoss()  # Mean Squared Error for regression
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [134]:
# 9. Train Model
epochs = 50
for epoch in range(epochs):
    model.train()
    total_loss = 0
    for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)

        optimizer.zero_grad()
        predictions = model(X_batch)
        loss = criterion(predictions, y_batch)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss / len(train_loader):.6f}")

  return F.mse_loss(input, target, reduction=self.reduction)


Epoch 1/50, Loss: 0.072037
Epoch 2/50, Loss: 0.036881
Epoch 3/50, Loss: 0.034636
Epoch 4/50, Loss: 0.036352
Epoch 5/50, Loss: 0.035749
Epoch 6/50, Loss: 0.035815
Epoch 7/50, Loss: 0.034952
Epoch 8/50, Loss: 0.034843
Epoch 9/50, Loss: 0.034568
Epoch 10/50, Loss: 0.034697
Epoch 11/50, Loss: 0.035129
Epoch 12/50, Loss: 0.035281
Epoch 13/50, Loss: 0.035506
Epoch 14/50, Loss: 0.034775
Epoch 15/50, Loss: 0.035235
Epoch 16/50, Loss: 0.035696
Epoch 17/50, Loss: 0.035605
Epoch 18/50, Loss: 0.035121
Epoch 19/50, Loss: 0.036359
Epoch 20/50, Loss: 0.035289
Epoch 21/50, Loss: 0.035015
Epoch 22/50, Loss: 0.035426
Epoch 23/50, Loss: 0.035203
Epoch 24/50, Loss: 0.034813
Epoch 25/50, Loss: 0.034932
Epoch 26/50, Loss: 0.034751
Epoch 27/50, Loss: 0.035006
Epoch 28/50, Loss: 0.034824
Epoch 29/50, Loss: 0.034768
Epoch 30/50, Loss: 0.034682
Epoch 31/50, Loss: 0.034656
Epoch 32/50, Loss: 0.036443
Epoch 33/50, Loss: 0.034887
Epoch 34/50, Loss: 0.034415
Epoch 35/50, Loss: 0.034889
Epoch 36/50, Loss: 0.035104
E

In [137]:
# 10. Evaluate Model
import matplotlib.pyplot as plt
from sklearn.metrics import mean_absolute_error

model.eval()
test_losses = []
predictions_list = []

with torch.no_grad():
    for X_batch, y_batch in test_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        predictions = model(X_batch)
        loss = criterion(predictions, y_batch)
        test_losses.append(loss.item())
        predictions_list.extend(predictions.cpu().numpy())

# 2. Compute MSE & MAE
test_mse = np.mean(test_losses)

print(f"Test MSE: {test_mse:.6f}")

# 3. Convert Predictions Back to Original Scale
predictions = scaler.inverse_transform(np.array(predictions_list).reshape(-1, 1))
y_test_original = scaler.inverse_transform(y_test_tensor.cpu().numpy().reshape(-1, 1))

# 4. Plot Results
plt.figure(figsize=(12, 6))
plt.plot(df["Date"].values[split+seq_length:], y_test_original, label="Actual", color='blue')
plt.plot(df["Date"].values[split+seq_length:], predictions, label="Predicted", color='red', linestyle='dashed')
plt.legend()
plt.xlabel("Date")
plt.ylabel("Stock Price")
plt.title("Stock Price Prediction with CNN-LSTM")
plt.grid()
plt.show()

Test MSE: 0.091326
