# Import

In [1]:
import os
import random
import re
from collections import Counter
from typing import Tuple, List, Dict, Any

import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import precision_score, recall_score, f1_score
from sklearn.model_selection import train_test_split, KFold
from sklearn.multiclass import OneVsRestClassifier
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.svm import SVC
from torch.optim import AdamW
from torch.utils.data import DataLoader, TensorDataset
from torch.utils.data import Subset, RandomSampler
from tqdm import tqdm
from transformers import RobertaForSequenceClassification, RobertaTokenizer


def warn(*args, **kwargs):
    pass


import warnings

warnings.warn = warn

# Configuration

In [2]:
DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"Running on device {DEVICE}")

RANDOM_SEED = 0
torch.manual_seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)
random.seed(RANDOM_SEED)
torch.backends.cudnn.benchmark = False

LOG_DIR = os.path.join("log")
if not os.path.exists(LOG_DIR):
    os.makedirs(LOG_DIR)

PATH_TO_DATASET = os.path.join("..", "dataset", "cgt")
BERT_MODEL_TYPE = 'microsoft/codebert-base'

MAX_FEATURES = 500
BATCH_SIZE = 2
NUM_FOLDS = 2
NUM_EPOCHS = 1
NUM_LABELS = 20
LR = 0.001
TEST_SIZE = 0.2

FILE_TYPE = "runtime"
FILE_EXT = ".rt.hex"
FILE_ID = "runtime"

Running on device cpu


# Dataset

Create PyTorch dataset feeding either source code, bytecode or runtime to the models.

## Preprocessing

In [3]:
def preprocess_hex(hex_data: str) -> str:
    # Reads a hex file and converts it to a byte string
    byte_data = bytes.fromhex(hex_data.strip())

    # Convert byte data to a readable ASCII string, ignoring non-ASCII characters
    return ' '.join(f'{byte:02x}' for byte in byte_data)


def preprocess_solidity_code(code: str) -> str:
    # Remove single-line comments
    code = re.sub(r'//.*', '', code)

    # Remove multi-line comments
    code = re.sub(r'/\*.*?\*/', '', code, flags=re.DOTALL)

    # Remove blank lines (lines only containing whitespace)
    lines = code.split('\n')
    non_blank_lines = [line for line in lines if line.strip() != '']
    code = '\n'.join(non_blank_lines)

    return code


def preprocess(data: str) -> str:
    return preprocess_solidity_code(data) if FILE_TYPE == "source" else preprocess_hex(data)

## Labels Management

In [4]:
def init_inputs_and_gt(data: pd.DataFrame) -> Tuple:
    """
    Initialize inputs, labels, and groundtruth (gt) from the given data.

    :param data: A pandas DataFrame containing the data to process.
    :return: A tuple containing the list of inputs, labels dictionary, and gt dictionary.
    """
    inputs, labels, gt = {}, {}, {}
    for _, row in tqdm(data.iterrows(), desc="Initializing inputs and groundtruth data"):
        item_id, file_id = row["id"], row["fp_" + FILE_ID]

        # Check if file exists
        path_to_file = os.path.join(PATH_TO_DATASET, FILE_TYPE, str(file_id) + FILE_EXT)
        if os.path.exists(path_to_file):

            # Initialize the documents
            inputs[item_id] = preprocess(open(path_to_file, 'r', encoding="utf8").read())

            # Initialize the label
            labels[item_id] = [0] * NUM_LABELS

            # Initialize the groundtruth
            prop = row["property"].lower()
            if prop not in gt.keys():
                gt[prop] = len(gt.values())

    return list(inputs.values()), labels, gt


def set_labels(data: pd.DataFrame, labels: Dict, gt: Dict) -> List:
    """
    Set up the labels based on the groundtruth (gt) for the given data.

    :param data: A pandas DataFrame containing the data to process.
    :param labels: A dictionary where keys are item IDs and values are lists representing labels.
    :param gt: A dictionary where keys are properties and values are their corresponding indices.
    :return: A list of labels values.
    """
    for _, row in tqdm(data.iterrows(), desc="Setting up the labels"):
        item_id, file_id = row["id"], row["fp_" + FILE_ID]

        # Check if file exists
        path_to_file = os.path.join(PATH_TO_DATASET, FILE_TYPE, str(file_id) + FILE_EXT)
        if os.path.exists(path_to_file):

            # Set label   
            prop = row["property"].lower()
            if row['property_holds'] == 't':
                labels[item_id][gt[prop]] = 1

    return list(labels.values())


## Initialization of the dataset

In [5]:
# Read the dataset from CSV
dataset = pd.read_csv(os.path.join(PATH_TO_DATASET, "consolidated.csv"), sep=";")

# Count the frequency of each item in the column
frequency = dataset['dataset'].value_counts()

# Find the item with the maximum occurrence
most_frequent_item = frequency.idxmax()
most_frequent_count = frequency.max()

print(f"The most frequent item in the column is '{most_frequent_item}' and it appears {most_frequent_count} times.")

# Exclude outliers from the dataset
dataset = dataset[dataset["dataset"] == most_frequent_item]

# Initialize the documents and the groundtruth
INPUTS, LABELS, gt = init_inputs_and_gt(dataset)

# Set the labels for the multilabel classification problem
LABELS = set_labels(dataset, LABELS, gt)

VECTORIZER = TfidfVectorizer(max_features=MAX_FEATURES)

The most frequent item in the column is 'CodeSmells' and it appears 10395 times.


Initializing inputs and groundtruth data: 10395it [00:14, 740.75it/s] 
Setting up the labels: 10395it [00:00, 30013.76it/s]


# Cross Validation

In [6]:
def compute_metrics(true_labels: List[Any], pred_labels: List[Any]) -> Dict[str, float]:
    """
    Compute evaluation metrics for the given true and predicted labels.

    :param true_labels: The ground truth labels.
    :param pred_labels: The predicted labels.
    :return: A dictionary containing precision, recall, and F1 score.
    """
    return {
        "precision": precision_score(true_labels, pred_labels, average='samples', zero_division=0),
        "recall": recall_score(true_labels, pred_labels, average='samples', zero_division=0),
        "f1": f1_score(true_labels, pred_labels, average='samples', zero_division=0)
    }


def save_results(results: List[Dict[str, Any]], filename: str) -> None:
    """
    Save the results to a CSV file.

    :param results: The results to save, typically a list of dictionaries.
    :param filename: The name of the file to save the results to.
    """
    df = pd.DataFrame(results)
    df.to_csv(os.path.join(LOG_DIR, filename), index=False)
    print(f"All fold results saved to '{LOG_DIR}'/'{filename}'")


In [7]:
class Trainer:
    """
    Trainer class for handling the training and evaluation of a model.
    """

    def __init__(self, model: nn.Module):
        """
        Initialize the trainer with model, loss criterion, and optimizer.

        :param model: The neural network model to be trained.
        """
        self._model = model.to(DEVICE)
        self._loss_fn = nn.BCEWithLogitsLoss().to(DEVICE)
        self._optimizer = optim.Adam(model.parameters(), lr=LR)

    def _evaluate_batch(self, batch: Tuple[torch.Tensor, torch.Tensor]) -> Tuple[float, Dict[str, float]]:
        """
        Evaluate a single batch of data.

        :param batch: A tuple containing input data and labels.
        :return: A tuple containing the loss and a dictionary of metrics.
        """
        # Move batch elements to the appropriate device (CPU/GPU)
        batch = tuple(b.to(DEVICE) for b in batch)

        # Prepare the inputs for the model
        inputs, labels = batch

        # Disable gradient computation for evaluation
        with torch.no_grad():
            outputs = self._model(inputs)

            # Compute the loss
            loss = self._loss_fn(outputs, labels)

            # Make predictions and compute batch metrics
            predictions = torch.sigmoid(outputs).round().cpu().numpy()
            batch_metrics = compute_metrics(labels.cpu().numpy(), predictions)

        # Return the loss and metrics
        return loss.item(), batch_metrics

    def _train_batch(self, batch: Tuple[torch.Tensor, torch.Tensor]) -> Tuple[float, Dict[str, float]]:
        """
        Train a single batch of data.

        :param batch: A tuple containing input data and labels.
        :return: A tuple containing the loss and a dictionary of metrics.
        """
        # Prepare inputs for the model
        inputs, labels = batch

        # Zero the parameter gradients
        self._model.zero_grad()

        # Forward pass
        outputs = self._model(inputs)

        # Compute the loss
        loss = self._loss_fn(outputs, labels)

        # Backward pass and optimize
        loss.backward()
        self._optimizer.step()

        # Make predictions and compute metrics
        predictions = torch.sigmoid(outputs).round().detach().cpu().numpy()
        batch_metrics = compute_metrics(labels.detach().cpu().numpy(), predictions)

        return loss.item(), batch_metrics

    def run_epoch(self, dataloader: DataLoader, train_mode: bool = True) -> Tuple[float, Dict[str, float]]:
        """
        Run a single epoch of training or evaluation.

        :param dataloader: DataLoader providing the data for the epoch.
        :param train_mode: Boolean flag indicating whether to train or evaluate.
        :return: A tuple containing the average loss and a dictionary of average metrics.
        """
        # Set the mode for the epoch (Training or Testing)
        phase = 'Training' if train_mode else 'Testing'
        self._model.train() if train_mode else self._model.eval()

        losses, metrics_list = []

        # Iterate over the data loader
        for batch in tqdm(dataloader, desc=phase):
            # Move batch elements to the appropriate device
            batch = tuple(b.to(DEVICE) for b in batch)

            loss, batch_metrics = self._train_batch(batch) if train_mode else self._evaluate_batch(batch)

            # Accumulate the loss and metrics
            losses.append(loss)
            metrics_list.append(batch_metrics)

        # Compute average loss and metrics for the epoch
        avg_loss = np.mean(losses)
        avg_metrics = {metric: np.mean([m[metric] for m in metrics_list]) for metric in metrics_list[0]}

        return avg_loss, avg_metrics


In [8]:
class CrossValidator:
    """
    CrossValidator class for handling k-fold cross-validation of a model.
    """

    def __init__(self, trainer: Trainer, train_data: TensorDataset, test_data: TensorDataset):
        """
        Initialize the CrossValidator with trainer, training data, and test data.

        :param trainer: An instance of the Trainer class.
        :param train_data: The training dataset.
        :param test_data: The test dataset.
        """
        self.__trainer = trainer
        self.__train_data = train_data
        self.__test_data = test_data

    def __train_and_evaluate(self, train_dataloader: DataLoader, test_dataloader: DataLoader) -> None:
        """
        Train and evaluate the model for a specified number of epochs.

        :param train_dataloader: DataLoader for the training data.
        :param test_dataloader: DataLoader for the validation data.
        """
        for epoch in range(NUM_EPOCHS):
            print(f"\n --- Epoch {epoch + 1}/{NUM_EPOCHS} ---")

            # Train the model and print training metrics
            avg_train_loss, avg_train_metrics = self.__trainer.run_epoch(train_dataloader, train_mode=True)
            print(f"\n TRAIN | Loss: {avg_train_loss:.4f} |"
                  f" Precision: {avg_train_metrics['precision']:.4f},"
                  f" Recall: {avg_train_metrics['recall']:.4f},"
                  f" F1: {avg_train_metrics['f1']:.4f}\n")

            # Evaluate the model on the validation set and print validation metrics
            avg_test_loss, avg_test_metrics = self.__trainer.run_epoch(test_dataloader, train_mode=False)
            print(f" VALID | Loss: {avg_test_loss:.4f} |"
                  f" Precision: {avg_test_metrics['precision']:.4f},"
                  f" Recall: {avg_test_metrics['recall']:.4f},"
                  f" F1: {avg_test_metrics['f1']:.4f}\n")

    def __evaluate_on_test_set(self, test_dataloader: DataLoader) -> Dict[str, float]:
        """
        Evaluate the model on the test set.

        :param test_dataloader: DataLoader for the test data.
        :return: A dictionary of test set metrics.
        """
        avg_test_loss, avg_test_metrics = self.__trainer.run_epoch(test_dataloader, train_mode=False)

        # Print test set metrics
        print(f"\nTest Set Evaluation | Loss: {avg_test_loss:.4f} |"
              f" Precision: {avg_test_metrics['precision']:.4f},"
              f" Recall: {avg_test_metrics['recall']:.4f},"
              f" F1: {avg_test_metrics['f1']:.4f}\n")

        return avg_test_metrics

    def k_fold_cv(self, log_id: str = "bert") -> None:
        """
        Perform k-fold cross-validation.

        :param log_id: Identifier for logging purposes, typically the model name.
        """
        kf = KFold(n_splits=NUM_FOLDS, shuffle=True)
        fold_metrics = []

        # Iterate over each fold
        for fold, (train_idx, val_idx) in enumerate(kf.split(self.__train_data)):
            # Create data loaders for training and validation sets
            train_subsampler = Subset(self.__train_data, train_idx)
            val_subsampler = Subset(self.__train_data, val_idx)

            train_loader = DataLoader(
                train_subsampler,
                sampler=RandomSampler(train_subsampler),
                batch_size=BATCH_SIZE
            )
            val_loader = DataLoader(
                val_subsampler,
                batch_size=BATCH_SIZE  # No need for shuffling
            )

            print(f"Starting Fold {fold + 1}/{NUM_FOLDS}")

            # Train and evaluate the model for the current fold
            self.__train_and_evaluate(train_loader, val_loader)

            # Evaluate on the test set after each fold
            metrics = self.__evaluate_on_test_set(DataLoader(self.__test_data, batch_size=BATCH_SIZE, shuffle=False))
            fold_metrics.append(metrics)

        # Calculate average and standard deviation of each metric across all folds
        metric_keys = fold_metrics[0].keys()  # Assuming all metrics dictionaries have the same structure
        average_metrics = {key: np.mean([metric[key] for metric in fold_metrics]) for key in metric_keys}
        std_dev_metrics = {key: np.std([metric[key] for metric in fold_metrics]) for key in metric_keys}

        # Print average metrics and their standard deviations
        print("Average Metrics Over All Folds:")
        for key, value in average_metrics.items():
            print(f"{key}: {value:.4f} (±{std_dev_metrics[key]:.4f})")

        # Save metrics to CSV file
        save_results(fold_metrics, filename=f"{log_id}.csv")


# Models

## BERT

In [9]:
class BERTModelTrainer(Trainer):
    """
    BERTModelTrainer class for handling the training and evaluation of a BERT-based model.
    Inherits from the Trainer class.
    """

    def __init__(self, model: torch.nn.Module):
        """
        Initialize the BERTModelTrainer with model, optimizer, and loss function.

        :param model: The BERT model to be trained.
        """
        super().__init__(model)

        # Initialize the optimizer with model parameters and a learning rate
        self._optimizer = AdamW(self._model.parameters(), lr=LR)

        # Define the loss function for binary classification with logits
        self._loss_fn = nn.BCEWithLogitsLoss()

    def _evaluate_batch(self, batch: Tuple[torch.Tensor, torch.Tensor, torch.Tensor]) -> Tuple[float, Dict[str, float]]:
        """
        Evaluate a single batch of data.

        :param batch: A tuple containing input_ids, attention_mask, and labels.
        :return: A tuple containing the loss and a dictionary of metrics.
        """
        # Prepare the inputs for the model
        inputs = {
            'input_ids': batch[0],
            'attention_mask': batch[1],
            'labels': batch[2]
        }

        # Disable gradient computation for evaluation
        with torch.no_grad():
            outputs = self._model(**inputs)

            # Compute the loss
            loss = self._loss_fn(outputs.logits, inputs['labels'])

            # Make predictions and compute batch metrics
            predictions = torch.sigmoid(outputs.logits).round().cpu().numpy()
            batch_metrics = compute_metrics(batch[2].cpu().numpy(), predictions)

        # Return the loss and metrics
        return loss.item(), batch_metrics

    def _train_batch(self, batch: Tuple[torch.Tensor, torch.Tensor, torch.Tensor]) -> Tuple[float, Dict[str, float]]:
        """
        Train a single batch of data.

        :param batch: A tuple containing input_ids, attention_mask, and labels.
        :return: A tuple containing the loss and a dictionary of metrics.
        """
        # Prepare inputs for the model
        inputs = {'input_ids': batch[0], 'attention_mask': batch[1], 'labels': batch[2]}

        # Zero the parameter gradients
        self._model.zero_grad()

        # Forward pass
        outputs = self._model(**inputs)

        # Compute the loss
        loss = self._loss_fn(outputs.logits, inputs['labels'])

        # Backward pass and optimize
        loss.backward()
        self._optimizer.step()

        # Make predictions and compute metrics
        predictions = torch.sigmoid(outputs.logits).round().detach().cpu().numpy()
        batch_metrics = compute_metrics(batch[2].detach().cpu().numpy(), predictions)

        return loss.item(), batch_metrics


In [10]:
model = RobertaForSequenceClassification.from_pretrained(BERT_MODEL_TYPE, num_labels=20, ignore_mismatched_sizes=True)
model.config.problem_type = "multi_label_classification"
model.to(DEVICE)

tokenizer = RobertaTokenizer.from_pretrained(BERT_MODEL_TYPE, ignore_mismatched_sizes=True)

x, y = tokenizer(
    INPUTS,
    add_special_tokens=True,
    max_length=512,
    return_token_type_ids=False,
    padding="max_length",
    truncation=True,
    return_attention_mask=True,
    return_tensors='pt'
), LABELS

# Split the data into training and test sets
x_train, x_test, y_train, y_test = train_test_split(x['input_ids'], y, test_size=TEST_SIZE)

# Split attention masks for training and test sets
train_masks, test_masks, _, _ = train_test_split(x['attention_mask'], y, test_size=TEST_SIZE)

# Create datasets for training and testing
train_data = TensorDataset(x_train, train_masks, torch.tensor(y_train).float())
test_data = TensorDataset(x_test, test_masks, torch.tensor(y_test).float())
CrossValidator(BERTModelTrainer(model), train_data, test_data).k_fold_cv(log_id="bert")

Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at microsoft/codebert-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Starting Fold 1/2

 --- Epoch 1/1 ---


Training:   0%|          | 0/110 [00:05<?, ?it/s]



 TRAIN | Loss: 0.7066 | Precision: 0.1548, Recall: 0.5000, F1: 0.2361



Testing:   0%|          | 0/110 [00:01<?, ?it/s]


 VALID | Loss: 0.4882 | Precision: 1.0000, Recall: 0.6750, F1: 0.8036



Testing:   0%|          | 0/56 [00:01<?, ?it/s]



Test Set Evaluation | Loss: 0.5146 | Precision: 0.8333, Recall: 0.5500, F1: 0.6607

Starting Fold 2/2

 --- Epoch 1/1 ---


Training:   0%|          | 0/110 [00:06<?, ?it/s]



 TRAIN | Loss: 0.5039 | Precision: 1.0000, Recall: 0.6750, F1: 0.8036



Testing:   0%|          | 0/110 [00:01<?, ?it/s]


 VALID | Loss: 0.3668 | Precision: 0.6667, Recall: 0.5833, F1: 0.6190



Testing:   0%|          | 0/56 [00:01<?, ?it/s]


Test Set Evaluation | Loss: 0.3557 | Precision: 0.8333, Recall: 0.5500, F1: 0.6607

Average Metrics Over All Folds:
precision: 0.8333 (±0.0000)
recall: 0.5500 (±0.0000)
f1: 0.6607 (±0.0000)
All fold results saved to 'bert.csv'





## Feed Forward Neural Network

In [11]:
class FFNNClassifier(nn.Module):
    """
    Simple Neural Network with three fully connected layers.
    """

    def __init__(self):
        """
        Initialize the network layers.
        """
        super(FFNNClassifier, self).__init__()
        self.fc1 = nn.Linear(256, 512)
        self.fc2 = nn.Linear(512, 128)
        self.fc3 = nn.Linear(128, NUM_LABELS)
        self.relu = nn.ReLU()

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Forward pass through the network.

        :param x: Input tensor
        :return: Output tensor
        """
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        x = torch.sigmoid(self.fc3(x))
        return x


In [12]:
model = FFNNClassifier()

x = torch.FloatTensor(VECTORIZER.fit_transform(INPUTS).toarray())
y = torch.FloatTensor(LABELS)

x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=TEST_SIZE, random_state=RANDOM_SEED)
train_data = TensorDataset(x_train, y_train)
test_data = TensorDataset(x_test, y_test)

CrossValidator(Trainer(model), train_data, test_data).k_fold_cv(log_id="ffnn")

Starting Fold 1/2

 --- Epoch 1/1 ---


Training:   0%|          | 0/110 [00:00<?, ?it/s]



 TRAIN | Loss: 0.8886 | Precision: 0.1750, Recall: 1.0000, F1: 0.2971



Testing:   0%|          | 0/110 [00:00<?, ?it/s]


 VALID | Loss: 0.8883 | Precision: 0.1750, Recall: 1.0000, F1: 0.2971



Testing:   0%|          | 0/56 [00:00<?, ?it/s]



Test Set Evaluation | Loss: 0.8629 | Precision: 0.2250, Recall: 1.0000, F1: 0.3667

Starting Fold 2/2

 --- Epoch 1/1 ---


Training:   0%|          | 0/110 [00:00<?, ?it/s]



 TRAIN | Loss: 0.8999 | Precision: 0.1500, Recall: 1.0000, F1: 0.2609



Testing:   0%|          | 0/110 [00:00<?, ?it/s]


 VALID | Loss: 0.8615 | Precision: 0.2250, Recall: 1.0000, F1: 0.3667



Testing:   0%|          | 0/56 [00:00<?, ?it/s]


Test Set Evaluation | Loss: 0.8616 | Precision: 0.2250, Recall: 1.0000, F1: 0.3667

Average Metrics Over All Folds:
precision: 0.2250 (±0.0000)
recall: 1.0000 (±0.0000)
f1: 0.3667 (±0.0000)
All fold results saved to 'ffnn.csv'





## LSTM

In [13]:
class LSTMClassifier(nn.Module):
    """
    LSTM Classifier for text classification.
    """

    def __init__(self, vocab_size: int, embedding_dim: int, hidden_dim: int, pretrained_embeddings: np.ndarray):
        """
        Initialize the LSTM Classifier.

        :param vocab_size: Size of the vocabulary.
        :param embedding_dim: Dimension of the embedding vectors.
        :param hidden_dim: Dimension of the hidden layer.
        :param pretrained_embeddings: Pretrained embeddings to initialize the embedding layer.
        """
        super(LSTMClassifier, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.embedding.weight = nn.Parameter(torch.tensor(pretrained_embeddings, dtype=torch.float32))
        self.embedding.weight.requires_grad = True  # Optionally freeze the embeddings
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, NUM_LABELS)

    def forward(self, text: torch.Tensor) -> torch.Tensor:
        """
        Forward pass through the LSTM Classifier.

        :param text: Input tensor containing text data.
        :return: Output tensor after passing through the LSTM and fully connected layers.
        """
        embedded = self.embedding(text.long())
        packed_output, (hidden, cell) = self.lstm(embedded)
        hidden = hidden.squeeze(0)
        output = self.fc(hidden)
        return torch.sigmoid(output)


def load_glove_embeddings(glove_file: str) -> Dict[str, np.ndarray]:
    """
    Load GloVe embeddings from a file.

    :param glove_file: Path to the GloVe embeddings file.
    :return: Dictionary mapping words to their corresponding embedding vectors.
    """
    embeddings = {}
    with open(glove_file, 'r', encoding='utf-8') as file:
        for line in tqdm(file, desc="Loading GloVe Embeddings"):
            parts = line.split()
            word = parts[0]
            vector = np.array(parts[1:], dtype=np.float32)
            embeddings[word] = vector
    return embeddings


In [14]:
glove_embeddings = load_glove_embeddings(os.path.join("..", "asset", "glove.6B.100d.txt"))

# Tokenization and vocabulary creation
word_count = Counter(word for sentence in INPUTS for word in sentence.lower().split())
vocabulary = {word: i + 1 for i, word in enumerate(word_count)}  # start indexing from 1
vocabulary['<PAD>'] = 0  # Padding value

# Embedding matrix creation
embedding_dim = 100  # Dimensionality of GloVe embeddings used
embedding_matrix = np.zeros((len(vocabulary), embedding_dim))
for word, i in tqdm(vocabulary.items(), desc='Creating Embedding Matrix'):
    embedding_vector = glove_embeddings.get(word)
    if embedding_vector is not None:
        embedding_matrix[i] = embedding_vector

# Convert text to sequence of integers
sequences = [[vocabulary[word] for word in text.lower().split()] for text in INPUTS]

# Finding the longest sequence
max_seq_len = max(len(seq) for seq in sequences)

# Pad sequences
seq_padded = [seq + [vocabulary['<PAD>']] * (max_seq_len - len(seq)) for seq in sequences]

Loading GloVe Embeddings: 400000it [00:08, 48500.56it/s]
Creating Embedding Matrix: 100%|██████████| 257/257 [00:00<00:00, 115398.37it/s]


In [16]:
model = LSTMClassifier(len(vocabulary), embedding_dim, 64, embedding_matrix)

x_tensor = torch.FloatTensor(seq_padded)
y_tensor = torch.FloatTensor(LABELS)

x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=TEST_SIZE, random_state=RANDOM_SEED)
train_data = TensorDataset(x_train, y_train)
test_data = TensorDataset(x_test, y_test)

CrossValidator(Trainer(model), train_data, test_data).k_fold_cv(log_id="lstm")

Starting Fold 1/2

 --- Epoch 1/1 ---


Training:   0%|          | 0/110 [00:00<?, ?it/s]



 TRAIN | Loss: 0.8965 | Precision: 0.1500, Recall: 1.0000, F1: 0.2609



Testing:   0%|          | 0/110 [00:00<?, ?it/s]


 VALID | Loss: 0.8707 | Precision: 0.2000, Recall: 1.0000, F1: 0.3304



Testing:   0%|          | 0/56 [00:00<?, ?it/s]



Test Set Evaluation | Loss: 0.8594 | Precision: 0.2250, Recall: 1.0000, F1: 0.3667

Starting Fold 2/2

 --- Epoch 1/1 ---


Training:   0%|          | 0/110 [00:00<?, ?it/s]



 TRAIN | Loss: 0.8707 | Precision: 0.2000, Recall: 1.0000, F1: 0.3333



Testing:   0%|          | 0/110 [00:00<?, ?it/s]


 VALID | Loss: 0.8706 | Precision: 0.2000, Recall: 1.0000, F1: 0.3333



Testing:   0%|          | 0/56 [00:00<?, ?it/s]



Test Set Evaluation | Loss: 0.8587 | Precision: 0.2250, Recall: 1.0000, F1: 0.3667

Average Metrics Over All Folds:
precision: 0.2250 (±0.0000)
recall: 1.0000 (±0.0000)
f1: 0.3667 (±0.0000)
All fold results saved to 'lstm.csv'


# SVM, Random Forest, Gradient Boosting

In [17]:
class ClassifiersPoolEvaluator:
    """
    ClassifiersPoolEvaluator class for evaluating a pool of classifiers using TF-IDF features and k-fold cross-validation.
    """

    def __init__(self):
        """
        Initialize the ClassifiersPoolEvaluator with TF-IDF vectorizer and a dictionary of classifiers.
        """
        # Create a TF-IDF vectorizer with a maximum number of features defined by MAX_FEATURES
        self.vectorizer = TfidfVectorizer(max_features=MAX_FEATURES)

        # Define a dictionary of classifiers to evaluate
        self.classifiers = {
            "svm": OneVsRestClassifier(SVC(kernel='linear', probability=True)),
            "random_forest": OneVsRestClassifier(RandomForestClassifier(n_estimators=100, random_state=RANDOM_SEED)),
            "gradient_boosting": OneVsRestClassifier(
                GradientBoostingClassifier(n_estimators=100, learning_rate=0.1, max_depth=3))
        }

        # Transform the documents into TF-IDF features
        self.X = self.vectorizer.fit_transform(INPUTS)

        # Transform the labels using MultiLabelBinarizer
        self.mlb = MultiLabelBinarizer()
        self.y = self.mlb.fit_transform(LABELS)

    def __evaluate_fold(self, classifier: OneVsRestClassifier, train_index: List[int], test_index: List[int]) -> Dict[
        str, float]:
        """
        Evaluate a classifier on a single fold of cross-validation.

        :param classifier: The classifier to be evaluated.
        :param train_index: Indices for the training data.
        :param test_index: Indices for the test data.
        :return: A dictionary of computed metrics.
        """
        X_train, X_test = self.X[train_index], self.X[test_index]
        y_train, y_test = self.y[train_index], self.y[test_index]

        # Train the classifier on the training data
        classifier.fit(X_train, y_train)
        # Make predictions on the test data
        predictions = classifier.predict(X_test)

        # Compute metrics using the provided utility function
        return compute_metrics(y_test, predictions)

    def __k_fold_cv(self, classifier: OneVsRestClassifier) -> pd.DataFrame:
        """
        Perform k-fold cross-validation on a given classifier.

        :param classifier: The classifier to be evaluated.
        :return: A DataFrame containing the results of each fold.
        """
        kf = KFold(n_splits=NUM_FOLDS, shuffle=True, random_state=RANDOM_SEED)
        # Evaluate the classifier on each fold and collect the results
        results = [self.__evaluate_fold(classifier, train_index, test_index) for train_index, test_index in
                   kf.split(self.X)]
        # Return the results as a DataFrame
        return pd.DataFrame(results)

    def pool_evaluation(self) -> None:
        """
        Run the evaluation for each classifier defined in self.classifiers.
        """
        # Run the evaluation for each classifier defined in self.classifiers
        for classifier_name, classifier in self.classifiers.items():
            print(f"\nTesting classifier: {classifier_name}")
            # Evaluate the classifier and get the metrics DataFrame
            metrics_df = self.__k_fold_cv(classifier)
            # Save the results using the provided utility function
            save_results(metrics_df, f"{classifier_name}.csv")
            # Print the results
            print(f"Results for {classifier_name}:\n{metrics_df}\n")


In [18]:
evaluator = ClassifiersPoolEvaluator()
evaluator.pool_evaluation()


Testing classifier: svm
All fold results saved to 'svm.csv'
Results for svm:
   precision  recall        f1
0   0.998188     1.0  0.998792
1   1.000000     1.0  1.000000


Testing classifier: random_forest
All fold results saved to 'random_forest.csv'
Results for random_forest:
   precision  recall        f1
0   0.998188     1.0  0.998792
1   1.000000     1.0  1.000000


Testing classifier: gradient_boosting
All fold results saved to 'gradient_boosting.csv'
Results for gradient_boosting:
   precision  recall        f1
0   0.998188     1.0  0.998792
1   1.000000     1.0  1.000000

