<a href="https://colab.research.google.com/github/praveena0506/Transformer-from-scratch/blob/main/transformer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Cell 1: Imports
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
import numpy as np # You'll likely need it later

In [None]:
# Cell 2: Transformer Building Blocks (The "Blueprint")

# --- 1. Positional Encoding ---
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1), :]
        return x

# --- 2. Scaled Dot-Product Attention (Helper Function) ---
def scaled_dot_product_attention(q, k, v, mask=None):
    matmul_qk = torch.matmul(q, k.transpose(-2, -1))
    d_k = q.size(-1)
    scaled_attention_scores = matmul_qk / math.sqrt(d_k)
    if mask is not None:
        scaled_attention_scores = scaled_attention_scores.masked_fill(mask == 0, -1e9)
    attention_weights = F.softmax(scaled_attention_scores, dim=-1)
    output = torch.matmul(attention_weights, v)
    return output, attention_weights

# --- 3. Multi-Head Attention (The Main Module) ---
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, h):
        super(MultiHeadAttention, self).__init__()
        assert d_model % h == 0
        self.d_model = d_model
        self.h = h
        self.d_k = d_model // h
        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)
        self.w_o = nn.Linear(d_model, d_model)

    def split_heads(self, x, batch_size):
        x = x.view(batch_size, -1, self.h, self.d_k)
        return x.transpose(1, 2)

    def forward(self, q_in, k_in, v_in, mask):
        batch_size = q_in.size(0)
        q = self.w_q(q_in)
        k = self.w_k(k_in)
        v = self.w_v(v_in)

        q = self.split_heads(q, batch_size)
        k = self.split_heads(k, batch_size)
        v = self.split_heads(v, batch_size)

        context, attn_weights = scaled_dot_product_attention(q, k, v, mask)

        context = context.transpose(1, 2).contiguous()
        context = context.view(batch_size, -1, self.d_model)

        output = self.w_o(context)
        return output
  # Cell 2 (Continued): Add this code

# --- 4. Position-wise Feed-Forward Network ---
class PositionwiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff, dropout=0.1):
        """
        Initializes the FFN module.

        Args:
            d_model (int): The dimension of the model (e.g., 512).
            d_ff (int): The inner-layer dimension, usually 4*d_model (e.g., 2048).
            dropout (float): Dropout rate.
        """
        super(PositionwiseFeedForward, self).__init__()
        # The paper uses d_ff = 2048 for a d_model = 512
        self.w_1 = nn.Linear(d_model, d_ff)
        self.w_2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)
        self.relu = nn.ReLU()

    def forward(self, x):
        """
        Forward pass for the FFN.

        Args:
            x (torch.Tensor): Input tensor. Shape: [batch_size, seq_len, d_model]

        Returns:
            torch.Tensor: Output tensor. Shape: [batch_size, seq_len, d_model]
        """
        # 1. Pass through the first linear layer and ReLU
        #    [batch_size, seq_len, d_model] -> [batch_size, seq_len, d_ff]
        intermediate = self.relu(self.w_1(x))

        # 2. Apply dropout
        intermediate = self.dropout(intermediate)

        # 3. Pass through the second linear layer
        #    [batch_size, seq_len, d_ff] -> [batch_size, seq_len, d_model]
        output = self.w_2(intermediate)

        return output
# Cell 2 (Continued): Add this code

# --- 5. The Full Encoder Layer ---
class EncoderLayer(nn.Module):
    def __init__(self, d_model, h, d_ff, dropout=0.1):
        """
        Initializes a single Encoder Layer.

        Args:
            d_model (int): The dimension of the model (e.g., 512).
            h (int): The number of attention heads (e.g., 8).
            d_ff (int): The inner-layer dimension of the FFN (e.g., 2048).
            dropout (float): Dropout rate.
        """
        super(EncoderLayer, self).__init__()

        # --- Sub-layer 1: Multi-Head Attention ---
        self.self_attn = MultiHeadAttention(d_model, h)
        # We need Layer Normalization and Dropout for the residual connection
        self.norm1 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)

        # --- Sub-layer 2: Position-wise Feed-Forward ---
        self.ffn = PositionwiseFeedForward(d_model, d_ff, dropout)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout2 = nn.Dropout(dropout)

    def forward(self, x, mask):
        """
        Forward pass for the Encoder Layer.

        Args:
            x (torch.Tensor): Input tensor. Shape: [batch_size, seq_len, d_model]
            mask (torch.Tensor): The mask for self-attention.

        Returns:
            torch.Tensor: Output tensor. Shape: [batch_size, seq_len, d_model]
        """

        # --- Process Sub-layer 1 (Self-Attention) ---

        # 1. Save the original input for the residual connection
        residual1 = x

        # 2. Pass the input through the attention layer
        #    Note: In self-attention, q, k, and v are all from the same input 'x'
        attn_output = self.self_attn(q_in=x, k_in=x, v_in=x, mask=mask)

        # 3. Apply dropout and add the residual connection
        x = self.dropout1(attn_output)
        x = x + residual1

        # 4. Apply layer normalization
        x = self.norm1(x)

        # --- Process Sub-layer 2 (Feed-Forward) ---

        # 1. Save the intermediate input for the residual connection
        residual2 = x

        # 2. Pass the input through the feed-forward network
        ffn_output = self.ffn(x)

        # 3. Apply dropout and add the residual connection
        x = self.dropout2(ffn_output)
        x = x + residual2

        # 4. Apply layer normalization
        x = self.norm2(x)

        return x
  # Cell 2 (Continued): Add this code

# --- 6. The Full Encoder ---
class Encoder(nn.Module):
    def __init__(self, d_model, h, d_ff, N, dropout=0.1):
        """
        Initializes the full Encoder stack.

        Args:
            d_model (int): Model dimension (e.g., 512).
            h (int): Number of heads (e.g., 8).
            d_ff (int): FFN inner dimension (e.g., 2048).
            N (int): Number of EncoderLayer blocks to stack (e.g., 6).
            dropout (float): Dropout rate.
        """
        super(Encoder, self).__init__()

        # Create a list of N EncoderLayer blocks
        self.layers = nn.ModuleList([
            EncoderLayer(d_model, h, d_ff, dropout) for _ in range(N)
        ])
        # Add a final LayerNorm
        self.norm = nn.LayerNorm(d_model)

    def forward(self, x, mask):
        """
        Forward pass for the full Encoder.

        Args:
            x (torch.Tensor): Input. Shape: [batch_size, seq_len, d_model]
            mask (torch.Tensor): The mask for self-attention.

        Returns:
            torch.Tensor: Output. Shape: [batch_size, seq_len, d_model]
        """
        # Pass the input through each layer in the stack
        for layer in self.layers:
            x = layer(x, mask)

        # Apply the final layer normalization
        return self.norm(x)
  # Cell 2 (Continued): Add this code

# --- 7. The Full Transformer Classifier ---
class TransformerClassifier(nn.Module):
    def __init__(self, vocab_size, d_model, h, d_ff, N, num_classes, dropout=0.1):
        """
        Initializes the complete Transformer-based classifier.

        Args:
            vocab_size (int): The size of your vocabulary (e.g., 30000).
            d_model (int): Model dimension (e.g., 512).
            h (int): Number of heads (e.g., 8).
            d_ff (int): FFN inner dimension (e.g., 2048).
            N (int): Number of EncoderLayer blocks (e.g., 6).
            num_classes (int): The number of output classes (e.g., 2 for pos/neg).
            dropout (float): Dropout rate.
        """
        super(TransformerClassifier, self).__init__()

        self.d_model = d_model

        # --- 1. Input Embedding ---
        # This layer learns the "meaning" vector for each word in your vocab
        self.embedding = nn.Embedding(vocab_size, d_model)

        # --- 2. Positional Encoding ---
        # This adds the "position" vector
        self.pos_encoding = PositionalEncoding(d_model)

        # --- 3. Full Encoder Stack ---
        # This is the main "brain" of the model
        self.encoder = Encoder(d_model, h, d_ff, N, dropout)

        # --- 4. The Classifier Head ---
        # This takes the final output and turns it into a class prediction
        self.classifier_head = nn.Linear(d_model, num_classes)

        self.dropout = nn.Dropout(dropout)

    def forward(self, x_input, mask):
        """
        Forward pass for the classifier.

        Args:
            x_input (torch.Tensor): Input token IDs. Shape: [batch_size, seq_len]
            mask (torch.Tensor): The padding mask. Shape: [batch_size, 1, seq_len, seq_len]

        Returns:
            torch.Tensor: The final class logits. Shape: [batch_size, num_classes]
        """
        # --- 1. Get Embeddings & Add Position ---
        # x_input shape: [batch_size, seq_len]

        # Get word embeddings
        x = self.embedding(x_input) # Shape: [batch_size, seq_len, d_model]

        # Scale embeddings (as done in the paper)
        x = x * math.sqrt(self.d_model)

        # Add positional encoding
        x = self.pos_encoding(x)
        x = self.dropout(x)

        # --- 2. Pass through Encoder ---
        # x shape: [batch_size, seq_len, d_model]
        x = self.encoder(x, mask)

        # --- 3. Get Classification ---
        # To classify the whole sentence, we can just average all
        # the word vectors from the final layer.
        # This is a simple and effective method.
        # x.mean(dim=1) takes the average across the seq_len dimension.
        # Shape: [batch_size, seq_len, d_model] -> [batch_size, d_model]
        x_pooled = x.mean(dim=1)

        # Pass the pooled output through the final linear layer
        # Shape: [batch_size, d_model] -> [batch_size, num_classes]
        output = self.classifier_head(x_pooled)

        return output
# Cell 2 (Continued): Add this new function

def create_padding_mask(input_ids, pad_token_id):
    """
    Creates the padding mask for the Encoder.

    Args:
        input_ids (torch.Tensor): Input token IDs. Shape: [batch_size, seq_len]
        pad_token_id (int): The ID of the padding token.

    Returns:
        torch.Tensor: The padding mask.
                      Shape: [batch_size, 1, 1, seq_len]
    """
    # Find where the input_ids are NOT the padding token
    # (batch_size, seq_len)
    mask = (input_ids != pad_token_id)

    # Reshape to the broadcastable shape for attention
    # (batch_size, 1, 1, seq_len)
    return mask.unsqueeze(1).unsqueeze(2)

In [None]:
# Cell 3 (Continued): Add this new test

print("\n--- Testing final model ---")

# --- Define Model Hyperparameters ---
vocab_size = 30000  # How many unique words in our dictionary
num_classes = 2     # 2 classes: positive and negative
d_model = 512
h = 8
d_ff = 2048
N = 6               # The paper uses 6 layers

# --- Create a mock input tensor ---
batch_size = 3
seq_len = 10
# This time, the input is token IDs (just integers), not vectors
mock_input_ids = torch.randint(0, vocab_size, (batch_size, seq_len))
print(f"Mock input IDs shape: {mock_input_ids.shape}")

# Create the same simple mask
mask = torch.ones(batch_size, 1, seq_len, seq_len)

# --- 1. Test the Full TransformerClassifier ---
model = TransformerClassifier(vocab_size, d_model, h, d_ff, N, num_classes)

# Run the input through the full model
final_output = model(mock_input_ids, mask)

print(f"Final model output shape: {final_output.shape}")

# --- Check the final output shape ---
expected_shape = torch.Size([batch_size, num_classes])
assert final_output.shape == expected_shape

print("\nSuccess! Your full Transformer Classifier is built and working correctly!")

In [None]:
# Cell 4: Load Data and Tokenizer
!pip install datasets transformers

from datasets import load_dataset
from transformers import AutoTokenizer
from torch.utils.data import DataLoader

# --- 1. Load a Tokenizer ---
# We use a pre-trained tokenizer. Don't build this from scratch.
# "bert-base-uncased" is a good, standard tokenizer.
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

# --- 2. Create a Preprocessing Function ---
def preprocess_function(examples):
    # Tokenize the text, pad it, and truncate it to 512 tokens
    return tokenizer(examples["text"],
                     padding="max_length",  # Pad to max_length
                     truncation=True,       # Truncate to max_length
                     max_length=512)        # Standard max length

# --- 3. Load the IMDB Dataset ---
print("Loading IMDB dataset...")
# This will download the dataset and cache it
# We only take 5000 examples from train/test to make it run FAST
dataset = load_dataset("imdb", split={
    'train': 'train[:5000]',
    'test': 'test[:5000]'
})

# --- 4. Apply the Preprocessing ---
print("Tokenizing and preprocessing data...")
tokenized_dataset = dataset.map(preprocess_function, batched=True)

# --- 5. Set Format and Create DataLoaders ---
# This converts the dataset from "Hugging Face format" to "PyTorch format"
tokenized_dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "label"])

# Set up our training and testing DataLoaders
batch_size = 16 # You can make this smaller (e.g., 8) if you run out of GPU memory
train_dataloader = DataLoader(tokenized_dataset["train"], batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(tokenized_dataset["test"], batch_size=batch_size)

print("\nData preparation complete!")
print(f"Number of training batches: {len(train_dataloader)}")
print(f"Number of testing batches: {len(test_dataloader)}")

In [None]:
# Cell 5: Initialize Model, Loss, and Optimizer

# --- 1. Define Device ---
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# --- 2. Define Model Hyperparameters ---
vocab_size = tokenizer.vocab_size  # Get vocab size from the tokenizer
num_classes = 2                    # 2 classes: positive and negative
d_model = 256                      # SMALLER for faster training (paper used 512)
h = 4                              # SMALLER (paper used 8)
d_ff = 1024                        # SMALLER (paper used 2048)
N = 3                              # SMALLER (paper used 6)
dropout = 0.1
pad_token_id = tokenizer.pad_token_id # Get the <PAD> token ID

# --- 3. Initialize the Model ---
model = TransformerClassifier(
    vocab_size=vocab_size,
    d_model=d_model,
    h=h,
    d_ff=d_ff,
    N=N,
    num_classes=num_classes,
    dropout=dropout
).to(device)

# --- 4. Initialize Loss Function ---
# We use CrossEntropyLoss because our model outputs raw logits
criterion = nn.CrossEntropyLoss()

# --- 5. Initialize Optimizer ---
# The paper used a custom learning rate, but standard Adam works well
optimizer = torch.optim.Adam(model.parameters(), lr=5e-5)

print("Model, Loss, and Optimizer are ready.")

In [None]:
# Cell 6: The Training Loop
import time

num_epochs = 3 # Train for 3 epochs (full passes over the data)

print("Starting training...")

for epoch in range(num_epochs):
    model.train() # Set the model to training mode
    total_loss = 0
    start_time = time.time()

    for batch in train_dataloader:
        # 1. Get data and move to device
        input_ids = batch["input_ids"].to(device)
        labels = batch["label"].to(device)

        # 2. Create the real padding mask
        # We use the new function from Cell 2
        mask = create_padding_mask(input_ids, pad_token_id).to(device)

        # 3. Forward pass
        optimizer.zero_grad() # Clear old gradients
        outputs = model(input_ids, mask) # Get model predictions

        # 4. Calculate loss
        loss = criterion(outputs, labels)

        # 5. Backward pass and optimize
        loss.backward() # Calculate gradients
        optimizer.step() # Update model weights

        total_loss += loss.item()

    # --- End of Epoch ---
    avg_train_loss = total_loss / len(train_dataloader)
    epoch_time = time.time() - start_time
    print(f"\n--- Epoch {epoch+1}/{num_epochs} ---")
    print(f"Average Training Loss: {avg_train_loss:.4f}")
    print(f"Epoch Time: {epoch_time:.2f}s")

    # --- Evaluation ---
    model.eval() # Set the model to evaluation mode
    total_correct = 0
    total_samples = 0

    with torch.no_grad(): # No gradients needed for evaluation
        for batch in test_dataloader:
            input_ids = batch["input_ids"].to(device)
            labels = batch["label"].to(device)
            mask = create_padding_mask(input_ids, pad_token_id).to(device)

            outputs = model(input_ids, mask)

            # Get the class with the highest score
            _, predicted = torch.max(outputs, 1)

            total_samples += labels.size(0)
            total_correct += (predicted == labels).sum().item()

    accuracy = (total_correct / total_samples) * 100
    print(f"Test Accuracy: {accuracy:.2f}%")

print("\n--- Training Complete! ---")

In [None]:
# Cell 7: Test on your own sentences

def predict_sentiment(sentence):
    model.eval() # Set model to evaluation mode

    # 1. Tokenize the sentence
    inputs = tokenizer(sentence,
                       return_tensors="pt",  # Return PyTorch tensors
                       padding="max_length",
                       truncation=True,
                       max_length=512)

    input_ids = inputs["input_ids"].to(device)

    # 2. Create the padding mask
    # We can just use the attention_mask from the tokenizer!
    # But we need to reshape it for our model
    attention_mask = inputs["attention_mask"].to(device)
    mask = attention_mask.unsqueeze(1).unsqueeze(2) # [1, 1, 1, seq_len]

    # 3. Get the prediction
    with torch.no_grad():
        outputs = model(input_ids, mask)

    # 4. Get the final class
    _, predicted_class = torch.max(outputs, 1)

    if predicted_class.item() == 1:
        return "Positive"
    else:
        return "Negative"

# --- Try it out! ---
print(predict_sentiment("This movie was absolutely fantastic! I loved it."))
print(predict_sentiment("It was a complete waste of time. The plot was terrible."))