<a href="https://colab.research.google.com/github/nataliakoliou/NLP-Various-Implementations/blob/main/Assignment-2/Assignment-2c/nlp_2c.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **NLP-Various Implementations | Text Classification with RNNs**

**Overview:** In this part of the project, I trained several neural network models, including RNNs and LSTMs, with different architectures and hyperparameters to evaluate their performance on some simple classification tasks. For this purpose, I used the AG News Topic Classification and the IMDB movie review12 datasets. Overall, this work provided me with valuable hands-on experience in training neural networks and insight into the factors that affect their performance in text classification tasks. Through experimenting with different architectures and hyperparameters, I gained a deeper understanding of how these models operate and can be optimized to achieve high accuracy levels.

## **1. Import all the necessary modules**

**Briefly:** ```time``` library provides functions for working with time-related tasks, ```torch``` library provides support for deep learning operations using tensors, ```random``` library provides tools for generating random numbers and ```pandas``` library provides data manipulation and analysis tools. Additionaly, ```nn``` module provides support for building neural networks, ```tqdm``` library provides a progress bar to track loops, ```defaultdict``` class provides a way to create a dictionary with default values for nonexistent keys, ```PrettyTable``` library provides a way to display data in a table format, ```functional``` module provides support for functional-style programming with neural networks, ```FiDataLoader``` class provides a way to load data in batches for training neural networks and ```get_tokenizer``` function provides a way to tokenize text. Finally, ```accuracy_score``` function provides a way to calculate the accuracy of a model and ```build_vocab_from_iterator``` function provides a way to build a vocabulary from an iterator of text.

In [None]:
import time
import torch
import random
import pandas as pd
from torch import nn
from tqdm import tqdm
from collections import defaultdict
from prettytable import PrettyTable
from torch.nn import functional as F
from torch.utils.data import DataLoader
from torchtext.data import get_tokenizer
from sklearn.metrics import accuracy_score
from torchtext.vocab import build_vocab_from_iterator

## **2. Define and initialize the models' parameters**

**Initialize the variables and hyperparameters of the classification models:** The set_device function checks if a CUDA-enabled GPU is available and sets the device accordingly. The tokenizer variable is set to tokenize the text data using the "basic_english" tokenizer. Both models and classes are lists that contain the different models and classes used for the classification process, whereas accuracies, parameters, and time_costs are empty lists that will be used to store evaluation metrics during the training process. Finally, the remaining variables are hyperparameters used for this training process.

* The `models` listed in the models list are different types of neural network models that will be used for classification. Specifically, they are variations of recurrent neural networks (RNNs) and long short-term memory (LSTM) networks with different numbers of layers and types of connections between layers.

* The `classes` list specifies the different categories or classes that the classification model will be trained to predict. In this case, the four classes are *World*, *Sports*, *Business*, and *Sci/Tech*, which suggest that the model is being trained to classify news articles or text documents into these four broad categories.

* `MAX_WORDS = 25` sets a maximum limit for the number of words allowed in a text sample. This means that if a text sample contains more than 25 words, it will be truncated to 25 words before being fed into the classification model.

In [None]:
def set_device(primary, secondary):
    return torch.device(primary if torch.cuda.is_available() else secondary) # device used to perform the computations for the machine learning model

device = set_device("cuda","cpu")
tokenizer = get_tokenizer("basic_english")
models = ["1Uni-RNN", "1Bi-RNN", "2Bi-RNN", "1Uni-LSTM", "1Bi-LSTM", "2Bi-LSTM"]; classes = ["World", "Sports", "Business", "Sci/Tech"]; accuracies = []; parameters = []; time_costs = []
MIN_FREQ = 10 ; MAX_WORDS = 25; EPOCHS = 15; LEARNING_RATE = 1e-3; BATCH_SIZE = 1024; EMBEDDING_DIM = 100; HIDDEN_DIM = 64; PADDED = "<PAD>"; UNKNOWN = "<UNK>"

## **3. Load and preprocess the training and testing datasets**

**Create the training and testing datasets:** The load_dataset() function is used to load and preprocess a CSV file containing text data. It reads the CSV file using pandas, shuffles the rows (except the first one), and selects a subset of the data based on the given percent and mode arguments. It then combines the selected features into a single text column and returns a list of tuples, where each tuple contains the label and text data for each row of the dataset. The function is called twice to create train_dataset and test_dataset, which are used for training and testing a machine learning model.

* `data.iloc[:1]` selects only the first row of the data, which typically contains column names that correspond to our models' classes. By selecting only the data rows for shuffling, we ensure that the column names remain in the first row (for later use) and are not affected by the shuffling process.

* If `mode` is set to *start*, the first percent % of rows are selected, whereas if `mode` is set to *end*, the last percent % of rows are selected. The code calculates the starting and ending indexes based on the percent value and the total length of the dataset using integer division and multiplication.

In [None]:
def load_dataset(path, features, label, percent, mode):
    data = pd.read_csv(path)
    data = pd.concat([data.iloc[:1], data.iloc[1:].sample(frac=1)], ignore_index=True)  # shuffle all rows except the first one
    if mode == 'start':
        end_index = int(len(data) * (percent / 100))
        data = data.iloc[:end_index]
    elif mode == 'end':
        start_index = int(len(data) * ((100 - percent) / 100))
        data = pd.concat([data.iloc[0:0], data.iloc[start_index:]], ignore_index=True)
    text = data[features].astype(str).agg(' '.join, axis=1)
    return [(data[label][i], text[i]) for i in range(len(data))]

train_dataset, test_dataset = load_dataset("train.csv", ["Title","Description"], "Class Index", 100, "start"), load_dataset("test.csv", ["Title","Description"], "Class Index", 100, "start")

## **4. Build PyTorch DataLoaders for efficient model training and testing**

**Generate PyTorch DataLoaders for the classification process:** The generate_loader() function takes in a dataset, a maximum number of words, a batch size, and a shuffle flag, and returns a PyTorch DataLoader object with the specified parameters. The collate_batch() function is used as a custom collate function for the DataLoader, and it preprocesses the input data by tokenizing the text, padding the sequences with <PAD> tokens or truncating the sequences to a maximum length of max_words, and converting the data into PyTorch tensors. The DataLoader is then split into train_loader and test_loader, with train_loader being shuffled for better model training and test_loader being unshuffled for model evaluation.

In [None]:
def generate_loader(dataset, max_words, batch_size, shuffle):
    return DataLoader(dataset, batch_size=batch_size, shuffle=shuffle, collate_fn=lambda b: collate_batch(b, max_words))

def collate_batch(batch, max_words):
    Y, X = list(zip(*batch))
    Y = torch.tensor(Y) - 1  # target names in range [0,1,2,3] instead of [1,2,3,4]
    X = [vocab(tokenizer(text)) for text in X] # type(X): list of lists
    X = [tokens+([vocab['<PAD>']]* (max_words-len(tokens))) if len(tokens)<max_words else tokens[:max_words] for tokens in X]  # brings all samples to MAX_WORDS length - shorter texts are padded with <PAD> sequences, longer texts are truncated
    return torch.tensor(X, dtype=torch.int32).to(device), Y.to(device)

train_loader, test_loader = generate_loader(train_dataset, MAX_WORDS, BATCH_SIZE, True), generate_loader(test_dataset, MAX_WORDS, BATCH_SIZE, False)

In [None]:
def build_vocab(datasets, min_freq, padded, unknown):
    vocab = build_vocab_from_iterator(tokenize(datasets), min_freq=min_freq, specials=[padded, unknown])
    vocab.set_default_index(vocab[unknown])
    return vocab

def tokenize(datasets):
    for dataset in datasets:
        for _, text in dataset:
            yield tokenizer(text)

vocab = build_vocab([train_dataset, test_dataset], MIN_FREQ, PADDED, UNKNOWN)

In [None]:
def setup_model(device, model, classes, vocab, embedding_dim, hidden_dim, num_layers, bidirectional, learning_rate, embeddings, freeze):
    classifier = model(len(vocab), embedding_dim, hidden_dim, num_layers, bidirectional, len(classes), embeddings, freeze).to(device)
    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam([param for param in classifier.parameters() if param.requires_grad == True],lr=learning_rate)
    return classifier, loss_fn, optimizer
  
class RNN_model(nn.Module):
    def __init__(self, input_dim, embedding_dim, hidden_dim, num_layers, bidirectional, output_dim, none, freeze):
        super(RNN_model, self).__init__()
        self.embedding_layer = nn.Embedding(num_embeddings=input_dim, embedding_dim=embedding_dim)
        self.rnn = nn.RNN(input_size=embedding_dim, hidden_size=hidden_dim, num_layers=num_layers, bidirectional=bidirectional, batch_first=True)
        self.hidden_size = hidden_dim * get_directions(bidirectional)
        self.linear = nn.Linear(hidden_dim * get_directions(bidirectional), output_dim)
    def forward(self, X_batch):
        embeddings = self.embedding_layer(X_batch)
        output, hidden = self.rnn(embeddings)
        output_concat = torch.cat([output[:, :, :self.hidden_size], output[:, :, self.hidden_size:]], dim=2) # concatenates outputs
        logits = self.linear(output_concat[:, -1, :]) # the last output of the concatenated RNN is used for sequence classification
        probs = F.softmax(logits, dim=1)
        return probs

def get_directions(bidirectional):
    return 2 if bidirectional else 1

# CASE.A) [model: RNN, num_layers: 1, bidirectional: False]
classifier, loss_fn, optimizer = setup_model(device, RNN_model, classes, vocab, EMBEDDING_DIM, HIDDEN_DIM, 1, False, LEARNING_RATE, None, None)

In [None]:
def train_model(classifier, loss_fn, optimizer, train_loader, epochs):
    times = []
    for i in range(1, epochs+1):
        classifier.train()
        print('Epoch:',i)
        losses = []
        start_time = time.time()
        for X, Y in tqdm(train_loader):
            Y_preds = classifier(X)
            loss = loss_fn(Y_preds, Y)
            losses.append(loss.item())
            optimizer.zero_grad(); loss.backward(); optimizer.step()
        epoch_time = time.time() - start_time
        times.append(epoch_time)
        print("Train Loss : {:.3f}".format(torch.tensor(losses).mean()))
    return sum(times)/len(times)

time_cost = train_model(classifier, loss_fn, optimizer, train_loader, EPOCHS)

In [None]:
def evaluate_model(classifier, loss_fn, test_loader, test_data):
    classifier.eval()
    with torch.no_grad():  # during evaluation we don't update the model's parameters
        Y_actual, Y_preds, losses = [],[],[]
        for X, Y in test_loader:
            preds = classifier(X)
            loss = loss_fn(preds, Y)
            losses.append(loss.item())
            Y_actual.append(Y)
            Y_preds.append(preds.argmax(dim=-1))
        Y_actual, Y_preds = torch.cat(Y_actual), torch.cat(Y_preds)
        misclass_data = detect_misclassification(test_data,Y_preds.detach().cpu().numpy())
    return torch.tensor(losses).mean(), Y_actual.detach().cpu().numpy(), Y_preds.detach().cpu().numpy(), misclass_data  # returns mean loss, actual labels and predicted labels

def detect_misclassification(test_data, y_pred):
    misclass_data = defaultdict(list)
    for i in range(len(test_data["labels"])):
        true_label = test_data["labels"][i]
        predicted_label = y_pred[i]
        if true_label != predicted_label:
            text = test_data["features"][i]
            misclass_data[true_label].append((text, predicted_label))
    return misclass_data

def to_dict(tuples_list):
    return {'features': [d[1] for d in tuples_list], 'labels': [d[0] for d in tuples_list]}

_, Y_actual, Y_preds, misclass_data_1UniRNN = evaluate_model(classifier, loss_fn, test_loader, to_dict(test_dataset))

In [None]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

accuracies.append(accuracy_score(Y_actual, Y_preds))
parameters.append(count_parameters(classifier))
time_costs.append(time_cost)

In [None]:
# CASE.B) [model: RNN, num_layers: 1, bidirectional: True]
classifier, loss_fn, optimizer = setup_model(device, RNN_model, classes, vocab, EMBEDDING_DIM, HIDDEN_DIM, 1, True, LEARNING_RATE, None, None)
time_cost = train_model(classifier, loss_fn, optimizer, train_loader, EPOCHS)
_, Y_actual, Y_preds, misclass_data_1BiRNN = evaluate_model(classifier, loss_fn, test_loader, to_dict(test_dataset))
accuracies.append(accuracy_score(Y_actual, Y_preds))
parameters.append(count_parameters(classifier))
time_costs.append(time_cost)

In [None]:
# CASE.C) [model: RNN, num_layers: 2, bidirectional: True]
classifier, loss_fn, optimizer = setup_model(device, RNN_model, classes, vocab, EMBEDDING_DIM, HIDDEN_DIM, 2, True, LEARNING_RATE, None, None)
time_cost = train_model(classifier, loss_fn, optimizer, train_loader, EPOCHS)
_, Y_actual, Y_preds, misclass_data_2BiRNN = evaluate_model(classifier, loss_fn, test_loader, to_dict(test_dataset))
accuracies.append(accuracy_score(Y_actual, Y_preds))
parameters.append(count_parameters(classifier))
time_costs.append(time_cost)

In [None]:
class LSTM_model(nn.Module):
    def __init__(self, input_dim, embedding_dim, hidden_dim, num_layers, bidirectional, output_dim, none, freeze):
        super(LSTM_model, self).__init__()
        self.embedding_layer = nn.Embedding(num_embeddings=input_dim, embedding_dim=embedding_dim)
        self.lstm = nn.LSTM(input_size=embedding_dim, hidden_size=hidden_dim, num_layers=num_layers, bidirectional=bidirectional, batch_first=True)
        self.hidden_size = hidden_dim * get_directions(bidirectional)
        self.linear = nn.Linear(hidden_dim * get_directions(bidirectional), output_dim)
    def forward(self, X_batch):
        embeddings = self.embedding_layer(X_batch)
        output, (hidden, cell) = self.lstm(embeddings)
        output_concat = torch.cat([output[:, :, :self.hidden_size], output[:, :, self.hidden_size:]], dim=2) # concatenates outputs
        logits = self.linear(output_concat[:, -1, :]) # the last output of the concatenated LSTM is used for sequence classification
        probs = F.softmax(logits, dim=1)
        return probs

# CASE.D) [model: LSTM, num_layers: 1, bidirectional: False]
classifier, loss_fn, optimizer = setup_model(device, LSTM_model, classes, vocab, EMBEDDING_DIM, HIDDEN_DIM, 1, False, LEARNING_RATE, None, None)
time_cost = train_model(classifier, loss_fn, optimizer, train_loader, EPOCHS)
_, Y_actual, Y_preds, misclass_data_1UniLSTM = evaluate_model(classifier, loss_fn, test_loader, to_dict(test_dataset))
accuracies.append(accuracy_score(Y_actual, Y_preds))
parameters.append(count_parameters(classifier))
time_costs.append(time_cost)

In [None]:
# CASE.E) [model: LSTM, num_layers: 1, bidirectional: True]
classifier, loss_fn, optimizer = setup_model(device, LSTM_model, classes, vocab, EMBEDDING_DIM, HIDDEN_DIM, 1, True, LEARNING_RATE, None, None)
time_cost = train_model(classifier, loss_fn, optimizer, train_loader, EPOCHS)
_, Y_actual, Y_preds, misclass_data_1BiLSTM = evaluate_model(classifier, loss_fn, test_loader, to_dict(test_dataset))
accuracies.append(accuracy_score(Y_actual, Y_preds))
parameters.append(count_parameters(classifier))
time_costs.append(time_cost)

In [None]:
# CASE.F) [model: LSTM, num_layers: 2, bidirectional: True]
classifier, loss_fn, optimizer = setup_model(device, LSTM_model, classes, vocab, EMBEDDING_DIM, HIDDEN_DIM, 2, True, LEARNING_RATE, None, None)
time_cost = train_model(classifier, loss_fn, optimizer, train_loader, EPOCHS)
_, Y_actual, Y_preds, misclass_data_2BiLSTM = evaluate_model(classifier, loss_fn, test_loader, to_dict(test_dataset))
accuracies.append(accuracy_score(Y_actual, Y_preds))
parameters.append(count_parameters(classifier))
time_costs.append(time_cost)

In [None]:
def visualize(models, accuracies, parameters, time_costs):
    pt = PrettyTable(field_names=[f"\033[1m{field}\033[0m" for field in ["Model", "Accuracy", "Parameters", "Time Cost"]])
    [pt.add_row([model, accuracies[i], parameters[i], time_costs[i]]) for i, model in enumerate(models)]
    print(pt)

visualize(models, accuracies, parameters, time_costs)
#########################################################################################################################################################

In [None]:
def analyze_results(models, misclassified):
    common_misclass_data = defaultdict(list)
    for true_label in misclassified[0].keys():
        for text, label in misclassified[0][true_label]:
            labels = [label] + [next((p for t, p in model[true_label] if t == text), '') for model in misclassified[1:]]
            common_misclass_data[true_label].append((text, labels)) if all(labels) else None
    count_times(common_misclass_data)
    get_top_pair(common_misclass_data)
    get_random_text(models, common_misclass_data)

def count_times(common_misclass_data):
    misclass_counts = {true_label: len(misclass_tuple) for true_label, misclass_tuple in common_misclass_data.items()}
    pt = PrettyTable(field_names=[f"\033[1m{field}\033[0m" for field in ["True Label", "Misclassification Times"]])
    [pt.add_row([true_label, count]) for true_label, count in misclass_counts.items()]
    print(pt)

def get_top_pair(common_misclass_data):
    misclass_freqs = defaultdict(int)
    for true_label, values in common_misclass_data.items():
        for text, pred_labels in values:
            for pl in pred_labels:
                misclass_freqs[(true_label, pl)] += 1
    max_tuple, max_count = max(misclass_freqs.items(), key=lambda x: x[1])
    print("\033[1m" + "Most common Misclassification Pair:" + "\033[0m")
    pt = PrettyTable(field_names=[f"\033[1m{field}\033[0m" for field in ["True Label", "Predicted Label", "Frequency"]])
    pt.add_row([max_tuple[0], max_tuple[1], max_count])
    print(pt)

def get_random_text(models, common_misclass_data):
    rand_true_label = random.choice(list(common_misclass_data.keys()))
    rand_misclass_tuple = random.choice(common_misclass_data[rand_true_label])
    print("\033[1m" + "Random Text: " + "\033[0m" + rand_misclass_tuple[0] + "\033[1m" + "\nTrue Label: " + "\033[0m" + str(rand_true_label))
    pt = PrettyTable(field_names=[f"\033[1m{field}\033[0m" for field in ["Model", "Prediction"]])
    [pt.add_row([model, rand_misclass_tuple[1][idx]]) for idx, model in enumerate(models)]
    print(pt)

analyze_results(models, [misclass_data_1UniRNN, misclass_data_1BiRNN, misclass_data_2BiRNN, misclass_data_1UniLSTM, misclass_data_1BiLSTM, misclass_data_2BiLSTM])
#########################################################################################################################################################

In [None]:
MAX_WORDS = 50
train_loader, test_loader = generate_loader(train_dataset, MAX_WORDS, BATCH_SIZE, True), generate_loader(test_dataset, MAX_WORDS, BATCH_SIZE, False)

In [None]:
# CASE.A) [model: RNN, num_layers: 1, bidirectional: False]
classifier, loss_fn, optimizer = setup_model(device, RNN_model, classes, vocab, EMBEDDING_DIM, HIDDEN_DIM, 1, False, LEARNING_RATE, None, None)
time_cost = train_model(classifier, loss_fn, optimizer, train_loader, EPOCHS)
_, Y_actual, Y_preds, misclass_data_1UniRNN = evaluate_model(classifier, loss_fn, test_loader, to_dict(test_dataset))
accuracies.append(accuracy_score(Y_actual, Y_preds))
parameters.append(count_parameters(classifier))
time_costs.append(time_cost)

In [None]:
# CASE.B) [model: RNN, num_layers: 1, bidirectional: True]
classifier, loss_fn, optimizer = setup_model(device, RNN_model, classes, vocab, EMBEDDING_DIM, HIDDEN_DIM, 1, True, LEARNING_RATE, None, None)
time_cost = train_model(classifier, loss_fn, optimizer, train_loader, EPOCHS)
_, Y_actual, Y_preds, misclass_data_1BiRNN = evaluate_model(classifier, loss_fn, test_loader, to_dict(test_dataset))
accuracies.append(accuracy_score(Y_actual, Y_preds))
parameters.append(count_parameters(classifier))
time_costs.append(time_cost)

In [None]:
# CASE.C) [model: RNN, num_layers: 2, bidirectional: True]
classifier, loss_fn, optimizer = setup_model(device, RNN_model, classes, vocab, EMBEDDING_DIM, HIDDEN_DIM, 2, True, LEARNING_RATE, None, None)
time_cost = train_model(classifier, loss_fn, optimizer, train_loader, EPOCHS)
_, Y_actual, Y_preds, misclass_data_2BiRNN = evaluate_model(classifier, loss_fn, test_loader, to_dict(test_dataset))
accuracies.append(accuracy_score(Y_actual, Y_preds))
parameters.append(count_parameters(classifier))
time_costs.append(time_cost)

In [None]:
# CASE.D) [model: LSTM, num_layers: 1, bidirectional: False]
classifier, loss_fn, optimizer = setup_model(device, LSTM_model, classes, vocab, EMBEDDING_DIM, HIDDEN_DIM, 1, False, LEARNING_RATE, None, None)
time_cost = train_model(classifier, loss_fn, optimizer, train_loader, EPOCHS)
_, Y_actual, Y_preds, misclass_data_1UniLSTM = evaluate_model(classifier, loss_fn, test_loader, to_dict(test_dataset))
accuracies.append(accuracy_score(Y_actual, Y_preds))
parameters.append(count_parameters(classifier))
time_costs.append(time_cost)

In [None]:
# CASE.E) [model: LSTM, num_layers: 1, bidirectional: True]
classifier, loss_fn, optimizer = setup_model(device, LSTM_model, classes, vocab, EMBEDDING_DIM, HIDDEN_DIM, 1, True, LEARNING_RATE, None, None)
time_cost = train_model(classifier, loss_fn, optimizer, train_loader, EPOCHS)
_, Y_actual, Y_preds, misclass_data_1BiLSTM = evaluate_model(classifier, loss_fn, test_loader, to_dict(test_dataset))
accuracies.append(accuracy_score(Y_actual, Y_preds))
parameters.append(count_parameters(classifier))
time_costs.append(time_cost)

In [None]:
# CASE.F) [model: LSTM, num_layers: 2, bidirectional: True]
classifier, loss_fn, optimizer = setup_model(device, LSTM_model, classes, vocab, EMBEDDING_DIM, HIDDEN_DIM, 2, True, LEARNING_RATE, None, None)
time_cost = train_model(classifier, loss_fn, optimizer, train_loader, EPOCHS)
_, Y_actual, Y_preds, misclass_data_2BiLSTM = evaluate_model(classifier, loss_fn, test_loader, to_dict(test_dataset))
accuracies.append(accuracy_score(Y_actual, Y_preds))
parameters.append(count_parameters(classifier))
time_costs.append(time_cost)

In [None]:
visualize(models, accuracies, parameters, time_costs)
#########################################################################################################################################################

In [None]:
MAX_WORDS = 25
train_loader, test_loader = generate_loader(train_dataset, MAX_WORDS, BATCH_SIZE, True), generate_loader(test_dataset, MAX_WORDS, BATCH_SIZE, False)
models = ["1Uni-preRNN", "1Bi-preRNN", "2Bi-preRNN", "1Uni-preLSTM", "1Bi-preLSTM", "2Bi-preLSTM"]; accuracies = []; parameters = []; time_costs = []

In [None]:
def load_embeddings(path, vocab, dimension):
    with open(path, 'r', encoding='utf-8') as f:
        lines = f.readlines()
    embeddings = torch.zeros(len(vocab), dimension)
    for line in lines:
        word, vec = line.strip().split(' ', 1)
        if word in vocab:
            embeddings[vocab[word]] = torch.tensor([float(x) for x in vec.split()])
    return embeddings

embeddings = load_embeddings("glove.6B.100d.txt", vocab, EMBEDDING_DIM)

In [None]:
class pretrained_RNN_model(nn.Module):
    def __init__(self, input_dim, embedding_dim, hidden_dim, num_layers, bidirectional, output_dim, embeddings, freeze):
        super(pretrained_RNN_model, self).__init__()
        self.embedding_layer = nn.Embedding(num_embeddings=input_dim, embedding_dim=embedding_dim)
        self.embedding_layer.weight.data.copy_(embeddings)
        self.embedding_layer.weight.requires_grad = freeze  # freezes the weights of the embedding layer
        self.rnn = nn.RNN(input_size=embedding_dim, hidden_size=hidden_dim, num_layers=num_layers, bidirectional=bidirectional, batch_first=True)
        self.hidden_size = hidden_dim * get_directions(bidirectional)
        self.linear = nn.Linear(hidden_dim * get_directions(bidirectional), output_dim)
    def forward(self, X_batch):
        embeddings = self.embedding_layer(X_batch)
        output, hidden = self.rnn(embeddings)
        output_concat = torch.cat([output[:, :, :self.hidden_size], output[:, :, self.hidden_size:]], dim=2) # concatenates outputs
        logits = self.linear(output_concat[:, -1, :]) # the last output of the concatenated RNN is used for sequence classification
        probs = F.softmax(logits, dim=1)
        return probs

# CASE.A) [model: RNN, num_layers: 1, bidirectional: False]
classifier, loss_fn, optimizer = setup_model(device, pretrained_RNN_model, classes, vocab, EMBEDDING_DIM, HIDDEN_DIM, 1, False, LEARNING_RATE, embeddings, False)
time_cost = train_model(classifier, loss_fn, optimizer, train_loader, EPOCHS)
_, Y_actual, Y_preds, misclass_data_1UniRNN = evaluate_model(classifier, loss_fn, test_loader, to_dict(test_dataset))
accuracies.append(accuracy_score(Y_actual, Y_preds))
parameters.append(count_parameters(classifier))
time_costs.append(time_cost)

In [None]:
# CASE.B) [model: RNN, num_layers: 1, bidirectional: True]
classifier, loss_fn, optimizer = setup_model(device, pretrained_RNN_model, classes, vocab, EMBEDDING_DIM, HIDDEN_DIM, 1, True, LEARNING_RATE, embeddings, False)
time_cost = train_model(classifier, loss_fn, optimizer, train_loader, EPOCHS)
_, Y_actual, Y_preds, misclass_data_1BiRNN = evaluate_model(classifier, loss_fn, test_loader, to_dict(test_dataset))
accuracies.append(accuracy_score(Y_actual, Y_preds))
parameters.append(count_parameters(classifier))
time_costs.append(time_cost)

In [None]:
# CASE.C) [model: RNN, num_layers: 2, bidirectional: True]
classifier, loss_fn, optimizer = setup_model(device, pretrained_RNN_model, classes, vocab, EMBEDDING_DIM, HIDDEN_DIM, 2, True, LEARNING_RATE, embeddings, False)
time_cost = train_model(classifier, loss_fn, optimizer, train_loader, EPOCHS)
_, Y_actual, Y_preds, misclass_data_2BiRNN = evaluate_model(classifier, loss_fn, test_loader, to_dict(test_dataset))
accuracies.append(accuracy_score(Y_actual, Y_preds))
parameters.append(count_parameters(classifier))
time_costs.append(time_cost)

In [None]:
class pretrained_LSTM_model(nn.Module):
    def __init__(self, input_dim, embedding_dim, hidden_dim, num_layers, bidirectional, output_dim, embeddings, freeze):
        super(pretrained_LSTM_model, self).__init__()
        self.embedding_layer = nn.Embedding(num_embeddings=input_dim, embedding_dim=embedding_dim)
        self.embedding_layer.weight.data.copy_(embeddings)
        self.embedding_layer.weight.requires_grad = freeze  # freezes the weights of the embedding layer
        self.lstm = nn.LSTM(input_size=embedding_dim, hidden_size=hidden_dim, num_layers=num_layers, bidirectional=bidirectional, batch_first=True)
        self.hidden_size = hidden_dim * get_directions(bidirectional)
        self.linear = nn.Linear(hidden_dim * get_directions(bidirectional), output_dim)
    def forward(self, X_batch):
        embeddings = self.embedding_layer(X_batch)
        output, (hidden, cell) = self.lstm(embeddings)
        output_concat = torch.cat([output[:, :, :self.hidden_size], output[:, :, self.hidden_size:]], dim=2) # concatenates outputs
        logits = self.linear(output_concat[:, -1, :]) # the last output of the concatenated LSTM is used for sequence classification
        probs = F.softmax(logits, dim=1)
        return probs

# CASE.D) [model: LSTM, num_layers: 1, bidirectional: False]
classifier, loss_fn, optimizer = setup_model(device, pretrained_LSTM_model, classes, vocab, EMBEDDING_DIM, HIDDEN_DIM, 1, False, LEARNING_RATE, embeddings, False)
time_cost = train_model(classifier, loss_fn, optimizer, train_loader, EPOCHS)
_, Y_actual, Y_preds, misclass_data_1UniLSTM = evaluate_model(classifier, loss_fn, test_loader, to_dict(test_dataset))
accuracies.append(accuracy_score(Y_actual, Y_preds))
parameters.append(count_parameters(classifier))
time_costs.append(time_cost)

In [None]:
# CASE.E) [model: LSTM, num_layers: 1, bidirectional: True]
classifier, loss_fn, optimizer = setup_model(device, pretrained_LSTM_model, classes, vocab, EMBEDDING_DIM, HIDDEN_DIM, 1, True, LEARNING_RATE, embeddings, False)
time_cost = train_model(classifier, loss_fn, optimizer, train_loader, EPOCHS)
_, Y_actual, Y_preds, misclass_data_1BiLSTM = evaluate_model(classifier, loss_fn, test_loader, to_dict(test_dataset))
accuracies.append(accuracy_score(Y_actual, Y_preds))
parameters.append(count_parameters(classifier))
time_costs.append(time_cost)

In [None]:
# CASE.F) [model: LSTM, num_layers: 2, bidirectional: True]
classifier, loss_fn, optimizer = setup_model(device, pretrained_LSTM_model, classes, vocab, EMBEDDING_DIM, HIDDEN_DIM, 2, True, LEARNING_RATE, embeddings, False)
time_cost = train_model(classifier, loss_fn, optimizer, train_loader, EPOCHS)
_, Y_actual, Y_preds, misclass_data_2BiLSTM = evaluate_model(classifier, loss_fn, test_loader, to_dict(test_dataset))
accuracies.append(accuracy_score(Y_actual, Y_preds))
parameters.append(count_parameters(classifier))
time_costs.append(time_cost)

In [None]:
visualize(models, accuracies, parameters, time_costs)
#########################################################################################################################################################

In [None]:
accuracies = []; parameters = []; time_costs = []

In [None]:
# CASE.A) [model: RNN, num_layers: 1, bidirectional: False]
classifier, loss_fn, optimizer = setup_model(device, pretrained_RNN_model, classes, vocab, EMBEDDING_DIM, HIDDEN_DIM, 1, False, LEARNING_RATE, embeddings, True)
time_cost = train_model(classifier, loss_fn, optimizer, train_loader, EPOCHS)
_, Y_actual, Y_preds, misclass_data_1UniRNN = evaluate_model(classifier, loss_fn, test_loader, to_dict(test_dataset))
accuracies.append(accuracy_score(Y_actual, Y_preds))
parameters.append(count_parameters(classifier))
time_costs.append(time_cost)

In [None]:
# CASE.B) [model: RNN, num_layers: 1, bidirectional: True]
classifier, loss_fn, optimizer = setup_model(device, pretrained_RNN_model, classes, vocab, EMBEDDING_DIM, HIDDEN_DIM, 1, True, LEARNING_RATE, embeddings, True)
time_cost = train_model(classifier, loss_fn, optimizer, train_loader, EPOCHS)
_, Y_actual, Y_preds, misclass_data_1BiRNN = evaluate_model(classifier, loss_fn, test_loader, to_dict(test_dataset))
accuracies.append(accuracy_score(Y_actual, Y_preds))
parameters.append(count_parameters(classifier))
time_costs.append(time_cost)

In [None]:
# CASE.C) [model: RNN, num_layers: 2, bidirectional: True]
classifier, loss_fn, optimizer = setup_model(device, pretrained_RNN_model, classes, vocab, EMBEDDING_DIM, HIDDEN_DIM, 2, True, LEARNING_RATE, embeddings, True)
time_cost = train_model(classifier, loss_fn, optimizer, train_loader, EPOCHS)
_, Y_actual, Y_preds, misclass_data_2BiRNN = evaluate_model(classifier, loss_fn, test_loader, to_dict(test_dataset))
accuracies.append(accuracy_score(Y_actual, Y_preds))
parameters.append(count_parameters(classifier))
time_costs.append(time_cost)

In [None]:
# CASE.D) [model: LSTM, num_layers: 1, bidirectional: False]
classifier, loss_fn, optimizer = setup_model(device, pretrained_LSTM_model, classes, vocab, EMBEDDING_DIM, HIDDEN_DIM, 1, False, LEARNING_RATE, embeddings, True)
time_cost = train_model(classifier, loss_fn, optimizer, train_loader, EPOCHS)
_, Y_actual, Y_preds, misclass_data_1UniLSTM = evaluate_model(classifier, loss_fn, test_loader, to_dict(test_dataset))
accuracies.append(accuracy_score(Y_actual, Y_preds))
parameters.append(count_parameters(classifier))
time_costs.append(time_cost)

In [None]:
# CASE.E) [model: LSTM, num_layers: 1, bidirectional: True]
classifier, loss_fn, optimizer = setup_model(device, pretrained_LSTM_model, classes, vocab, EMBEDDING_DIM, HIDDEN_DIM, 1, True, LEARNING_RATE, embeddings, True)
time_cost = train_model(classifier, loss_fn, optimizer, train_loader, EPOCHS)
_, Y_actual, Y_preds, misclass_data_1BiLSTM = evaluate_model(classifier, loss_fn, test_loader, to_dict(test_dataset))
accuracies.append(accuracy_score(Y_actual, Y_preds))
parameters.append(count_parameters(classifier))
time_costs.append(time_cost)

In [None]:
# CASE.F) [model: LSTM, num_layers: 2, bidirectional: True]
classifier, loss_fn, optimizer = setup_model(device, pretrained_LSTM_model, classes, vocab, EMBEDDING_DIM, HIDDEN_DIM, 2, True, LEARNING_RATE, embeddings, True)
time_cost = train_model(classifier, loss_fn, optimizer, train_loader, EPOCHS)
_, Y_actual, Y_preds, misclass_data_2BiLSTM = evaluate_model(classifier, loss_fn, test_loader, to_dict(test_dataset))
accuracies.append(accuracy_score(Y_actual, Y_preds))
parameters.append(count_parameters(classifier))
time_costs.append(time_cost)

In [None]:
visualize(models, accuracies, parameters, time_costs)
#########################################################################################################################################################

In [None]:
models = ["1Uni-RNN", "1Bi-RNN", "2Bi-RNN", "1Uni-LSTM", "1Bi-LSTM", "2Bi-LSTM"]; classes = ["Positive", "Negative"]; accuracies = []; parameters = []; time_costs = []
train_dataset, test_dataset = load_dataset("IMDB Dataset.csv", ["review"], "sentiment", 80, "start"), load_dataset("IMDB Dataset.csv", ["review"], "sentiment", 20, "end")

In [None]:
def replace_labels(dataset, categorical, numerical):
    mapping = {categorical[0]: numerical[0], categorical[1]: numerical[1]}
    return [(mapping[label], text) for label, text in dataset]

train_dataset, test_dataset = replace_labels(train_dataset, ["negative", "positive"], [1,2]), replace_labels(test_dataset, ["negative", "positive"], [1,2])

In [None]:
train_loader, test_loader = generate_loader(train_dataset, MAX_WORDS, BATCH_SIZE, True), generate_loader(test_dataset, MAX_WORDS, BATCH_SIZE, False)
vocab = build_vocab([train_dataset, test_dataset], MIN_FREQ, PADDED, UNKNOWN)

In [None]:
# CASE.A) [model: RNN, num_layers: 1, bidirectional: False]
classifier, loss_fn, optimizer = setup_model(device, RNN_model, classes, vocab, EMBEDDING_DIM, HIDDEN_DIM, 1, False, LEARNING_RATE, None, None)
time_cost = train_model(classifier, loss_fn, optimizer, train_loader, EPOCHS)
_, Y_actual, Y_preds, misclass_data_1UniRNN = evaluate_model(classifier, loss_fn, test_loader, to_dict(test_dataset))
accuracies.append(accuracy_score(Y_actual, Y_preds))
parameters.append(count_parameters(classifier))
time_costs.append(time_cost)

In [None]:
# CASE.B) [model: RNN, num_layers: 1, bidirectional: True]
classifier, loss_fn, optimizer = setup_model(device, RNN_model, classes, vocab, EMBEDDING_DIM, HIDDEN_DIM, 1, True, LEARNING_RATE, None, None)
time_cost = train_model(classifier, loss_fn, optimizer, train_loader, EPOCHS)
_, Y_actual, Y_preds, misclass_data_1BiRNN = evaluate_model(classifier, loss_fn, test_loader, to_dict(test_dataset))
accuracies.append(accuracy_score(Y_actual, Y_preds))
parameters.append(count_parameters(classifier))
time_costs.append(time_cost)

In [None]:
# CASE.C) [model: RNN, num_layers: 2, bidirectional: True]
classifier, loss_fn, optimizer = setup_model(device, RNN_model, classes, vocab, EMBEDDING_DIM, HIDDEN_DIM, 2, True, LEARNING_RATE, None, None)
time_cost = train_model(classifier, loss_fn, optimizer, train_loader, EPOCHS)
_, Y_actual, Y_preds, misclass_data_2BiRNN = evaluate_model(classifier, loss_fn, test_loader, to_dict(test_dataset))
accuracies.append(accuracy_score(Y_actual, Y_preds))
parameters.append(count_parameters(classifier))
time_costs.append(time_cost)

In [None]:
# CASE.D) [model: LSTM, num_layers: 1, bidirectional: False]
classifier, loss_fn, optimizer = setup_model(device, LSTM_model, classes, vocab, EMBEDDING_DIM, HIDDEN_DIM, 1, False, LEARNING_RATE, None, None)
time_cost = train_model(classifier, loss_fn, optimizer, train_loader, EPOCHS)
_, Y_actual, Y_preds, misclass_data_1UniLSTM = evaluate_model(classifier, loss_fn, test_loader, to_dict(test_dataset))
accuracies.append(accuracy_score(Y_actual, Y_preds))
parameters.append(count_parameters(classifier))
time_costs.append(time_cost)

In [None]:
# CASE.E) [model: LSTM, num_layers: 1, bidirectional: True]
classifier, loss_fn, optimizer = setup_model(device, LSTM_model, classes, vocab, EMBEDDING_DIM, HIDDEN_DIM, 1, True, LEARNING_RATE, None, None)
time_cost = train_model(classifier, loss_fn, optimizer, train_loader, EPOCHS)
_, Y_actual, Y_preds, misclass_data_1BiLSTM = evaluate_model(classifier, loss_fn, test_loader, to_dict(test_dataset))
accuracies.append(accuracy_score(Y_actual, Y_preds))
parameters.append(count_parameters(classifier))
time_costs.append(time_cost)

In [None]:
# CASE.F) [model: LSTM, num_layers: 2, bidirectional: True]
classifier, loss_fn, optimizer = setup_model(device, LSTM_model, classes, vocab, EMBEDDING_DIM, HIDDEN_DIM, 2, True, LEARNING_RATE, None, None)
time_cost = train_model(classifier, loss_fn, optimizer, train_loader, EPOCHS)
_, Y_actual, Y_preds, misclass_data_2BiLSTM = evaluate_model(classifier, loss_fn, test_loader, to_dict(test_dataset))
accuracies.append(accuracy_score(Y_actual, Y_preds))
parameters.append(count_parameters(classifier))
time_costs.append(time_cost)

In [None]:
visualize(models, accuracies, parameters, time_costs)