## Data

In [1]:
import torch
import torch.nn as nn
from tokenizers import Tokenizer
from tokenizers.models import WordLevel
from tokenizers.trainers import WordLevelTrainer
from tokenizers.pre_tokenizers import Whitespace

In [2]:
corpus_en = [
    "good morning",
    "ai books"    
]

# max vocabulary size and sequence length
vocab_size_en = 7
sequence_length_en = 3

In [3]:
# Initialize the tokenizer and define a trainer
tokenizer_en = Tokenizer(WordLevel())
tokenizer_en.pre_tokenizer = Whitespace()
tokenizer_en.enable_padding(pad_id=1, 
                                    pad_token="<pad>", 
                                    length=sequence_length_en)
tokenizer_en.enable_truncation(max_length=sequence_length_en)

# Train the tokenizer on your corpus
trainer_generation = WordLevelTrainer(vocab_size=vocab_size_en, 
                                      special_tokens=["<unk>", "<pad>", "<eos>"])
tokenizer_en.train_from_iterator(corpus_en, trainer_generation)

In [4]:
topics_ids = []
for x in corpus_en:
    x_ids = tokenizer_en.encode(x).ids
    topics_ids.append(x_ids)

en_data = torch.tensor(topics_ids, dtype=torch.long)

print(en_data)
print(tokenizer_en.get_vocab())

tensor([[5, 6, 1],
        [3, 4, 1]])
{'<unk>': 0, 'good': 5, 'morning': 6, '<eos>': 2, 'books': 4, 'ai': 3, '<pad>': 1}


In [5]:
corpus_vn = [
    "chào buổi sáng",
    "sách ai"    
]

# max vocabulary size and sequence length
vocab_size_vn = 9
sequence_length_vn = 4

In [6]:
# Initialize the tokenizer and define a trainer
tokenizer_vn = Tokenizer(WordLevel())
tokenizer_vn.pre_tokenizer = Whitespace()
tokenizer_vn.enable_padding(pad_id=1, 
                            pad_token="<pad>", 
                            length=sequence_length_vn)
tokenizer_vn.enable_truncation(max_length=sequence_length_vn)

# Train the tokenizer on your corpus
trainer_vn = WordLevelTrainer(vocab_size=vocab_size_vn, 
                              special_tokens=["<unk>", "<pad>", "<sos>", "<eos>"])
tokenizer_vn.train_from_iterator(corpus_vn, trainer_vn)

In [7]:
data_x = []
data_y = []
for vector in corpus_vn:
    vector = ['<sos>'] + vector.split() + ['<eos>']
    data_x.append( ' '.join(vector[:-1]) )
    data_y.append( ' '.join(vector[1:]) )

print(data_x)
print(data_y)

['<sos> chào buổi sáng', '<sos> sách ai']
['chào buổi sáng <eos>', 'sách ai <eos>']


In [8]:
# Tokenize and numericalize your samples
def vectorize_generation(x, y, tokenizer_vn):     
    x_ids = tokenizer_vn.encode(x).ids
    y_ids = tokenizer_vn.encode(y).ids
    print(x_ids, y_ids)
    return x_ids, y_ids

# Vectorize the samples
input_vn_data = []
label_vn_data = []
for x, y in zip(data_x, data_y):
    x_ids, y_ids = vectorize_generation(x, y, tokenizer_vn)
    input_vn_data.append(x_ids)
    label_vn_data.append(y_ids)

input_vn_data = torch.tensor(input_vn_data, dtype=torch.long)
label_vn_data = torch.tensor(label_vn_data, dtype=torch.long)

[2, 6, 5, 8] [6, 5, 8, 3]
[2, 7, 4, 1] [7, 4, 3, 1]


## Model

In [9]:
class Encoder(nn.Module):
    def __init__(self, vocab_size_en, embedding_dim, model_dim, nhead):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size_en, embedding_dim)
        self.transformer_encoder = nn.TransformerEncoderLayer(d_model=model_dim, 
                                                              nhead=nhead, 
                                                              dim_feedforward=6,
                                                              dropout=0.0,
                                                              batch_first=True)

    # src = [batch_size, seq_length]
    def forward(self, src):
        embedded = self.embedding(src)                # [batch_size, seq_length, d]        
        context = self.transformer_encoder(embedded)  # [batch_size, seq_length, d]         
        return context

In [10]:
en_data.shape

torch.Size([2, 3])

In [11]:
embedding_dim, model_dim, nhead = 6, 6, 2
encoder = Encoder(vocab_size_en, embedding_dim, model_dim, nhead)

context_sample = encoder(en_data)
print(context_sample.shape)

torch.Size([2, 3, 6])


In [12]:
class Decoder(nn.Module):
    def __init__(self, vocab_size_vn, embedding_dim, model_dim, nhead, sequence_length_vn):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size_vn, embedding_dim)
        self.mask = torch.triu(torch.ones(sequence_length_vn, sequence_length_vn), diagonal=1).bool()  
        self.transformer_decoder = nn.TransformerDecoderLayer(d_model=model_dim, 
                                                              nhead=nhead, 
                                                              dim_feedforward=6,
                                                              dropout=0.0,
                                                              batch_first=True)
        self.fc_out = nn.Linear(model_dim, vocab_size_vn)

    # input: [batch_size, seq_length_vn]   
    # context: [batch_size, seq_length_en, d]
    def forward(self, input, context):                
        embedded = self.embedding(input)                                           # [batch_size, seq_length_vn, d]        
        output = self.transformer_decoder(embedded, context, tgt_mask=self.mask)   # [batch_size, seq_length_vn, d]        
        prediction = self.fc_out(output)                                           # [batch_size, seq_length_vn, vocab_size_vn]
        
        return prediction.permute(0, 2, 1)                                         # [batch_size, vocab_size_vn, seq_length_vn]

In [13]:
decoder = Decoder(vocab_size_vn, embedding_dim, model_dim, nhead, sequence_length_vn)
outputs = decoder(input_vn_data, context_sample)
print(outputs.shape)

torch.Size([2, 9, 4])


In [14]:
class Seq2Seq_Model(nn.Module):
    def __init__(self, encoder, decoder):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder     

    def forward(self, sequence_en, sequence_vn):        
        context = self.encoder(sequence_en)
        outputs = self.decoder(sequence_vn, context)            
        return outputs

In [15]:
model = Seq2Seq_Model(encoder, decoder)

# test
outputs = model(en_data, input_vn_data)
print(outputs.shape)

torch.Size([2, 9, 4])


## Train

In [17]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.05)

In [18]:
for _ in range(50):
    optimizer.zero_grad()
    outputs = model(en_data, input_vn_data)
    loss = criterion(outputs, label_vn_data)
    #print(loss.item())
    loss.backward()
    optimizer.step()

In [19]:
outputs = model(en_data, input_vn_data)
print(torch.argmax(outputs, axis=1))

tensor([[6, 5, 8, 3],
        [7, 4, 3, 1]])


In [20]:
label_vn_data

tensor([[6, 5, 8, 3],
        [7, 4, 3, 1]])