In [None]:
import os

import torch
import json
import numpy as np
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pack_padded_sequence
from torch.optim import Adam
from pyvi import ViTokenizer
from tqdm import tqdm
from sklearn.metrics import f1_score

In [2]:
# Define vocabulary
class Vocab():
    def __init__(self, path, sentence_key: str, label_key:str, max_len: int = 100):
        self.path = path
        self.sentence_key = sentence_key
        self.label_key = label_key
        self.max_len = max_len

        self.vocab = [] 
        self.labels = set()

        self.pad_id = 0
        self.unk_id = 1

        self.w2i = {}
        self.i2w = {}

        self.i2l = {}
        self.l2i = {}

        self.make_vocab()

    def make_vocab(self):
        data = json.load(open(self.path, 'r', encoding='utf-8'))

        for item in data:

            try:
                if isinstance(item[self.sentence_key], str):
                    tokenized_sentence = ViTokenizer.tokenize(item[self.sentence_key])
                    self.vocab.extend(tokenized_sentence.split())
                
                if isinstance(item[self.label_key], str):
                    self.labels.add(item[self.label_key])

            except:
                raise Exception('Wrong input type')

        self.vocab = list(set(self.vocab))

        self.w2i = {word : idx for idx, word in enumerate(self.vocab, 2)}
        self.i2w = {idx : word for word, idx in self.w2i.items()}

        self.l2i = {label : idx for idx, label in enumerate(self.labels)}
        self.i2l = {idx: label for label, idx in self.l2i.items()}


    def encode_sentence(self, input: str):
        try:
            if isinstance(input, str): # Text classification
                tokenized_sentence = ViTokenizer.tokenize(input)
                tokens = tokenized_sentence.split()
        except:
            raise Exception('Wrong input type')


        input_ids = []
        for token in tokens:
            try:
                input_ids.append(self.w2i[token])
            except:
                input_ids.append(self.unk_id)


        if len(input_ids) > self.max_len:
            input_ids = input_ids[:self.max_len] # Truncation
        else:
            input_ids.extend([self.pad_id] * (self.max_len - len(input_ids))) # Padding

        return torch.tensor(input_ids, dtype=torch.long)
    
        
    def encode_label(self, labels: str):
        if isinstance(labels, str): 
            return torch.tensor([self.l2i[labels]], dtype=torch.long)
        else:
            raise Exception('Wrong input type')


    def decode_label(self, label_vec: torch.Tensor):
        labels = []
        for label_id in label_vec:
            idx = label_id.item()
            if idx == -100: # ignore index for cross entropy loss
                continue
            labels.append(self.i2l[idx])
        return labels
    
    @property
    def vocab_size(self):
        return len(self.vocab) + 2 # unk, pad id

    @property
    def num_labels(self):
        return len(self.labels)

In [4]:
# Define dataset
class UIT_VSFC(Dataset):
    def __init__(self, path:str, label_key: str, vocab: Vocab) -> None:
        super().__init__()

        self.path = path
        self.label_key = label_key
        self.vocab = vocab

        self.data = json.load(open(path, 'r', encoding='utf-8'))
    

    def __len__(self) -> int:
        return len(self.data)


    def __getitem__(self, index: int):
        item = self.data[index]
        sentence = item['sentence']
        label = item[self.label_key]

        encoded_sentence = self.vocab.encode_sentence(sentence)
        encoded_label = self.vocab.encode_label(label)

        return {
            "input_ids" : encoded_sentence,
            "label" : encoded_label
        }
 
    
def collate_fn(samples: list[dict]) -> dict[dict]:
    samples = {
        "input_ids": torch.stack([sample['input_ids'] for sample in samples], dim=0),
        'label': torch.stack([sample['label'] for sample in samples], dim=0)
    }

    return samples

In [26]:
# Define a model
class Model(nn.Module):
    def __init__(self, vocab: Vocab, embed_dim=300, hidden_dim=256, num_layers = 5):
        super().__init__()
        self.vocab = vocab

        self.embedding = nn.Embedding(
            num_embeddings=vocab.vocab_size,
            embedding_dim=embed_dim,
            padding_idx=vocab.pad_id
        )

        self.model = nn.LSTM(
            input_size=self.embedding.embedding_dim,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            batch_first=True
        )

        self.classifier = nn.Linear(
            in_features=hidden_dim,
            out_features=vocab.num_labels
        )

        self.loss_fn = nn.CrossEntropyLoss(ignore_index=-100)

    def forward(self, input_ids: torch.Tensor, labels=None):
        lengths = (input_ids != self.vocab.pad_id).sum(dim=1)
        embed = self.embedding(input_ids)

        packed = pack_padded_sequence(
            input=embed,
            lengths=lengths.cpu(),
            batch_first=True,
            enforce_sorted=False
        )

        packed_out, (h_n, c_n) = self.model(packed)

        last_hidden = h_n[-1]

        logits = self.classifier(last_hidden)

        if labels is not None:
            loss = self.loss_fn(logits, labels)
            return loss, logits
        
        return logits

In [None]:
class TextClassification:
    def __init__(self, vocab: Vocab, model, checkpoint_path, train_path, val_path, test_path, lr):
        self.checkpoint_path = checkpoint_path
        os.makedirs(self.checkpoint_path, exist_ok=True)
        
        self.train_path = train_path
        self.val_path = val_path
        self.test_path = test_path
        
        self.vocab = vocab
        self.vocab.make_vocab()
        
        self.load_datasets()
        self.create_dataloaders()

        self.model = model

        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.model.to(self.device)

        self.lr = lr
        self.optimizer = Adam(self.model.parameters(), lr=self.lr)

    def load_datasets(self):
        self.train_dataset = UIT_VSFC(self.train_path, self.vocab.label_key, self.vocab)
        self.val_dataset = UIT_VSFC(self.val_path, self.vocab.label_key, self.vocab)
        self.test_dataset = UIT_VSFC(self.test_path, self.vocab.label_key, self.vocab)

    def create_dataloaders(self):
        self.train_loader = DataLoader(self.train_dataset, batch_size=32, shuffle=True,
                                       collate_fn=collate_fn)
        self.val_loader = DataLoader(self.val_dataset, batch_size=32,
                                     collate_fn=collate_fn)
        self.test_loader = DataLoader(self.test_dataset, batch_size=32,
                                      collate_fn=collate_fn)

    def forward_batch(self, batch):
        input_ids = batch["input_ids"]
        labels = batch["label"].view(-1)

        loss, logits = self.model(input_ids, labels)   

        preds = logits.argmax(dim=-1).tolist()  
        true = labels.tolist()                  

        return loss, true, preds


    def evaluate_metrics(self, dataloader, desc="Evaluating"):
        self.model.eval()
        total_loss = 0
        y_true_all = []
        y_pred_all = []

        pbar = tqdm(dataloader, desc=desc, ncols=90)

        with torch.no_grad():
            for batch in pbar:
                batch = {k: v.to(self.device) for k, v in batch.items()}

                loss, y_true, y_pred = self.forward_batch(batch)
                total_loss += loss.item()

                y_true_all.extend(y_true)
                y_pred_all.extend(y_pred)

                avg_loss = total_loss / (pbar.n + 1)
                pbar.set_postfix(loss=f"{avg_loss:.4f}")

        avg_loss = total_loss / len(dataloader)
        macro_f1 = f1_score(y_true_all, y_pred_all, average="macro")

        return avg_loss, macro_f1


    def train(self, epochs=20, patience=5):
        save_model_path = os.path.join(self.checkpoint_path, 'best_model.pt')

        best_f1 = 0
        patience_counter = 0

        for epoch in range(1, epochs + 1):

            self.model.train()
            running_loss = 0

            pbar = tqdm(self.train_loader, desc=f"Epoch {epoch}/{epochs}", ncols=90)

            for batch in pbar:
                batch = {k: v.to(self.device) for k, v in batch.items()}

                loss, _, _ = self.forward_batch(batch)

                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()

                running_loss += loss.item()

                avg_loss = running_loss / (pbar.n + 1)
                pbar.set_postfix(loss=f"{avg_loss:.4f}")

            val_loss, val_f1 = self.evaluate_metrics(self.val_loader, desc="Validating")

            print(
                f"[Epoch {epoch}] Val_loss={val_loss:.4f} | Val_Macro-F1={val_f1:.4f}"
            )

            if val_f1 > best_f1:
                best_f1 = val_f1
                patience_counter = 0
                torch.save(self.model.state_dict(), save_model_path)
                print(f"New BEST model saved (F1={best_f1:.4f})")
            else:
                patience_counter += 1
                if patience_counter >= patience:
                    print("Early stopping triggered.")
                    break

        # Load best model
        self.model.load_state_dict(torch.load(save_model_path, map_location=self.device))


    def test(self):
        return self.evaluate_metrics(self.test_loader, desc="Testing")

In [None]:
train_path = "UIT-VSFC/UIT-VSFC-train.json"
val_path   = "UIT-VSFC/UIT-VSFC-dev.json"
test_path  = "UIT-VSFC/UIT-VSFC-test.json"

vocab = Vocab(
    path=train_path, 
    sentence_key="sentence", 
    label_key="sentiment")

In [50]:
# Load pretrained embedding
word_dict = []  
embeddings_index = {}  
f = open('W2V_ner.vec')  
for line in f:  
    values = line.split(' ')   
    word = values[0]   
    word_dict.append(word)
    try:
        coefs = np.asarray(values[1:], dtype='float32')   
        embeddings_index[word] = coefs   
    except Exception as e:   
        pass   
f.close()

# Get pretrained embedding weights
embedding_dim = 300  
max_feature = len(embeddings_index) + 1   
vocab_size = vocab.vocab_size
embedding_matrix = np.zeros((vocab_size, embedding_dim))

oov_count = 0

for word, i in vocab.w2i.items():
    if i > max_feature:
        continue   
    embedding_vector = embeddings_index.get(word)

    if embedding_vector is not None:
    # we found the word - add that words vector to the  matrix  
        embedding_matrix[i] = embedding_vector 
    else:
        oov_count += 1
    # doesn't exist, assign a random vector   
        embedding_matrix[i] = np.random.randn(embedding_dim)
        
embedding_tensor = torch.tensor(embedding_matrix, dtype=torch.float32)
print('Out of Vocab:', oov_count)

Out of Vocab: 1162


In [None]:
model = Model(vocab)

# Using pretrained embedding 
model.embedding = nn.Embedding.from_pretrained(
    embeddings=embedding_tensor,
    freeze=False,
    padding_idx=vocab.pad_id
)


task = TextClassification(
    vocab,
    train_path=train_path,
    val_path=val_path,
    test_path=test_path,
    model=model,
    checkpoint_path= 'checkpoint/assignment_1',
    lr=1e-3
)

task.train(epochs=20, patience=5)

test_loss, test_f1 = task.test()
print("TEST LOSS:", test_loss)
print("TEST MACRO-F1:", test_f1)

Epoch 1/20:   2%|â–Œ                           | 7/358 [00:03<02:45,  2.12it/s, loss=1.2176]


KeyboardInterrupt: 