In [None]:
# Generic/Built-in
from typing import *

# Libs
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, ConcatDataset, Subset, DataLoader
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

# Custom
from src.data_preparation import *

In [None]:
train_dataset, val_dataset, test_dataset, norm_stats = prepare_datasets(
    sequence_size=100,
    stride=50,
    train_ratio=0.8,
    val_ratio=0.1,
    test_ratio=0.1,
    load_if_exists=True
)

In [None]:
len(train_dataset)

In [33]:
train_dataloader = DataLoader(train_dataset, batch_size = 64, shuffle= True)


In [34]:
for X, y in train_dataloader:
    print(X.shape)
    break

torch.Size([64, 100, 6])


In [35]:
class EncoderRNN(nn.Module):
    def __init__(self, input_size, hidden_size, \
                 num_layers = 1, dropout = 0.0, num_classes = 7):
        super(EncoderRNN, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.rnn = nn.LSTM(input_size, hidden_size, \
                           num_layers = num_layers, 
                           dropout = dropout if num_layers > 1 else 0.0, 
                           batch_first = True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, input_seq):
        # For this encoder, we ignore the outputs 
        # We use the final hidden state for the calculation of the logits
        # TODO: some function that will extract actual hidden state, output could only be a function fo hidden state
        outputs, (hn, _) = self.rnn(input_seq)
        logits = self.fc(hn[-1])
        return logits

In [36]:
def train_encoder(model, dataloader, num_epochs, learning_rate):
    model.train()  # Set the model to training mode
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr = learning_rate)

    for epoch in range(num_epochs):
        total_loss = 0
        correct = 0
        total = 0
        for input_seq, target_seq in dataloader:
            optimizer.zero_grad()
            output_seq = model(input_seq)
            loss = criterion(output_seq, target_seq)
            total_loss += loss.item()

            _, predicted = torch.max(output_seq, dim=1)  # Get class with highest logit
            correct += (predicted == target_seq).sum().item()
            total += target_seq.size(0)

            loss.backward()
            optimizer.step()
        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {total_loss/len(dataloader)}, Accuracy: {(correct / total)*100}')


In [37]:
# Hyperparameters
input_size = 6
hidden_size = 30
learning_rate = 0.001
num_epochs = 5
num_layers = 1
dropout_rate = 0.1

encoder = EncoderRNN(input_size, hidden_size, num_layers, dropout_rate)
train_encoder(encoder, train_dataloader, num_epochs, learning_rate)

Epoch 1/5, Loss: 0.7039836268872023, Accuracy: 74.53119767831231
Epoch 2/5, Loss: 0.38282975779314127, Accuracy: 87.19723183391004
Epoch 3/5, Loss: 0.27637421659060885, Accuracy: 91.5978345797522
Epoch 4/5, Loss: 0.25691091979720765, Accuracy: 92.04431298135952
Epoch 5/5, Loss: 0.24727368843076483, Accuracy: 92.21174238196227


In [38]:
test_dataloader = DataLoader(test_dataset, batch_size = 128, shuffle= True)

In [42]:
def evaluate(model, dataloader):
    model.eval()  # Set the model to training mode
    criterion = nn.CrossEntropyLoss()

    with torch.no_grad():
        total_loss = 0
        correct = 0
        total = 0
        for input_seq, target_seq in dataloader:
            output_seq = model(input_seq)
            loss = criterion(output_seq, target_seq)
            total_loss += loss.item()

            _, predicted = torch.max(output_seq, dim=1)  # Get class with highest logit
            correct += (predicted == target_seq).sum().item()
            total += target_seq.size(0)

        print(f"Test accuracy:{(correct/total) * 100}")

In [44]:
evaluate(encoder, test_dataloader)

Test accuracy:93.10267857142858
