# NLP Lab 5: RNNs for Text and Token Classification

**Mục tiêu:** Tìm hiểu và ứng dụng Mạng Nơ-ron Hồi quy (RNN/LSTM) cho các bài toán phân loại văn bản và phân loại token.

**Nội dung:**
1. Part 1: Làm quen với PyTorch (15%)
2. Part 2: Phân loại văn bản với RNN/LSTM (35%)
3. Part 3: Part-of-Speech Tagging với RNN (25%)
4. Part 4: Named Entity Recognition với RNN (25%)

In [None]:
# Import thư viện cần thiết
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '../')))

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence

---
## Part 1: Làm quen với PyTorch

Tensor là cấu trúc dữ liệu cốt lõi của PyTorch, tương tự ndarray của NumPy nhưng có thêm khả năng chạy trên GPU và tự động tính đạo hàm.

In [None]:
print("--- Task 1.1: Tạo Tensor ---")
# Tạo tensor từ list
data = [[1, 2], [3, 4]]
x_data = torch.tensor(data)
print(f"Tensor từ list:\n {x_data}\n")

# Tạo tensor từ NumPy array
np_array = np.array(data)
x_np = torch.from_numpy(np_array)
print(f"Tensor từ NumPy array:\n {x_np}\n")

# Tạo tensor với các giá trị ngẫu nhiên hoặc hằng số
x_ones = torch.ones_like(x_data)
print(f"Ones Tensor:\n {x_ones}\n")
x_rand = torch.rand_like(x_data, dtype=torch.float)
print(f"Random Tensor:\n {x_rand}\n")

# In ra shape, dtype, và device của tensor
print(f"Shape của tensor: {x_rand.shape}")
print(f"Datatype của tensor: {x_rand.dtype}")
print(f"Device lưu trữ tensor: {x_rand.device}\n")

In [None]:
print("--- Task 1.2: Các phép toán trên Tensor ---")
print(f"x_data + x_data:\n {x_data + x_data}\n")
print(f"x_data * 5:\n {x_data * 5}\n")
print(f"x_data @ x_data.T:\n {x_data @ x_data.T}\n")

print("--- Task 1.3: Indexing và Slicing ---")
print(f"Hàng đầu tiên: {x_data[0]}")
print(f"Cột thứ hai: {x_data[:, 1]}")
print(f"Giá trị [1, 1]: {x_data[1, 1]}\n")

print("--- Task 1.4: Thay đổi hình dạng Tensor ---")
rand_tensor = torch.rand(4, 4)
print(f"Tensor gốc (4, 4):\n {rand_tensor}\n")
reshaped_tensor = rand_tensor.view(16, 1)
print(f"Tensor đã reshape (16, 1):\n {reshaped_tensor}\n")

In [None]:
print("--- Phần 2: Tự động tính Đạo hàm với autograd ---")
x = torch.ones(1, requires_grad=True)
y = x + 2
z = y * y * 3
z.backward()
print(f"Đạo hàm dz/dx khi x=1: {x.grad}")

**Câu hỏi:** Chuyện gì xảy ra nếu bạn gọi z.backward() một lần nữa? Tại sao?

> **Trả lời:** Sẽ xảy ra lỗi 'RuntimeError: Trying to backward through the graph a second time...'. Theo mặc định, PyTorch sẽ xóa đồ thị tính toán (computation graph) ngay sau khi thực hiện lan truyền ngược (.backward()) để tiết kiệm bộ nhớ. Nếu muốn tính đạo hàm lại, ta cần chỉ định `retain_graph=True`.

In [None]:
print("--- Phần 3: Xây dựng Mô hình với torch.nn ---")

class MyFirstModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim):
        super(MyFirstModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.linear = nn.Linear(embedding_dim, hidden_dim)
        self.activation = nn.ReLU()
        self.output_layer = nn.Linear(hidden_dim, output_dim)

    def forward(self, indices):
        embeds = self.embedding(indices)
        pooled = embeds.mean(dim=1)
        hidden = self.activation(self.linear(pooled))
        output = self.output_layer(hidden)
        return output

model = MyFirstModel(vocab_size=100, embedding_dim=16, hidden_dim=8, output_dim=2)
input_data = torch.LongTensor([[1, 2, 5, 9]])
output_data = model(input_data)
print(f"Model input shape: {input_data.shape}")
print(f"Model output shape: {output_data.shape}")
print(f"Model output:\n {output_data}")

---
## Part 2: Phân loại văn bản với RNN/LSTM

Sử dụng bộ dữ liệu HWU64 để phân loại ý định người dùng.

In [None]:
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report, f1_score

# Tải dữ liệu HWU
df_train = pd.read_csv('../data/lab5/HWU/train.csv')
df_val = pd.read_csv('../data/lab5/HWU/val.csv')
df_test = pd.read_csv('../data/lab5/HWU/test.csv')

# Tiền xử lý nhãn
label_encoder = LabelEncoder()
df_train['intent_encoded'] = label_encoder.fit_transform(df_train['category'])
df_val['intent_encoded'] = label_encoder.transform(df_val['category'])
df_test['intent_encoded'] = label_encoder.transform(df_test['category'])

X_train, y_train = df_train['text'], df_train['intent_encoded']
X_val, y_val = df_val['text'], df_val['intent_encoded']
X_test, y_test = df_test['text'], df_test['intent_encoded']

print(f"Train: {len(X_train)}, Val: {len(X_val)}, Test: {len(X_test)}")
print(f"Số lượng intent: {len(label_encoder.classes_)}")

### Task 1: Baseline 1 (TF-IDF + Logistic Regression)

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import make_pipeline

tfidf_lr_pipeline = make_pipeline(
    TfidfVectorizer(max_features=5000),
    LogisticRegression(max_iter=1000, random_state=42)
)
tfidf_lr_pipeline.fit(X_train, y_train)

y_pred_tfidf = tfidf_lr_pipeline.predict(X_test)
print("--- Kết quả TF-IDF + Logistic Regression ---")
print(classification_report(y_test, y_pred_tfidf, zero_division=0))
f1_tfidf = f1_score(y_test, y_pred_tfidf, average='macro')
print(f"Macro F1-score: {f1_tfidf:.4f}")

### Task 2: Baseline 2 (Word2Vec Trung bình + Dense Layer)

In [None]:
from gensim.models import Word2Vec
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout

# Huấn luyện Word2Vec
sentences = [text.split() for text in X_train]
w2v_model = Word2Vec(sentences, vector_size=100, window=5, min_count=1, workers=4)

def sentence_to_avg_vector(text, model):
    words = text.split()
    word_vectors = [model.wv[word] for word in words if word in model.wv]
    if not word_vectors:
        return np.zeros(model.vector_size)
    return np.mean(word_vectors, axis=0)

X_train_avg = np.array([sentence_to_avg_vector(text, w2v_model) for text in X_train])
X_val_avg = np.array([sentence_to_avg_vector(text, w2v_model) for text in X_val])
X_test_avg = np.array([sentence_to_avg_vector(text, w2v_model) for text in X_test])
num_classes = len(label_encoder.classes_)

w2v_dense_model = Sequential([
    Dense(128, activation='relu', input_shape=(w2v_model.vector_size,)),
    Dropout(0.5),
    Dense(num_classes, activation='softmax')
])
w2v_dense_model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
w2v_dense_model.fit(X_train_avg, y_train, epochs=10, validation_data=(X_val_avg, y_val), batch_size=32, verbose=0)

y_pred_w2v = np.argmax(w2v_dense_model.predict(X_test_avg), axis=1)
print("--- Kết quả Word2Vec (Avg) + Dense Layer ---")
print(classification_report(y_test, y_pred_w2v, zero_division=0))
f1_w2v = f1_score(y_test, y_pred_w2v, average='macro')
print(f"Macro F1-score: {f1_w2v:.4f}")

### Task 3 & 4: Mô hình LSTM (Pre-trained vs Scratch)

In [None]:
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.layers import Embedding, LSTM
from tensorflow.keras.callbacks import EarlyStopping

# Tiền xử lý cho mô hình chuỗi
vocab_size = 10000
tokenizer = Tokenizer(num_words=vocab_size, oov_token='<UNK>')
tokenizer.fit_on_texts(X_train)

max_len = 50
X_train_pad = pad_sequences(tokenizer.texts_to_sequences(X_train), maxlen=max_len, padding='post')
X_val_pad = pad_sequences(tokenizer.texts_to_sequences(X_val), maxlen=max_len, padding='post')
X_test_pad = pad_sequences(tokenizer.texts_to_sequences(X_test), maxlen=max_len, padding='post')

# Tạo ma trận embedding từ Word2Vec
embedding_dim = w2v_model.vector_size
embedding_matrix = np.zeros((vocab_size, embedding_dim))
for word, i in tokenizer.word_index.items():
    if i < vocab_size and word in w2v_model.wv:
        embedding_matrix[i] = w2v_model.wv[word]

In [None]:
# LSTM với Pre-trained Embeddings
lstm_pretrained = Sequential([
    Embedding(vocab_size, embedding_dim, weights=[embedding_matrix], input_length=max_len, trainable=False),
    LSTM(128, dropout=0.2, recurrent_dropout=0.2),
    Dense(num_classes, activation='softmax')
])
lstm_pretrained.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)
print("--- Huấn luyện LSTM (Pre-trained) ---")
lstm_pretrained.fit(X_train_pad, y_train, epochs=15, validation_data=(X_val_pad, y_val), 
                    batch_size=32, callbacks=[early_stopping], verbose=1)

y_pred_lstm_pre = np.argmax(lstm_pretrained.predict(X_test_pad), axis=1)
f1_lstm_pre = f1_score(y_test, y_pred_lstm_pre, average='macro')
print(f"LSTM Pre-trained Macro F1: {f1_lstm_pre:.4f}")

In [None]:
# LSTM học từ đầu (Scratch)
lstm_scratch = Sequential([
    Embedding(vocab_size, 100, input_length=max_len, trainable=True),
    LSTM(128, dropout=0.2, recurrent_dropout=0.2),
    Dense(num_classes, activation='softmax')
])
lstm_scratch.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

print("--- Huấn luyện LSTM (Scratch) ---")
lstm_scratch.fit(X_train_pad, y_train, epochs=15, validation_data=(X_val_pad, y_val),
                 batch_size=32, callbacks=[early_stopping], verbose=1)

y_pred_lstm_scratch = np.argmax(lstm_scratch.predict(X_test_pad), axis=1)
f1_lstm_scratch = f1_score(y_test, y_pred_lstm_scratch, average='macro')
print(f"LSTM Scratch Macro F1: {f1_lstm_scratch:.4f}")

### Task 5: So sánh và Phân tích

In [None]:
print("=== SO SÁNH KẾT QUẢ ===")
print(f"TF-IDF + LR:        F1 = {f1_tfidf:.4f}")
print(f"Word2Vec + Dense:   F1 = {f1_w2v:.4f}")
print(f"LSTM Pre-trained:   F1 = {f1_lstm_pre:.4f}")
print(f"LSTM Scratch:       F1 = {f1_lstm_scratch:.4f}")

**Phân tích:**
- **TF-IDF + LR**: Baseline mạnh, nhanh, nhưng bỏ qua thứ tự từ.
- **Word2Vec + Dense**: Kết quả kém do mất thông tin ngữ cảnh khi lấy trung bình.
- **LSTM**: Hiểu được ngữ cảnh và thứ tự từ, đặc biệt hiệu quả với câu có phủ định.

**Ví dụ minh họa:**
- Câu: "can you remind me to not call my mom" (intent: reminder_create)
- Baseline có thể nhầm do từ "call", nhưng LSTM hiểu "not" phủ định hành động.

---
## Part 3: Part-of-Speech Tagging với RNN

Sử dụng bộ dữ liệu UD_English-EWT để gán nhãn từ loại.

In [None]:
import json

def load_jsonl(file_path):
    sentences = []
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            data = json.loads(line.strip())
            sentence = list(zip(data['words'], data['tags']))
            sentences.append(sentence)
    return sentences

train_sents = load_jsonl('../data/lab5/UD_English-EWT/en_ewt-ud-train.jsonl')
dev_sents = load_jsonl('../data/lab5/UD_English-EWT/en_ewt-ud-dev.jsonl')

# Xây dựng từ điển
word_to_ix = {'<PAD>': 0, '<UNK>': 1}
tag_to_ix = {'<PAD>': 0}

for sent in train_sents:
    for word, tag in sent:
        if word not in word_to_ix:
            word_to_ix[word] = len(word_to_ix)
        if tag not in tag_to_ix:
            tag_to_ix[tag] = len(tag_to_ix)

print(f"Kích thước từ điển từ: {len(word_to_ix)}")
print(f"Kích thước từ điển nhãn: {len(tag_to_ix)}")

In [None]:
class POSDataset(Dataset):
    def __init__(self, sents, word_to_ix, tag_to_ix):
        self.sents = sents
        self.word_to_ix = word_to_ix
        self.tag_to_ix = tag_to_ix

    def __len__(self):
        return len(self.sents)

    def __getitem__(self, idx):
        sent = self.sents[idx]
        word_indices = [self.word_to_ix.get(w, self.word_to_ix['<UNK>']) for w, t in sent]
        tag_indices = [self.tag_to_ix.get(t) for w, t in sent]
        return torch.tensor(word_indices), torch.tensor(tag_indices)

def collate_fn(batch):
    sentences, tags = zip(*batch)
    sentences_padded = pad_sequence(sentences, batch_first=True, padding_value=word_to_ix['<PAD>'])
    tags_padded = pad_sequence(tags, batch_first=True, padding_value=tag_to_ix['<PAD>'])
    return sentences_padded, tags_padded

train_dataset = POSDataset(train_sents, word_to_ix, tag_to_ix)
dev_dataset = POSDataset(dev_sents, word_to_ix, tag_to_ix)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)
dev_loader = DataLoader(dev_dataset, batch_size=32, collate_fn=collate_fn)

In [None]:
class SimpleRNNForPOS(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, tagset_size):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.rnn = nn.RNN(embedding_dim, hidden_dim, batch_first=True)
        self.linear = nn.Linear(hidden_dim, tagset_size)

    def forward(self, sentence):
        embeds = self.embedding(sentence)
        rnn_out, _ = self.rnn(embeds)
        tag_scores = self.linear(rnn_out)
        return tag_scores

EMBEDDING_DIM = 64
HIDDEN_DIM = 64
pos_model = SimpleRNNForPOS(len(word_to_ix), EMBEDDING_DIM, HIDDEN_DIM, len(tag_to_ix))
loss_function = nn.CrossEntropyLoss(ignore_index=tag_to_ix['<PAD>'])
optimizer = optim.Adam(pos_model.parameters(), lr=0.001)

In [None]:
# Huấn luyện
EPOCHS = 20
for epoch in range(EPOCHS):
    pos_model.train()
    total_loss = 0
    for sentence_in, tags_in in train_loader:
        optimizer.zero_grad()
        tag_scores = pos_model(sentence_in)
        loss = loss_function(tag_scores.view(-1, len(tag_to_ix)), tags_in.view(-1))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    if (epoch+1) % 5 == 0:
        print(f"Epoch {epoch+1}/{EPOCHS}, Loss: {total_loss / len(train_loader):.4f}")

In [None]:
def evaluate_pos(model, loader):
    model.eval()
    correct, total = 0, 0
    with torch.no_grad():
        for sentence_in, tags_in in loader:
            tag_scores = model(sentence_in)
            preds = torch.argmax(tag_scores, dim=2)
            mask = (tags_in != tag_to_ix['<PAD>'])
            correct += (preds[mask] == tags_in[mask]).sum().item()
            total += mask.sum().item()
    return correct / total * 100 if total > 0 else 0

def predict_pos(sentence_str):
    pos_model.eval()
    words = sentence_str.split()
    word_indices = [word_to_ix.get(w, word_to_ix['<UNK>']) for w in words]
    sent_tensor = torch.LongTensor([word_indices])
    with torch.no_grad():
        tag_scores = pos_model(sent_tensor)
        pred_indices = torch.argmax(tag_scores, dim=2).squeeze(0).tolist()
    ix_to_tag = {i: t for t, i in tag_to_ix.items()}
    return ' '.join([f"{w}/{ix_to_tag[i]}" for w, i in zip(words, pred_indices)])

print(f"\nĐộ chính xác trên tập dev: {evaluate_pos(pos_model, dev_loader):.2f}%")
print(f"\nVí dụ: {predict_pos('I love NLP')}")
print(f"Ví dụ: {predict_pos('From the AP comes this story')}")

---
## Part 4: Named Entity Recognition với RNN

Sử dụng bộ dữ liệu CoNLL-2003 để nhận dạng thực thể tên.

In [None]:
from datasets import load_dataset

# Tải dữ liệu CoNLL-2003
dataset = load_dataset('conll2003', trust_remote_code=True)

# Lấy ánh xạ nhãn
ner_labels = dataset['train'].features['ner_tags'].feature.names
print(f"NER Labels: {ner_labels}")

# Trích xuất dữ liệu
train_tokens = dataset['train']['tokens']
train_ner_tags = [[ner_labels[t] for t in tags] for tags in dataset['train']['ner_tags']]
val_tokens = dataset['validation']['tokens']
val_ner_tags = [[ner_labels[t] for t in tags] for tags in dataset['validation']['ner_tags']]

print(f"Train: {len(train_tokens)} câu, Val: {len(val_tokens)} câu")

In [None]:
# Xây dựng từ điển cho NER
ner_word_to_ix = {'<PAD>': 0, '<UNK>': 1}
ner_tag_to_ix = {'<PAD>': 0}

for tokens in train_tokens:
    for word in tokens:
        if word not in ner_word_to_ix:
            ner_word_to_ix[word] = len(ner_word_to_ix)

for tag in ner_labels:
    if tag not in ner_tag_to_ix:
        ner_tag_to_ix[tag] = len(ner_tag_to_ix)

print(f"Vocab size: {len(ner_word_to_ix)}, Tag size: {len(ner_tag_to_ix)}")

In [None]:
class NERDataset(Dataset):
    def __init__(self, tokens_list, tags_list, word_to_ix, tag_to_ix):
        self.tokens_list = tokens_list
        self.tags_list = tags_list
        self.word_to_ix = word_to_ix
        self.tag_to_ix = tag_to_ix

    def __len__(self):
        return len(self.tokens_list)

    def __getitem__(self, idx):
        tokens = self.tokens_list[idx]
        tags = self.tags_list[idx]
        word_indices = [self.word_to_ix.get(w, self.word_to_ix['<UNK>']) for w in tokens]
        tag_indices = [self.tag_to_ix.get(t, 0) for t in tags]
        return torch.tensor(word_indices), torch.tensor(tag_indices)

def ner_collate_fn(batch):
    sentences, tags = zip(*batch)
    sentences_padded = pad_sequence(sentences, batch_first=True, padding_value=ner_word_to_ix['<PAD>'])
    tags_padded = pad_sequence(tags, batch_first=True, padding_value=ner_tag_to_ix['<PAD>'])
    return sentences_padded, tags_padded

ner_train_dataset = NERDataset(train_tokens, train_ner_tags, ner_word_to_ix, ner_tag_to_ix)
ner_val_dataset = NERDataset(val_tokens, val_ner_tags, ner_word_to_ix, ner_tag_to_ix)
ner_train_loader = DataLoader(ner_train_dataset, batch_size=32, shuffle=True, collate_fn=ner_collate_fn)
ner_val_loader = DataLoader(ner_val_dataset, batch_size=32, collate_fn=ner_collate_fn)

In [None]:
class BiLSTMForNER(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, tagset_size):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True, bidirectional=True)
        self.linear = nn.Linear(hidden_dim * 2, tagset_size)

    def forward(self, sentence):
        embeds = self.embedding(sentence)
        lstm_out, _ = self.lstm(embeds)
        tag_scores = self.linear(lstm_out)
        return tag_scores

NER_EMBEDDING_DIM = 100
NER_HIDDEN_DIM = 128
ner_model = BiLSTMForNER(len(ner_word_to_ix), NER_EMBEDDING_DIM, NER_HIDDEN_DIM, len(ner_tag_to_ix))
ner_loss_fn = nn.CrossEntropyLoss(ignore_index=ner_tag_to_ix['<PAD>'])
ner_optimizer = optim.Adam(ner_model.parameters(), lr=0.001)

In [None]:
# Huấn luyện NER
NER_EPOCHS = 5
for epoch in range(NER_EPOCHS):
    ner_model.train()
    total_loss = 0
    for sentence_in, tags_in in ner_train_loader:
        ner_optimizer.zero_grad()
        tag_scores = ner_model(sentence_in)
        loss = ner_loss_fn(tag_scores.view(-1, len(ner_tag_to_ix)), tags_in.view(-1))
        loss.backward()
        ner_optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1}/{NER_EPOCHS}, Loss: {total_loss / len(ner_train_loader):.4f}")

In [None]:
def evaluate_ner(model, loader):
    model.eval()
    correct, total = 0, 0
    with torch.no_grad():
        for sentence_in, tags_in in loader:
            tag_scores = model(sentence_in)
            preds = torch.argmax(tag_scores, dim=2)
            mask = (tags_in != ner_tag_to_ix['<PAD>'])
            correct += (preds[mask] == tags_in[mask]).sum().item()
            total += mask.sum().item()
    return correct / total * 100 if total > 0 else 0

def predict_ner(sentence_str):
    ner_model.eval()
    words = sentence_str.split()
    word_indices = [ner_word_to_ix.get(w, ner_word_to_ix['<UNK>']) for w in words]
    sent_tensor = torch.LongTensor([word_indices])
    with torch.no_grad():
        tag_scores = ner_model(sent_tensor)
        pred_indices = torch.argmax(tag_scores, dim=2).squeeze(0).tolist()
    ix_to_tag = {i: t for t, i in ner_tag_to_ix.items()}
    return ' '.join([f"{w}/{ix_to_tag[i]}" for w, i in zip(words, pred_indices)])

print(f"\nĐộ chính xác NER trên tập validation: {evaluate_ner(ner_model, ner_val_loader):.2f}%")
print(f"\nVí dụ: {predict_ner('John lives in New York')}")
print(f"Ví dụ: {predict_ner('Apple Inc is based in California')}")

---
## Tổng kết

**Kết quả đạt được:**
- Part 1: Làm quen với PyTorch (Tensor, autograd, nn.Module)
- Part 2: So sánh 4 mô hình phân loại văn bản (TF-IDF, Word2Vec, LSTM)
- Part 3: POS Tagging với RNN đạt ~86% accuracy
- Part 4: NER với Bi-LSTM

**Khó khăn và Giải pháp:**
1. Padding sequences: Sử dụng `pad_sequence` và `ignore_index` trong loss
2. OOV words: Thêm token `<UNK>` vào vocabulary
3. Vanishing gradient: Sử dụng LSTM thay vì RNN cơ bản