# Loading Required Libraries

In [None]:
# Importing required libraries
import numpy as np
import datetime
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
import torch.optim as optim
from torch.autograd import Variable
from sklearn.metrics import classification_report
torch.manual_seed(123)

# Loading the datasets

In [None]:
# Loading training, validation and test datasets
validation= torch.load('Validation/Dataset/validation_dataset.t')
test = torch.load('Test/Dataset/test_dataset.t')

# Loading labels
training_labels = torch.load('Training/Labels/training_labels.t')
validation_labels = torch.load('Validation/Labels/validation_labels.t')
test_labels = torch.load('Test/Labels/test_labels.t')

In [None]:
# Printing out shapes for validation and test
print('Validation shape: ', validation.shape)
print('Test shape: ', test.shape)

# Weight balance

Since the dataset is not balanced, weights are computed and passed to the loss function so the training is not biased for the majority class, in this case, the stationary class.

In [None]:
# Counting the number of occurrences per class
number_of_labels=[]
type_of_label = [0, 1, 2]

# Selecting the labels
labels = training_labels.clone().detach()

for label in type_of_label:
    total_labels = labels[labels == label].shape[0]
    number_of_labels.append(total_labels)

In [None]:
# Computing the weights to be assigned so every class gets the same attention during training
weights=torch.tensor(number_of_labels)
weights= weights / weights.sum()
print(weights)
weights = 1/weights
print(weights)
weights = weights / weights.sum()
print(weights)

# Dataloader

We create different dataloaders for the training split and for the evaluation or test splits. Examples for the training were saved individually since the whole dataset did not fit on memory.

In [None]:
# First we create a Dataset object for the training and test datasets and labels
class Dataset_no_train(torch.utils.data.Dataset):
    
    def __init__(self, dataset, labels):
        self.dataset = dataset
        self.labels = labels
        
    def __len__(self):
        return len(self.dataset)
    
    def __getitem__(self, idx):
        item=self.dataset[idx]
        label=self.labels[idx].long()
        
        return item, label

In [None]:
class Dataset_train(torch.utils.data.Dataset):
    
    # Characterizes a dataset for PyTorch
    def __init__(self, labels):
        # Initialization
        self.labels = labels
        self.list_IDs = range(len(labels))


    def __len__(self):
        # Denotes the total number of samples
        return len(self.list_IDs)

    def __getitem__(self, index):
        # Generates one sample of data
        
        # Load data and get label
        item = torch.load('Training/Dataset/' + str(index) + '.t')
        label = self.labels[index].long()

        return item, label

In [None]:
# We define then the training, validation and test dataset objects
training_dataset = Dataset_train(training_labels)
validation_dataset = Dataset_no_train(validation, validation_labels)
test_dataset = Dataset_no_train(test, test_labels)

In [None]:
# Dataloader
batch_size = 512
num_workers = 32
pin_memory = True
training_dataloader = torch.utils.data.DataLoader(training_dataset, 
                                                  batch_size=batch_size, 
                                                  shuffle=True, 
                                                  num_workers=num_workers)

validation_dataloader = torch.utils.data.DataLoader(validation_dataset, 
                                                    batch_size=batch_size, 
                                                    shuffle=False, 
                                                    num_workers=num_workers)

test_dataloader = torch.utils.data.DataLoader(test_dataset, 
                                              batch_size=batch_size, 
                                              shuffle=False, 
                                              num_workers=num_workers)

In [None]:
print('Training dataloader length: ', len(training_dataloader))
print('Validation dataloader length: ', len(validation_dataloader))
print('Test dataloader length: ', len(test_dataloader))

# LSTM Model

In [None]:
class LSTM_Model(nn.Module):

    def __init__(self, input_size, hidden_size, num_layers, num_classes, dropout_p=0.5):
        super(LSTM_Model, self).__init__()
        
        """
        Args:
            input_size: The number of expected features in the input x.
            hidden_size: The number of features in the hidden state h.
            num_layers: Number of recurrent layers. Would mean stacking two LSTMs together to form a stacked LSTM.
            num_classes: Total number of classes to classify to: Up, Down or Stationary.
            dropout_p: Probability of an element to be zeroed. Default: 0.5
        """
        
        self.model_type = 'Long Short-Term Memory'
        self.hidden_size = hidden_size
        
        # LSTM layer
        self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True)
        
        # Dropout layer
        self.dropout = nn.Dropout(dropout_p)
        
        # Fully connected layer
        self.fc = nn.Linear(hidden_size, num_classes)        

    def forward(self, src):
        """
        Args:
            src: Tensor, shape[batch_size, seq_len, input_size]
        
        """
        
        # Propagate input through model
        output, (h_n, c_n) = self.lstm(src)
        h_n = h_n[-1].view(-1, self.hidden_size) # -1 was added to select the last hidden_states when num_layers > 1
        h_n = self.dropout(h_n)
        out = self.fc(h_n)
        
        return out

In [None]:
# Defining parameters
input_size = 40
hidden_size = 64
num_layers = 1
num_classes = 3
dropout_p = 0.5

# Creating the model
model = LSTM_Model(input_size, hidden_size, num_layers, num_classes, dropout_p)

# Defining some other training parameters, optimizer and loss function
learning_rate = 0.001
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
loss_fn = nn.CrossEntropyLoss(weight=weights)
n_epochs = 10

# The Training Loop

In [None]:
# Training loop
for epoch in range(1, n_epochs + 1):
    loss_train=0.0
    loss_validation=0.0
    
    for batch, labels in training_dataloader:    
        # Forward pass
        outputs=model(batch)
        train_loss=loss_fn(outputs, labels)
        
        # Backward pass
        optimizer.zero_grad()
        train_loss.backward()
        optimizer.step()
        
        # We accumulate the loss
        loss_train += train_loss.item()

    for batch, labels in validation_dataloader: 
        
        # We don't need to create a computation graph nor a backwards step for the validation set
        with torch.no_grad():
            outputs_validation= model(batch)
            validation_loss=loss_fn(outputs_validation, labels)
            assert validation_loss.requires_grad == False
            
        # We accumulate the test loss
        loss_validation += validation_loss.item()
            
    if epoch == 1 or epoch % 1 == 0:
        print('{} Epoch {}, Training loss {}, Validation loss {}'.format(datetime.datetime.now(), 
                                                                         epoch,
                                                                         loss_train / len(dataloader_train),
                                                                         loss_validation / len(dataloader_validation)))

# Model Evaluation on Test Dataset

In [None]:
# Function for evaluating the 
def evaluate(model: nn.Module, dataloader: DataLoader, print_report: bool):
    model.eval()  # Turn on evaluation mode
    all_outputs = torch.empty(0)
    all_labels = torch.empty(0)
    total_loss = 0.0
    total_correct = 0
    
    with torch.no_grad(): # Context-manager that disabled gradient calculation
        for batch, labels in dataloader:
            
            # Computing model output and loss
            outputs= model(batch)
            loss=loss_fn(outputs, labels)    
            assert loss.requires_grad == False
            
            # Concatenating the outputs and labels for the f1-score
            all_outputs = torch.cat((all_outputs, torch.argmax(outputs, axis=1)))
            all_labels = torch.cat((all_labels, labels))
            
            # Adding the loss for each batch
            total_loss += loss.item()
            
    total_loss_avg = total_loss / len(validation_dataloader) # Might be slightly different incorrect if last batch is considerably smaller than the batch_size
    report = classification_report(all_labels.cpu(), all_outputs.cpu(), output_dict=not(print_report), digits=4)
            
    return total_loss_avg, report

In [None]:
# Evaluating model on test
loss_test, report_test = evaluate(model, test_dataloader, print_report=True)
print(report_test)