# Import Necessary Libraries
Import libraries such as torch, torch.nn, torch.optim, spacy, and pyvi.

In [3]:
# Import necessary libraries
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset

import spacy
from pyvi import ViTokenizer
import re
import random
import math
import time

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cuda


# Load and Prepare the Dataset
Download and load the IWSLT 2015 TED Talks dataset for Vietnamese-English translation.

In [5]:
# Load and Prepare the Dataset

# Download the dataset from IWSLT 2015 TED Talks
!wget https://wit3.fbk.eu/archive/2015-01/texts/vi/en/vi-en.tgz
!tar -xvzf vi-en.tgz

# Load the dataset
vi_en_data = []
with open('iwslt-vi-en/train.vi', 'r', encoding='utf-8') as vi_file, open('iwslt-vi-en/train.en', 'r', encoding='utf-8') as en_file:
    for vi_line, en_line in zip(vi_file, en_file):
        vi_en_data.append((vi_line.strip(), en_line.strip()))

print(f'Total number of sentence pairs: {len(vi_en_data)}')

'wget' is not recognized as an internal or external command,
operable program or batch file.
tar: Error opening archive: Failed to open 'vi-en.tgz'


FileNotFoundError: [Errno 2] No such file or directory: 'iwslt-vi-en/train.vi'

# Text Normalization and Tokenization
Normalize and tokenize the text using spacy for English and pyvi for Vietnamese.

In [None]:
# Load language models
spacy_en = spacy.load('en_core_web_sm')

def tokenize_en(text):
    return [tok.text for tok in spacy_en.tokenizer(text)]

def tokenize_vi(text):
    return ViTokenizer.tokenize(text).split()

# Example of tokenization
print('English:', tokenize_en('This is a test sentence.'))
print('Vietnamese:', tokenize_vi('Đây là một câu kiểm tra.'))

# Dataset Preparation
Split the dataset into training, validation, and test sets.

In [None]:
# Split the dataset
random.shuffle(vi_en_data)

train_data = vi_en_data[:int(0.8*len(vi_en_data))]
valid_data = vi_en_data[int(0.8*len(vi_en_data)):int(0.9*len(vi_en_data))]
test_data = vi_en_data[int(0.9*len(vi_en_data)):]

# Vocabulary
Create vocabulary for both languages.

In [None]:
from collections import Counter

def build_vocab(tokenizer, data, min_freq):
    counter = Counter()
    for sentence in data:
        tokens = tokenizer(sentence)
        counter.update(tokens)
    return {word: idx for idx, (word, freq) in enumerate(counter.items()) if freq >= min_freq}

SRC_VOCAB = build_vocab(tokenize_vi, [sentence[0] for sentence in train_data], min_freq=2)
TRG_VOCAB = build_vocab(tokenize_en, [sentence[1] for sentence in train_data], min_freq=2)

print(f'Size of Vietnamese vocabulary: {len(SRC_VOCAB)}')
print(f'Size of English vocabulary: {len(TRG_VOCAB)}')

# Dataloader
Create DataLoader objects for the training, validation, and test sets.

In [None]:
class TranslationDataset(Dataset):
    def __init__(self, data, src_vocab, trg_vocab, src_tokenizer, trg_tokenizer):
        self.data = data
        self.src_vocab = src_vocab
        self.trg_vocab = trg_vocab
        self.src_tokenizer = src_tokenizer
        self.trg_tokenizer = trg_tokenizer
        
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        src_sentence, trg_sentence = self.data[idx]
        src_tokens = [self.src_vocab.get(token, 0) for token in self.src_tokenizer(src_sentence)]
        trg_tokens = [self.trg_vocab.get(token, 0) for token in self.trg_tokenizer(trg_sentence)]
        return torch.tensor(src_tokens), torch.tensor(trg_tokens)

train_dataset = TranslationDataset(train_data, SRC_VOCAB, TRG_VOCAB, tokenize_vi, tokenize_en)
valid_dataset = TranslationDataset(valid_data, SRC_VOCAB, TRG_VOCAB, tokenize_vi, tokenize_en)
test_dataset = TranslationDataset(test_data, SRC_VOCAB, TRG_VOCAB, tokenize_vi, tokenize_en)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, collate_fn=lambda x: x)
valid_loader = DataLoader(valid_dataset, batch_size=64, shuffle=False, collate_fn=lambda x: x)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False, collate_fn=lambda x: x)

# Define the LSTM Model
Define the LSTM model for sequence-to-sequence translation.

In [None]:
class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hid_dim, n_layers, dropout):
        super().__init__()
        
        self.embedding = nn.Embedding(input_dim, emb_dim)
        self.rnn = nn.LSTM(emb_dim, hid_dim, n_layers, dropout=dropout)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, src):
        
        embedded = self.dropout(self.embedding(src))
        outputs, (hidden, cell) = self.rnn(embedded)
        
        return hidden, cell

class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hid_dim, n_layers, dropout):
        super().__init__()
        
        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.rnn = nn.LSTM(emb_dim, hid_dim, n_layers, dropout=dropout)
        self.fc_out = nn.Linear(hid_dim, output_dim)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, input, hidden, cell):
        
        input = input.unsqueeze(0)
        embedded = self.dropout(self.embedding(input))
        output, (hidden, cell) = self.rnn(embedded, (hidden, cell))
        prediction = self.fc_out(output.squeeze(0))
        
        return prediction, hidden, cell

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        
        self.encoder = encoder
        self.decoder = decoder
        self.device = device
        
    def forward(self, src, trg, teacher_forcing_ratio = 0.5):
        
        trg_len = trg.shape[0]
        batch_size = trg.shape[1]
        trg_vocab_size = self.decoder.output_dim
        outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(self.device)
        hidden, cell = self.encoder(src)
        input = trg[0,:]
        for t in range(1, trg_len):
            output, hidden, cell = self.decoder(input, hidden, cell)
            outputs[t] = output
            top1 = output.argmax(1) 
            input = trg[t] if random.random() < teacher_forcing_ratio else top1
        
        return outputs

# Training the Model
Train the Seq2Seq model using the prepared dataset.

In [None]:
# Training the Model

INPUT_DIM = len(SRC_VOCAB)
OUTPUT_DIM = len(TRG_VOCAB)
ENC_EMB_DIM = 256
DEC_EMB_DIM = 256
HID_DIM = 512
N_LAYERS = 2
ENC_DROPOUT = 0.5
DEC_DROPOUT = 0.5

enc = Encoder(INPUT_DIM, ENC_EMB_DIM, HID_DIM, N_LAYERS, ENC_DROPOUT)
dec = Decoder(OUTPUT_DIM, DEC_EMB_DIM, HID_DIM, N_LAYERS, DEC_DROPOUT)

model = Seq2Seq(enc, dec, device).to(device)

def init_weights(m):
    for name, param in m.named_parameters():
        nn.init.normal_(param.data, mean=0, std=0.01)

model.apply(init_weights)

optimizer = optim.Adam(model.parameters())
TRG_PAD_IDX = TRG_VOCAB['<pad>']
criterion = nn.CrossEntropyLoss(ignore_index = TRG_PAD_IDX)

def train(model, iterator, optimizer, criterion, clip):
    model.train()
    epoch_loss = 0
    for i, batch in enumerate(iterator):
        src, trg = zip(*batch)
        src = nn.utils.rnn.pad_sequence(src, padding_value=SRC_VOCAB['<pad>']).to(device)
        trg = nn.utils.rnn.pad_sequence(trg, padding_value=TRG_VOCAB['<pad>']).to(device)
        optimizer.zero_grad()
        output = model(src, trg)
        output_dim = output.shape[-1]
        output = output[1:].view(-1, output_dim)
        trg = trg[1:].view(-1)
        loss = criterion(output, trg)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        epoch_loss += loss.item()
    return epoch_loss / len(iterator)

def evaluate(model, iterator, criterion):
    model.eval()
    epoch_loss = 0
    with torch.no_grad():
        for i, batch in enumerate(iterator):
            src, trg = zip(*batch)
            src = nn.utils.rnn.pad_sequence(src, padding_value=SRC_VOCAB['<pad>']).to(device)
            trg = nn.utils.rnn.pad_sequence(trg, padding_value=TRG_VOCAB['<pad>']).to(device)
            output = model(src, trg, 0)
            output_dim = output.shape[-1]
            output = output[1:].view(-1, output_dim)
            trg = trg[1:].view(-1)
            loss = criterion(output, trg)
            epoch_loss += loss.item()
    return epoch_loss / len(iterator)

N_EPOCHS = 10
CLIP = 1

for epoch in range(N_EPOCHS):
    train_loss = train(model, train_loader, optimizer, criterion, CLIP)
    valid_loss = evaluate(model, valid_loader, criterion)
    print(f'Epoch: {epoch+1:02} | Train Loss: {train_loss:.3f} | Val. Loss: {valid_loss:.3f}')

# Testing the Model
Test the model on the test set and evaluate its performance.

In [None]:
# Testing the Model

test_loss = evaluate(model, test_loader, criterion)
print(f'Test Loss: {test_loss:.3f}')