In [1]:
!pip install -U torchtext==0.17.0



In [2]:
import torch
import torch.nn as nn
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
import random
import numpy as np

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
# Đặt seed cho các thư viện
def set_seed(seed_value=42):
    torch.manual_seed(seed_value)  # Seed cho PyTorch trên CPU
    torch.cuda.manual_seed(seed_value)  # Seed cho PyTorch trên GPU (nếu có)
    np.random.seed(seed_value)  # Seed cho NumPy
    random.seed(seed_value)  # Seed cho Python random
    torch.backends.cudnn.deterministic = (
        True  # Đảm bảo tính deterministic khi chạy trên GPU
    )
    torch.backends.cudnn.benchmark = False  # Tắt benchmark để giữ tính nhất quán


set_seed(42)

# Data

In [4]:
tokenizer = get_tokenizer("basic_english")




# Create a function to yield list of tokens

def yield_tokens(examples):

    for text in examples:

        yield tokenizer(text)

## Source Data

In [5]:
# Source corpus
corpus_en = ["i love you", "build ai model"]
data_size_en = len(corpus_en)

vocab_size_en = 8
sequence_length_en = 4

In [6]:
# Create vocabulary
vocab_en = build_vocab_from_iterator(
    yield_tokens(corpus_en),
    max_tokens=vocab_size_en,
    specials=["<unk>", "<pad>", "<eos>"],
)
vocab_en.set_default_index(vocab_en["<unk>"])
vocab_en.get_stoi()

{'love': 6,
 '<unk>': 0,
 '<eos>': 2,
 '<pad>': 1,
 'ai': 3,
 'build': 4,
 'i': 5,
 'model': 7}

In [7]:
# vectorize (tokenize, numberize, padding, truncate, add special token)
def vectorize_en(text, vocab, sequence_length):
    tokens = tokenizer(text)
    tokens = [vocab[token] for token in tokens] + [vocab["<eos>"]]
    token_ids = tokens[:sequence_length] + [vocab["<pad>"]] * (
        sequence_length - len(tokens)
    )
    return torch.tensor(token_ids, dtype=torch.long)

In [8]:
# Vectorize the samples
corpus_ids_en = []
for sentence in corpus_en:
    corpus_ids_en.append(vectorize_en(sentence, vocab_en, sequence_length_en))

for v in corpus_ids_en:
    print(v)

tensor([5, 6, 0, 2])
tensor([4, 3, 7, 2])


## Target Data

In [9]:
# Target corpus
corpus_vn = ["toi yeu ban", "xây mô hình ai"]
data_size_vn = len(corpus_vn)

vocab_size_vn = 12
sequence_length_vn = 6

In [10]:
# Create vocabulary
vocab_vn = build_vocab_from_iterator(
    yield_tokens(corpus_vn),
    max_tokens=vocab_size_vn,
    specials=["<unk>", "<pad>", "<sos>", "<eos>"],
)
vocab_vn.set_default_index(vocab_vn["<unk>"])
vocab_vn.get_stoi()

{'<unk>': 0,
 'hình': 6,
 'ban': 5,
 '<eos>': 3,
 '<pad>': 1,
 '<sos>': 2,
 'mô': 7,
 'ai': 4,
 'toi': 8,
 'xây': 9,
 'yeu': 10}

In [11]:
# vectorize (tokenize, numberize, padding, truncate, add special token)
def vectorize_vn(text, vocab, sequence_length):
    tokens = tokenizer(text)
    tokens = [vocab["<sos>"]] + [vocab[token] for token in tokens] + [vocab["<eos>"]]
    token_ids = tokens[:sequence_length] + [vocab["<pad>"]] * (
        sequence_length - len(tokens)
    )
    return torch.tensor(token_ids, dtype=torch.long)

In [12]:
# Vectorize the samples
corpus_ids_vn = []
for sentence in corpus_vn:
    corpus_ids_vn.append(vectorize_vn(sentence, vocab_vn, sequence_length_vn))

for v in corpus_ids_vn:
    print(v)

tensor([ 2,  8, 10,  5,  3,  1])
tensor([2, 9, 7, 6, 4, 3])


# Model

## LSTM

In [13]:
# LSTM Encoder
class LSTM_Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hid_dim, n_layers, dropout):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, emb_dim)
        self.lstm = nn.LSTM(
            emb_dim, hid_dim, n_layers, dropout=dropout, batch_first=True
        )
        self.dropout = nn.Dropout(dropout)

    def forward(self, src):
        # src: [batch_size, seq_len]
        embedded = self.dropout(self.embedding(src))
        # embedded: [batch_size, seq_len, emb_dim]
        outputs, (hidden, cell) = self.lstm(embedded)
        # outputs: [batch_size, seq_len, hid_dim]
        # hidden, cell: [n_layers, batch_size, hid_dim]
        return hidden, cell

In [14]:
# LSTM Decoder
class LSTM_Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hid_dim, n_layers, dropout):
        super().__init__()
        self.output_dim = output_dim
        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.lstm = nn.LSTM(
            emb_dim, hid_dim, n_layers, dropout=dropout, batch_first=True
        )
        self.fc_out = nn.Linear(hid_dim, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, input, hidden, cell):
        # input: [batch_size]
        input = input.unsqueeze(1)  # [batch_size, 1]
        embedded = self.dropout(self.embedding(input))  # [batch_size, 1, emb_dim]
        output, (hidden, cell) = self.lstm(embedded, (hidden, cell))
        # output: [batch_size, 1, hid_dim]
        prediction = self.fc_out(output.squeeze(1))  # [batch_size, output_dim]
        return prediction, hidden, cell

In [15]:
# LSTM Seq2Seq Model
class LSTM_Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        batch_size = src.shape[0]
        trg_len = trg.shape[1]
        trg_vocab_size = self.decoder.output_dim

        outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(self.device)
        hidden, cell = self.encoder(src)

        input = trg[:, 0]  # <sos> token
        for t in range(1, trg_len):
            output, hidden, cell = self.decoder(input, hidden, cell)
            outputs[:, t, :] = output
            teacher_force = torch.rand(1).item() < teacher_forcing_ratio
            top1 = output.argmax(1)
            input = trg[:, t] if teacher_force else top1

        return outputs

# Training

In [16]:
# Training Code
def train_model(model, train_iterator, optimizer, criterion, clip=1):
    model.train()
    epoch_loss = 0

    for src, trg in train_iterator:
        optimizer.zero_grad()

        output = model(src, trg)
        output_dim = output.shape[-1]

        output = output[:, 1:].reshape(-1, output_dim)
        trg = trg[:, 1:].reshape(-1)

        loss = criterion(output, trg)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()

        epoch_loss += loss.item()

    return epoch_loss / len(train_iterator)

# Set Up

In [17]:
# Initialize models
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
# LSTM Model
enc_lstm = LSTM_Encoder(vocab_size_en, 256, 512, 2, 0.5)
dec_lstm = LSTM_Decoder(vocab_size_vn, 256, 512, 2, 0.5)
lstm_model = LSTM_Seq2Seq(enc_lstm, dec_lstm, device).to(device)

In [19]:
# Training setup
optimizer_lstm = torch.optim.Adam(lstm_model.parameters())
criterion = nn.CrossEntropyLoss(ignore_index=vocab_vn["<pad>"])

In [20]:
# Prepare data
train_data = list(zip(corpus_ids_en, corpus_ids_vn))
train_iterator = torch.utils.data.DataLoader(train_data, batch_size=2)
N_EPOCHS = 100

for epoch in range(N_EPOCHS):
    # Train LSTM
    lstm_loss = train_model(lstm_model, train_iterator, optimizer_lstm, criterion)

    print(f"Epoch: {epoch+1:02}")
    print(f"\tLSTM Loss: {lstm_loss:.3f}")

Epoch: 01
	LSTM Loss: 2.490
Epoch: 02
	LSTM Loss: 2.442
Epoch: 03
	LSTM Loss: 2.360
Epoch: 04
	LSTM Loss: 2.270
Epoch: 05
	LSTM Loss: 2.075
Epoch: 06
	LSTM Loss: 1.847
Epoch: 07
	LSTM Loss: 1.493
Epoch: 08
	LSTM Loss: 1.183
Epoch: 09
	LSTM Loss: 0.905
Epoch: 10
	LSTM Loss: 0.760
Epoch: 11
	LSTM Loss: 0.596
Epoch: 12
	LSTM Loss: 0.486
Epoch: 13
	LSTM Loss: 0.373
Epoch: 14
	LSTM Loss: 0.286
Epoch: 15
	LSTM Loss: 0.246
Epoch: 16
	LSTM Loss: 0.173
Epoch: 17
	LSTM Loss: 0.155
Epoch: 18
	LSTM Loss: 0.109
Epoch: 19
	LSTM Loss: 0.088
Epoch: 20
	LSTM Loss: 0.075
Epoch: 21
	LSTM Loss: 0.053
Epoch: 22
	LSTM Loss: 0.046
Epoch: 23
	LSTM Loss: 0.059
Epoch: 24
	LSTM Loss: 0.031
Epoch: 25
	LSTM Loss: 0.025
Epoch: 26
	LSTM Loss: 0.035
Epoch: 27
	LSTM Loss: 0.025
Epoch: 28
	LSTM Loss: 0.016
Epoch: 29
	LSTM Loss: 0.013
Epoch: 30
	LSTM Loss: 0.012
Epoch: 31
	LSTM Loss: 0.012
Epoch: 32
	LSTM Loss: 0.011
Epoch: 33
	LSTM Loss: 0.010
Epoch: 34
	LSTM Loss: 0.007
Epoch: 35
	LSTM Loss: 0.006
Epoch: 36
	LSTM Loss

# Testing

In [21]:
# Testing/Inference Code
def translate_sentence(
    model, sentence, src_vocab, trg_vocab, max_len, device, is_transformer=False
):
    model.eval()

    tokens = (
        vectorize_en(sentence, src_vocab, sequence_length_en).unsqueeze(0).to(device)
    )
    output_encoder = None
    hidden_encoder = None

    with torch.no_grad():
        # LSTM inference
        hidden, cell = model.encoder(tokens)
        trg_indexes = [trg_vocab["<sos>"]]
        for i in range(max_len):
            trg_tensor = torch.LongTensor([trg_indexes[-1]]).to(device)
            output, hidden, cell = model.decoder(trg_tensor, hidden, cell)
            pred_token = output.argmax(1).item()
            trg_indexes.append(pred_token)
            output_encoder = output
            hidden_encoder = (hidden, cell)
            if pred_token == trg_vocab["<eos>"]:
                break

    trg_tokens = [trg_vocab.get_itos()[i] for i in trg_indexes]
    return trg_tokens[1:-1], output_encoder, hidden_encoder  # Remove <sos>, <eos>

In [22]:
# Test the models
test_sentence = "i love you"
print("Original:", test_sentence)

lstm_translation, output_encoder, _ = translate_sentence(
    lstm_model, test_sentence, vocab_en, vocab_vn, sequence_length_vn, device
)
print("LSTM Translation:", " ".join(lstm_translation))

Original: i love you
LSTM Translation: toi yeu ban


# Final Result

In [23]:
# Sum output of decoder
sum(output_encoder[0])

tensor(-6.5159)