In [None]:
import torch
import torch.nn as nn
import math
import numpy as np
from transformers import AutoTokenizer

In [None]:
# Define the set of token constants
PAD_TKN = 0 # for padding sequence lengths
CLS_TKN = 101 # for the start of the sequence
SEP_TKN = 102 # for the end of sequence

In [None]:

tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
result = tokenizer.encode("Hello world!", add_special_tokens=True, return_tensors="pt")
decoded_result = tokenizer.decode(result[0])
print(f"Encoded sentence: {result}")
print(f"Decoded sentence: {decoded_result}")

In [None]:
tokenizer.decode([101, 7592, 2088, 999, 102])

If we have a much more limited vocabularly, we can simply define a dictionary that creates a token per word and maps from words to token IDs: 

In [None]:
ENGLISH_TOKEN_MAPPING = {
    "<PAD>" : 0,
    "<BOS>" : 1,
    "<EOS>" : 2,
    "the" : 3,
    "quick" : 4,
    "brown" : 5,
    "fox" : 6,
    "jumps" : 7,
    "over" : 8,
    "lazy" : 9,
    "dog" : 10,

}

FRENCH_TOKEN_MAPPING = {
    "<PAD>" : 0,
    "<BOS>" : 1,
    "<EOS>" : 2,
    "le" : 3,
    "renard" : 4,
    "brun" : 5,
    "rapide" : 6,
    "saute" : 7,
    "par-dessus" : 8,
    "chien" : 9,
    "paresseux" : 10
}
    

# From token IDs to token embeddings
Adapted from this [PyTorch tutorial](https://pytorch.org/tutorials/beginner/translation_transformer.html)

In [None]:
class TokenEmbedding(nn.Module):
    """Class for converting token IDs of a fixed vocabulary size into vectors of a fixed length"""
    def __init__(self, vocab_size: int, emb_dim: int, padding_idx: int = 0):
        # The padding index prevents gradients from being propagated to the 
        # embedding parameters for padding tokens
        super(TokenEmbedding, self).__init__()
        self.embedding = nn.Embedding(vocab_size, emb_dim, padding_idx=padding_idx)
        self._embed_dim = emb_dim
        self._vocab_size = vocab_size
    
    def forward(self, token_ids: torch.Tensor) -> torch.Tensor:
        return self.embedding(token_ids)

In [None]:
vocab_size = len(ENGLISH_TOKEN_MAPPING)
embedding_dim = 128
token_embedding = TokenEmbedding(vocab_size, embedding_dim)
tokens = torch.tensor([ENGLISH_TOKEN_MAPPING["the"]], dtype=torch.long)
embedding = token_embedding(tokens)
print(f"Embedding for 'the':\n{embedding}\nShape: {embedding.shape}")

# Positional Encoding
We want to add information to the word embedding to encoded information about their positions in a sequence.

Below is taken from the official [PyTorch transformers tutorial](https://pytorch.org/tutorials/beginner/transformer_tutorial.html)


In [None]:
class PositionalEncoding(nn.Module):

    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Arguments:
            x: Tensor, shape ``[seq_len, batch_size, embedding_dim]``
            
        Adds positional encoding to a given sequential input tensor.
            
        """
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)

In [None]:
positional_encoder = PositionalEncoding(embedding_dim)
test_sentence = "the quick brown fox jumps over the lazy dog"
test_tokens = [ENGLISH_TOKEN_MAPPING[token] for token in test_sentence.split()]
test_tokens = torch.tensor(test_tokens, dtype=torch.long)
test_embedding = token_embedding(test_tokens)
test_embedding = test_embedding.unsqueeze(1) # add batch dim
positional_encoder(test_embedding)

# Preparing the input for the translation task
As discussed in this [blog post](https://kikaben.com/transformers-encoder-decoder/), we typically use teacher forcing to prepare the sequence of inputs in parallel.

For example, the first input to the decoder block would be the encoded `the quick brown fox jumps over the lazy dog` along with the `<BOS>` tag. The decoder would then hopefully output the first word of the translation `le`. The next sets of inputs to the decoder would be the same apart from `<BOS> le` etc.

The teaching forcing means we give the ground-truth tokens at each step of the translations, rather than auto-regressively waiting for the decoder to generate each required token, which means that we can parallelise the input. 

This parallelisation is done using **masked attention** which sets the attention value to 0 for any token that the decoder is not "allowed" to see yet.

Our full target sequence is `<BOS> le renard brun rapide saute par-dessus le chien paresseux <EOS>`

The first attention mask is thus `[1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]` as the decoder is only allowed to see the `<BOS>` when predicting the first output token.

The second attention mask is `[1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0]` etc.

We can group these together in a matrix:

```
[1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
[1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0]
[1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0]
[1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0]
[1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0]
[1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0]
[1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0]
[1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0]
[1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0]
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0]
```

This can be quickly done using an existing PyTorch function (which uses zeros un-masked values and -infs for masked values):

In [None]:
nn.Transformer.generate_square_subsequent_mask(10)

# Constructing the Transformer Model

In [None]:
class TransformerEncoderDecoder(nn.Module):
    def __init__(self, 
                 d_model: int, 
                 n_head: int, 
                 num_encoder_layers: int, 
                 num_decoder_layers: int, 
                 dim_feedforward: int = 512, 
                 dropout: float = 0.1, 
                 activation: str = "relu", 
                 source_vocab_size: int = 100,
                 target_vocab_size: int = 100, 
                 max_seq_length: int = 100, 
                 device: torch.device = torch) -> None:
        super(TransformerEncoderDecoder, self).__init__()
        self.transformer = nn.Transformer(d_model=d_model, 
                                          nhead=n_head, 
                                          num_encoder_layers=num_encoder_layers, 
                                          num_decoder_layers=num_decoder_layers, 
                                          dim_feedforward=dim_feedforward, 
                                          dropout=dropout, 
                                          activation=activation)
        self._target_vocab_size = target_vocab_size
        self._max_seq_length = max_seq_length
        self._device = device
        self.final_layer = nn.Linear(d_model, target_vocab_size)
        self.pe = PositionalEncoding(d_model, dropout)
        self.src_embedding = TokenEmbedding(source_vocab_size, d_model)
        self.tgt_embedding = TokenEmbedding(target_vocab_size, d_model)
    
    def forward(self, 
                src: torch.Tensor, 
                tgt: torch.Tensor) -> torch.Tensor:
        src = self.src_embedding(src)
        tgt = self.tgt_embedding(tgt)
        src = self.pe(src)
        tgt = self.pe(tgt)
        tgt_mask = nn.Transformer.generate_square_subsequent_mask(tgt.size(0)).to(self._device)
        out = self.transformer(src, tgt, tgt_mask=tgt_mask)
        out = self.final_layer(out)
        return out

## Example usage


In [None]:
source_sentence = "the quick brown fox jumps over the lazy dog"
target_sentence = "<BOS> le renard brun rapide saute par-dessus le chien paresseux <EOS>"
source_tokens = [ENGLISH_TOKEN_MAPPING[token] for token in source_sentence.split()]
target_tokens = [FRENCH_TOKEN_MAPPING[token] for token in target_sentence.split()]
print(f"Source tokens: {source_tokens}")
print(f"Target tokens: {target_tokens}")

In [None]:
transformer = TransformerEncoderDecoder(d_model=512,
                                        n_head=2,
                                        num_encoder_layers=6,
                                        num_decoder_layers=6,
                                        source_vocab_size=11,
                                        target_vocab_size=11,
                                        device=torch.device("cuda" if torch.cuda.is_available() else "cpu"))

source_tokens_tensor = torch.tensor(source_tokens, dtype=torch.long).unsqueeze(1)
target_tokens_tensor = torch.tensor(target_tokens, dtype=torch.long).unsqueeze(1)
print(f"Source tokens shape: {source_tokens_tensor.shape}")
print(f"Target tokens shape: {target_tokens_tensor.shape}")

result = transformer(source_tokens_tensor, target_tokens_tensor).squeeze(1) # remove batch dim
print(f"Output shape: {result.shape}")

# Run through softmax to get probabilities
result = nn.functional.softmax(result, dim=-1)
predicted_token_ids = torch.argmax(result, dim=-1)
print(f"Predicted token IDs: {predicted_token_ids}")
predicted_words = [list(FRENCH_TOKEN_MAPPING.keys())[list(FRENCH_TOKEN_MAPPING.values()).index(token_id)] for token_id in predicted_token_ids]
print(f"Predicted Translation: {' '.join(predicted_words)}")

# Training Loop

In [None]:
loss_function = nn.CrossEntropyLoss()
transformer = TransformerEncoderDecoder(d_model=512,
                                        n_head=2,
                                        num_encoder_layers=6,
                                        num_decoder_layers=6,
                                        source_vocab_size=11,
                                        target_vocab_size=11,
                                        device=torch.device("cuda" if torch.cuda.is_available() else "cpu"))

source_tokens_tensor = torch.tensor(source_tokens, dtype=torch.long).unsqueeze(1)
target_tokens_tensor = torch.tensor(target_tokens, dtype=torch.long).unsqueeze(1)
optimizer = torch.optim.Adam(transformer.parameters(), lr=0.0001)
print_every = 100
for train_iter in range(10000):
    optimizer.zero_grad()
    result = transformer(source_tokens_tensor, target_tokens_tensor).squeeze(1)
    result = nn.functional.softmax(result, dim=-1)
    loss = loss_function(result.view(-1, transformer._target_vocab_size), target_tokens_tensor.view(-1))
    loss.backward()
    optimizer.step()
    if train_iter % print_every == 0:
        print(f"Iteration {train_iter}, Loss: {loss.item()}")
        predicted_token_ids = torch.argmax(result, dim=-1)
        predicted_words = [list(FRENCH_TOKEN_MAPPING.keys())[list(FRENCH_TOKEN_MAPPING.values()).index(token_id)] for token_id in predicted_token_ids]
        print(f"Predicted Translation: {' '.join(predicted_words)}")
        
    
    