In [1]:
import os, re, string, time, random
from collections import Counter

import numpy as np
import pandas as pd

from tqdm.auto import tqdm

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

def set_seed(seed_value=42):
    random.seed(seed_value)
    np.random.seed(seed_value)
    torch.manual_seed(seed_value)
    torch.cuda.manual_seed_all(seed_value)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

SEED = 42
set_seed(SEED)

device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
def load_data_from_path(folder_path):
    examples = []
    for label_name in os.listdir(folder_path):
        full_path = os.path.join(folder_path, label_name)
        for file_name in os.listdir(full_path):
            file_path = os.path.join(full_path, file_name)
            with open(file_path, "r", encoding="utf-8") as f:
                sentence = " ".join(f.readlines())
            label = 1 if label_name == "pos" else 0
            examples.append({"sentence": sentence, "label": label})
    return pd.DataFrame(examples)

folder_paths = {
    "train": "./data/ntc-scv/data_train/train",
    "valid": "./data/ntc-scv/data_train/test",
    "test":  "./data/ntc-scv/data_test/test"
}

train_df = load_data_from_path(folder_paths["train"])
valid_df = load_data_from_path(folder_paths["valid"])
test_df  = load_data_from_path(folder_paths["test"])

print(train_df.shape, valid_df.shape, test_df.shape)
train_df.head()

(30000, 2) (10000, 2) (10000, 2)


Unnamed: 0,sentence,label
0,Qu√°n ƒë·ªì_ƒÉn kh√° ngon . . nh∆∞ng ph·ª•c_v·ª• kh√¥ng t·ªë...,0
1,"H√¥m_nay ƒëi ƒÉn t·∫°i qu√°n , m√≥n ƒÉn ngon v·ª´a_mi·ªáng...",0
2,Qua ÃÅ n c∆∞ Ã£ c ngon . Tu√¢ ÃÄ n na ÃÄ o mi ÃÄ nh c...,0
3,"Ch√°n , ƒë·ªì u·ªëng kh√° nh·∫°t . V·ªõi kh√¥ng_gian v√† ch...",0
4,"M√¨ ƒÉn ok üëç üèª üëç üèª üëç üèª nh∆∞ng ngu·ªôi , view r·ªông ,...",0


In [3]:
def preprocess_text(text):
    text = re.sub(r"https?://\S+|www\.\S+", " ", text)

    text = re.sub(r"<[^<>]+>", " ", text)

    punct = string.punctuation.replace("_", "")
    replace_chars = punct + string.digits
    for char in replace_chars:
        text = text.replace(char, " ")

    emoji_pattern = re.compile("["
        u"\U0001F600-\U0001F64F"
        u"\U0001F300-\U0001F5FF"
        u"\U0001F680-\U0001F6FF"
        u"\U0001F1E0-\U0001F1FF"
        u"\U0001F1F2-\U0001F1F4"
        u"\U0001F1E6-\U0001F1FF"
        u"\U0001F600-\U0001F64F"
        u"\U00002702-\U000027B0"
        u"\U000024C2-\U0001F251"
        u"\U0001f926-\U0001f937"
        u"\U0001F1F2"
        u"\U0001F1F4"
        u"\U0001F620"
        u"\u200d"
        u"\u2640-\u2642"
        "]+", flags=re.UNICODE)
    text = emoji_pattern.sub(r" ", text)

    text = " ".join(text.split())

    text = text.lower()
    return text

for df in [train_df, valid_df, test_df]:
    df["text"] = df["sentence"].apply(preprocess_text)

train_df[["text", "label"]].head()

Unnamed: 0,text,label
0,qu√°n ƒë·ªì_ƒÉn kh√° ngon nh∆∞ng ph·ª•c_v·ª• kh√¥ng t·ªët ch...,0
1,h√¥m_nay ƒëi ƒÉn t·∫°i qu√°n m√≥n ƒÉn ngon v·ª´a_mi·ªáng k...,0
2,qua ÃÅ n c∆∞ Ã£ c ngon tu√¢ ÃÄ n na ÃÄ o mi ÃÄ nh cu ...,0
3,ch√°n ƒë·ªì u·ªëng kh√° nh·∫°t v·ªõi kh√¥ng_gian v√† ch·∫•t_l...,0
4,m√¨ ƒÉn ok nh∆∞ng ngu·ªôi view r·ªông nh∆∞ng k√™u m√≥n v...,0


In [4]:
def tokenize(text: str):
    return text.split()

PAD = "[PAD]"
UNK = "[UNK]"
CLS = "[CLS]"
SEP = "[SEP]"
MASK = "[MASK]"
SPECIALS = [PAD, UNK, CLS, SEP, MASK]

def build_vocab(texts, vocab_size=30000, min_freq=1):
    counter = Counter()
    for s in texts:
        counter.update(tokenize(s))

    items = [(w, c) for w, c in counter.items() if c >= min_freq and w not in set(SPECIALS)]
    items.sort(key=lambda x: x[1], reverse=True)

    itos = list(SPECIALS)
    need = vocab_size - len(itos)
    itos += [w for w, _ in items[: max(0, need)]]

    # ƒë·∫£m b·∫£o ƒë√∫ng 30000 token
    while len(itos) < vocab_size:
        itos.append(f"[unused{len(itos)}]")

    stoi = {w: i for i, w in enumerate(itos)}
    return stoi, itos, stoi[PAD], stoi[UNK], stoi[CLS], stoi[SEP], stoi[MASK]

VOCAB_SIZE = 30000
MAX_LEN = 256  # g·ªìm [CLS] v√† [SEP]

stoi, itos, pad_idx, unk_idx, cls_idx, sep_idx, mask_idx = build_vocab(
    train_df["text"].tolist(),
    vocab_size=VOCAB_SIZE,
    min_freq=1
)

print("Vocab size:", len(itos))
print("Special idx:", {"pad": pad_idx, "unk": unk_idx, "cls": cls_idx, "sep": sep_idx, "mask": mask_idx})

Vocab size: 30000
Special idx: {'pad': 0, 'unk': 1, 'cls': 2, 'sep': 3, 'mask': 4}


In [5]:
class TextDataset(Dataset):
    def __init__(self, df: pd.DataFrame):
        self.texts = df["text"].astype(str).tolist()
        self.labels = df["label"].astype(int).tolist()

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return {"text": self.texts[idx], "labels": int(self.labels[idx])}

train_ds = TextDataset(train_df)
valid_ds = TextDataset(valid_df)
test_ds  = TextDataset(test_df)

print("Train ds size:", len(train_ds))
print(train_ds[0])


Train ds size: 30000
{'text': 'qu√°n ƒë·ªì_ƒÉn kh√° ngon nh∆∞ng ph·ª•c_v·ª• kh√¥ng t·ªët ch·ªâ lo s·ª≠a_so·∫°n kh√¥ng ƒë·ªÉ_√Ω t·ªõi kh√°ch ƒë·ªì_ƒÉn l√†m r·∫•t l√¢u trong khi ch·ªâ c√≥ b√†n g·∫ßn ti·∫øng m√† ch∆∞a ra trong khi ch·ªâ c√≥ rau x√†o v√† g·ªèi khi m√¨nh ƒë·ª£i qu√° l√¢u n√™n t√≠nh ti·ªÅn th√¨ qu·∫£n_l√Ω c√≥ th√°i_ƒë·ªô kh√≥_ch·ªãu kh√¥ng th√≠ch th√°i_ƒë·ªô ph·ª•c_v·ª• ch√∫t n√†o ƒë√¢y l√† l·∫ßn cu·ªëi_c√πng gh√© qu√°n', 'labels': 0}


In [6]:
def encode_text(text: str, stoi, unk_idx, cls_idx, sep_idx, max_len: int):
    toks = tokenize(text)
    toks = toks[: max(0, max_len - 2)]  # ch·ª´a [CLS],[SEP]
    ids = [cls_idx] + [stoi.get(t, unk_idx) for t in toks] + [sep_idx]
    return ids

def collate_fn(batch, stoi, pad_idx, unk_idx, cls_idx, sep_idx, max_len=256):
    labels = torch.tensor([ex["labels"] for ex in batch], dtype=torch.long)

    seqs = []
    for ex in batch:
        ids = encode_text(ex["text"], stoi, unk_idx, cls_idx, sep_idx, max_len=max_len)
        seqs.append(torch.tensor(ids, dtype=torch.long))

    lengths = torch.tensor([len(x) for x in seqs], dtype=torch.long).clamp(min=1)
    L = int(lengths.max().item())

    input_ids = torch.full((len(batch), L), pad_idx, dtype=torch.long)
    for i, t in enumerate(seqs):
        input_ids[i, :t.numel()] = t

    pad_mask = (input_ids != pad_idx).long()

    return {
        "input_ids": input_ids,
        "pad_mask": pad_mask,
        "labels": labels
    }

In [7]:
BATCH_SIZE = 128
pin = torch.cuda.is_available()

collate = lambda b: collate_fn(
    b, stoi=stoi, pad_idx=pad_idx, unk_idx=unk_idx, cls_idx=cls_idx, sep_idx=sep_idx, max_len=MAX_LEN
)

trainloader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate, pin_memory=pin)
validloader = DataLoader(valid_ds, batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate, pin_memory=pin)
testloader  = DataLoader(test_ds,  batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate, pin_memory=pin)

b = next(iter(trainloader))
print("Batch shapes:", {k: tuple(v.shape) for k, v in b.items()})

Batch shapes: {'input_ids': (128, 256), 'pad_mask': (128, 256), 'labels': (128,)}


### RNN

In [8]:
# Your code here
class RNNTextClassifier(nn.Module):
    def __init__(self, vocab_size, num_classes=2):
        super(RNNTextClassifier, self).__init__()
        # Embedding layer v·ªõi dim = 128
        self.embedding = nn.Embedding(vocab_size, 128)
        
        # RNN layer v·ªõi hidden = 128
        # batch_first=True gi√∫p input c√≥ d·∫°ng (batch, seq_len, embed_dim)
        self.rnn = nn.RNN(input_size=128, 
                          hidden_size=128, 
                          num_layers=1, 
                          batch_first=True)
        
        # MLP head: 128 -> 512 nodes -> num_classes
        self.classifier = nn.Sequential(
            nn.Linear(128, 512),
            nn.ReLU(),
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        # x: (batch, seq_len)
        embedded = self.embedding(x)
        
        # RNN tr·∫£ v·ªÅ output v√† hidden state cu·ªëi c√πng (h_n)
        _, h_n = self.rnn(embedded)
        
        # S·ª≠ d·ª•ng h_n c·ªßa l·ªõp cu·ªëi c√πng (l·∫•y ph·∫ßn t·ª≠ cu·ªëi c√πng c·ªßa chi·ªÅu num_layers)
        return self.classifier(h_n[-1])

### LSTM


In [9]:
# Your code here
class LSTMTextClassifier(nn.Module):
    def __init__(self, vocab_size, num_classes=2):
        super(LSTMTextClassifier, self).__init__()
        # Embedding layer v·ªõi dim = 128
        self.embedding = nn.Embedding(vocab_size, 128)
        
        # LSTM layer v·ªõi hidden = 128
        self.lstm = nn.LSTM(input_size=128, 
                            hidden_size=128, 
                            num_layers=1, 
                            batch_first=True)
        
        # MLP head: 128 -> 512 nodes -> num_classes
        self.classifier = nn.Sequential(
            nn.Linear(128, 512),
            nn.ReLU(),
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        embedded = self.embedding(x)
        
        # LSTM tr·∫£ v·ªÅ output v√† b·ªô (h_n, c_n)
        _, (h_n, _) = self.lstm(embedded)
        
        # h_n[-1] l√† hidden state c·ªßa timestep cu·ªëi c√πng
        return self.classifier(h_n[-1])

### BERT

In [11]:
from transformers import BertConfig, BertForSequenceClassification


A module that was compiled using NumPy 1.x cannot be run in
NumPy 2.2.6 as it may crash. To support both 1.x and 2.x
versions of NumPy, modules must be compiled with NumPy 2.0.
Some module may need to rebuild instead e.g. with 'pybind11>=2.12'.

If you are a user of the module, the easiest solution will be to
downgrade to 'numpy<2' or try to upgrade the affected module.
We expect that some modules will need time to support NumPy 2.

Traceback (most recent call last):  File "/opt/homebrew/Cellar/python@3.10/3.10.18/Frameworks/Python.framework/Versions/3.10/lib/python3.10/runpy.py", line 196, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/opt/homebrew/Cellar/python@3.10/3.10.18/Frameworks/Python.framework/Versions/3.10/lib/python3.10/runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "/Users/vychan/Coding/AIO2025/Code/aio2025_study/venv_tf/lib/python3.10/site-packages/ipykernel_launcher.py", line 18, in <module>
    app.launch_new_instanc

AttributeError: _ARRAY_API not found


A module that was compiled using NumPy 1.x cannot be run in
NumPy 2.2.6 as it may crash. To support both 1.x and 2.x
versions of NumPy, modules must be compiled with NumPy 2.0.
Some module may need to rebuild instead e.g. with 'pybind11>=2.12'.

If you are a user of the module, the easiest solution will be to
downgrade to 'numpy<2' or try to upgrade the affected module.
We expect that some modules will need time to support NumPy 2.

Traceback (most recent call last):  File "/opt/homebrew/Cellar/python@3.10/3.10.18/Frameworks/Python.framework/Versions/3.10/lib/python3.10/runpy.py", line 196, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/opt/homebrew/Cellar/python@3.10/3.10.18/Frameworks/Python.framework/Versions/3.10/lib/python3.10/runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "/Users/vychan/Coding/AIO2025/Code/aio2025_study/venv_tf/lib/python3.10/site-packages/ipykernel_launcher.py", line 18, in <module>
    app.launch_new_instanc

AttributeError: _ARRAY_API not found


A module that was compiled using NumPy 1.x cannot be run in
NumPy 2.2.6 as it may crash. To support both 1.x and 2.x
versions of NumPy, modules must be compiled with NumPy 2.0.
Some module may need to rebuild instead e.g. with 'pybind11>=2.12'.

If you are a user of the module, the easiest solution will be to
downgrade to 'numpy<2' or try to upgrade the affected module.
We expect that some modules will need time to support NumPy 2.

Traceback (most recent call last):  File "/opt/homebrew/Cellar/python@3.10/3.10.18/Frameworks/Python.framework/Versions/3.10/lib/python3.10/runpy.py", line 196, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/opt/homebrew/Cellar/python@3.10/3.10.18/Frameworks/Python.framework/Versions/3.10/lib/python3.10/runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "/Users/vychan/Coding/AIO2025/Code/aio2025_study/venv_tf/lib/python3.10/site-packages/ipykernel_launcher.py", line 18, in <module>
    app.launch_new_instanc

AttributeError: _ARRAY_API not found

ImportError: numpy.core._multiarray_umath failed to import

ImportError: numpy.core.umath failed to import

In [None]:
config = BertConfig(
    vocab_size=VOCAB_SIZE,
    max_position_embeddings=MAX_LEN + 2,
    type_vocab_size=1,
    hidden_size=128,
    num_hidden_layers=1,
    num_attention_heads=8,
    intermediate_size=512,
    hidden_dropout_prob=0.1,
    attention_probs_dropout_prob=0.1,
    num_labels=2,
    pad_token_id=pad_idx,
)
BERTTextClassifier = BertForSequenceClassification(config)
print("BERT type:", type(BERTTextClassifier))

In [None]:
def to_device(batch, device):
    return {k: (v.to(device) if torch.is_tensor(v) else v) for k, v in batch.items()}

In [None]:
def get_logits(model, batch):
    try:
        out = model(
            input_ids=batch["input_ids"],
            attention_mask=batch["pad_mask"],
        )
        return out.logits if hasattr(out, "logits") else out
    except TypeError:
        return model(batch["input_ids"], batch["pad_mask"])

In [None]:
epochs = 5
learning_rate = 5e-5

In [None]:
@torch.no_grad()
def evaluate(model, testloader, criterion):
    model.eval()
    test_loss = 0.0
    running_correct = 0
    total = 0

    for i, batch in enumerate(testloader, 0):
        if isinstance(batch, dict):
            batch = to_device(batch, device)
            labels = batch["labels"]
            outputs = get_logits(model, batch)
        else:
            inputs, labels = batch
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)

        loss = criterion(outputs, labels)
        test_loss += loss.item()

        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        running_correct += (predicted == labels).sum().item()

    accuracy = 100.0 * running_correct / total
    test_loss = test_loss / (i + 1)
    return test_loss, accuracy

In [None]:
def train_model(model, criterion, optimizer):
    model.to(device)

    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        running_correct = 0
        total = 0

        for i, batch in enumerate(trainloader, 0):
            if isinstance(batch, dict):
                batch = to_device(batch, device)
                labels = batch["labels"]
                outputs = get_logits(model, batch)
            else:
                inputs, labels = batch
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)

            optimizer.zero_grad()

            loss = criterion(outputs, labels)
            running_loss += loss.item()

            loss.backward()
            optimizer.step()

            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            running_correct += (predicted == labels).sum().item()

        epoch_accuracy = 100.0 * running_correct / total
        epoch_loss = running_loss / (i + 1)

        test_loss, test_accuracy = evaluate(model, testloader, criterion)

        print(f"Epoch [{(epoch + 1):3}/{epochs:3}] \t "
              f"Loss: {epoch_loss:<11.5f} Accuracy: {epoch_accuracy:.2f}% \t "
              f"Test Loss: {test_loss:<11.5f} Test Accuracy: {test_accuracy:.2f}%")

In [None]:
models = {
    "RNN":  RNNTextClassifier(vocab_size=len(itos), pad_idx=pad_idx),
    "LSTM": LSTMTextClassifier(vocab_size=len(itos), pad_idx=pad_idx),
    "BERT": BERTTextClassifier,
}

In [None]:
criterion = nn.CrossEntropyLoss()
for model_name, model in models.items():
  if model_name == 'RNN':
    print(f"Training {model_name}")
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    train_model(model, criterion, optimizer)

In [None]:
criterion = nn.CrossEntropyLoss()
for model_name, model in models.items():
  if model_name == 'LSTM':
    print(f"Training {model_name}")
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    train_model(model, criterion, optimizer)

In [None]:
criterion = nn.CrossEntropyLoss()
for model_name, model in models.items():
  if model_name == 'BERT':
    print(f"Training {model_name}")
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    train_model(model, criterion, optimizer)