## Imports

In [None]:
import torch.nn as nn
import torch
import full_iri_dataset_generator as iri
from training_loop import train_model

## Constants

- `SEQUENCE_LENGTH` is the number of historical measurements before the target element to provide to the model
- `NUM_FREATURES_PER_SAMPLE` is how many details each measurement has. `IRI-only` has 3: left_iri, right_iri, and time_since_first_measurement
- `NUM_LAYERS` is the number of RNN layers to use

In [None]:
SEQUENCE_LENGTH = 10
NUM_HEADS = 1
NUM_FEATURES_PER_SAMPLE = 14
NUM_LAYERS = 3
EMBEDDING_DIM = SEQUENCE_LENGTH * NUM_FEATURES_PER_SAMPLE

## Dataset Preperation

Load train and test datasets

In [None]:
train, test = iri.load_iri_datasets(path="../training_data/final_data.parquet",
                                    construction_path="../training_data/construction_data.parquet",
                                    seq_length=SEQUENCE_LENGTH)

print(len(train), len(test))
print(train[0][0].shape, train[0][1].shape)

## Model Definition

Here a basic RNN classifier model is defined.

1. Data is flattened
2. RNN layers process data and modify hidden state
3. final layer maps hidden state to 3 predicted probilities
4. outputs are scaled using a logsoftmax function

In [None]:
class LSTM_Attention_Layer(nn.Module):
    def __init__(self):
        super(LSTM_Attention_Layer, self).__init__()
        self.attention = nn.MultiheadAttention(EMBEDDING_DIM, NUM_HEADS)
        self.linear = nn.Linear(EMBEDDING_DIM, EMBEDDING_DIM)
        self.final = nn.Linear(EMBEDDING_DIM, EMBEDDING_DIM)
        self.lstm = nn.LSTM(input_size=EMBEDDING_DIM,
                          hidden_size=EMBEDDING_DIM,
                          num_layers=1,
                          batch_first=True)
    def forward(self, x):
        hidden = torch.zeros(1,
                             EMBEDDING_DIM).to(x.device)
        cell = torch.zeros(1,
                            EMBEDDING_DIM).to(x.device)
        x = x.reshape(x.shape[0], -1)
        out = self.linear(x)
        out, _ = self.attention(out, out, out)
        out, _ = self.lstm(x, (hidden, cell))
        out = self.final(out)
        return out
class LSTM(nn.Module):
    def __init__(self):
        super(LSTM, self).__init__()
        self.embedding = nn.Linear(SEQUENCE_LENGTH * NUM_FEATURES_PER_SAMPLE, EMBEDDING_DIM)
        self.first = LSTM_Attention_Layer()
        self.layers = nn.ModuleList([LSTM_Attention_Layer() for _ in range(NUM_LAYERS - 1)])
        self.final = nn.Linear(EMBEDDING_DIM, 2)

    def forward(self, x):
        out = self.embedding(x.reshape(-1, SEQUENCE_LENGTH * NUM_FEATURES_PER_SAMPLE))
        out = self.first(out)
        for layer in self.layers:
            out = layer(out)
        out = self.final(out.reshape(-1, EMBEDDING_DIM))
        return out

## Training

In [None]:
model = LSTM()
loss = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

lr_scheduler = torch.optim.lr_scheduler.CyclicLR(optimizer, base_lr=0.001, max_lr=0.1, step_size_up=10, cycle_momentum=False)

training_info = train_model(model, train, test, loss, optimizer, epochs=200, test_every_n=10, batch_size=512, lr_scheduler=lr_scheduler)

## Accuracy Computation

In [None]:
from torcheval.metrics import R2Score
from torch.utils.data import DataLoader

def compute_r2_for(dataset):
    r2 = R2Score()
    train_data = DataLoader(dataset, batch_size=256, shuffle=True)
    for _, data in enumerate(train_data):
        inputs, goal = data[0], data[1]
        outputs = model(inputs)
        r2.update(goal, outputs)
    return r2.compute()

model.to("cpu")
model.eval()
with torch.no_grad():
    train_r2 = compute_r2_for(train)
    print(f"R^2 for training data: {train_r2}")
    test_r2 = compute_r2_for(test)
    print(f"R^2 for testing data: {test_r2}")

In [None]:
from torcheval.metrics import MeanSquaredError

def compute_mse_for(dataset):
    mse = MeanSquaredError()
    train_data = DataLoader(dataset, batch_size=256, shuffle=True)
    for _, data in enumerate(train_data):
        inputs, goal = data[0], data[1]
        outputs = model(inputs)
        mse.update(outputs, goal)
    return mse.compute()

model.to("cpu")
model.eval()
with torch.no_grad():
    train_mse = compute_mse_for(train)
    print(f"MSE for training data: {train_mse}")
    test_mse = compute_mse_for(test)
    print(f"MSE for testing data: {test_mse}")

In [None]:
import matplotlib.pyplot as plt
import numpy as np

roads = range(0, 500)

#common sense testing
# call the model ith t increments of 360 for 10 timesteps and print the results
allres = []
for i in roads:
    results = []
    inpt = test[i][0]
    for i in range(0, 50):
        inpt[-2, -1] = 1 * i
        results.append(model(inpt.unsqueeze(0)).squeeze() * iri.iri_range + iri.mean_iri)
        results[-1] = results[-1].detach().numpy()
    allres.append(results)
allres = np.array(allres)
for i in range(50):
    plt.plot(allres[i, :, 0], label=f"Road {i}")