<a href="https://colab.research.google.com/github/mohammadreza-mohammadi94/Transformers-Hub/blob/main/Encoder-Only%20From%20Scratch/Transformer_From_Scratch_Encoder_Only.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Implementing Transformer - Encoder Only

### Import Libraries

In [None]:
!pip install -q datasets

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/480.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m480.6/480.6 kB[0m [31m11.9 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/116.3 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m8.5 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/179.3 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m179.3/179.3 kB[0m [31m10.0 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/143.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m143.5/143.5 kB[0m [31m11.2 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import dataset
from torch.utils.data import DataLoader

import numpy as np
import math
import matplotlib.pyplot as plt
from datetime import datetime

from datasets import load_dataset
from transformers import AutoTokenizer, DataCollatorWithPadding

### Multi-Head Attention Module

In [None]:
class MultiHeadAttention(nn.Module):
    """
    Implements Multi-Head Attention mechanism as described in the Transformer architecture.

    Args:
        d_k (int): Dimension of each attention head.
        d_model (int): Total dimension of the model (input/output embeddings).
        n_heads (int): Number of attention heads.
    """
    def __init__(self, d_k, d_model, n_heads):
        super().__init__()

        # Assume d_v = d_k
        self.d_k = d_k
        self.n_heads = n_heads

        self.key = nn.Linear(d_model, d_k * n_heads)
        self.query = nn.Linear(d_model, d_k * n_heads)
        self.value = nn.Linear(d_model, d_k * n_heads)

        # Final Linear Layer
        self.fc = nn.Linear(d_k * n_heads, d_model)

    def forward(self, q, k, v, mask=None):
        """
        Forward pass for Multi-Head Attention.

        Args:
            q (torch.Tensor): Query tensor of shape (batch_size, seq_len, d_model).
            k (torch.Tensor): Key tensor of shape (batch_size, seq_len, d_model).
            v (torch.Tensor): Value tensor of shape (batch_size, seq_len, d_model).
            mask (torch.Tensor, optional): Mask to prevent attention to certain positions.

        Returns:
            torch.Tensor: Output of the attention mechanism.
        """
        q = self.query(q) # N * T * (dh_k)
        k = self.key(k) # N * T * (dh_k)
        v = self.value(v) # N * T * (dh_k)

        N = q.shape[0]
        T = q.shape[1]

        # Change the shape to => (N, T, h, d_k) --> (N, h, T, d_K)
        # In order for matrix myltiply to work properly
        q = q.view(N, T, self.n_heads, self.d_k).transpose(1, 2)
        k = k.view(N, T, self.n_heads, self.d_k).transpose(1, 2)
        v = v.view(N, T, self.n_heads, self.d_k).transpose(1, 2)

        # Compute Attention Weights
        # (N, h, T, d_K) * (N, h, d_k, T) --> (N, h, T, T)
        attn_scores = q @ k.transpose(-2, -1) / math.sqrt(self.d_k)
        if mask is not None:
            attn_scores = attn_scores.masked_fill(
                mask[:, None, None, :] == 0, float("-inf"))
        attn_weights = F.softmax(attn_scores, dim=-1)

        # Compute attention-weighted values
        # (N, h, T, T) * (N, h, T, d_k) --> (N, h, T, d_k)
        A = attn_weights @ v
        # Reshape it back before final linear layer
        A = A.transpose(1, 2)
        A = A.contiguous().view(N, T, self.n_heads * self.d_k)

        # Projection
        return self.fc(A)

### Transformer Module

In [None]:
class TransformerBlock(nn.Module):
    """
    A single Transformer block combining Multi-Head Attention and a Feedforward Neural Network.

    Args:
        d_k (int): Dimension of each attention head.
        d_model (int): Total dimension of the model.
        n_heads (int): Number of attention heads.
        dropout_prob (float): Dropout probability.
    """
    def __init__(self, d_k, d_model, n_heads, dropout_prob=0.1):
        super().__init__()

        self.layer_norm1 = nn.LayerNorm(d_model)
        self.layer_norm2 = nn.LayerNorm(d_model)
        self.multi_head_attn = MultiHeadAttention(d_k, d_model, n_heads)
        self.ann = nn.Sequential(
            nn.Linear(d_model, d_model * 4),
            nn.GELU(),
            nn.Linear(d_model * 4, d_model),
            nn.Dropout(dropout_prob)
        )
        self.dropout = nn.Dropout(p=dropout_prob)

    def forward(self, x, mask=None):
        """
        Forward pass for the Transformer block.

        Args:
            x (torch.Tensor): Input tensor of shape (batch_size, seq_len, d_model).
            mask (torch.Tensor, optional): Mask to prevent attention to certain positions.

        Returns:
            torch.Tensor: Output tensor of the same shape as input.
        """
        x = self.layer_norm1(x + self.multi_head_attn(x, x, x, mask))
        x = self.layer_norm2(x + self.ann(x))
        x = self.dropout(x)
        return x

### Positional Encoding

In [None]:
class PositionalEncoding(nn.Module):
    """
    Adds positional encoding to input embeddings to inject order information into the model.

    Args:
        d_model (int): Total dimension of the model.
        max_len (int): Maximum sequence length.
        dropout_prob (float): Dropout probability.
    """
    def __init__(self, d_model, max_len=2048, dropout_prob=0.1):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout_prob)

        position = torch.arange(max_len).unsqueeze(1)
        exp_term = torch.arange(0, d_model, 2)
        div_term = torch.exp(exp_term * (-math.log(1000.0) / d_model))
        # Create a positional encoding matrix
        pe = torch.zeros(1, max_len, d_model)
        pe[0, :, 0::2] = torch.sin(position * div_term)
        pe[0, :, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x):
        """
        Forward pass to add positional encoding.

        Args:
            x (torch.Tensor): Input tensor of shape (batch_size, seq_len, d_model).

        Returns:
            torch.Tensor: Input tensor with positional encoding added.
        """
        x = x + self.pe[:, :x.size(1), :]
        return self.dropout(x)

### Encoder Module

In [None]:
class Encoder(nn.Module):
    """
    Transformer Encoder that processes input embeddings using multiple Transformer blocks.

    Args:
        vocab_size (int): Size of the input vocabulary.
        max_len (int): Maximum sequence length.
        d_k (int): Dimension of each attention head.
        d_model (int): Total dimension of the model.
        n_heads (int): Number of attention heads.
        n_layers (int): Number of Transformer blocks.
        n_classes (int): Number of output classes.
        dropout_prob (float): Dropout probability.
    """
    def __init__(self,
                 vocab_size,
                 max_len,
                 d_k,
                 d_model,
                 n_heads,
                 n_layers,
                 n_classes,
                 dropout_prob):
        super().__init__()

        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = PositionalEncoding(d_model, max_len, dropout_prob)
        transformer_blocks = [
            TransformerBlock(
                d_k,
                d_model,
                n_heads,
                dropout_prob) for _ in range(n_layers)]
        self.transformer_blocks = nn.Sequential(*transformer_blocks)
        self.layer_norm = nn.LayerNorm(d_model)
        self.fc = nn.Linear(d_model, n_classes)


    def forward(self, x, mask=None):
        """
        Forward pass for the Encoder.

        Args:
            x (torch.Tensor): Input tensor of shape (batch_size, seq_len).
            mask (torch.Tensor, optional): Mask for attention.

        Returns:
            torch.Tensor: Output tensor of shape (batch_size, n_classes).
        """
        x = self.embedding(x)
        x = self.pos_encoding(x)
        for block in self.transformer_blocks:
            x = block(x, mask)

        # Many-to-One (x has the shape of N * T * D)
        x = x[:, 0, :]

        x = self.layer_norm(x)
        x = self.fc(x)
        return x

### Preparing Dataset

In [None]:
# Load tokenizer for a pretrained model
checkpoint = "distilbert-base-cased"
tokenizer = AutoTokenizer.from_pretrained(checkpoint)

# Load the SST2 dataset from GLUE benchmark
raw_datasets = load_dataset("glue", "sst2")

In [None]:
# Define tokenization function
def tokenize_fn(batch):
    """
    Tokenizes a batch of sentences using the specified tokenizer.

    Args:
        batch (dict): A batch of sentences.

    Returns:
        dict: Tokenized sentences with attention masks and input IDs.
    """
    return tokenizer(batch['sentence'], truncation=True)

In [None]:
# Tokenize the dataset
tokenized_datasets = raw_datasets.map(tokenize_fn, batched=True)
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

Map:   0%|          | 0/872 [00:00<?, ? examples/s]

In [None]:
# Prepare the dataset for training
# Remove unnecessary columns and rename 'label' column to 'labels'
tokenized_datasets = tokenized_datasets.remove_columns(["sentence", "idx"])
tokenized_datasets = tokenized_datasets.rename_column("label", "labels")

In [None]:
# Create DataLoaders for training and validation
train_dataloader = DataLoader(
    tokenized_datasets["train"],
    shuffle=True,
    batch_size=32,
    collate_fn=data_collator)

valid_dataloader = DataLoader(
    tokenized_datasets["validation"],
    batch_size=32,
    collate_fn=data_collator)

# Verify the DataLoader output
for batch in train_dataloader:
    for k, v in batch.items():
        print("Key:", k, "Value Shape:", v.shape)
    break

Key: labels Value Shape: torch.Size([32])
Key: input_ids Value Shape: torch.Size([32, 45])
Key: attention_mask Value Shape: torch.Size([32, 45])


### Define Model

In [None]:
# Define the Transformer-based Encoder model (assumed already implemented as `Encoder`)
model = Encoder(
    vocab_size=tokenizer.vocab_size,
    max_len=tokenizer.model_max_length,
    d_k=16,
    d_model=64,
    n_heads=4,
    n_layers=2,
    dropout_prob=0.1,
    n_classes=2
)

# Move the model to the available device (GPU/CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())

In [None]:
def train(mode, criterion, optimizer, train_loader, valid_loader, epochs):
    """
    Trains and evaluates the Transformer model.

    Args:
        model (nn.Module): The Transformer model.
        criterion: Loss function.
        optimizer: Optimization algorithm.
        train_loader (DataLoader): DataLoader for training data.
        valid_loader (DataLoader): DataLoader for validation data.
        epochs (int): Number of training epochs.

    Returns:
        tuple: Training and validation losses for each epoch.
    """
    train_losses = np.zeros(epochs)
    test_losses = np.zeros(epochs)

    for it in range(epochs):
        model.train()
        t0 = datetime.now()
        train_loss = 0
        n_train = 0
        for batch in train_loader:
            batch = {k: v.to(device) for k, v in batch.items()}
            optimizer.zero_grad()
            outputs = model(batch['input_ids'], batch['attention_mask'])
            loss = criterion(outputs, batch['labels'])
            loss.backward()
            optimizer.step()

            train_loss += loss.item() * batch['input_ids'].size(0)
            n_train += batch['input_ids'].size(0)

        train_loss = train_loss / n_train

        model.eval()
        test_loss = 0
        n_test = 0
        for batch in valid_loader:
            batch = {k: v.to(device) for k, v in batch.items()}
            outputs = model(batch['input_ids'], batch['attention_mask'])
            loss = criterion(outputs, batch['labels'])
            test_loss += loss.item() * batch['input_ids'].size(0)
            n_test += batch['input_ids'].size(0)
        test_loss = test_loss / n_test

        # Save Losses
        train_losses[it] = train_loss
        test_losses[it] = test_loss

        dt = datetime.now() - t0
        print(f"Epoch: {it+1}/{epochs} | Train Loss: {train_loss:.4f} | Test Loss: {test_loss:.4f} | Duration: {dt}")

    return train_losses, test_losses

### Train Model

In [None]:
# Train Model
train_losses, test_losses = train(model,
                                  criterion,
                                  optimizer,
                                  train_dataloader,
                                  valid_dataloader,
                                  10)

Epoch: 1/10 | Train Loss: 0.5342 | Test Loss: 0.5027 | Duration: 0:00:18.005578
Epoch: 2/10 | Train Loss: 0.3669 | Test Loss: 0.4694 | Duration: 0:00:18.918710
Epoch: 3/10 | Train Loss: 0.2965 | Test Loss: 0.4761 | Duration: 0:00:18.407780
Epoch: 4/10 | Train Loss: 0.2550 | Test Loss: 0.5028 | Duration: 0:00:19.309505
Epoch: 5/10 | Train Loss: 0.2262 | Test Loss: 0.5282 | Duration: 0:00:17.934885
Epoch: 6/10 | Train Loss: 0.2056 | Test Loss: 0.5566 | Duration: 0:00:18.220725
Epoch: 7/10 | Train Loss: 0.1876 | Test Loss: 0.5350 | Duration: 0:00:19.045539
Epoch: 8/10 | Train Loss: 0.1747 | Test Loss: 0.6142 | Duration: 0:00:17.878078
Epoch: 9/10 | Train Loss: 0.1603 | Test Loss: 0.7427 | Duration: 0:00:18.985393
Epoch: 10/10 | Train Loss: 0.1497 | Test Loss: 0.7182 | Duration: 0:00:17.998161


In [None]:
# Evaluate the model's accuracy
def evaluate_accuracy(model, data_loader):
    """
    Evaluates the model's accuracy on a given DataLoader.

    Args:
        model (nn.Module): The trained model.
        data_loader (DataLoader): DataLoader containing the evaluation data.

    Returns:
        float: Accuracy of the model on the dataset.
    """
    model.eval()
    n_correct = 0
    n_total = 0

    with torch.no_grad():
        for batch in data_loader:
            batch = {k: v.to(device) for k, v in batch.items()}
            outputs = model(batch['input_ids'], batch['attention_mask'])
            _ , predictions = torch.max(outputs, dim=1)

            n_correct += (predictions == batch['labels']).sum().item()
            n_total += batch['labels'].size(0)

    return n_correct / n_total

# Calculate accuracy for train and validation sets
train_acc = evaluate_accuracy(model, train_dataloader)
valid_acc = evaluate_accuracy(model, valid_dataloader)

print(f"Train Accuracy: {train_acc:.4f}")
print(f"Validation Accuracy: {valid_acc:.4f}")

Train Accuracy: 0.9304
Validation Accuracy: 0.7959
