In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import numpy as np
import random
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import RobustScaler


In [None]:
# Setting random seed for reproducibility
torch.manual_seed(140)
np.random.seed(140)
random.seed(140)

In [None]:
# Simulate data 
x_values = torch.tensor([1, 2, 3], dtype=torch.float32).view(-1, 1, 1)
y_values = torch.tensor([10, 20, 30], dtype=torch.float32).view(-1, 1, 1)


In [None]:

class LSTM(nn.Module):
    def __init__(self, x_values, y_values, input_size, hidden_size, num_layers, output_size, batch_size=3, num_epochs=50, learning_rate=0.05):
        super(LSTM, self).__init__()
        
        # Initialize scalers
        self.sc_x = RobustScaler()
        self.sc_y = RobustScaler()
        
        # Preprocess the data
        self.x_scaled, self.y_scaled = self.fit_transform(x_values, y_values)
        
        # Create DataLoader
        self.train_loader = self.create_dataloader(self.x_scaled, self.y_scaled, batch_size)
        
        # LSTM Model Configuration
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.output_size = output_size
        self.num_epochs = num_epochs
        self.learning_rate = learning_rate
        
        # Define LSTM and FC layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def fit_transform(self, x_values, y_values):

        # Ensure input is in NumPy format
        if not isinstance(x_values, np.ndarray):
            x_values = x_values.numpy()
        if not isinstance(y_values, np.ndarray):
            y_values = y_values.numpy()

        # Scale the data
        x_values_2D = x_values.squeeze(-1).numpy()
        y_values_2D = y_values.squeeze(-1).numpy()
        
        x_scaled = self.sc_x.fit_transform(x_values_2D)
        y_scaled = self.sc_y.fit_transform(y_values_2D)
        
        x_scaled = torch.tensor(x_scaled).unsqueeze(-1)
        y_scaled = torch.tensor(y_scaled).unsqueeze(-1)
        
        return x_scaled, y_scaled
    
    def transform(self, x_values):
        # Ensure input is in NumPy format
        if not isinstance(x_values, np.ndarray):
            x_values = x_values.numpy()

        x_values_2D = x_values.squeeze(-1).numpy()
        x_scaled = self.sc_x.transform(x_values_2D)
        x_scaled = torch.tensor(x_scaled).unsqueeze(-1)
        return x_scaled

    def create_dataloader(self, x_scaled, y_scaled, batch_size):
        class CustomDataset(Dataset):
            def __init__(self, x_data, y_data):
                self.x_data = x_data
                self.y_data = y_data
            
            def __len__(self):
                return len(self.x_data)
            
            def __getitem__(self, idx):
                return self.x_data[idx], self.y_data[idx]
        
        dataset = CustomDataset(x_scaled, y_scaled)
        return DataLoader(dataset=dataset, batch_size=batch_size, shuffle=False)

    def forward(self, x):
        x, _ = self.lstm(x)
        x = self.fc(x[:, -1, :])  # Extract only the last timestep's output for prediction
        return x

    def inverse_transform_y(self, y_pred):
        # Ensure input is in NumPy format
        if not isinstance(y_pred, np.ndarray):
            y_pred = y_pred.numpy()

        # Convert predictions back to original scale
        y_pred_np = y_pred.detach().numpy()  # Convert to numpy array
        y_pred_orig = self.sc_y.inverse_transform(y_pred_np)
        return torch.tensor(y_pred_orig)

    def train_model(self):
        optimizer = optim.Adam(self.parameters(), lr=self.learning_rate)
        loss_fn = nn.MSELoss()  # Mean Squared Error Loss

        for epoch in range(self.num_epochs):
            self.train()  # Set the model to training mode
            for x, y in self.train_loader:
                output = self(x)  # Forward pass
                optimizer.zero_grad()  # Clear the gradients
                loss = loss_fn(output, y.view(-1, 1))  # Compute the loss
                loss.backward()  # Backward pass
                optimizer.step()  # Update the weights

            if epoch % 5 == 0:  # Print the loss every 5 epochs
                print(f'Epoch [{epoch}/{self.num_epochs}], Loss: {loss.item()}')

In [None]:
# Create an instance of the LSTM model
model = LSTM(x_values, y_values, input_size=1, hidden_size=10, num_layers=1, output_size=1, batch_size=3, num_epochs=50, learning_rate=0.01)
model.train_model() # Train the model


In [None]:
# Get a prediction
x_test = torch.tensor([4], dtype=torch.float32).view(-1, 1, 1)
x_test = model.transform(x_test)
y_pred = model(x_test)
y_pred_orig = model.inverse_transform_y(y_pred)

print(f'Prediction: {y_pred_orig.item()}')  # Prediction: 40.0