<a href="https://colab.research.google.com/github/once-upon-an-april/Thuc-Hanh-Deep-Learning-trong-Khoa-Hoc-Du-Lieu-DS201.Q11.1/blob/main/Bai2/22520975_Lab3_Bai3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# ==============================================================================
# BÀI 3: NHẬN DIỆN THỰC THỂ (NER) TRÊN PHO_NER_COVID19 SỬ DỤNG BILSTM
# (5 Lớp BiLSTM, Hidden 256, F1-Score)
# ==============================================================================

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import pandas as pd
import requests
import json
from collections import Counter
from sklearn.metrics import classification_report, f1_score

# Kiểm tra thiết bị
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

In [None]:
# ==============================================================================
# PHẦN 1: TẢI DỮ LIỆU TỪ GITHUB (JSONL FORMAT)
# ==============================================================================

print("\n>>> 1. Đang tải dữ liệu PhoNER_COVID19 từ GitHub...")

def load_data(url):
    response = requests.get(url)
    if response.status_code == 200:
        # File dạng JSON Lines (mỗi dòng là 1 json)
        return [json.loads(line) for line in response.text.strip().split('\n')]
    else:
        raise Exception(f"Không tải được file từ {url}")

urls = {
    "train": "https://raw.githubusercontent.com/VinAIResearch/PhoNER_COVID19/master/data/word/train_word.json",
    "dev": "https://raw.githubusercontent.com/VinAIResearch/PhoNER_COVID19/master/data/word/dev_word.json",
    "test": "https://raw.githubusercontent.com/VinAIResearch/PhoNER_COVID19/master/data/word/test_word.json"
}

train_data = load_data(urls['train'])
dev_data = load_data(urls['dev'])
test_data = load_data(urls['test'])

print(f"Đã tải xong! Train: {len(train_data)}, Dev: {len(dev_data)}, Test: {len(test_data)} câu.")

# Xem mẫu dữ liệu
print("Ví dụ mẫu đầu tiên:")
print(f"Words: {train_data[0]['words'][:5]}...")
print(f"Tags:  {train_data[0]['tags'][:5]}...")

In [None]:
# ==============================================================================
# PHẦN 2: XÂY DỰNG TỪ ĐIỂN (VOCAB & TAGS)
# ==============================================================================

print("\n>>> 2. Xây dựng Vocabulary và Tag Map...")

# 2.1 Xây dựng Word Vocab
all_words = []
for sample in train_data:
    all_words.extend(sample['words'])

word_counts = Counter(all_words)
# Giữ từ xuất hiện >= 2 lần
vocab = sorted([w for w, c in word_counts.items() if c >= 2])

word2idx = {w: i+2 for i, w in enumerate(vocab)}
word2idx['<PAD>'] = 0
word2idx['<UNK>'] = 1

print(f"Kích thước Word Vocab: {len(word2idx)}")

# 2.2 Xây dựng Tag Vocab (Nhãn thực thể)
all_tags = []
for sample in train_data:
    all_tags.extend(sample['tags'])

unique_tags = sorted(list(set(all_tags)))
tag2idx = {t: i+1 for i, t in enumerate(unique_tags)}
tag2idx['<PAD>'] = 0 # Quan trọng: Padding tag phải là 0 để ignore loss

idx2tag = {i: t for t, i in tag2idx.items()}

print(f"Số lượng nhãn (Tags): {len(tag2idx)}")
print(f"Danh sách nhãn: {list(tag2idx.keys())}")

In [None]:
# ==============================================================================
# PHẦN 3: DATASET & DATALOADER
# ==============================================================================

MAX_LEN = 100 # Độ dài cắt/pad cho câu
BATCH_SIZE = 32

class PhoNERDataset(Dataset):
    def __init__(self, data, word2idx, tag2idx, max_len):
        self.data = data
        self.word2idx = word2idx
        self.tag2idx = tag2idx
        self.max_len = max_len

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        words = self.data[idx]['words']
        tags = self.data[idx]['tags']

        # Vectorize Words
        w_idxs = [self.word2idx.get(w, self.word2idx['<UNK>']) for w in words]

        # Vectorize Tags
        t_idxs = [self.tag2idx.get(t, 0) for t in tags] # Fallback 0 is PAD? Should ensure tags exist.

        # Padding / Truncate
        if len(w_idxs) < self.max_len:
            # Pad
            w_idxs = w_idxs + [self.word2idx['<PAD>']] * (self.max_len - len(w_idxs))
            t_idxs = t_idxs + [self.tag2idx['<PAD>']] * (self.max_len - len(t_idxs))
        else:
            # Truncate
            w_idxs = w_idxs[:self.max_len]
            t_idxs = t_idxs[:self.max_len]

        return torch.tensor(w_idxs, dtype=torch.long), torch.tensor(t_idxs, dtype=torch.long)

train_loader = DataLoader(PhoNERDataset(train_data, word2idx, tag2idx, MAX_LEN), batch_size=BATCH_SIZE, shuffle=True)
dev_loader = DataLoader(PhoNERDataset(dev_data, word2idx, tag2idx, MAX_LEN), batch_size=BATCH_SIZE)
test_loader = DataLoader(PhoNERDataset(test_data, word2idx, tag2idx, MAX_LEN), batch_size=BATCH_SIZE)

In [None]:
# ==============================================================================
# PHẦN 4: MÔ HÌNH BiLSTM CHO NER
# ==============================================================================

class BiLSTM_NER(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_size, output_dim, n_layers):
        super(BiLSTM_NER, self).__init__()

        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)

        # Bi-LSTM: bidirectional=True
        self.lstm = nn.LSTM(
            input_size=embedding_dim,
            hidden_size=hidden_size,
            num_layers=n_layers,
            batch_first=True,
            bidirectional=True, # QUAN TRỌNG
            dropout=0.3
        )

        # Output layer
        # Vì là BiLSTM nên hidden state đầu ra sẽ gấp đôi (hidden_size * 2)
        self.fc = nn.Linear(hidden_size * 2, output_dim)

    def forward(self, text):
        # text: [batch, seq_len]
        embedded = self.embedding(text)

        # output: [batch, seq_len, hidden_size * 2]
        output, _ = self.lstm(embedded)

        # Đưa qua lớp Linear để phân loại từng token
        # predictions: [batch, seq_len, output_dim]
        predictions = self.fc(output)

        return predictions

# Khởi tạo mô hình
EMBEDDING_DIM = 100
HIDDEN_SIZE = 256 # Theo đề bài
NUM_LAYERS = 5    # Theo đề bài
OUTPUT_DIM = len(tag2idx) # Số lượng nhãn BIO

model = BiLSTM_NER(len(word2idx), EMBEDDING_DIM, HIDDEN_SIZE, OUTPUT_DIM, NUM_LAYERS)
model = model.to(device)

# Loss function: Cần ignore index 0 (PAD) để không tính loss cho phần đệm
criterion = nn.CrossEntropyLoss(ignore_index=tag2idx['<PAD>'])
optimizer = optim.Adam(model.parameters(), lr=0.001)

print("\n>>> 3. Kiến trúc mô hình BiLSTM-NER:")
print(model)

In [None]:
# ==============================================================================
# PHẦN 5: HUẤN LUYỆN
# ==============================================================================

# Hàm tính F1 Token-level (Bỏ qua PAD)
def evaluate(model, loader):
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for text, tags in loader:
            text, tags = text.to(device), tags.to(device)

            # Forward
            predictions = model(text) # [batch, seq_len, num_tags]

            # Lấy argmax để ra nhãn dự đoán
            preds = torch.argmax(predictions, dim=2) # [batch, seq_len]

            # Flatten để tính metrics, nhưng cần lọc bỏ PAD
            # Chuyển về CPU numpy
            preds = preds.cpu().numpy().flatten()
            tags = tags.cpu().numpy().flatten()

            # Lọc mask: Chỉ lấy những vị trí tag != PAD
            mask = (tags != tag2idx['<PAD>'])

            valid_preds = preds[mask]
            valid_tags = tags[mask]

            all_preds.extend(valid_preds)
            all_labels.extend(valid_tags)

    return f1_score(all_labels, all_preds, average='weighted')

print("\n>>> 4. Bắt đầu huấn luyện...")
EPOCHS = 5 # Demo chạy 5 epochs

for epoch in range(EPOCHS):
    model.train()
    epoch_loss = 0

    for text, tags in train_loader:
        text, tags = text.to(device), tags.to(device)

        optimizer.zero_grad()
        predictions = model(text)

        # Reshape để tính Loss
        # predictions: [batch * seq_len, num_tags]
        # tags: [batch * seq_len]
        loss = criterion(predictions.view(-1, OUTPUT_DIM), tags.view(-1))

        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    val_f1 = evaluate(model, dev_loader)
    print(f"Epoch {epoch+1}/{EPOCHS} | Loss: {epoch_loss/len(train_loader):.4f} | Val F1: {val_f1:.4f}")

In [None]:
# ==============================================================================
# PHẦN 6: ĐÁNH GIÁ TRÊN TEST
# ==============================================================================
print("\n>>> 5. Kết quả chi tiết trên tập TEST:")

model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for text, tags in test_loader:
        text, tags = text.to(device), tags.to(device)
        predictions = model(text)
        preds = torch.argmax(predictions, dim=2)

        preds = preds.cpu().numpy().flatten()
        tags = tags.cpu().numpy().flatten()

        # Lọc bỏ PAD (Chỉ đánh giá trên các token thực)
        mask = (tags != tag2idx['<PAD>'])
        all_preds.extend(preds[mask])
        all_labels.extend(tags[mask])

# Map từ index về tên nhãn để in report cho dễ hiểu
# Lấy tập hợp tất cả các nhãn xuất hiện trong dữ liệu thực tế VÀ dự đoán
present_labels = sorted(list(set(all_labels) | set(all_preds)))
target_names = [idx2tag.get(i, f"UNK_{i}") for i in present_labels]

# zero_division=0: Gán điểm 0 cho các nhãn model không dự đoán được thay vì báo lỗi/warning
print(classification_report(all_labels, all_preds, labels=present_labels, target_names=target_names, digits=4, zero_division=0))