In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np




In [None]:
# Encoder LSTM
class Encoder(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers):
        super(Encoder, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)

    def forward(self, x):
        outputs, (h, c) = self.lstm(x)  
        return outputs, h, c  

#Attention Mechanism
class Attention(nn.Module):
    def __init__(self, hidden_size):
        super(Attention, self).__init__()
        self.attn = nn.Linear(hidden_size * 2, hidden_size)  
        self.v = nn.Linear(hidden_size, 1, bias=False)  

    def forward(self, hidden, encoder_outputs):
        seq_len = encoder_outputs.shape[1]
        hidden = hidden.repeat(1, seq_len, 1)  
        energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=2)))  
        attn_weights = torch.softmax(self.v(energy), dim=1)  
        context = torch.sum(attn_weights * encoder_outputs, dim=1)  
        return context, attn_weights  

#Decoder LSTM with Attention
class Decoder(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(Decoder, self).__init__()
        self.lstm = nn.LSTM(hidden_size + input_size, hidden_size, num_layers, batch_first=True)
        self.attention = Attention(hidden_size)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x, hidden, cell, encoder_outputs):
        context, attn_weights = self.attention(hidden[-1].unsqueeze(1), encoder_outputs)  
        lstm_input = torch.cat((x, context.unsqueeze(1)), dim=2)  
        output, (h, c) = self.lstm(lstm_input, (hidden, cell))  
        output = self.fc(output.squeeze(1))  
        return output, h, c, attn_weights  

#Seq2Seq Model
class Seq2Seq(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(Seq2Seq, self).__init__()
        self.encoder = Encoder(input_size, hidden_size, num_layers)
        self.decoder = Decoder(input_size, hidden_size, num_layers, output_size)

    def forward(self, x, target_seq_len, teacher_forcing_ratio=0.5):
        batch_size, seq_len, _ = x.shape
        encoder_outputs, h, c = self.encoder(x)  
        decoder_input = x[:, -1, :].unsqueeze(1)  

        outputs = []
        for _ in range(target_seq_len):
            output, h, c, _ = self.decoder(decoder_input, h, c, encoder_outputs)
            outputs.append(output)
            if np.random.rand() < teacher_forcing_ratio:
                decoder_input = output.unsqueeze(1)  
            else:
                decoder_input = decoder_input  
        return torch.stack(outputs, dim=1)  


In [3]:
# Data Preparation Function
def create_sequences(data, seq_length, target_length):
    X, y = [], []
    for i in range(len(data) - seq_length - target_length + 1):
        X.append(data[i:i + seq_length])
        y.append(data[i + seq_length:i + seq_length + target_length])  
    return np.array(X), np.array(y)

# Example Data (Stock Prices)
stock_prices = np.array([100, 102, 105, 107, 110, 115, 120, 125, 130, 128, 132, 135], dtype=np.float32)

# Create Dataset
seq_length = 4  
target_length = 2  
X, y = create_sequences(stock_prices, seq_length, target_length)
X_train = torch.tensor(X).unsqueeze(-1)  
y_train = torch.tensor(y).unsqueeze(-1)  

# Model Setup
input_size = 1  
hidden_size = 64  
num_layers = 2  
output_size = 1  

model = Seq2Seq(input_size, hidden_size, num_layers, output_size)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

# Training Function
def train(model, X_train, y_train, epochs=100):
    model.train()  
    for epoch in range(epochs):
        optimizer.zero_grad()
        outputs = model(X_train, target_seq_len=target_length, teacher_forcing_ratio=0.5)
        loss = criterion(outputs, y_train)
        loss.backward()
        optimizer.step()
        
        if (epoch + 1) % 10 == 0:
            print(f'Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}')


In [4]:
# Evaluation Function
def evaluate(model, X_input):
    model.eval()  
    with torch.no_grad():
        pred = model(X_input, target_seq_len=target_length, teacher_forcing_ratio=0.0)
    return pred


In [7]:
# Training the Model
train(model, X_train, y_train, epochs=5000)

Epoch [10/5000], Loss: 50.5362
Epoch [20/5000], Loss: 50.5357
Epoch [30/5000], Loss: 50.5354
Epoch [40/5000], Loss: 50.5350
Epoch [50/5000], Loss: 50.5346
Epoch [60/5000], Loss: 50.5342
Epoch [70/5000], Loss: 50.5338
Epoch [80/5000], Loss: 50.5334
Epoch [90/5000], Loss: 50.5331
Epoch [100/5000], Loss: 50.5326
Epoch [110/5000], Loss: 50.5322
Epoch [120/5000], Loss: 50.5318
Epoch [130/5000], Loss: 50.5314
Epoch [140/5000], Loss: 50.5309
Epoch [150/5000], Loss: 50.5305
Epoch [160/5000], Loss: 50.5300
Epoch [170/5000], Loss: 50.5297
Epoch [180/5000], Loss: 50.5292
Epoch [190/5000], Loss: 50.5288
Epoch [200/5000], Loss: 50.5284
Epoch [210/5000], Loss: 50.5279
Epoch [220/5000], Loss: 50.5275
Epoch [230/5000], Loss: 50.5270
Epoch [240/5000], Loss: 50.5266
Epoch [250/5000], Loss: 50.5262
Epoch [260/5000], Loss: 50.5257
Epoch [270/5000], Loss: 50.5253
Epoch [280/5000], Loss: 50.5247
Epoch [290/5000], Loss: 50.5243
Epoch [300/5000], Loss: 50.5238
Epoch [310/5000], Loss: 50.5234
Epoch [320/5000],

In [8]:
# Testing the Model on New Data
X_test = torch.tensor([[125, 130, 128, 132]], dtype=torch.float32).unsqueeze(-1)  
predicted = evaluate(model, X_test)

print("\nPredicted Stock Prices:", predicted.squeeze().tolist())  



Predicted Stock Prices: [132.19366455078125, 134.63034057617188]
