In [None]:
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import SGD, Adam
from torch.utils.data import DataLoader, TensorDataset

In [None]:
class BasicNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.w00 = nn.Parameter(torch.tensor(1.7), requires_grad=False)
        self.b00 = nn.Parameter(torch.tensor(-0.85), requires_grad=False)
        self.w01 = nn.Parameter(torch.tensor(-40.8), requires_grad=False)

        self.w10 = nn.Parameter(torch.tensor(12.6), requires_grad=False)
        self.b10 = nn.Parameter(torch.tensor(0.0), requires_grad=False)
        self.w11 = nn.Parameter(torch.tensor(2.7), requires_grad=False)

        self.final_bias = nn.Parameter(torch.tensor(-16.0), requires_grad=True)

    def forward(self, input):
        input_to_top_relu = input * self.w00 + self.b00
        top_relu_output = F.relu(input_to_top_relu)
        scaled_top_relu_output = top_relu_output * self.w01

        input_to_bottom_relu = input * self.w10 + self.b10
        bottom_relu_output = F.relu(input_to_bottom_relu)
        scaled_bottom_relu_output = bottom_relu_output * self.w11

        input_to_final_relu = scaled_top_relu_output + scaled_bottom_relu_output + self.final_bias

        return F.relu(input_to_final_relu)

In [None]:
model = BasicNN()

In [None]:
inputs = torch.tensor([0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0])
labels = torch.tensor([0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0])

optimizer = SGD(model.parameters(), lr=0.1)
total_loss = 0.0
for epoch in range(100):
    total_loss = 0
    for i in range(len(inputs)):
        input_i = inputs[i]
        label_i = labels[i]
        output_i = model(input_i)
        loss = (label_i - output_i) ** 2
        loss.backward()
        total_loss += float(loss)
    
    if total_loss < 0.0001:
        print("Num steps: ", epoch)
        print("Total loss: ", total_loss, "\n======")
        break
    
    optimizer.step()
    optimizer.zero_grad()
    print("Step: ", epoch)
    print("Total loss: ", total_loss)
    print("Bias value: ", model.final_bias.data, "\n======") 

In [None]:
final_outputs = model(inputs)
plt.plot(inputs, final_outputs.detach())

## LSTM

In [None]:
x_start, x_end = 0, 100
n_samples = 1000
noise = 0.0

x = np.linspace(x_start, x_end, num=n_samples)
y = np.sin(x) ** 2 + 0.5 * np.cos(x) + np.random.normal(loc=0, scale=noise, size=n_samples)
plt.plot(x, y)

In [None]:
class LSTMModel(nn.Module):
    def __init__(self, input_size=1, hidden_size=50, output_size=1, num_layers=1):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        out, _ = self.lstm(x)
        out = self.fc(out)
        return out

In [None]:
lstm = LSTMModel(input_size=1, hidden_size=50, output_size=1, num_layers=1)

In [None]:
# Prepare the data
xs = []
ys = []

for i in range(len(y) - 1):
    xs.append(y[i])
    ys.append(y[i + 1])

xs = np.array(xs).reshape(-1, 1)
xs = torch.tensor(xs, dtype=torch.float32)

ys = np.array(ys).reshape(-1, 1)
ys = torch.tensor(ys, dtype=torch.float32)

In [None]:
# Train the model
criterion = nn.MSELoss()
optimizer = Adam(lstm.parameters(), lr=0.01)
num_epochs = 100
for epoch in range(num_epochs):
    lstm.train()
    optimizer.zero_grad()
    
    # Forward pass
    y_hat = lstm(xs)
    loss = criterion(y_hat, ys)

    # Backward pass and optimization
    loss.backward()
    optimizer.step()

    if (epoch + 1) % 10 == 0:
        print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}')

In [None]:
# Fitted values
lstm.eval()
with torch.no_grad():
    y_hat = lstm(xs)
y_hat = y_hat.numpy()

plt.plot(np.arange(len(y)), y, label='Original values', color='blue')
plt.plot(np.arange(len(y_hat)), y_hat, label='Fitted values', color='red')

In [None]:
# https://www.youtube.com/watch?v=RHGiXPuo_pI
# article on LSTMs, RNNs, and GRUs

### Going further with RNNs

In [None]:
# Useful resources:
# https://medium.com/@imjeremyhi/understanding-recurrent-networks-part-1-simple-rnn-lstm-cc53e7475980
# https://github.com/fastai/fastbook/blob/master/12_nlp_dive.ipynb

In [None]:
# Predict next value based on the last 10 values
x_start, x_end = 0, 100
n_samples = 1000
noise = 0.0

x = np.linspace(x_start, x_end, num=n_samples)
y = np.sin(x) + np.cos(0.5 * x) + 0.4 * np.cos(0.1 * x)
plt.plot(x, y)

In [None]:
seq_len = 3
xs, ys = [], []
for i in range(len(y) - seq_len):
    xs.append(y[i:i + seq_len])
    ys.append(y[i + seq_len])

xs = np.array(xs).reshape(-1, seq_len)
xs = torch.tensor(xs, dtype=torch.float32)

ys = np.array(ys).reshape(-1, 1)
ys = torch.tensor(ys, dtype=torch.float32)

In [None]:
class RNN1(nn.Module):
    def __init__(self, input_size=1, hidden_size=25):
        super().__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.ih = nn.Linear(input_size, hidden_size)
        self.hh = nn.Linear(hidden_size, hidden_size)
        self.ho = nn.Linear(hidden_size, 1)
    
    def forward(self, x: torch.Tensor):
        x = x.view(-1, 3)  # Reshape to (batch_size, seq_len)
        h = torch.zeros(x.shape[0], self.hidden_size)
        h = F.relu(self.ih(x[:, 0].view(-1, 1)) + self.hh(h))  # .view(-1, 1) to make it 2D
        h = F.relu(self.ih(x[:, 1].view(-1, 1)) + self.hh(h))
        h = F.relu(self.ih(x[:, 2].view(-1, 1)) + self.hh(h))
        return self.ho(h)

In [None]:
# Define model
rnn_model = RNN1(input_size=1, hidden_size=25)

# Define data loaders
dataset = TensorDataset(xs, ys)
dataloader = DataLoader(dataset, batch_size=32)

criterion = nn.MSELoss()
optimizer = Adam(rnn_model.parameters(), lr=0.01)
num_epochs = 100

rnn_model.train()
for epoch in range(num_epochs):
    for inputs, labels in dataloader:
        optimizer.zero_grad()
        outputs = rnn_model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

    if (epoch + 1) % 10 == 0:
        print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}')

In [None]:
# Fitted values
rnn_model.eval()
with torch.no_grad():
    fitted_values = rnn_model(xs).view(-1)
fitted_values = fitted_values.numpy()

plt.plot(x, y, label='Original values', color='blue')
plt.plot(x[seq_len:], fitted_values, label='Fitted values', color='red')

## Forecasting several timestamps ahead

In [None]:
# Predict next value based on the last 10 values
x_start, x_end = 0, 100
n_samples = 1000
noise = 0.2

x = np.linspace(x_start, x_end, num=n_samples)
y = np.sin(x) + np.cos(0.5 * x) + 0.4 * np.cos(0.1 * x) + np.random.normal(loc=0, scale=noise, size=n_samples)
plt.plot(x, y)

In [None]:
in_seq_len = 60
out_seq_len = 21

xs, ys = [], []
for i in range(len(y) - in_seq_len - out_seq_len):
    xs.append(y[i:i + in_seq_len])
    for j in range(in_seq_len):
        ys.append(y[i + j + 1: i + j + out_seq_len + 1])

xs = np.array(xs).reshape(-1, in_seq_len)
ys = np.array(ys).reshape(-1, in_seq_len, out_seq_len)

In [None]:
sample_idx = 4
fig, axes = plt.subplots(in_seq_len, 1, figsize=(10, in_seq_len * 1.5), sharex=True)
axes = axes.flatten()

for i in range(in_seq_len):
    axes[i].scatter(
        x[sample_idx:sample_idx + in_seq_len + out_seq_len],
        y[sample_idx:sample_idx + in_seq_len + out_seq_len],
        label='Original sequence',
        color='blue'
    )

    axes[i].plot(
        x[sample_idx:sample_idx + in_seq_len],
        xs[sample_idx],
        label='Input sequence',
        color='green',
        lw=2.5,
    )

    axes[i].plot(
        x[sample_idx + i + 1:sample_idx + i + out_seq_len + 1],
        ys[sample_idx, i],
        label='Output sequence',
        color='orange',
    )

fig.tight_layout();

In [None]:
class RNN2(nn.Module):
    def __init__(
        self,
        in_seq_length: int,
        out_seq_len: int,
        input_size: int = 1,
        hidden_size: int = 25,
        batch_size: int = 32,
    ):
        super().__init__()
        self.in_seq_length = in_seq_length
        self.out_seq_len = out_seq_len
        self.hidden_size = hidden_size
        self.input_size = input_size
        self.batch_size = batch_size
        self.ih = nn.Linear(input_size, hidden_size)
        self.hh = nn.Linear(hidden_size, hidden_size)
        self.ho = nn.Linear(hidden_size, out_seq_len)
    
    def forward(self, x: torch.Tensor, h: torch.Tensor | None = None):
        if h is None:
            h = torch.zeros(self.batch_size, self.hidden_size, dtype=torch.float32)
        
        outs = []
        for i in range(self.in_seq_length):
            h = F.relu(self.ih(x[:, i].view(-1, 1)) + self.hh(h))
            outs.append(self.ho(h))
        outs = torch.stack(outs, dim=1)
        return outs

In [None]:
batch_size = 32

xs = torch.tensor(xs, dtype=torch.float32)
ys = torch.tensor(ys, dtype=torch.float32)

ds = TensorDataset(xs, ys)
dl = DataLoader(ds, batch_size=32, drop_last=True)

In [None]:
rnn_model = RNN2(in_seq_length=in_seq_len, out_seq_len=out_seq_len, batch_size=batch_size)
criterion = nn.MSELoss()
optimizer = Adam(rnn_model.parameters(), lr=1e-03)

# Training loop
n_epochs = 250
rnn_model.train()
for epoch in range(n_epochs):
    for inputs, labels in dl:
        optimizer.zero_grad()
        outputs = rnn_model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

    if (epoch + 1) % 1 == 0:
        print(f'Epoch [{epoch + 1}/{n_epochs}], Loss: {loss.item():.4f}')

In [None]:
# Plot the fitted values for each batch
plt.plot(x, y)

for batch_idx, (inputs, _) in enumerate(dl):
    outputs = rnn_model(inputs).detach().numpy()
    x_start = batch_idx * batch_size + in_seq_len + 1
    x_end = x_start + out_seq_len
    plt.plot(x[x_start: x_end], outputs[0, -1, :], color="red")

## Multilayer RNN

In [None]:
class RNN3(nn.Module):
    def __init__(
        self,
        in_seq_length: int,
        out_seq_len: int,
        input_size: int = 1,
        hidden_size: int = 25,
        batch_size: int = 32,
        num_layers: int = 1
    ):
        super().__init__()
        self.in_seq_length = in_seq_length
        self.out_seq_len = out_seq_len
        self.hidden_size = hidden_size
        self.input_size = input_size
        self.batch_size = batch_size
        self.rnn = nn.RNN(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True)
        self.ho = nn.Linear(in_features=hidden_size, out_features=out_seq_len)
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        out, _ = self.rnn(x)
        return self.ho(out)

In [None]:
# Predict next value based on the last 10 values
x_start, x_end = 0, 100
n_samples = 1000
noise = 0.3

x = np.linspace(x_start, x_end, num=n_samples)
y = np.sin(x) + np.cos(0.5 * x) + 0.4 * np.cos(0.1 * x) + np.random.normal(loc=0, scale=noise, size=n_samples)
plt.plot(x, y)

In [None]:
# Prepare datasets
in_seq_len = 60
out_seq_len = 21

xs, ys = [], []
for i in range(len(y) - in_seq_len - out_seq_len):
    xs.append(y[i:i + in_seq_len])
    for j in range(in_seq_len):
        ys.append(y[i + j + 1: i + j + out_seq_len + 1])

xs = np.array(xs).reshape(-1, in_seq_len, 1)
xs = torch.tensor(xs, dtype=torch.float32)

ys = np.array(ys).reshape(-1, in_seq_len, out_seq_len)
ys = torch.tensor(ys, dtype=torch.float32)

In [None]:
batch_size = 32
ds = TensorDataset(xs, ys)
dl = DataLoader(ds, batch_size=32)

In [None]:
rnn_model = RNN3(
    in_seq_length=in_seq_len,
    out_seq_len=out_seq_len,
    input_size=1,
    hidden_size=25,
    num_layers=3,
    batch_size=batch_size
)
criterion = nn.MSELoss()
optimizer = Adam(rnn_model.parameters(), lr=1e-03)

# Training loop
n_epochs = 200
rnn_model.train()
for epoch in range(n_epochs):
    for inputs, labels in dl:
        optimizer.zero_grad()
        outputs = rnn_model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

    if (epoch + 1) % 10 == 0:
        print(f'Epoch [{epoch + 1}/{n_epochs}], Loss: {loss.item():.4f}')

## Multilayer LSTM, Long Horizon