# Test Phase

<h2> Load data

In [None]:
import pandas as pd
from google.colab import drive
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import torch
import torch.nn as nn
import logging
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from tqdm import tqdm
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report
import json
import torch.nn.functional as F
import os
from torch.utils.data import Dataset
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


In [None]:
# load google drive to see the files in google drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
directory = '2024-07-03_10-03-02' ### ATTENZIONE!!! ALWAYS CHANGE THIS!!!!###

path = '/content/drive/MyDrive/Progetto-Vascon/DataLog'
save_dir_base = os.path.join(path, directory)
save_dir_bert = os.path.join(path, "bert_sequence_classification_trained")

<h2> GET the BERT embeddings from drive

In [None]:
import pickle
import os
import numpy as np

# Define filenames for each of the embeddings
file_words_test = "embeddings_words_test.pkl"
file_cls_test = "embedding_cls_test.pkl"

# Load the embeddings from the specified directory
with open(os.path.join(path, 'embeddings', file_words_test), "rb") as f:
    embeddings_words_test = pickle.load(f)

with open(os.path.join(path, 'embeddings', file_cls_test), "rb") as f:
    embedding_cls_test = pickle.load(f)

y_test = np.load(os.path.join(path, 'embeddings', 'y_test.npy'))

print("Embeddings loaded successfully.")

Embeddings loaded successfully.


# CLASS Model! ALWAYS CHANGE!
 modificare in base al criterion

In [None]:
class CustomDatasetForCLSToken(Dataset):
    def __init__(self, data, targets, cls_tokens):
        self.data = data
        self.targets = targets
        self.cls_tokens = cls_tokens

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample = self.data[idx]
        target = self.targets[idx]
        cls_token = self.cls_tokens[idx]

        return sample, target, cls_token


custom_test_dataset = CustomDatasetForCLSToken(embeddings_words_test, y_test, embedding_cls_test)

In [None]:
import torch
import torch.nn as nn


class Attention(nn.Module):
    def __init__(self, hidden_dim):
        """
        Initialize the Attention mechanism.
        Args:
            hidden_dim (int): The number of expected features in the input.
        """
        super(Attention, self).__init__()
        self.hidden_dim = hidden_dim
        self.attn = nn.Linear(hidden_dim, 1)
        self.init_weights()

    def init_weights(self):
        """
        Initialize weights for the attention layer.
        """
        nn.init.xavier_uniform_(self.attn.weight)
        if self.attn.bias is not None:
            nn.init.constant_(self.attn.bias, 0)

    def forward(self, rnn_output):
        """
        Forward pass for the attention mechanism.
        Args:
            rnn_output (torch.Tensor): Output from the RNN layer.
        Returns:
            torch.Tensor: Attention weights.
        """
        energy = torch.tanh(self.attn(rnn_output))
        energy = energy.squeeze(-1)
        attention_weights = F.softmax(energy, dim=1)
        return attention_weights

class SentimentClassifierWithSoftAttention(nn.Module):
    def __init__(
        self,
        embedding_dim=768,
        hidden_dim=256,
        output_dim=6,
        n_layers=1,
        bidirectional=True,
        dropout=0.0,
        rnn_type='LSTM',
    ):
        """
        Initialize the SentimentClassifierWithSoftAttention model.
        Args:
            embedding_dim (int): Dimension of the input embeddings.
            hidden_dim (int): Dimension of the hidden layer.
            output_dim (int): Dimension of the output layer.
            n_layers (int): Number of recurrent layers.
            bidirectional (bool): If True, use a bidirectional RNN.
            dropout (float): Dropout probability.
            rnn_type (str): Type of RNN to use ('LSTM' or 'GRU').
        """
        super().__init__()
        self.embedding_dim = embedding_dim
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim
        self.n_layers = n_layers
        self.bidirectional = bidirectional
        self.dropout = dropout
        self.rnn_type = rnn_type

        self.attention = Attention(hidden_dim * 2 if bidirectional else hidden_dim)

        if rnn_type == 'LSTM':
            self.rnn = nn.LSTM(
                embedding_dim,
                hidden_dim,
                num_layers=n_layers,
                bidirectional=bidirectional,
                dropout=dropout if n_layers > 1 else 0,
                batch_first=True,
            )
        elif rnn_type == 'GRU':
            self.rnn = nn.GRU(
                embedding_dim,
                hidden_dim,
                num_layers=n_layers,
                bidirectional=bidirectional,
                dropout=dropout if n_layers > 1 else 0,
                batch_first=True,
            )
        else:
            raise ValueError("Choose a valid RNN type: LSTM or GRU")

        self.fc = nn.Sequential(
            nn.Linear(hidden_dim * 2 if bidirectional else hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, output_dim)
        )

        self.fc_cls = nn.Sequential(
            nn.Linear(hidden_dim * 2 + embedding_dim if bidirectional else hidden_dim + embedding_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, output_dim)
        )

        self.init_weights()

    def init_weights(self):
        """
        Initialize weights for the RNN and fully connected layers.
        """
        for name, param in self.rnn.named_parameters():
            if 'weight' in name:
                nn.init.xavier_uniform_(param.data)
            elif 'bias' in name:
                nn.init.constant_(param.data, 0)

        for layer in self.fc_cls:
            if isinstance(layer, nn.Linear):
                nn.init.xavier_uniform_(layer.weight)
                if layer.bias is not None:
                    nn.init.constant_(layer.bias, 0)

    def forward(self, embedded):
        """
        Forward pass for the model.
        Args:
            embedded (torch.Tensor): Input embeddings.
        Returns:
            torch.Tensor: Output logits.
            torch.Tensor: Attention weights.
        """
        weighted_sum, attention_weights = self.prepare_data(embedded)
        dense_outputs = self.fc(weighted_sum)
        outputs = torch.sigmoid(dense_outputs)
        return outputs, attention_weights  # Return attention weights

    def forward_with_cls(self, embedded, cls_token):
        """
        Forward pass for the model with CLS token.
        Args:
            embedded (torch.Tensor): Input embeddings.
            cls_token (torch.Tensor): CLS token.
        Returns:
            torch.Tensor: Output logits.
            torch.Tensor: Attention weights.
        """
        weighted_sum, attention_weights = self.prepare_data(embedded)
        cls_token = cls_token.squeeze(1)
        weighted_sum_with_cls = torch.cat((weighted_sum, cls_token), dim=1) # concatenate attention weights + cls_token
        dense_outputs = self.fc_cls(weighted_sum_with_cls)
        outputs = torch.sigmoid(dense_outputs)
        return outputs, attention_weights  # Return attention weights

    def prepare_data(self, embedded):
        """
        Prepare data by applying RNN and attention mechanism.
        Args:
            embedded (torch.Tensor): Input embeddings.
        Returns:
            torch.Tensor: Weighted sum of RNN outputs.
            torch.Tensor: Attention weights.
        """
        if len(embedded.shape) != 3:
            raise ValueError("Input shape must be 3D: (batch_size, seq_len, embedding_dim)")
        rnn_output, _ = self.rnn(embedded)
        attention_weights = self.attention(rnn_output)
        attention_weights = attention_weights.unsqueeze(1)
        weighted = torch.bmm(attention_weights, rnn_output)
        weighted_sum = weighted.squeeze(1)
        return weighted_sum, attention_weights.squeeze(1)

    def get(self):
        """
        Get model parameters as a JSON string.
        Returns:
            str: JSON string of model parameters.
        """
        params = {
            'embedding_dim': self.embedding_dim,
            'hidden_dim': self.hidden_dim,
            'output_dim': self.output_dim,
            'n_layers': self.n_layers,
            'bidirectional': self.bidirectional,
            'dropout': self.dropout,
            'rnn_type': self.rnn_type
        }
        return json.dumps(params)


In [None]:
def calculate_metrics(predictions, targets):
    num_classes = predictions.shape[1]
    class_metrics = {i: {'true_positives': 0, 'true_negatives': 0, 'false_positives': 0, 'false_negatives': 0} for i in range(num_classes)}

    for i in range(num_classes):
        for pred, target in zip(predictions[:, i], targets[:, i]):
            if pred == 1 and target == 1:
                class_metrics[i]['true_positives'] += 1
            elif pred == 0 and target == 0:
                class_metrics[i]['true_negatives'] += 1
            elif pred == 1 and target == 0:
                class_metrics[i]['false_positives'] += 1
            elif pred == 0 and target == 1:
                class_metrics[i]['false_negatives'] += 1



    return pd.DataFrame(class_metrics).transpose()

def calculate_accuracy(metrics):
    return (metrics['true_positives'].sum()+metrics['true_negatives'].sum())/metrics.sum().sum()

def calculate_precision(metrics):

    if (metrics['true_positives'].sum() + metrics['false_positives'].sum()) >0 :
      return (metrics['true_positives'].sum())/(metrics['true_positives'].sum() + metrics['false_positives'].sum())
    else: return 0

def calculate_recall(metrics):

    if (metrics['true_positives'].sum() + metrics['false_negatives'].sum()) >0 :
      return (metrics['true_positives'].sum())/(metrics['true_positives'].sum() + metrics['false_negatives'].sum())
    else: return 0

In [None]:
def calculate_hamming_distance(predictions, targets):
    # Ensure predictions and targets are numpy arrays
    import numpy as np
    if not isinstance(predictions, np.ndarray):
        predictions = np.array(predictions)
    if not isinstance(targets, np.ndarray):
        targets = np.array(targets)

    # Check the shape and size of predictions and targets
    assert predictions.shape == targets.shape, "Predictions and targets must have the same shape."

    # Calculate the Hamming distance
    hamming_distance = np.sum(predictions != targets)

    return hamming_distance

In [None]:
def calculate_f1_score(precision, recall):

    # Calculate F1 score using harmonic mean of precision and recall
    if precision + recall == 0:
        return 0  # Avoid division by zero, return zero if both precision and recall are zero

    f1_score = 2 * (precision * recall) / (precision + recall)

    return f1_score

<h2> Parameters

In [None]:
batch_size = 264
thershold = 0.65

test_loader = DataLoader(custom_test_dataset, batch_size=batch_size, shuffle=True)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

<h2> Testing Phase without CLS

In [None]:
import time

# After training, load the best model for test prediction
#best_model = SentimentClassifierWithSoftAttentionWithCLS().to(device)
#best_model.load_state_dict(torch.load('best_model_path'))

def test_without_cls(model_rnn, test_loader, t=0.5):
  model_rnn.eval()

  # Use the best model for prediction on the test set
  test_predictions = []
  target_embeddings = []

  start_time = time.time()
  with torch.no_grad():
      for data, targets, _ in test_loader:
          data, targets = data.to(device), targets.to(device) #, cls_tokens.to(device)
          #cls_tokens = cls_tokens.view(cls_tokens.shape[0], 1, cls_tokens.shape[1])

          outputs, _ = model_rnn.forward(data.to(dtype=torch.float32)) #, cls_tokens.to(dtype=torch.float32))

          # Round values above threshold to 1 and set others to 0
          predictions = torch.where(outputs > t, torch.tensor(1.0), torch.tensor(0.0))

          # predictions = torch.round(torch.sigmoid(outputs))
          test_predictions.append(predictions.cpu())
          target_embeddings.append(targets.cpu())
  end_time = time.time()
  elapsed_time = end_time - start_time

  # Convert test_predictions list to a single numpy array if needed

  predicted_embeddings_np = torch.cat(test_predictions, dim=0).cpu().numpy()
  target_embeddings_np = torch.cat(target_embeddings, dim=0).cpu().numpy()

  metrics = calculate_metrics(predicted_embeddings_np,target_embeddings_np)
  overall_test_accuracy = calculate_accuracy(metrics)
  overall_test_precision = calculate_precision(metrics)
  overall_test_recall = calculate_recall(metrics)
  overall_test_f1_score = calculate_f1_score(overall_test_precision, overall_test_recall)
  overall_test_hamming_distance = calculate_hamming_distance(predicted_embeddings_np,target_embeddings_np)

  print("----")
  print(f"Test Accuracy: {overall_test_accuracy:.4f}")
  print(f"Test Precision: {overall_test_precision:.4f}")
  print(f"Test Recall: {overall_test_recall:.4f}")
  print(f"Test F1-score: {overall_test_f1_score:.4f}")
  print(f"Test Hamming Distance: {overall_test_hamming_distance}")
  print(f"Elapsed time: {elapsed_time:.3f} seconds")
  print(f"Elapsed average time: {(elapsed_time*1000)/(len(test_loader)*test_loader.batch_size):.2f} ms")
  print("")

In [None]:
filename = 'LSTM/best_model_LSTM.pth'
best_model_path_lstm = os.path.join(save_dir_base, filename)

best_model_lstm = SentimentClassifierWithSoftAttention(rnn_type='LSTM').to(device)
best_model_lstm.load_state_dict(torch.load(best_model_path_lstm))
test_without_cls(best_model_lstm, test_loader, t=thershold)

----
Test Accuracy: 0.8849
Test Precision: 0.8610
Test Recall: 0.7345
Test F1-score: 0.7927
Test Hamming Distance: 5494
Elapsed time: 24.751 seconds
Elapsed average time: 3.02 ms



In [None]:
filename = 'GRU/best_model_GRU.pth'
best_model_path_gru = os.path.join(save_dir_base, filename)

best_model_gru = SentimentClassifierWithSoftAttention(rnn_type='GRU').to(device)
best_model_gru.load_state_dict(torch.load(best_model_path_gru))
test_without_cls(best_model_gru, test_loader, t=thershold)

----
Test Accuracy: 0.8703
Test Precision: 0.9172
Test Recall: 0.6236
Test F1-score: 0.7424
Test Hamming Distance: 6189
Elapsed time: 21.618 seconds
Elapsed average time: 2.64 ms



<h2> Testing Phase with CLS

In [None]:
def test_with_cls(model_rnn, test_loader, t=0.5):
  model_rnn.eval()

  # Use the best model for prediction on the test set
  test_predictions = []
  target_embeddings = []

  start_time = time.time()
  with torch.no_grad():
      for data, targets, cls_tokens in test_loader:
          data, targets, cls_tokens = data.to(device), targets.to(device), cls_tokens.to(device)
          cls_tokens = cls_tokens.view(cls_tokens.shape[0], 1, cls_tokens.shape[1])

          outputs, _ = model_rnn.forward_with_cls(data.to(dtype=torch.float32), cls_tokens.to(dtype=torch.float32))
          predictions = torch.where(outputs > t, torch.tensor(1.0), torch.tensor(0.0)) #torch.round(torch.sigmoid(outputs))
          test_predictions.append(predictions.cpu())
          target_embeddings.append(targets.cpu())
  end_time = time.time()
  elapsed_time = end_time - start_time

  # Convert test_predictions list to a single numpy array if needed

  predicted_embeddings_np = torch.cat(test_predictions, dim=0).cpu().numpy()
  target_embeddings_np = torch.cat(target_embeddings, dim=0).cpu().numpy()

  metrics = calculate_metrics(predicted_embeddings_np,target_embeddings_np)
  overall_test_accuracy = calculate_accuracy(metrics)
  overall_test_precision = calculate_precision(metrics)
  overall_test_recall = calculate_recall(metrics)
  overall_test_f1_score = calculate_f1_score(overall_test_precision, overall_test_recall)
  overall_test_hamming_distance = calculate_hamming_distance(predicted_embeddings_np,target_embeddings_np)

  print("----")
  print(f"Test Accuracy: {overall_test_accuracy:.4f}")
  print(f"Test Precision: {overall_test_precision:.4f}")
  print(f"Test Recall: {overall_test_recall:.4f}")
  print(f"Test F1-score: {overall_test_f1_score:.4f}")
  print(f"Test Hamming Distance: {overall_test_hamming_distance}")
  print(f"Elapsed time: {elapsed_time:.3f} seconds")
  print(f"Elapsed average time: {(elapsed_time*1000)/(len(test_loader)*test_loader.batch_size):.2f} ms")
  print("")

In [None]:
filename = 'LSTM_CLS/best_model_LSTM_CLS.pth'
best_model_path_lstm_cls = os.path.join(save_dir_base, filename)


best_model_lstm_cls = SentimentClassifierWithSoftAttention(rnn_type='LSTM').to(device)
best_model_lstm_cls.load_state_dict(torch.load(best_model_path_lstm_cls))
test_with_cls(best_model_lstm_cls, test_loader, t=thershold)

----
Test Accuracy: 0.8855
Test Precision: 0.8984
Test Recall: 0.6968
Test F1-score: 0.7849
Test Hamming Distance: 5463
Elapsed time: 25.370 seconds
Elapsed average time: 3.10 ms



In [None]:
filename = 'GRU_CLS/best_model_GRU_CLS.pth'
best_model_path_gru_cls = os.path.join(save_dir_base, filename)

best_model_gru_cls = SentimentClassifierWithSoftAttention(rnn_type='GRU').to(device)
best_model_gru_cls.load_state_dict(torch.load(best_model_path_gru_cls))
test_with_cls(best_model_gru_cls, test_loader, t=thershold)

----
Test Accuracy: 0.8876
Test Precision: 0.8866
Test Recall: 0.7168
Test F1-score: 0.7927
Test Hamming Distance: 5362
Elapsed time: 21.241 seconds
Elapsed average time: 2.60 ms

