In [3]:
import numpy as np
import torch
from torch import nn, optim
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence
from sklearn.metrics import precision_recall_fscore_support
from collections import defaultdict
import random
import numpy as np
from sklearn.metrics import f1_score, precision_score, recall_score
from prepro import *

In [11]:
from Model.biGRU import *
from Model.biLSTM import *
from Model.uniGRU import *
from Model.uniLSTM import *
from Model.CNN_biGRU import * 
from Model.CNN_biLSTM import *

done


## Load data

In [7]:
# Đọc dữ liệu
trainSentences = readfile("input/train.txt")
devSentences = readfile("input/dev.txt")
testSentences = readfile("input/test.txt")

# Thêm thông tin ký tự
trainSentences = addCharInformation(trainSentences)
devSentences = addCharInformation(devSentences)
testSentences = addCharInformation(testSentences)

## Build wordEmbeddings, caseEmbeddings,..

In [None]:
# UNIQUE LABELS VÀ WORDS
uni_labels = set()
uni_words = {}
for dataset in [trainSentences, devSentences, testSentences]:
    for sentence in dataset:
        for token, char, label in sentence:
            uni_labels.add(label)
            uni_words[token.lower()] = True

label2Idx = {tag: index for index, tag in enumerate(sorted(uni_labels))}
idx2Label = {v: k for k, v in label2Idx.items()}

# case2Index và caseEmbeddings
case2Idx = {
    'numeric': 0, 
    'allLower': 1, 
    'allUpper': 2, 
    'initialUpper': 3, 
    'other': 4, 
    'mainly_numeric': 5, 
    'contains_digit': 6, 
    'PADDING_TOKEN':7
}
caseEmbeddings = np.identity(len(case2Idx), dtype='float32')

# char2Index
char2Idx = {"PADDING":0, "UNKNOWN":1}
for c in " 0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ.,-_()[]{}!?:;#'\"/\\%$`&=*+@^~|":
    char2Idx[c] = len(char2Idx)

# word2Index và wordEmbeddings
word2Idx = {}
wordEmbeddings = []
fEmbeddings = open("/kaggle/input/ner-data/glove.6B.100d.txt", encoding="utf-8")
for line in fEmbeddings:
    split = line.strip().split(" ")
    word = split[0]
    
    if len(word2Idx) == 0:  # Add padding + unknown
        word2Idx["PADDING_TOKEN"] = len(word2Idx)
        vector = np.zeros(len(split)-1)  # Zero vector cho 'PADDING' word
        wordEmbeddings.append(vector)
        
        word2Idx["UNKNOWN_TOKEN"] = len(word2Idx)
        vector = np.random.uniform(-0.25, 0.25, len(split)-1)
        wordEmbeddings.append(vector)

    if split[0].lower() in uni_words:
        vector = np.array([float(num) for num in split[1:]])
        wordEmbeddings.append(vector)
        word2Idx[split[0]] = len(word2Idx)
        
fEmbeddings.close()
wordEmbeddings = np.array(wordEmbeddings)

# Tạo ma trận dữ liệu
train_set = createMatrices(trainSentences, word2Idx, label2Idx, case2Idx, char2Idx)
dev_set = createMatrices(devSentences, word2Idx, label2Idx, case2Idx, char2Idx)
test_set = createMatrices(testSentences, word2Idx, label2Idx, case2Idx, char2Idx)

# Tạo batch
train_batch, train_batch_len = createBatches(train_set)
dev_batch, dev_batch_len = createBatches(dev_set)
test_batch, test_batch_len = createBatches(test_set)

## Mini-batch splitting

In [None]:
def collate_fn(batch):
    tokens, casing, chars, labels = zip(*batch)
    
    tokens = torch.stack([torch.tensor(t, dtype=torch.long) for t in tokens], dim=0)
    casing = torch.stack([torch.tensor(c, dtype=torch.long) for c in casing], dim=0)
    chars = torch.stack([torch.tensor(ch, dtype=torch.long) for ch in chars], dim=0)
    labels = torch.stack([torch.tensor(l, dtype=torch.long) for l in labels], dim=0)
    
    # return tokens, labels
    return tokens, casing, chars, labels


def create_dataloader(batch, batch_len, batch_size=32, shuffle=True):
    # Chia data thành các batch nhỏ
    data = batch
    data_len = batch_len
    loaders = []
    start = 0
    for end in data_len:
        batch_data = data[start:end]
        start = end
        if shuffle:
            random.shuffle(batch_data)
        # Tạo DataLoader cho từng batch
        tokens, casing, chars, labels = zip(*batch_data)
        dataset = list(zip(tokens, casing, chars, labels))
        loader = DataLoader(dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)
        loaders.append(loader)
    return loaders


batch_size = 32  
train_loaders = create_dataloader(train_batch, train_batch_len, batch_size=batch_size, shuffle=True)
dev_loaders = create_dataloader(dev_batch, dev_batch_len, batch_size=batch_size, shuffle=False)
test_loaders = create_dataloader(test_batch, test_batch_len, batch_size=batch_size, shuffle=False)

## Train and Evaluation

In [None]:
def evaluate_model(model, data_loaders, using_char):
    model.eval()
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for loader in data_loaders:
            for batch in loader:
                tokens, cases, chars, labels = batch
                tokens = tokens.to(model.device)
                cases  = cases.to(model.device)
                chars  = chars.to(model.device)
                labels = labels.to(model.device)
                
                if using_char: # IF CNN
                    outputs = model(tokens, cases, chars)  # [batch_size, seq_len, num_labels]
                else:
                    outputs = model(tokens)                # [batch_size, seq_len, num_labels]
                
                preds = torch.argmax(outputs, dim=-1)  # [batch_size, seq_len]
                all_preds.extend(preds.cpu().numpy().tolist())
                all_labels.extend(labels.cpu().numpy().tolist())
    
    y_pred = []
    y_true = []
    for pred_seq, true_seq in zip(all_preds, all_labels):
        for p, t in zip(pred_seq, true_seq):
                y_pred.append(p)
                y_true.append(t)
    
    # Tính macro F1-score
    f1 = f1_score(y_true, y_pred, average='macro')
    precision = precision_score(y_true, y_pred, average='macro')
    recall = recall_score(y_true, y_pred, average='macro')
    
    return precision, recall, f1

In [None]:
def train_model(model, train_loaders, dev_loaders, optimizer, criterion,epochs, using_char):
    for epoch in range(epochs):
        batch_count = 0
        model.train()
        total_loss = 0
        for loader in train_loaders:
            for batch in loader:
                tokens, cases, chars, labels = batch
                tokens = tokens.to(model.device)
                cases  = cases.to(model.device)
                chars  = chars.to(model.device)
                labels = labels.to(model.device)

                optimizer.zero_grad()
                if using_char:
                    outputs = model(tokens, cases, chars)  # [batch_size, seq_len, num_labels]
                else:
                    outputs = model(tokens)  # [batch_size, seq_len, num_labels]
                outputs = outputs.view(-1, outputs.shape[-1])  # [batch_size*seq_len, num_labels]
                labels = labels.view(-1)  # [batch_size*seq_len]
                
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()
    
                total_loss += loss.item()
                batch_count += 1
                
        
        avg_loss = total_loss / batch_count
        print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}")
        
        # Đánh giá trên tập train
        train_prec, train_rec, train_f1 = evaluate_model(model, train_loaders, using_char)
        print(f"Train Precision: {train_prec:.4f}, Recall: {train_rec:.4f}, F1: {train_f1:.4f}")
        
        # Đánh giá trên tập dev
        dev_prec, dev_rec, dev_f1 = evaluate_model(model, dev_loaders, using_char)
        print(f"Dev Precision: {dev_prec:.4f}, Recall: {dev_rec:.4f}, F1: {dev_f1:.4f}\n")
        
    print("Training finished.")

## Define model

In [None]:
# Chuyển mô hình sang device (GPU hoặc CPU)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

vocab_size=wordEmbeddings.shape[0] 
word_emb_dim=wordEmbeddings.shape[1] 
word_embeddings=wordEmbeddings
case_emb_dim=len(case2Idx)
case_embeddings=caseEmbeddings 
char_size=len(char2Idx)
char_emb_dim=30
conv_out_channels=30
conv_kernel_size=3
lstm_hidden_size=200 
num_labels=len(label2Idx)
dropout=0.5 
dropout_recurrent=0.25

# Khởi tạo mô hình (sử dụng dữ liệu embedding đã cho)
model = CNN_biLSTM(
    vocab_size=vocab_size, 
    word_emb_dim=word_emb_dim, 
    word_embeddings=word_embeddings, 
    case_emb_dim=case_emb_dim,
    case_embeddings=case_embeddings, 
    char_size=char_size, 
    char_emb_dim=char_emb_dim, 
    conv_out_channels=conv_out_channels, 
    conv_kernel_size=conv_kernel_size, 
    lstm_hidden_size=lstm_hidden_size, 
    num_labels=num_labels,
    dropout=0.5, 
    dropout_recurrent=0.25,
    device=device
)
model.to(device)

# Khởi tạo Optimizer và Loss function
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()  # Loss cho classification

# Huấn luyện mô hình
train_model(model, train_loaders, dev_loaders, optimizer, criterion, epochs=10, using_char=True)
