In [None]:
configs = [
    # Word2Vec
    {
        'experiment_name': 'Unidirectional_Word2Vec_LR_0.01',
        'no_layers': 1,
        'hidden_dim': 128,
        'embedding_dim': 100,
        'freeze_embeddings': True,
        'bidirectional': False,
        'epochs': 50,
        'learning_rate': 0.01,
        'dropout': 0.5,
        'clip': 5,
        'embedding_type': 'word2vec',
        'embedding_path': None
    },
    # GloVe 100d
    {
        'experiment_name': 'GloVe_100d',
        'no_layers': 1,
        'hidden_dim': 128,
        'embedding_dim': 100,
        'freeze_embeddings': True,
        'bidirectional': False,
        'epochs': 50,
        'learning_rate': 0.001,
        'dropout': 0.5,
        'clip': 5,
        'embedding_type': 'glove',
        'embedding_path': None
    },
]

In [None]:
import os

# Download TC_provided.tar.gz
!wget http://faculty.cooper.edu/sable2/courses/fall2024/ece467/TC_provided.tar.gz -O TC_provided.tar.gz

# Extract TC_provided.tar.gz
!tar -xzf TC_provided.tar.gz

# Set the path to the extracted 'TC_provided' directory
tc_provided_path = os.path.abspath('TC_provided')

# Download GloVe embeddings
!wget http://nlp.stanford.edu/data/glove.6B.zip -O glove.6B.zip

# Unzip GloVe embeddings
!unzip -q glove.6B.zip

# Set the path to 'glove.6B.100d.txt'
glove_100d_path = os.path.abspath('glove.6B.100d.txt')

# Update the 'embedding_path' in the configs
for config in configs:
    if config['embedding_type'] == 'glove':
        config['embedding_path'] = glove_100d_path

# Now 'tc_provided_path' and 'glove_100d_path' can be used in your code
# For example, set 'tc' to 'tc_provided_path' in your main function
tc = tc_provided_path

--2024-11-18 03:29:35--  http://faculty.cooper.edu/sable2/courses/fall2024/ece467/TC_provided.tar.gz
Resolving faculty.cooper.edu (faculty.cooper.edu)... 199.98.16.192
Connecting to faculty.cooper.edu (faculty.cooper.edu)|199.98.16.192|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 3421989 (3.3M) [application/x-gzip]
Saving to: ‘TC_provided.tar.gz’


2024-11-18 03:29:37 (2.19 MB/s) - ‘TC_provided.tar.gz’ saved [3421989/3421989]

--2024-11-18 03:29:38--  http://nlp.stanford.edu/data/glove.6B.zip
Resolving nlp.stanford.edu (nlp.stanford.edu)... 171.64.67.140
Connecting to nlp.stanford.edu (nlp.stanford.edu)|171.64.67.140|:80... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://nlp.stanford.edu/data/glove.6B.zip [following]
--2024-11-18 03:29:38--  https://nlp.stanford.edu/data/glove.6B.zip
Connecting to nlp.stanford.edu (nlp.stanford.edu)|171.64.67.140|:443... connected.
HTTP request sent, awaiting response... 301 Moved Permanently
L

In [None]:
import os
import re
import sys
import nltk
import torch
import numpy as np
import pandas as pd
from tqdm import tqdm
import torch.nn as nn
from pathlib import Path
from torch.optim import Adam
from collections import Counter
from nltk.corpus import stopwords
from gensim.models import Word2Vec
from nltk.tokenize import word_tokenize
from torch.nn.utils.rnn import pad_sequence
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, TensorDataset, DataLoader


nltk.download('stopwords')
nltk.download('punkt_tab')

np.random.seed(13)
torch.manual_seed(13)


class LoadDataset:
    def __init__(self, base_path):
        self.base_path = base_path
        self.train_label_file = "corpus1_train.labels"
        self.test_label_file = "corpus1_test.labels"
        self.data = self.load_data()

    def load_dataset(self, label_file_name, dataset_type):
        label_file = os.path.join(self.base_path, label_file_name)
        print(f"Loading {dataset_type} dataset from: {label_file}")
        data = []
        with open(label_file, "r") as file:
            lines = file.readlines()
        for line in lines:
            parts = line.strip().split()
            file_path = os.path.join(self.base_path, parts[0])
            label = parts[1]
            try:
                with open(file_path, "r", encoding="utf-8") as file:
                    text = file.read()
                data.append({"text": text, "label": label})
            except FileNotFoundError:
                print(
                    f"Warning: The file {file_path} does not exist and will be skipped."
                )
            except Exception as e:
                print(f"Error reading {file_path}: {e}")
        return pd.DataFrame(data)

    def load_data(self):
        train_data = self.load_dataset(self.train_label_file, "Training")
        test_data = self.load_dataset(self.test_label_file, "Testing")
        return {"train": train_data, "test": test_data}


class Processor:
    def __init__(
        self,
        datasets,
        embedding_type="word2vec",  # 'word2vec', 'glove', 'fasttext', 'random'
        embedding_dim=100,  # Embedding dimension
        embedding_path=None,  # Path to embeddings (GloVe or FastText)
        batch_size=32,
        shuffle_train=True,
        verbose=False,
        max_seq_length=100,
    ):
        self.train_data = datasets.data["train"]
        self.test_data = datasets.data["test"]
        self.embed_dim = embedding_dim
        self.embedding_type = embedding_type
        self.embedding_path = embedding_path
        self.label_encoder = LabelEncoder()
        self.model = None
        self.vocabulary = {}
        self.stop_words = set(stopwords.words("english"))
        self.batch_size = batch_size
        self.shuffle_train = shuffle_train
        self.verbose = verbose
        self.max_seq_length = max_seq_length

        # Process data
        self.train_sentences, self.test_sentences = self.preprocess_datasets()

        if self.embedding_type == "word2vec":
            self.train_word2vec_model()
            self.get_vocabulary_word2vec()
            self.create_embeddings_matrix_word2vec()
        elif self.embedding_type in ["glove", "fasttext"]:
            self.build_vocabulary_from_data()
            self.load_embeddings()
            self.create_embeddings_matrix()
        elif self.embedding_type == "random":
            self.build_vocabulary_from_data()
            self.embeddings_matrix = None  # No pre-trained embeddings
        else:
            raise ValueError(
                "Invalid embedding_type. Must be 'word2vec', 'glove', 'fasttext', or 'random'."
            )

        self.encode_labels()
        self.create_sequences()
        self.X_train_tensors = self.to_tensors(self.X_train_padded, dtype=torch.long)
        self.X_test_tensors = self.to_tensors(self.X_test_padded, dtype=torch.long)
        self.y_train_tensors = self.to_tensors(self.y_train_encoded, dtype=torch.long)
        self.y_test_tensors = self.to_tensors(self.y_test_encoded, dtype=torch.long)
        self.training_data_loader, self.testing_data_loader = self.get_loaders()

        if self.verbose:
            self.display_info()

    def preprocess_text(self, text):
        text = text.lower()
        text = re.sub(r"\d+", "", text)
        text = re.sub(r"[^\w\s]", "", text)
        tokens = word_tokenize(text)
        cleaned_tokens = []
        for token in tokens:
            if token not in self.stop_words and token.strip():
                cleaned_tokens.append(token)
        return cleaned_tokens

    def preprocess_datasets(self):
        self.train_sentences = []
        for text in self.train_data["text"]:
            self.train_sentences.append(self.preprocess_text(text))

        self.test_sentences = []
        for text in self.test_data["text"]:
            self.test_sentences.append(self.preprocess_text(text))

        return self.train_sentences, self.test_sentences

    def train_word2vec_model(self):
        self.model = Word2Vec(
            self.train_sentences,
            vector_size=self.embed_dim,
            window=5,
            min_count=1,
            workers=4,
        )

    def get_vocabulary_word2vec(self):
        self.vocabulary = {}
        for idx, word in enumerate(self.model.wv.index_to_key):
            self.vocabulary[word] = idx + 1

    def build_vocabulary_from_data(self):
        all_tokens = set()
        for tokens in self.train_sentences:
            all_tokens.update(tokens)

        self.vocabulary = {}
        for idx, word in enumerate(all_tokens):
            self.vocabulary[word] = idx + 1

    def load_embeddings(self):
        if self.embedding_path is None:
            raise ValueError(
                f"embedding_path must be specified when embedding_type is '{self.embedding_type}'."
            )
        self.embeddings_index = {}
        with open(self.embedding_path, "r", encoding="utf-8") as f:
            for line in f:
                values = line.rstrip().split(" ")
                word = values[0]
                coeffs = np.asarray(values[1:], dtype="float32")
                self.embeddings_index[word] = coeffs

    def create_embeddings_matrix(self):
        vocab_size = len(self.vocabulary) + 1
        embeddings_matrix = np.zeros((vocab_size, self.embed_dim))
        for word, idx in self.vocabulary.items():
            embedding_vector = self.embeddings_index.get(word)
            if embedding_vector is not None:
                embeddings_matrix[idx] = embedding_vector
            else:
                embeddings_matrix[idx] = np.random.normal(
                    scale=0.6, size=(self.embed_dim,)
                )
        self.embeddings_matrix = torch.tensor(embeddings_matrix, dtype=torch.float32)

    def create_embeddings_matrix_word2vec(self):
        vocab_size = len(self.vocabulary) + 1
        embeddings_matrix = np.zeros((vocab_size, self.embed_dim))
        for word, idx in self.vocabulary.items():
            embeddings_matrix[idx] = self.model.wv[word]
        self.embeddings_matrix = torch.tensor(embeddings_matrix, dtype=torch.float32)

    def encode_labels(self):
        self.y_train_encoded = self.label_encoder.fit_transform(
            self.train_data["label"]
        )
        self.y_test_encoded = self.label_encoder.transform(self.test_data["label"])

    def tokens_to_indices(self, tokens):
        indices = []
        for word in tokens:
            indices.append(self.vocabulary.get(word, 0))
        return indices

    def pad_sequences(self, sequences, max_len):
        padded_sequences = []
        for seq in sequences:
            if len(seq) < max_len:
                seq.extend([0] * (max_len - len(seq)))
            else:
                seq = seq[:max_len]
            padded_sequences.append(seq)
        return padded_sequences

    def create_sequences(self):
        self.X_train_indices = []
        for tokens in self.train_sentences:
            self.X_train_indices.append(self.tokens_to_indices(tokens))

        self.X_test_indices = []
        for tokens in self.test_sentences:
            self.X_test_indices.append(self.tokens_to_indices(tokens))

        self.X_train_padded = self.pad_sequences(
            self.X_train_indices, self.max_seq_length
        )
        self.X_test_padded = self.pad_sequences(
            self.X_test_indices, self.max_seq_length
        )

    def to_tensors(self, data, dtype):
        return torch.tensor(data, dtype=dtype)

    def get_loaders(self):
        train_dataset = TensorDataset(self.X_train_tensors, self.y_train_tensors)
        test_dataset = TensorDataset(self.X_test_tensors, self.y_test_tensors)
        training_data_loader = DataLoader(
            train_dataset,
            batch_size=self.batch_size,
            shuffle=self.shuffle_train,
        )
        testing_data_loader = DataLoader(
            test_dataset,
            batch_size=self.batch_size,
            shuffle=False,
        )
        return training_data_loader, testing_data_loader

    def display_info(self):
        from tabulate import tabulate

        data = [
            ["X_train", self.X_train_tensors.shape],
            ["X_test", self.X_test_tensors.shape],
            ["y_train", self.y_train_tensors.shape],
            ["y_test", self.y_test_tensors.shape],
            ["Vocabulary Size", len(self.vocabulary)],
            ["Embeddings Matrix", self.embeddings_matrix.shape],
        ]
        print(tabulate(data, headers=["Dataset", "Shape"], tablefmt="grid"))


class LSTM(nn.Module):
    def __init__(
        self,
        no_layers,
        hidden_dim,
        vocab_size,
        embedding_dim,
        embeddings=None,
        output_dim=1,
        freeze_embeddings=True,
        bidirectional=False,
        dropout=0.5,
    ):
        super(LSTM, self).__init__()
        self.no_layers = no_layers
        self.hidden_dim = hidden_dim
        self.bidirectional = bidirectional
        self.num_directions = 2 if bidirectional else 1

        if embeddings is not None:
            self.embedding = nn.Embedding.from_pretrained(
                embeddings, freeze=freeze_embeddings
            )
        else:
            self.embedding = nn.Embedding(
                num_embeddings=vocab_size + 1,  # Adding 1 if padding_idx=0
                embedding_dim=embedding_dim,
                padding_idx=0,  # Assuming index 0 is for padding
            )
            if freeze_embeddings:
                self.embedding.weight.requires_grad = False

        self.lstm = nn.LSTM(
            input_size=embedding_dim,
            hidden_size=hidden_dim,
            num_layers=no_layers,
            batch_first=True,
            bidirectional=bidirectional,
        )

        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(hidden_dim * self.num_directions, output_dim)

    def forward(self, x, hidden):
        x = self.embedding(x)
        x, hidden = self.lstm(x, hidden)
        x = self.dropout(x[:, -1, :])
        x = self.fc(x)
        return x, hidden

    def init_hidden(self, batch_size, device):
        h0 = torch.zeros(
            self.no_layers * self.num_directions, batch_size, self.hidden_dim
        ).to(device)
        c0 = torch.zeros(
            self.no_layers * self.num_directions, batch_size, self.hidden_dim
        ).to(device)
        return (h0, c0)


def get_model(vocab_size, embeddings, config, device="cpu", output_dim=5):
    net = LSTM(
        no_layers=config.get("no_layers", 2),
        hidden_dim=config.get("hidden_dim", 128),
        vocab_size=vocab_size,
        embedding_dim=config.get("embedding_dim", 100),
        embeddings=embeddings,
        output_dim=output_dim,
        freeze_embeddings=config.get("freeze_embeddings", True),
        bidirectional=config.get("bidirectional", False),
        dropout=config.get("dropout", 0.5),
    ).to(device)

    loss_fun = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(
        net.parameters(), lr=config.get("learning_rate", 0.001)
    )

    return net, loss_fun, optimizer


def train_model(processor, config, device="cpu"):
    # Get data loaders from the processor
    training_data_loader = processor.training_data_loader
    testing_data_loader = processor.testing_data_loader

    # Get embeddings matrix and vocab size
    embeddings = processor.embeddings_matrix
    vocab_size = len(processor.vocabulary)

    # Determine output dimension (number of classes)
    output_dim = len(processor.label_encoder.classes_)

    # Initialize the model, loss function, and optimizer
    net, loss_fun, optimizer = get_model(
        vocab_size=vocab_size,
        embeddings=embeddings,
        config=config,
        device=device,
        output_dim=output_dim,
    )
    net.to(device)

    # Training parameters
    clip = config.get("clip", 5)
    epochs = config.get("epochs", 50)

    train_acc_list, train_loss_list = [], []
    test_acc_list, test_loss_list = [], []

    # Initialize the progress bar without 'ascii=True' to use Unicode characters
    progress_bar = tqdm(total=epochs, desc="Training", leave=True, file=sys.stdout)

    for epoch in range(epochs):
        net.train()
        batch_acc, batch_loss = [], []

        for X_batch, y_batch in training_data_loader:
            current_batch_size = X_batch.size(0)
            h = net.init_hidden(current_batch_size, device)

            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            optimizer.zero_grad()
            y_hat, h = net(X_batch, h)
            loss = loss_fun(y_hat, y_batch)
            loss.backward()
            nn.utils.clip_grad_norm_(net.parameters(), clip)
            optimizer.step()

            predictions = torch.argmax(y_hat, dim=1)
            accuracy = (predictions == y_batch).float().mean()
            batch_acc.append(accuracy.item())
            batch_loss.append(loss.item())

        # Compute average training accuracy and loss for the epoch
        train_acc_epoch = np.mean(batch_acc)
        train_loss_epoch = np.mean(batch_loss)
        train_acc_list.append(train_acc_epoch)
        train_loss_list.append(train_loss_epoch)

        net.eval()
        batch_acc, batch_loss = [], []

        for X_batch, y_batch in testing_data_loader:
            current_batch_size = X_batch.size(0)
            h = net.init_hidden(current_batch_size, device)

            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            with torch.no_grad():
                y_hat, h = net(X_batch, h)
                loss = loss_fun(y_hat, y_batch)
                predictions = torch.argmax(y_hat, dim=1)
                accuracy = (predictions == y_batch).float().mean()
                batch_acc.append(accuracy.item())
                batch_loss.append(loss.item())

        # Compute average testing accuracy and loss for the epoch
        test_acc_epoch = np.mean(batch_acc)
        test_loss_epoch = np.mean(batch_loss)
        test_acc_list.append(test_acc_epoch)
        test_loss_list.append(test_loss_epoch)

        # Format the metrics in a readable way
        metrics_str = (
            f"Epoch {epoch + 1}/{epochs} | "
            f"Train Acc: {train_acc_epoch:.4f} | "
            f"Train Loss: {train_loss_epoch:.4f} | "
            f"Test Acc: {test_acc_epoch:.4f} | "
            f"Test Loss: {test_loss_epoch:.4f}"
        )

        # Update the progress bar's postfix to display metrics
        progress_bar.set_postfix_str(metrics_str)

        # Manually update the progress bar
        progress_bar.update(1)

    # Close the progress bar after completion
    progress_bar.close()

    return train_acc_list, test_acc_list, train_loss_list, test_loss_list, net


if __name__ == "__main__":

    dataset = LoadDataset(tc)

    # Run experiments
    for config in configs:
        print(f"Running Experiment: {config['experiment_name']}")

        processor = Processor(
            datasets=dataset,
            embedding_type=config.get("embedding_type", "word2vec"),
            embedding_dim=config.get("embedding_dim", 100),
            embedding_path=config.get("embedding_path", None),
            batch_size=32,
            shuffle_train=True,
            verbose=False,
            max_seq_length=100,
        )

        # Get device (CPU or GPU)
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        # Train the model
        train_acc, test_acc, train_loss, test_loss, net = train_model(
            processor, config, device=device
        )