### **Tutorial 11: Transformer (Custom)**

In this tutorial, we will learn how to build and implement a custom Transformer model from scratch for a machine learning task. The Transformer model has revolutionized the field of natural language processing (NLP) and time series prediction by replacing traditional recurrent neural networks (RNNs) with a more efficient attention mechanism. This tutorial will guide you through the process of creating a Transformer model for sequence-to-sequence tasks, such as translation, text summarization, or time series forecasting.

### **What You Will Learn**
- Introduction to the Transformer architecture
- Implementing the multi-head self-attention mechanism
- Building the Encoder and Decoder components of the Transformer
- Customizing the Transformer for specific tasks
- Training the model on sample data
- Evaluating and fine-tuning the model

---

### **1. Understanding the Transformer Architecture**

The Transformer architecture is composed of two main parts:
- **Encoder**: This part processes the input sequence and generates feature representations that the decoder can use.
- **Decoder**: This part generates the output sequence from the feature representations provided by the encoder.

The key component that distinguishes Transformers from other architectures is the **self-attention mechanism**, which allows the model to focus on different parts of the input sequence as it generates an output.

Here’s a simplified overview of the Transformer architecture:

- **Multi-Head Attention**: A mechanism that allows the model to focus on different parts of the sequence simultaneously.
- **Positional Encoding**: Since Transformers do not inherently process data in order (like RNNs), positional encodings are added to the input data to give it information about the position of each element in the sequence.
- **Feed-Forward Networks**: After the attention layers, the output is passed through fully connected layers.
- **Residual Connections and Layer Normalization**: Used to prevent vanishing gradients and to improve training stability.

---

### **2. Implementing the Transformer Components**

Let’s now dive into the implementation of a custom Transformer model. We will start by implementing the essential components: the multi-head attention mechanism, the encoder, and the decoder.

In [1]:
from torch.utils.data import Dataset
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
import numpy as np

class CharTokenizer:
  def __init__(self, vocabulary):
    self.token_id_for_char = {char: token_id for token_id, char in enumerate(vocabulary)}
    self.char_for_token_id = {token_id: char for token_id, char in enumerate(vocabulary)}

  @staticmethod
  def train_from_text(text):
    vocabulary = set(text) # remove duplicates
    return CharTokenizer(sorted(list(vocabulary)))

  def encode(self, text):
    token_ids = []
    for char in text:
      token_ids.append(self.token_id_for_char[char])
    return torch.tensor(token_ids, dtype=torch.long)

  def decode(self, token_ids):
    chars = []
    for token_id in token_ids.tolist():
      chars.append(self.char_for_token_id[token_id])
    return ''.join(chars)


  def vocabulary_size(self):
    return len(self.token_id_for_char)

class TokenIdsDataset(Dataset):
  def __init__(self, data, block_size):
    self.data = data
    self.block_size = block_size

  def __len__(self):
    return len(self.data) - self.block_size

  def __getitem__(self, pos):
    assert pos < len(self.data) - self.block_size

    x = self.data[pos:pos + self.block_size]
    y = self.data[pos + 1:pos + 1 + self.block_size]
    return x, y

In [None]:
text_paragraph = """
You are a manufacturer of hip implants. The doctor who will use your implants in surgeries has a requirement: he is willing to accept implants that are 1 mm bigger or smaller than the specified target size. This means the implant sizes must fall within a 2 mm range of the target size, i.e., ±1 mm from the target.
Additionally, your financial officer has stated that in order to maintain profitability, you can afford to discard **1 out of every 1000 implants**. This means that the size distribution of your implants must be such that only 0.1% of implants fall outside the acceptable ±1 mm range.
Given a recent sample of 1000 implants from the factory, the task is to evaluate whether the factory is meeting the specified quality and profitability requirements. If more than one percent of the implants fall outside the ±1 mm range, the factory will incur a loss due to excess waste.
"""

tokenizer = CharTokenizer.train_from_text(text_paragraph)
print(f"Vocabulary size: {tokenizer.vocabulary_size()}")
print(tokenizer.encode("Given that"))
print(tokenizer.decode(tokenizer.encode("Given That")))

#### **Multi-Head Attention Layer**
The multi-head attention mechanism allows the model to simultaneously focus on different parts of the sequence. Here's the code to implement it:

In [None]:


class AttentionHead(nn.Module):
  def __init__(self, config):
    super().__init__()
    self.Q_weights = nn.Linear(config["embedding_dim"], config["head_size"], config["use_bias"])
    self.K_weights = nn.Linear(config["embedding_dim"], config["head_size"], config["use_bias"])
    self.V_weights = nn.Linear(config["embedding_dim"], config["head_size"], config["use_bias"])

    self.dropout = nn.Dropout(config["dropout_rate"])

    casual_attention_mask = torch.tril(torch.ones(config["context_size"], config["context_size"]))
    self.register_buffer('casual_attention_mask', casual_attention_mask)


  def forward(self, input): # (B, C, embedding_dim)
    batch_size, tokens_num, embedding_dim = input.shape
    Q = self.Q_weights(input) # (B, C, head_size)
    K = self.K_weights(input) # (B, C, head_size)
    V = self.V_weights(input) # (B, C, head_size)

    attention_scores = Q @ K.transpose(1, 2)  # (B, C, C)
    attention_scores = attention_scores.masked_fill(
        self.casual_attention_mask[:tokens_num,:tokens_num] == 0,
        -torch.inf
    )
    attention_scores = attention_scores / ( K.shape[-1] ** 0.5 )
    attention_scores = torch.softmax(attention_scores, dim=-1)
    attention_scores = self.dropout(attention_scores)

    return attention_scores @ V # (B, C, head_size)
  

config = {
  "vocabulary_size": tokenizer.vocabulary_size(),
  "context_size": 256,
  "embedding_dim": 768,
  "heads_num": 12,
  "layers_num": 10,
  "dropout_rate": 0.1,
  "use_bias": False,
}

config["head_size"] = config["embedding_dim"] // config["heads_num"]
ah = AttentionHead(config)
input = torch.rand(8, config["context_size"], config["embedding_dim"])
output = ah(input)
output.shape


In [None]:
class MultiHeadAttention(nn.Module):
  def __init__(self, config):
    super().__init__()

    heads_list = [AttentionHead(config) for _ in range(config["heads_num"])]
    self.heads = nn.ModuleList(heads_list)

    self.linear = nn.Linear(config["embedding_dim"], config["embedding_dim"])
    self.dropout = nn.Dropout(config["dropout_rate"])

  def forward(self, input):
    heads_outputs = [head(input) for head in self.heads]

    scores_change = torch.cat(heads_outputs, dim=-1)

    scores_change = self.linear(scores_change)
    return self.dropout(scores_change)
  
config["head_size"] = config["embedding_dim"] // config["heads_num"]
mha = MultiHeadAttention(config)
input = torch.rand(8, config["context_size"], config["embedding_dim"])
output = ah(input)
output.shape

In [None]:
class TransformerEncoder(nn.Module):
    def __init__(self, config):
        super().__init__()

        self.linear_layers = nn.Sequential(
        nn.Linear(config["embedding_dim"], config["embedding_dim"] * 4),
        nn.GELU(),
        nn.Linear(config["embedding_dim"] * 4, config["embedding_dim"]),
        nn.Dropout(config["dropout_rate"])
    )


    def forward(self, input):        
        return self.linear_layers(input)

encoder = TransformerEncoder(config)
input = torch.rand(8, config["context_size"], config["embedding_dim"])
output = encoder(input)
output.shape

In [None]:
class TransformerEncoder(nn.Module):
    def __init__(self, config):
        super().__init__()

        self.linear_layers = nn.Sequential(
        nn.Linear(config["embedding_dim"], config["embedding_dim"] * 4),
        nn.GELU(),
        nn.Linear(config["embedding_dim"] * 4, config["embedding_dim"]),
        nn.Dropout(config["dropout_rate"])
    )


    def forward(self, input):        
        return self.linear_layers(input)

encoder = TransformerEncoder(config)
input = torch.rand(8, config["context_size"], config["embedding_dim"])
output = encoder(input)
output.shape

In [None]:
class TransformerBlock(nn.Module):

  def __init__(self, config):
    super().__init__()

    self.multi_head = MultiHeadAttention(config)
    self.layer_norm_1 = nn.LayerNorm(config["embedding_dim"])

    self.feed_forward = TransformerEncoder(config)
    self.layer_norm_2 = nn.LayerNorm(config["embedding_dim"])

  def forward(self, input):
    residual = input
    x = self.multi_head(self.layer_norm_1(input))
    x = x + residual

    residual = x
    x = self.feed_forward(self.layer_norm_2(x))
    return x + residual
  
b = TransformerBlock(config)
input = torch.rand(8, config["context_size"], config["embedding_dim"])
output = encoder(input)
output.shape

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [None]:
class DemoGPT(nn.Module):
  def __init__(self, config):
    super().__init__()

    self.token_embedding_layer = nn.Embedding(config["vocabulary_size"], config["embedding_dim"])
    self.positional_embedding_layer = nn.Embedding(config["context_size"], config["embedding_dim"])

    blocks = [TransformerBlock(config) for _ in range(config["layers_num"])]
    self.layers = nn.Sequential(*blocks)

    self.layer_norm = nn.LayerNorm(config["embedding_dim"])
    self.unembedding = nn.Linear(config["embedding_dim"], config["vocabulary_size"], bias=False)

  def forward(self, token_ids):
    batch_size, tokens_num = token_ids.shape

    x = self.token_embedding_layer(token_ids)
    sequence = torch.arange(tokens_num, device=device)
    x = x + self.positional_embedding_layer(sequence)

    x = self.layers(x)
    x = self.layer_norm(x)
    x = self.unembedding(x)

    return x
  
model = DemoGPT(config).to(device)
output = model(tokenizer.encode("Given That").unsqueeze(dim=0).to(device))
output.shape


In [10]:
def generate(model, prompt_ids, max_tokens):
    output_ids = prompt_ids
    for _ in range(max_tokens):
      if output_ids.shape[1] >= config["context_size"]:
        break
      with torch.no_grad():
        logits = model(output_ids)

      logits = logits[:, -1, :]
      probs = F.softmax(logits, dim=-1)
      # Sample a random token given the softmax distribution
      next_token_id = torch.multinomial(probs, num_samples=1)
      # Add new token to the output, and repeat the process
      output_ids = torch.cat([output_ids, next_token_id], dim=-1)
    return output_ids

This is the result before training

In [None]:
def generate_with_prompt(model, tokenizer, prompt, max_tokens=100):
  model.eval()

  prompt = tokenizer.encode(prompt).unsqueeze(dim=0).to(device)

  return tokenizer.decode(generate(model, prompt, max_tokens=max_tokens)[0])

generate_with_prompt(model, tokenizer, "Given that:\n")

In [None]:
batch_size = 16

train_iterations = 500
evaluation_interval = 100
learning_rate=4e-4
train_data = tokenizer.encode(text_paragraph).to(device)
train_dataset = TokenIdsDataset(train_data, config["context_size"])

from torch.utils.data import Dataset, DataLoader, RandomSampler

train_sampler = RandomSampler(train_dataset, num_samples=batch_size * train_iterations, replacement=True)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, sampler=train_sampler)
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

for step_num, sample in enumerate(train_dataloader):
    model.train()
    input, targets = sample
    logits = model(input)

    logits_view = logits.view(batch_size * config["context_size"], config["vocabulary_size"])
    targets_view = targets.view(batch_size * config["context_size"])

    loss = F.cross_entropy(logits_view, targets_view)
    # Backward propagation
    loss.backward()
    # Update model parameters
    optimizer.step()
    # Set to None to reduce memory usage
    optimizer.zero_grad(set_to_none=True)

    print(f"Step {step_num}. Loss {loss.item():.3f}")
    torch.cuda.empty_cache()
    if step_num % evaluation_interval == 0:
        print("Demo GPT:\n" + generate_with_prompt(model, tokenizer, "\n"))

In [None]:
model.eval()
generate_with_prompt(model, tokenizer, "Given that:\n")