In [19]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import random
import numpy as np
import plotly.graph_objects as go

device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using '{device}' device")

torch.manual_seed(42)


class SinDataset(Dataset):
    def __init__(self, seq_length=25, num_samples=1000, step=0.1):
        self.seq_length = seq_length
        self.num_samples = num_samples
        self.step = step
        self.X = self.generate_data()

    def generate_data(self):
        seq = []
        for _ in range(self.num_samples):
            start = random.uniform(0, 2 * np.pi)
            end = start + self.seq_length * self.step
            sin_vals = [torch.sin(x) for x in torch.arange(start, end, self.step)]
            # Truncate or pad the sequence to ensure it has the correct length
            if len(sin_vals) > self.seq_length:
                sin_vals = sin_vals[: self.seq_length]
            elif len(sin_vals) < self.seq_length:
                sin_vals.extend([torch.tensor(0.0)] * (self.seq_length - len(sin_vals)))
            seq.append(torch.stack(sin_vals))
        # Convert list of tensors to a single tensor
        return torch.stack(seq)

    def __len__(self):
        return self.num_samples

    def __getitem__(self, idx):
        return self.X[idx, :-1], self.X[idx, 1:]


class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(RNN, self).__init__()
        self.hidden_size = hidden_size
        self.rnn = nn.RNN(input_size, hidden_size, batch_first=True, num_layers=1)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        x = x.unsqueeze(-1)
        out, _ = self.rnn(x)
        out = self.fc(out)
        return out


def train(model, data_loader, epochs, optimizer, loss_fn):
    model.to(device)
    iteration_losses = []
    print("=> Starting training")
    for epoch in range(epochs):
        epoch_loss = 0
        for X, Y in data_loader:
            X, Y = X.to(device), Y.to(device)
            optimizer.zero_grad()
            output = model(X)
            # Adjust dimensions of the target tensor to match the output tensor
            Y = Y.unsqueeze(-1)
            # Ensure both output and target have the same size
            if Y.size() != output.size():
                Y = Y.expand_as(output)
            # Calculate the loss
            loss = loss_fn(output, Y)
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
        iteration_losses.append(epoch_loss)
        print(
            f"=> Epoch: {epoch + 1}, Loss: {epoch_loss / len(data_loader):.3e}",
            end="\r",
        )
    print("\n=> Training finished")
    return iteration_losses


def plot_loss_over_iterations(losses):
    fig = go.Figure()
    fig.add_trace(
        go.Scatter(
            x=np.arange(len(losses)),
            y=losses,
            mode="lines",
            name="Loss",
            line=dict(width=3),
        ),
    )
    fig.update_yaxes(type="log")
    fig.update_layout(
        title="Loss Over Iterations",
        xaxis_title="Iteration",
        yaxis_title="Loss (log scale)",
        template="plotly_dark",
    )
    fig.show()


def plot_predictions(real_values, predicted_values):
    fig = go.Figure()
    fig.add_trace(
        go.Scatter(
            x=np.arange(len(real_values)),
            y=real_values,
            mode="lines",
            name="Real",
            line=dict(width=3),
        )
    )
    fig.add_trace(
        go.Scatter(
            x=np.arange(len(predicted_values)),
            y=predicted_values,
            mode="markers+lines",
            name="Predicted",
            line=dict(width=3),
        )
    )
    fig.update_layout(
        title="Real vs Predicted Values",
        xaxis_title="Time Step",
        yaxis_title="Value",
        template="plotly_dark",
    )
    fig.show()

Using 'cuda' device


In [20]:
seq_length = 25
hidden_size = 128
input_size = 1
output_size = 1
num_samples = 1000
batch_size = 64
epochs = 100
learning_rate = 0.001

sin_dataset = SinDataset(seq_length=seq_length, num_samples=num_samples)
sin_dataloader = DataLoader(sin_dataset, batch_size=batch_size, shuffle=True)

In [21]:
rnn_model = RNN(input_size, hidden_size, output_size)
loss_function = nn.MSELoss()
optimizer = optim.Adam(rnn_model.parameters(), lr=learning_rate)

losses = train(rnn_model, sin_dataloader, epochs, optimizer, loss_function)

=> Starting training
=> Epoch: 100, Loss: 1.110e-03
=> Training finished


In [22]:
# Generate predictions for the first sequence in the dataset
with torch.no_grad():
    input_seq, target_seq = sin_dataset[0]
    input_seq = input_seq.unsqueeze(0).to(device)
    predicted_seq = rnn_model(input_seq)

plot_predictions(
    target_seq.cpu().numpy()[:100], predicted_seq.cpu().numpy().flatten()[:100]
)
plot_loss_over_iterations(losses)