In [1]:
# imports
import pandas as pd
import numpy as np

import matplotlib.pyplot as plt
import seaborn as sns

import torch
import torch.nn as nn

from torch.utils.data import TensorDataset, DataLoader
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torchsummary import summary
from tqdm import  tqdm

import os, json, gc, io, joblib
from contextlib import redirect_stdout
import math

from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix


### LSTM1

In [10]:
# Custom LSTM model
class LSTMModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(LSTMModel, self).__init__()

        # Multi-layer LSTM
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers=3, batch_first=True, dropout=0.2)

        # Flatten layer 

        # Define Linear layers with Batch Normalization, GELU, and Dropout
        self.linear1 = nn.Linear(hidden_dim, 64)
        self.batch_norm1 = nn.BatchNorm1d(64)
        self.gelu1 = nn.GELU()
        self.dropout1 = nn.Dropout(0.2)

        self.linear2 = nn.Linear(64, 32)
        self.batch_norm2 = nn.BatchNorm1d(32)
        self.gelu2 = nn.GELU()
        self.dropout2 = nn.Dropout(0.2)

        self.linear3 = nn.Linear(32, 16)
        self.batch_norm3 = nn.BatchNorm1d(16)
        self.gelu3 = nn.GELU()

        # Output layer
        self.output_layer = nn.Linear(16, output_dim)

    def forward(self, x):
        # Pass input through LSTM layers
        # print("Output type1:", type(x))
        (lstm_out, _) = self.lstm(x)

        # Taking the output of the last time step
        # print("Output type2:", type(x))
        x = lstm_out[:, -1, :]

        # Pass through Linear layers
        x = self.linear1(x)
        x = self.batch_norm1(x)
        x = self.gelu1(x)
        x = self.dropout1(x)

        x = self.linear2(x)
        x = self.batch_norm2(x)
        x = self.gelu2(x)
        x = self.dropout2(x)

        x = self.linear3(x)
        x = self.batch_norm3(x)
        x = self.gelu3(x)

        # Output layer
        x = self.output_layer(x)
        
        return x 


### GRUs (Gated Recurrent Units)

In [22]:
# Custom GRU model
class GRUModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers):
        super(GRUModel, self).__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        
        # GRU Layer
        self.gru = nn.GRU(input_dim, hidden_dim, num_layers, batch_first=True)

        # Flatten layer 

        # Define Linear layers with Batch Normalization, GELU, and Dropout
        self.linear1 = nn.Linear(hidden_dim, 64)
        self.batch_norm1 = nn.BatchNorm1d(64)
        self.gelu1 = nn.GELU()
        self.dropout1 = nn.Dropout(0.2)

        self.linear2 = nn.Linear(64, 32)
        self.batch_norm2 = nn.BatchNorm1d(32)
        self.gelu2 = nn.GELU()
        self.dropout2 = nn.Dropout(0.2)

        self.linear3 = nn.Linear(32, 16)
        self.batch_norm3 = nn.BatchNorm1d(16)
        self.gelu3 = nn.GELU()

        # Output layer
        self.output_layer = nn.Linear(16, output_dim)

    def forward(self, x):
        # Pass input through GRU layers
        # print("Output type1:", type(x))
        gru_out, _ = self.gru(x)

        # Taking the output of the last time step
        # print("Output type2:", type(x))
        x = gru_out[:, -1, :]

        # Pass through Linear layers
        x = self.linear1(x)
        x = self.batch_norm1(x)
        x = self.gelu1(x)
        x = self.dropout1(x)

        x = self.linear2(x)
        x = self.batch_norm2(x)
        x = self.gelu2(x)
        x = self.dropout2(x)

        x = self.linear3(x)
        x = self.batch_norm3(x)
        x = self.gelu3(x)

        # Output layer
        x = self.output_layer(x)
        
        return x 


### 1D Convolutional NN

In [33]:
class Conv1DModel(nn.Module):
    def __init__(self, num_features, num_classes):
        super(Conv1DModel, self).__init__()

        # Conv1D Layers
        self.conv1 = nn.Conv1d(in_channels=num_features, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.batch_norm1 = nn.BatchNorm1d(32)  
        self.gelu1 = nn.GELU()
        self.pool1 = nn.AvgPool1d(kernel_size=2, stride=2, padding=0)
        self.dropout1 = nn.Dropout(0.2)  # Dropout
        
        

        self.conv2 = nn.Conv1d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1)
        self.batch_norm2 = nn.BatchNorm1d(64)  
        self.gelu2 = nn.GELU()
        self.pool2 = nn.AvgPool1d(kernel_size=2, stride=2, padding=0)
        self.dropout2 = nn.Dropout(0.2)  # Dropout
        

        self.conv3 = nn.Conv1d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1)
        self.batch_norm3 = nn.BatchNorm1d(128)  
        self.gelu3 = nn.GELU()
        self.pool3 = nn.AvgPool1d(kernel_size=2, stride=2, padding=0)
        self.dropout3 = nn.Dropout(0.2)  # Dropout
        

        # Flatten layer 
        self.seq_length_after_conv_and_pool = 20 // 2 // 2 // 2 # Pooling 3 times with stride 2

        # Define Linear layers with Batch Normalization, GELU, and Dropout
        self.linear1 = nn.Linear(128 * self.seq_length_after_conv_and_pool, 64)
        self.batch_norm_lin1 = nn.BatchNorm1d(64)
        self.gelu_lin1 = nn.GELU()
        self.dropout_lin1 = nn.Dropout(0.2)

        self.linear2 = nn.Linear(64, 32)
        self.batch_norm_lin2 = nn.BatchNorm1d(32)
        self.gelu_lin2 = nn.GELU()
        self.dropout_lin2 = nn.Dropout(0.2)

        self.linear3 = nn.Linear(32, 16)
        self.batch_norm_lin3 = nn.BatchNorm1d(16)
        self.gelu_lin3 = nn.GELU()

        # Output layer
        self.output_layer = nn.Linear(16, output_dim)


    def forward(self, x):
        # Assuming x shape is (batch_size, seq_length, num_features)
        # Conv1d expects (batch_size, in_channels, seq_length), so transpose x
        x = x.transpose(1, 2)  # Now x shape: (batch_size, num_features, seq_length)

        # Apply Conv1D layers followed by pooling
        x = self.conv1(x)
        x = self.batch_norm1(x)
        x = self.gelu1(x)
        x = self.pool1(x)
        x = self.dropout1(x)
        
        x = self.conv2(x)
        x = self.batch_norm2(x)
        x = self.gelu2(x)
        x = self.pool2(x)
        x = self.dropout2(x)

        x = self.conv3(x)
        x = self.batch_norm3(x)
        x = self.gelu3(x)
        x = self.pool3(x)
        x = self.dropout3(x)
        
        # Flatten
        x = x.view(-1, 128 * self.seq_length_after_conv_and_pool)

        # Pass through Linear layers
        x = self.linear1(x)
        x = self.batch_norm_lin1(x)
        x = self.gelu_lin1(x)
        x = self.dropout_lin1(x)

        x = self.linear2(x)
        x = self.batch_norm_lin2(x)
        x = self.gelu_lin2(x)
        x = self.dropout_lin2(x)

        x = self.linear3(x)
        x = self.batch_norm_lin3(x)
        x = self.gelu_lin3(x)

        # Output layer
        x = self.output_layer(x)
        
        return x 


### Transformer Models

In [10]:
class TransformerModel(nn.Module):
    def __init__(self, input_dim, output_dim, seq_length, num_classes, d_model=64, nhead=4, num_encoder_layers=2, num_decoder_layers=2, dim_feedforward=256, dropout=0.1):
        super(TransformerModel, self).__init__()
        self.d_model = d_model
        self.seq_length = seq_length

        # Input embedding layer
        self.embedding = nn.Linear(input_dim, d_model)

        # Positional Encoding (Not using nn.Embedding here to keep it simple)
        self.positional_encoding = PositionalEncoding(d_model, dropout, seq_length)

        # Transformer
        transformer_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, dim_feedforward=dim_feedforward, dropout=dropout)
        self.transformer = nn.TransformerEncoder(transformer_layer, num_layers=num_encoder_layers)


        # Define Linear layers with Batch Normalization, GELU, and Dropout
        self.linear1 = nn.Linear(seq_length * d_model, 64)
        self.batch_norm_lin1 = nn.BatchNorm1d(64)
        self.gelu_lin1 = nn.GELU()
        self.dropout_lin1 = nn.Dropout(0.2)

        self.linear2 = nn.Linear(64, 32)
        self.batch_norm_lin2 = nn.BatchNorm1d(32)
        self.gelu_lin2 = nn.GELU()
        self.dropout_lin2 = nn.Dropout(0.2)

        self.linear3 = nn.Linear(32, 16)
        self.batch_norm_lin3 = nn.BatchNorm1d(16)
        self.gelu_lin3 = nn.GELU()

        # Output layer
        self.output_layer = nn.Linear(16, output_dim)


    def forward(self, src):
        # Assuming src shape is (batch_size, seq_length, input_dim)
        # Transformer expects (seq_length, batch_size, input_dim), so transpose src
        src = src.transpose(0, 1)

        # Embedding and positional encoding
        src = self.embedding(src)  # Now shape is (seq_length, batch_size, d_model)
        src = self.positional_encoding(src)

        # Transformer
        output = self.transformer(src)

        # For linear layers, we'll consider the output of all positions.
        # Reshape output to (batch_size, seq_length * d_model) before passing to linear layers.
        # Note: Adjusting this as per the expected input for linear layers.
        output = output.transpose(0, 1)  # Change back to (batch_size, seq_length, d_model)
        x = output.flatten(start_dim=1)

        # Pass through Linear layers
        x = self.linear1(x)
        x = self.batch_norm_lin1(x)
        x = self.gelu_lin1(x)
        x = self.dropout_lin1(x)

        x = self.linear2(x)
        x = self.batch_norm_lin2(x)
        x = self.gelu_lin2(x)
        x = self.dropout_lin2(x)

        x = self.linear3(x)
        x = self.batch_norm_lin3(x)
        x = self.gelu_lin3(x)

        # Output layer
        x = self.output_layer(x)
        
        return x 

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)


### Linear Models

In [21]:
class LinearModel(nn.Module):
    def __init__(self, seq_length, input_dim, output_dim):
        super(LinearModel, self).__init__()
        self.seq_length = seq_length
        self.input_dim = input_dim
        
        # Calculate the flattened input size
        self.flattened_size = seq_length * input_dim
        
        # Define Linear layers with Batch Normalization, GELU, and Dropout
        self.linear1 = nn.Linear(self.flattened_size, 128)
        self.batch_norm_lin1 = nn.BatchNorm1d(128)
        self.gelu_lin1 = nn.GELU()
        self.dropout_lin1 = nn.Dropout(0.2)

        self.linear2 = nn.Linear(128, 48)
        self.batch_norm_lin2 = nn.BatchNorm1d(48)
        self.gelu_lin2 = nn.GELU()
        self.dropout_lin2 = nn.Dropout(0.2)

        self.linear3 = nn.Linear(48, 16)
        self.batch_norm_lin3 = nn.BatchNorm1d(16)
        self.gelu_lin3 = nn.GELU()

        # Output layer
        self.output_layer = nn.Linear(16, output_dim)


    def forward(self, x):
        # Flatten the input
        x = x.view(-1, self.flattened_size)  # Reshape input to (batch_size, seq_length*input_dim)
        
        # Pass through Linear layers
        x = self.linear1(x)
        x = self.batch_norm_lin1(x)
        x = self.gelu_lin1(x)
        x = self.dropout_lin1(x)

        x = self.linear2(x)
        x = self.batch_norm_lin2(x)
        x = self.gelu_lin2(x)
        x = self.dropout_lin2(x)

        x = self.linear3(x)
        x = self.batch_norm_lin3(x)
        x = self.gelu_lin3(x)

        # Output layer
        x = self.output_layer(x)
        
        return x


### import Data

In [None]:
def import_data(seq_length):
    matrix_array = np.load(f'Data/matrix_array_{seq_length}_normalized.npy')
    answer_array = np.load(f'Data/answer_array_{seq_length}}.npy')

    labels = torch.tensor(answer_array)
    indices = torch.argmax(labels, dim=1)
    mapped_labels = torch.tensor([1 if i == 0 else 2 if i == 1 else 0 for i in indices])
    mapped_labels

    # answer = chunk.iloc[i+19][['plus_6', 'minus_6', 'zero_6']].tolist()
    # 1 = up , 2 = down, 0 = zero

    X = matrix_array
    y = mapped_labels

    X_train, X_temp, y_train, y_temp = train_test_split(X,y, test_size=0.2, random_state=1, stratify=y)
    X_valid, X_test, y_valid, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=1, stratify=y_temp)

    # Convert NumPy arrays to PyTorch tensors
    X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
    y_train_tensor = torch.tensor(y_train, dtype=torch.long)  # Use torch.long for labels if using CrossEntropyLoss
    X_valid_tensor = torch.tensor(X_valid, dtype=torch.float32)
    y_valid_tensor = torch.tensor(y_valid, dtype=torch.long)
    X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
    y_test_tensor = torch.tensor(y_test, dtype=torch.long)

    # Create TensorDatasets
    train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
    valid_dataset = TensorDataset(X_valid_tensor, y_valid_tensor)
    test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

    # Create DataLoaders
    batch_size = 256  # You can adjust the batch size as needed
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    # Now your DataLoaders are ready to be used in the training loop

    return train_loader, valid_loader, test_loader


### test models

In [None]:
def test_model(model_name, model_instance, seq_length):

    _, __, test_loader = import_data(seq_length)
    
    # Setting device to GPU if available
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print("Using device:", device)

    model = model_instance
    state_dict = torch.load(f'Models/{model_name}_model_state_dict.pth')
    model.load_state_dict(state_dict)
    model = model.to(device)
    
    model.eval()  # Set the model to evaluation mode

    # Initialize necessary metrics
    correct = 0
    total = 0

    y_pred = []
    y_true = []

    # No need to track gradients for validation, which saves memory and computations
    with torch.no_grad():
        # Wrap your loader with tqdm for a progress bar
        pbar_test = tqdm(enumerate(test_loader), total=len(test_loader), desc=f"Epoch 1/1")
        for i, (images, labels) in pbar_test:
            # Move tensors to the same device as model
            images, labels = images.to(device), labels.to(device)
            
            outputs = model(images)
            
            # Get predictions from the maximum value
            _, predicted = torch.max(outputs.data, 1)
            y_pred.extend(predicted.cpu().numpy())  # Move back to cpu and convert to numpy
            y_true.extend(labels.cpu().numpy())
            
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            # Update progress bar
            # pbar_test.set_postfix({'loss': running_loss / (i + 1)})

    # Calculate accuracy
    accuracy = 100 * correct / total
    print(f'Accuracy of the model on the test images: {accuracy}%')
    
    # Calculating Precision, Recall, F1 Score, and Confusion Matrix
    precision = precision_score(y_true, y_pred, average='weighted')  # or other averaging method
    recall = recall_score(y_true, y_pred, average='weighted')
    f1 = f1_score(y_true, y_pred, average='weighted')
    conf_matrix = confusion_matrix(y_true, y_pred)

    # Printing the metrics
    print(f'Precision: {precision}')
    print(f'Recall: {recall}')
    print(f'F1 Score: {f1}')
    print("Confusion Matrix:\n", conf_matrix)

    # Calculate the confusion matrix
    cm = confusion_matrix(y_true, y_pred)
    cm_normalized = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

    # Plot the normalized confusion matrix
    sns.heatmap(cm_normalized, annot=True, fmt='.2%', cmap='Blues')
    plt.title(f'{model_name} Confusion Matrix')
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.show()

    with open(f'Models/{model_name}_history.json', 'r') as f:
        history = json.load(f)  

    # Extracting values for plotting
    train_loss = history['train_loss']
    val_loss = history['val_loss']
    train_accuracy = history['train_accuracy']
    val_accuracy = history['val_accuracy']
    epochs = range(1, len(train_loss) + 1)  # 1, 2, ... , num_epochs
    last_improvement_epoch = len(train_loss) - 5

    # Creating subplots for loss and accuracy
    fig, ax = plt.subplots(1, 2, figsize=(12, 6))

    # Plotting training and validation loss
    ax[0].plot(epochs, train_loss, 'r', label='Training loss')
    ax[0].plot(epochs, val_loss, 'b', label='Validation loss')
    ax[0].axvline(x=last_improvement_epoch, color='g', linestyle='--', label='Last Improvement') 
    ax[0].set_title('Training and Validation Loss')
    ax[0].set_xlabel('Epochs')
    ax[0].set_ylabel('Loss')
    ax[0].legend()

    # Plotting training and validation accuracy
    ax[1].plot(epochs, train_accuracy, 'r', label='Training accuracy')
    ax[1].plot(epochs, val_accuracy, 'b', label='Validation accuracy')
    ax[1].axvline(x=last_improvement_epoch, color='g', linestyle='--', label='Last Improvement')
    ax[1].set_title('Training and Validation Accuracy')
    ax[1].set_xlabel('Epochs')
    ax[1].set_ylabel('Accuracy')
    ax[1].legend()

    # Show the plots
    plt.tight_layout()
    plt.show()

### tests

In [None]:
seq_lengths = [20,40,60,80]

for seq_length in seq_lengths:
    
    models = {
        'LSTM' : LSTMModel(input_dim=19, hidden_dim=128, output_dim=3),
        'GRU' : GRUModel(input_dim=19, hidden_dim=128, output_dim=3, num_layers=3),
        'Conv1D' : Conv1DModel(num_features=19, output_dim=3, seq_length=seq_length),
        'Transformer' :  TransformerModel(input_dim=19, output_dim=3, seq_length=seq_length, num_classes=3, \
                                        d_model=64, nhead=4, num_encoder_layers=2, dim_feedforward=256, dropout=0.1),
        "Linear" : LinearModel(seq_length=seq_length, input_dim=19, output_dim=3),
    }   
    
    for model_name, model_instance in models.items():
        test_model(model_name, model_instance, seq_length)