In [17]:
def load_conllu(file_path):
    sentences = []
    sentence = []

    with open(file_path, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()

            if line == "":
                if sentence:
                    sentences.append(sentence)
                    sentence = []
                continue

            if line.startswith("#"):
                continue

            parts = line.split("\t")
            if len(parts) != 10:
                continue

            word = parts[1]
            upos = parts[3]

            sentence.append((word, upos))

    if sentence:
        sentences.append(sentence)

    return sentences

train_data = load_conllu("C:\\Users\\FPTSHOP\\Courses\\NLP\\Lab_05\\data\\UD_English-EWT\\en_ewt-ud-train.conllu")
dev_data = load_conllu("C:\\Users\\FPTSHOP\\Courses\\NLP\\Lab_05\\data\\UD_English-EWT\\en_ewt-ud-dev.conllu")

# --- Build word_to_ix ---
word_to_ix = {"<UNK>": 0}
for sentence in train_data:
    for word, tag in sentence:
        if word not in word_to_ix:
            word_to_ix[word] = len(word_to_ix)

# --- Build tag_to_ix ---
tag_to_ix = {}
for sentence in train_data:
    for word, tag in sentence:
        if tag not in tag_to_ix:
            tag_to_ix[tag] = len(tag_to_ix)

print("Vocabulary size (word_to_ix):", len(word_to_ix))
print("Tag set size (tag_to_ix):", len(tag_to_ix))



Vocabulary size (word_to_ix): 20201
Tag set size (tag_to_ix): 18


In [2]:
import torch
from torch.utils.data import Dataset

class POSDataset(Dataset):
    def __init__(self, sentences, word_to_ix, tag_to_ix):
        self.sentences = sentences
        self.word_to_ix = word_to_ix
        self.tag_to_ix = tag_to_ix

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, idx):
        sentence = self.sentences[idx]

        word_indices = []
        tag_indices = []

        for word, tag in sentence:
            word_idx = self.word_to_ix.get(word, self.word_to_ix["<UNK>"])
            tag_idx = self.tag_to_ix[tag]

            word_indices.append(word_idx)
            tag_indices.append(tag_idx)

        return (
            torch.tensor(word_indices, dtype=torch.long),
            torch.tensor(tag_indices, dtype=torch.long)
        )


In [None]:
from torch.nn.utils.rnn import pad_sequence

def collate_fn(batch):
    """
    batch = [(sentence_tensor, tag_tensor), (...)]
    """
    sentences = [item[0] for item in batch]
    tags = [item[1] for item in batch]


    padded_sentences = pad_sequence(sentences, batch_first=True, padding_value=0)
    padded_tags = pad_sequence(tags, batch_first=True, padding_value=0)

    return padded_sentences, padded_tags


In [4]:
from torch.utils.data import DataLoader

train_dataset = POSDataset(train_data, word_to_ix, tag_to_ix)
dev_dataset   = POSDataset(dev_data, word_to_ix, tag_to_ix)

train_loader = DataLoader(
    train_dataset,
    batch_size=32,
    shuffle=True,
    collate_fn=collate_fn
)

dev_loader = DataLoader(
    dev_dataset,
    batch_size=32,
    shuffle=False,
    collate_fn=collate_fn
)

for x, y in train_loader:
    print("Sentences batch shape:", x.shape)
    print("Tags batch shape:", y.shape)
    break


Sentences batch shape: torch.Size([32, 97])
Tags batch shape: torch.Size([32, 97])


In [18]:
import torch
import torch.nn as nn

class SimpleRNNForTokenClassification(nn.Module):
    def __init__(self, vocab_size, tagset_size, embedding_dim=128, hidden_dim=256):
        super().__init__()

        # 1. Embedding layer
        self.embedding = nn.Embedding(vocab_size, embedding_dim)

        # 2. RNN layer
        self.rnn = nn.RNN(
            input_size=embedding_dim,
            hidden_size=hidden_dim,
            batch_first=True
        )

        # 3. Linear layer
        self.fc = nn.Linear(hidden_dim, tagset_size)

    def forward(self, x):
        """
        x: tensor kích thước (batch_size, seq_len)
        """

        # Step 1: embedding
        embedded = self.embedding(x)

        # Step 2: RNN
        rnn_out, hidden = self.rnn(embedded)

        # Step 3: Ánh xạ sang tagset
        tag_scores = self.fc(rnn_out)

        return tag_scores



In [19]:
def evaluate(model, dataloader):
    model.eval()
    total_correct = 0
    total_tokens = 0

    with torch.no_grad():
        for sentences, gold_tags in dataloader:

            logits = model(sentences)
            # logits -> (B, L, num_tags)

            # Dự đoán nhãn
            pred_tags = torch.argmax(logits, dim=-1)  # (B, L)

            # Tính số token không phải padding
            mask = (gold_tags != 0)

            # So sánh dự đoán với nhãn thật
            correct = (pred_tags == gold_tags) & mask

            total_correct += correct.sum().item()
            total_tokens += mask.sum().item()

    accuracy = total_correct / total_tokens if total_tokens > 0 else 0
    return accuracy


In [20]:
import torch.optim as optim

best_dev_acc = 0
best_model_state = None

model = SimpleRNNForTokenClassification(
    vocab_size=len(word_to_ix),
    tagset_size=len(tag_to_ix),
    embedding_dim=128,
    hidden_dim=256
)

optimizer = optim.Adam(model.parameters(), lr=0.001)

loss_fn = nn.CrossEntropyLoss(ignore_index=0)

num_epochs = 5

for epoch in range(num_epochs):
    model.train()
    total_loss = 0

    for sentences, tags in train_loader:

        optimizer.zero_grad()
        logits = model(sentences)

        B, L, C = logits.shape
        loss = loss_fn(
            logits.reshape(B*L, C),
            tags.reshape(B*L)
        )

        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    # --- Evaluate ---
    train_acc = evaluate(model, train_loader)
    dev_acc = evaluate(model, dev_loader)

    print(f"Epoch {epoch+1}/{num_epochs} | Loss={total_loss/len(train_loader):.4f} "
          f"| Train Acc={train_acc:.4f} | Dev Acc={dev_acc:.4f}")

    # Save best model (early stopping style)
    if dev_acc > best_dev_acc:
        best_dev_acc = dev_acc
        best_model_state = model.state_dict()


Epoch 1/5 | Loss=0.8888 | Train Acc=0.8250 | Dev Acc=0.8032
Epoch 2/5 | Loss=0.4931 | Train Acc=0.8793 | Dev Acc=0.8471
Epoch 3/5 | Loss=0.3640 | Train Acc=0.9067 | Dev Acc=0.8633
Epoch 4/5 | Loss=0.2829 | Train Acc=0.9290 | Dev Acc=0.8709
Epoch 5/5 | Loss=0.2252 | Train Acc=0.9439 | Dev Acc=0.8763


In [21]:
model.load_state_dict(best_model_state)
print(f"Best Dev Accuracy = {best_dev_acc:.4f}")


Best Dev Accuracy = 0.8763


In [22]:
def predict_sentence(model, sentence, word_to_ix, ix_to_tag):
    model.eval()
    words = sentence.split()

    # Convert word -> index
    indices = [
        word_to_ix.get(w, word_to_ix["<UNK>"])
        for w in words
    ]

    tensor = torch.tensor(indices, dtype=torch.long).unsqueeze(0)
    # shape: (1, seq_len)

    with torch.no_grad():
        logits = model(tensor)
        pred = torch.argmax(logits, dim=-1).squeeze(0)

    pred_tags = [ix_to_tag[i.item()] for i in pred]

    return list(zip(words, pred_tags))

ix_to_tag = {v: k for k, v in tag_to_ix.items()}

print(predict_sentence(model,
       "I love NLP",
       word_to_ix, ix_to_tag))


[('I', 'PRON'), ('love', 'VERB'), ('NLP', 'NUM')]
