# Problem: Build a Transformer Model from Scratch

## Objective
Implement a **Transformer model** in PyTorch for sequence processing and prediction. The model should include an embedding layer, a Transformer encoder, and an output projection layer.

## Tasks

1. Implement Positional Encoding to inject sequence order into embeddings  
Create sinusoidal positional encodings that are added to input embeddings to provide order information.

2. Implement Multi-Head Self Attention mechanism  
Apply attention in parallel across multiple heads to capture different representation subspaces.

3. Linear projection of queries, keys, and values  
Use a single linear layer to project input into concatenated Q, K, V tensors.

4. Scaled dot-product attention  
Compute attention scores by scaled dot product of queries and keys, followed by softmax and application to values.

5. Output projection after head concatenation  
Concatenate the outputs of all heads and project back to the original embedding dimension.

6. Implement FeedForward layer used within Transformer blocks  
Build a two-layer MLP with a ReLU activation in between to process each token independently.

7. Connect components in a TransformerEncoderLayer with proper layer normalization and residual connections  
Apply residual connections and layer normalization around the attention and feedforward sublayers.


## Requirements

- Support padded input sequences for variable-length data.
- Ensure the model handles batched inputs with correct tensor shapes.


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import math

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        pass

    def forward(self, x):
        pass


class MultiHeadSelfAttention(nn.Module):
    def __init__(self, embed_dim, num_heads):
        super().__init__()
        pass

    def forward(self, x):
        pass


class FeedForward(nn.Module):
    def __init__(self, embed_dim, ff_dim):
        super().__init__()
        pass

    def forward(self, x):
        pass


class TransformerEncoderLayer(nn.Module):
    def __init__(self, embed_dim, num_heads, ff_dim):
        super().__init__()
        pass

    def forward(self, x):
       pass


class TransformerModel(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_heads, num_layers, ff_dim, output_dim):
        super().__init__()
        pass

    def forward(self, x):
        pass


# Symmetry Detection Test
A synthetic task where the model must determine if a sequence is a mirror of itself. It's a good test for positional encoding. If everything is working correctly, you should see test accuracy above 95% after 10 epochs.

In [None]:
torch.manual_seed(42)

seq_length = 10
num_samples = 10000
vocab_size = seq_length  # tokens are 0..N-1

def create_mirror_data(num_samples, seq_length, vocab_size):
    half_len = seq_length // 2
    X = torch.zeros((num_samples, seq_length), dtype=torch.long)
    y = torch.zeros(num_samples, dtype=torch.long)

    for i in range(num_samples):
        # 1. Always create a mirror sequence first
        base_half = torch.randint(0, vocab_size, (half_len,))
        mirror_seq = torch.cat([base_half, torch.flip(base_half, dims=[0])])

        if torch.rand(1) > 0.5:
            # Positive Case: Keep the mirror order
            X[i] = mirror_seq
            y[i] = 1
        else:
            # Negative Case: Randomly shuffle the mirror sequence
            # Now the tokens are the same, but the order is "broken"
            X[i] = mirror_seq[torch.randperm(seq_length)]
            y[i] = 0

    return X, y

X, y = create_mirror_data(num_samples, seq_length, vocab_size)
X_test, y_test = create_mirror_data(1000, seq_length, vocab_size)


embed_dim = 64
num_heads = 4
num_layers = 2
ff_dim = 128

model = TransformerModel(vocab_size, embed_dim, num_heads, num_layers, ff_dim, output_dim=2)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=2e-4)

epochs = 10
batch_size = 64
for epoch in range(epochs):
    avg_loss = 0.0
    for i in range(0, num_samples, batch_size):
        X_batch = X[i:i+batch_size]
        y_batch = y[i:i+batch_size]

        # Forward pass
        predictions = model(X_batch)
        loss = criterion(predictions, y_batch)
        avg_loss += loss.item()

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()


    model.eval()
    with torch.no_grad():
        test_predictions = model(X_test)
        test_loss = criterion(test_predictions, y_test)
        _, predicted_classes = torch.max(test_predictions, 1)
        accuracy = (predicted_classes == y_test).float().mean().item()
    model.train()

    print(f"Epoch [{epoch + 1}/{epochs}], Avg Train Loss: {avg_loss/(num_samples//batch_size):.4f}, Test Loss: {test_loss.item():.4f}, Test Accuracy: {accuracy:.4f}")


# Sentiment Analysis Test
Real-world application of the Transformer model for sentiment analysis. Note: not a gret test for positional encoding, but a good sanity check for the overall model.

## Install required libraries
`pip install datasets transformers`

In [None]:
from datasets import load_dataset
from torch.utils.data import DataLoader
from transformers import AutoTokenizer

# 1. Load a small subset of the SST-2 dataset
dataset = load_dataset("glue", "sst2", split="train[:5000]")

# 2. Tokenization (Turning words into numbers)
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

def tokenize_function(examples):
    return tokenizer(examples["sentence"], padding="max_length", truncation=True, max_length=16)

tokenized_dataset = dataset.map(tokenize_function, batched=True)
tokenized_dataset.set_format(type='torch', columns=['input_ids', 'label'])

# 3. Create DataLoader
train_loader = DataLoader(tokenized_dataset, batch_size=32, shuffle=True)

In [None]:
# Updated Hyperparameters for Sentiment Analysis
vocab_size = tokenizer.vocab_size  # Usually ~30,522 for BERT
embed_dim = 32                     # Small embedding for speed
num_heads = 4
num_layers = 2
ff_dim = 128
output_dim = 2                     # 0 for Negative, 1 for Positive

# Initialize the model with the new vocab size
# Note: You'll need to update your TransformerModel class to include an nn.Embedding layer
model = TransformerModel(vocab_size, embed_dim, num_heads, num_layers, ff_dim, output_dim)

# Use CrossEntropyLoss for classification instead of MSELoss
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

In [None]:
# Training loop
model.train()
epochs = 50
for epoch in range(epochs):
    avg_loss = 0.0
    for batch in train_loader:
        X = batch['input_ids']  # Shape: (batch_size, seq_length)
        y = batch['label']      # Shape: (batch_size,)

        # Forward pass
        predictions = model(X)
        loss = criterion(predictions, y)
        avg_loss += loss.item()

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    if (epoch + 1) % 5 == 0:
        print(f"Epoch [{epoch + 1}/{epochs}], Avg Loss: {avg_loss/len(train_loader):.4f}")

In [None]:
# 1. Put model in evaluation mode (disables dropout/batchnorm)
model.eval()

test_sequences = [
    "This was the worst film I have ever seen.",
    "I absolutely loved this movie!"
]

# 2. Tokenize (ensure return_tensors="pt" for PyTorch)
tokenized_test = tokenizer(test_sequences, padding="max_length", truncation=True, max_length=16, return_tensors="pt")
X_test = tokenized_test['input_ids']  # Shape: (2, seq_length)
with torch.no_grad():
    logits = model(X_test)
    probabilities = torch.softmax(logits, dim=-1)
    predictions = torch.argmax(probabilities, dim=-1)

print(f"Raw Logits: {logits.tolist()}")
print(f"Probabilities: {probabilities.tolist()}")
print(f"Predicted Classes (0 negative, 1 positive): {predictions.tolist()}") # [1, 0] (hopefully!)