# Transformers and Arithmetic Addition

### Step 0: Some import and later required functions

In [2]:
# Imports
import os
import random
import math
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader
import mnist
from matplotlib import pyplot as plt
import matplotlib as mpl

# Set a seed for reproducibility
seed = 42
random.seed(seed)

np.random.seed(seed)
torch.manual_seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)

## Arithmetic Operation as Machine Translation Task

In this exercise, you'll build a transformer model from scratch in PyTorch to perform simple arithmetic addition. Your model will learn to translate an input expression of two numbers (e.g., `"123+456"`) into its correct sum (e.g., `"579"`). This task is framed as a neural machine translation problem where the source is the arithmetic expression and the target is its result.


### Dataset Overview
This dataset is designed for a machine translation task where the goal is to perform addition between two numbers. Each sample consists of:
- **Input (Source):** A string representing an arithmetic expression in the form "a+b", where *a* and *b* are randomly generated numbers.
- **Output (Target):** A string representing the sum of *a* and *b*.

Special tokens such as `<sos>` (start-of-sequence), `<eos>` (end-of-sequence), and `<pad>` (padding) are included in both the source and target vocabularies to facilitate sequence modeling in a Transformer-based neural machine translation framework.

In [7]:
# Assuming SimpleTokenizer is defined as provided:
class SimpleTokenizer:
    def __init__(self, texts=None):
        self.char_to_idx = {}
        self.idx_to_char = {}
        
        if texts:
            self.fit(texts)
    
    def fit(self, texts):
        # Create a set of all unique characters
        unique_chars = set()
        for text in texts:
            unique_chars.update(text)
        
        # Create mapping dictionaries
        self.char_to_idx = {char: idx for idx, char in enumerate(sorted(unique_chars))}
        self.idx_to_char = {idx: char for char, idx in self.char_to_idx.items()}
    
    def encode(self, text):
        return [self.char_to_idx[char] for char in text]
    
    def decode(self, indices):
        return ''.join([self.idx_to_char[idx] for idx in indices])
    
    @property
    def vocab_size(self):
        return len(self.char_to_idx)

class AdditionDataset(Dataset):
    def __init__(self, num_samples=10000, max_digits=6, add_special_tokens=True, tokenizer=None):
        """
        Args:
            num_samples (int): Number of samples to generate.
            max_digits (int): Maximum number of digits for each number.
            add_special_tokens (bool): Whether to add <sos> and <eos> to the sequences.
            tokenizer (SimpleTokenizer, optional): A pre-initialized tokenizer. If None, it will be created.
        """
        super(AdditionDataset, self).__init__()
        self.num_samples = num_samples
        self.max_digits = max_digits
        self.add_special_tokens = add_special_tokens
        self.samples = []
        
        # Generate the dataset samples.
        for _ in range(num_samples):
            a = random.randint(0, 10**max_digits - 1)
            b = random.randint(0, 10**max_digits - 1)
            src = f"{a}+{b}"
            tgt = str(a + b)
            if add_special_tokens:
                src = "<sos>" + src + "<eos>"
                tgt = "<sos>" + tgt + "<eos>"
            self.samples.append((src, tgt))
        
        # If no tokenizer is provided, create one from all texts (both source and target)
        if tokenizer is None:
            all_texts = []
            for src, tgt in self.samples:
                all_texts.append(src)
                all_texts.append(tgt)
            self.tokenizer = SimpleTokenizer(all_texts)
        else:
            self.tokenizer = tokenizer
    
    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        src, tgt = self.samples[idx]
        src_ids = self.tokenizer.encode(src)
        tgt_ids = self.tokenizer.encode(tgt)
        return torch.tensor(src_ids, dtype=torch.long), torch.tensor(tgt_ids, dtype=torch.long)


#### Step 0: Feel the data

In [9]:
dataset = AdditionDataset(num_samples=5, max_digits=6)
for i in range(len(dataset)):
    src_tensor, tgt_tensor = dataset[i]
    # Decode to verify using the dataset's tokenizer
    src_decoded = dataset.tokenizer.decode(src_tensor.tolist())
    tgt_decoded = dataset.tokenizer.decode(tgt_tensor.tolist())
    print(f"Input: {src_decoded} -> Target: {tgt_decoded}")


Input: <sos>229258+243962<eos> -> Target: <sos>473220<eos>
Input: <sos>529903+631262<eos> -> Target: <sos>1161165<eos>
Input: <sos>27824+588508<eos> -> Target: <sos>616332<eos>
Input: <sos>208496+750800<eos> -> Target: <sos>959296<eos>
Input: <sos>681453+735392<eos> -> Target: <sos>1416845<eos>


# Building Transformers from scratch


### Step 2: Multi-Head Attention Mechanism

The multi-head attention mechanism is the "secret sauce" of transformers. Imagine reading a sentence where certain words carry more meaning than others. The attention mechanism allows the model to focus on the most relevant parts of the input sequence when generating each token of the output.

The attention mechanism lets the model focus on relevant parts of the input. The "multi-head" aspect allows it to focus on different relationship patterns simultaneously. "Multi-head" means the model can capture different patterns simultaneously. For example, one head might capture subject-verb relationships while another focuses on adjective-noun connections. This diversity in attention is a key innovation that enhances the model's understanding and performance.

#### Your Task:

During the initialization of the module, make sure to:
- Verify that `d_model` (the embedding dimension) is divisible by `num_heads`.
- Compute the dimension per head (`d_k`), where `d_k = d_model / num_heads`.
- Create linear layers for the query (Q), key (K), and value (V) projections.
- Create a final linear layer for output projection.

With the split_heads method reshape the input tensor to separate the embedding dimension into multiple heads. This allows each head to focus on different aspects of the input.

In the forward pass:
- Project the inputs using the linear layers and split into heads using the split_heads method.
- Calculate attention scores by taking the dot product between queries and keys (transposed)
- Scale the scores by dividing by sqrt(d_k) to prevent extremely small gradients
- Apply a zero-mask if provided (important for preventing the model from "seeing the future" in autoregressive tasks), check `torch.Tensor.masked_fill`
- Apply softmax to get normalized weights that sum to 1
- Apply these weights to the values through matrix multiplication
- Apply the output projection

In [10]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        assert ..., "d_model must be divisible by num_heads"
        
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = ...
        
        # Linear projections for Q, K, V
        self.w_q = ...
        self.w_k = ...
        self.w_v = ...
        self.w_o = ...

    def split_heads(self, x, batch_size):
        # Reshape from (batch_size, seq_len, d_model) to (batch_size, num_heads, seq_len, d_k)
        x = x.view(batch_size, -1, self.num_heads, self.d_k)
        return x.permute(0, 2, 1, 3)

     def forward(self, q, k, v, mask=None):
        batch_size = q.size(0)
        
        # Linear projections and split into heads
        q = ...
        k = ...
        v = ...
        
        # Scaled dot-product attention
        scores = ...
        scores = ...

        # Apply mask if provided
        if mask is not None:
            scores = ...
        
        # Apply softmax to get attention weights
        attn_weights = ...
        
        # Apply attention weights to values
        context = ...  # (batch_size, num_heads, seq_len_q, d_k)

        # Reshape back to (batch_size, seq_len_q, d_model)
        context = context.permute(0, 2, 1, 3).contiguous()
        context = context.view(batch_size, -1, self.d_model)
        
        # Final linear projection
        output = ...
        
        return output

### Step 3: Position-wise Feed-Forward Network

After the attention layer, each position in the sequence gets processed by this simple (feed-forward) neural network. It's applied to each position independently, like having the same mini neural network look at each word.

#### Your Task:
Write simple two-layer neural network with a ReLU activation in between.


In [11]:
class PositionwiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(PositionwiseFeedForward, self).__init__()
        self.fc1 = ...
        self.fc2 = ...
        
    def forward(self, x):
        return ...

### Step 4: Positional Encoding

Transformers don't inherently capture the order of tokens in a sequence. To address this, positional encoding is added to each token embedding to inject information about the token positions. This encoding is computed using sine and cosine functions of different frequencies, ensuring that each position in the sequence has a unique representation. The key trick is to use sine functions for even indices and cosine functions for odd indices in the embedding dimension.

#### Your Task:
Fill in the missing parts of the code below according to the steps



In [1]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_seq_length=5000):
        super(PositionalEncoding, self).__init__()
        
        # TODO: Create a positional encoding matrix of size (max_seq_length, d_model) filled with zeros.
        pe = ...
        
        # TODO: Create a tensor 'position' with values from 0 to max_seq_length - 1 and reshape it to (max_seq_length, 1).
        position = ...
        
        # TODO: Compute the div_term using exponential decay based on the embedding dimension.
        div_term = ...
        
        # TODO: For even indices (0, 2, ...), fill the positional encoding matrix with sine(position * div_term).
        pe[:, 0::2] = ...
        
        # TODO: For odd indices (1, 3, ...), fill the positional encoding matrix with cosine(position * div_term).
        pe[:, 1::2] = ...
        
        #Register the positional encoding matrix as a buffer so it's not treated as a model parameter.
        self.register_buffer('pe', pe.unsqueeze(0))
        
    def forward(self, x):
        # TODO: Add positional encoding to the input x. Make sure to slice pe based on the input's sequence length.
        return ...


### Step 5: Encoder Layer

The encoder layer is the building block of the Transformer encoder. It combines several important components:

- **Multi-Head Self-Attention Layer:**  
  This layer computes attention where the query, key, and value all come from the same input. It allows the model to weigh the importance of different tokens in the sequence relative to each other. (use the provided `MultiHeadAttention` class).

- **Feed-Forward Network:**  
  A two-layer fully connected network that processes the output from the attention mechanism. (use the provided `PositionwiseFeedForward` class).

- **Residual Connections:**  
  These connections add the input of a sub-layer to its output, which helps the gradient flow during training. (See the last exercise)

- **Layer Normalization:**  
  Layer normalization is applied after adding the residual connection to stabilize and speed up training. See `nn.LayerNorm`

- **Dropout Modules:**  
  Dropout is used to prevent overfitting by randomly setting a fraction of the input units to zero during training. See `nn.Dropout`

#### Your Task:
Fill in the missing parts in the code below by completing the following steps


In [None]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super(EncoderLayer, self).__init__()
        
        # TODO: Initialize the multi-head self-attention layer
        self.self_attn = ...
        
        # TODO: Initialize the position-wise feed-forward network
        self.feed_forward = ...
        
        # TODO: Initialize two layer normalization modules
        self.norm1 = ...
        self.norm2 = ...
        
        # TODO: Initialize two dropout modules
        self.dropout1 = ...
        self.dropout2 = ...

    def forward(self, x, mask=None):
        # TODO: Step 1: Apply multi-head self-attention with a residual connection and layer normalization.
        # Q, K, and V are all the same input: x without mask for encoder
        attn_output = ...  # Multi-head self-attention
        # TODO: Add dropout and residual connection, then apply layer normalization.
        x = ...
        
        # Step 2: Apply the feed-forward network with a residual connection and layer normalization.
        ff_output = self.feed_forward(x)
        # TODO: Add dropout and residual connection, then apply layer normalization.
        x = ...
        
        return x


### Step 6: Decoder Layer

The decoder layer extends the encoder layer by adding an extra cross-attention mechanism. In addition to self-attention (which uses a mask to prevent future token access), it incorporates cross-attention to allow the decoder to attend to the encoder's output. Like the encoder, the decoder layer uses residual connections and layer normalization to ensure effective gradient flow and stable learning.

#### Your Task:
Complete the following tasks by filling in the missing code sections:

1. **Self-Attention:**  
   Compute self-attention on the decoder input using a mask (`tgt_mask`) to prevent attending to future tokens. Then apply dropout, add a residual connection, and normalize.

2. **Cross-Attention:**  
   Use cross-attention where the queries come from the current decoder representation and the keys and values come from the encoder's output. Again, apply dropout, add the residual connection, and normalize.

3. **Feed-Forward Network:**  
   Process the result with a feed-forward network. Apply dropout, add the residual connection, and perform a final layer normalization.

In [None]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super(DecoderLayer, self).__init__()
        
        # TODO: Initialize the self-attention layer for the decoder.
        self.self_attn = ...
        
        # TODO: Initialize the cross-attention layer for attending to encoder outputs.
        self.cross_attn = ...
        
        # TODO: Initialize the position-wise feed-forward network.
        self.feed_forward = ...
        
        # TODO: Initialize three layer normalization modules.
        self.norm1 = ...
        self.norm2 = ...
        self.norm3 = ...
        
        # TODO: Initialize three dropout modules.
        self.dropout1 = ...
        self.dropout2 = ...
        self.dropout3 = ...

    def forward(self, x, enc_output, src_mask=None, tgt_mask=None):
        # TODO: Step 1: Self-attention on the decoder input with masking, tgt or src? ;)
        self_attn_output = ...
        # TODO: Apply dropout, add the residual connection, and normalize.
        x = ...
        
        # TODO: Step 2: Cross-attention between decoder representation and encoder outputs with masking, tgt or src? ;)
        cross_attn_output = ...
        # TODO: Apply dropout, add the residual connection, and normalize.
        x = ...
        
        # Step 3: Feed-forward network with residual connection and layer normalization.
        ff_output = self.feed_forward(x)
        # TODO: Apply dropout, add the residual connection, and normalize.
        x = ...
        
        return x


### Step 7: Full Encoder and Decoder

Now we stack multiple encoder and decoder layers. The encoder simply applies multiple encoder layers in sequence. The decoder does the same with decoder layers, passing the encoder output to each layer. These stack multiple encoder or decoder layers to create deeper networks that can learn more complex patterns.

In [15]:
class Encoder(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, num_layers, dropout=0.1):
        super(Encoder, self).__init__()
        
        self.layers = nn.ModuleList([
            EncoderLayer(d_model, num_heads, d_ff, dropout)
            for _ in range(num_layers)
        ])
        
    def forward(self, x, mask=None):
        for layer in self.layers:
            x = layer(x, mask)
        return x

class Decoder(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, num_layers, dropout=0.1):
        super(Decoder, self).__init__()
        
        self.layers = nn.ModuleList([
            DecoderLayer(d_model, num_heads, d_ff, dropout)
            for _ in range(num_layers)
        ])
        
    def forward(self, x, enc_output, src_mask=None, tgt_mask=None):
        for layer in self.layers:
            x = layer(x, enc_output, src_mask, tgt_mask)
        return x

### Step 8: Complete Transformer Language Model

In this step, you'll integrate all previous components to build a complete Transformer-based language model. This model can work in two different configurations: an encoder–decoder architecture or a decoder-only architecture. Understanding the differences between these two approaches is key to designing models for different NLP tasks.

---

#### **Encoder–Decoder Architecture**

- **When to Use:**  
  This setup is ideal for tasks like machine translation where you have a clear input sequence (source language) and an output sequence (target language). The encoder processes the source sequence into a set of contextualized representations, and the decoder uses these representations to generate the target sequence.

- **Pipeline Overview:**  
  1. **Input Embedding:**  
     Convert token IDs from the source sequence into dense vector representations. (Use `nn.Embedding`)
  2. **Positional Encoding:**  
     Add positional encodings to the embeddings so the model knows the order of tokens.
  3. **Encoder Stack:**  
     Pass the source embeddings through the encoder stack to generate a context-rich representation.
  4. **Target Embedding & Positional Encoding:**  
     Convert token IDs from the target sequence into embeddings ((Use `nn.Embedding`)) and add positional encodings.
  5. **Decoder Stack:**  
     Use the decoder stack to generate the target sequence. The decoder attends both to its own past predictions (self-attention with masking) and the encoder outputs (cross-attention).
  6. **Output Projection:**  
     Map the decoder's final outputs to vocabulary scores, which can then be used to predict the next token in the sequence.

- **Hint for Implementation:**  
  Make sure you correctly manage two inputs—the source and target sequences. The encoder output must be passed as an additional argument to the decoder so it can perform cross-attention.

---

#### **Decoder-Only Architecture**

- **When to Use:**  
  This configuration is common in models like GPT for tasks such as language modeling or text generation. In this mode, the model uses only a decoder. The single input sequence is processed to predict the next token, relying on self-attention across all previous tokens.

- **Pipeline Overview:**  
  1. **Input Embedding:**  
     Convert token IDs into vector representations (Use `nn.Embedding`).
  2. **Positional Encoding:**  
     Add positional information to the embeddings.
  3. **Decoder Stack:**  
     Process the sequence through a decoder stack that only employs self-attention (with a mask to prevent looking ahead).
  4. **Output Projection:**  
     Convert the processed vectors into vocabulary scores for next-token prediction.

- **Hint for Implementation:**  
  Since you only have one sequence, the same tensor is used both for the self-attention and as the input to subsequent layers. Ensure that you properly apply masking to prevent the decoder from "cheating" by looking at future tokens.

---

#### **Key Components Across Both Architectures**

1. **Embedding & Positional Encoding:**  
   - **Embedding:** This layer converts each token (represented by its ID) into a high-dimensional vector.  
   - **Scaling:** Multiplying by `sqrt(d_model)` helps balance the variance between embeddings and positional encodings.
   - **Positional Encoding:** Adds order information, making sure that tokens' positions are known to the model.

2. **Encoder and/or Decoder Stacks:**  
   - **Encoder:** In the encoder–decoder model, the encoder transforms the input sequence into context-rich representations using self-attention and feed-forward networks.
   - **Decoder:** Whether it's decoder-only or part of an encoder–decoder system, the decoder uses self-attention (and cross-attention in encoder–decoder) to generate output sequences.

3. **Output Projection:**  
   - This is typically a linear layer that maps the final hidden states from the decoder to logits over the vocabulary. These logits can be turned into probabilities to predict the next token.

---

#### **Suggested Implementation Pipeline**

1. **Start with the Embedding Layers:**  
   - Write code to create the token embedding layer.
   - Multiply the output by `sqrt(d_model)` for proper scaling.

2. **Implement Positional Encoding:**  
   - Create a module that adds positional encodings to the embeddings.
   - Verify that the added positional information correctly alters the embeddings.

3. **Build the Encoder and Decoder Stacks:**  
   - For the encoder–decoder setup, create separate modules for the encoder and decoder.  
   - For the decoder-only setup, focus on a robust decoder implementation that supports masking.

4. **Integrate the Stacks with a Control Flag:**  
   - Introduce a flag (e.g., `use_encoder_decoder`) in your model’s initialization.
   - Use conditional logic in the forward pass to switch between encoder–decoder and decoder-only pipelines.

5. **Add the Output Projection Layer:**  
   - Implement a final linear layer that projects the decoder outputs to the size of your vocabulary.
   - Test the end-to-end flow to ensure the model produces logits that can be used for training.

6. **Test with Dummy Data:**  
   - Before training, run a few forward passes with dummy data to verify that the dimensions and data flows are correct.
   - Experiment with both modes (encoder–decoder and decoder-only) to understand the differences in output.

---

In [None]:
class TransformerLM(nn.Module):
    def __init__(self, vocab_size, d_model, num_heads, d_ff, num_layers, dropout=0.1, use_encoder_decoder=False):
        """
        Args:
            vocab_size (int): Vocabulary size.
            d_model (int): Embedding dimension.
            num_heads (int): Number of attention heads.
            d_ff (int): Dimension of the feed-forward network.
            num_layers (int): Number of layers in the encoder/decoder stack.
            dropout (float): Dropout rate.
            use_encoder_decoder (bool): If True, use an encoder-decoder architecture; otherwise, use decoder-only.
        """
        super(TransformerLM, self).__init__()
        self.d_model = d_model
        self.use_encoder_decoder = use_encoder_decoder
        
        # TODO: Initialize token embedding layer
        self.embedding = ...
        # TODO: Initialize positional encoding module
        self.positional_encoding = ...
        self.dropout = ...
        
        if self.use_encoder_decoder:
            # Initialize the encoder stack for the encoder-decoder architecture
            self.encoder = Encoder(d_model, num_heads, d_ff, num_layers, dropout)
            # Initialize the decoder stack for the encoder-decoder architecture
            self.decoder = Decoder(d_model, num_heads, d_ff, num_layers, dropout)
        else:
            # Initialize the decoder stack for the decoder-only architecture (like GPT)
            self.decoder = Decoder(d_model, num_heads, d_ff, num_layers, dropout)
        
        # TODO: Initialize the output projection layer to map decoder outputs to vocabulary scores
        self.fc_out = ...
    
    def forward(self, x, src_mask=None, tgt_mask=None, encoder_input=None):
        """
        Args:
            x (Tensor): For decoder-only, the input sequence (batch, seq_len).
                        For encoder-decoder, the target sequence (batch, tgt_seq_len).
            src_mask (Tensor, optional): Mask for the encoder (if using encoder-decoder).
            tgt_mask (Tensor, optional): Mask for the decoder.
            encoder_input (Tensor, optional): Source sequence for the encoder (required if use_encoder_decoder=True).
        """
        if self.use_encoder_decoder:
            # --- Encoder–Decoder Setup ---
            assert encoder_input is not None, "encoder_input must be provided for encoder-decoder architecture."
            
            # TODO: Compute encoder embeddings: scale embeddings, add positional encoding, apply dropout.
            enc_emb = ...
            enc_emb = ...
            enc_emb = ...
            # TODO: Pass encoder embeddings through the encoder stack.
            enc_output = ...
            
            # TODO: Compute decoder embeddings: scale embeddings, add positional encoding, apply dropout.
            dec_emb = ...
            dec_emb = ...
            dec_emb = ...
            # TODO: Pass decoder embeddings through the decoder stack, using encoder outputs.
            dec_output = ...
        else:
            # --- Decoder-Only Setup ---
            # TODO: Compute decoder embeddings: scale embeddings, add positional encoding, apply dropout.
            dec_emb = ...
            dec_emb = ...
            dec_emb = ...
            # TODO: Pass decoder embeddings through the decoder stack (using self-attention only).
            dec_output = ...
        
        # Project the decoder output to vocabulary scores using the output projection layer.
        output = self.fc_out(dec_output)
        return output


### Step 9: Causal Masking for Autoregressive Generation
In language modeling, we need to ensure the model only looks at past tokens when predicting the next one, by creating a triangular mask that allows each position to attend only to previous positions and itself.

In [None]:
def create_causal_mask(seq_len):
    # TODO: Create a causal attention mask to prevent attending to future tokens.
    mask = ...
    return mask == 0  # Convert to boolean mask where True values are kept

### Step 10: Training and Inference Function

Now i the follwoing, you have both training and inference pipelines, it's time to explore and compare the performance of the decoder-only and encoder–decoder architectures. In this step, you will:

1. **Plot the Training Loss:**  
   Track and plot the training loss over epochs to understand how well each model converges during training.

2. **Evaluate on a Length Generalization Task:**  
   In a length generalization task, you test the model on arithmetic expressions that are longer than those seen during training. For example, if you trained your model with numbers up to 6 digits, you could test it on arithmetic expressions with 7 or 8 digits. The goal is to assess whether the model can extrapolate its learned arithmetic operations to longer sequences. Report the performance of the models within this task.
   
   **What is Length Generalization?**  
   Length generalization examines a model's ability to handle sequences longer than those encountered during training. In our arithmetic task, it tests whether the model, trained on relatively short input sequences, can correctly perform addition when the numbers have more digits than the training examples. This is a key challenge in many sequence-to-sequence tasks and is an active area of research.

3. **Research on Solutions for Length Generalization:**  
   As you analyze the results, do some research on strategies that have been proposed to improve length generalization. For instance, look into:
   - Modifications in positional encoding
   - Modifications on non-linear activations 
   - Architectures that incorporate recurrence or other forms of extrapolation-friendly inductive biases
   - Techniques like curriculum learning, where the model is gradually exposed to longer sequences

#### Your Tasks:
- **Task 1:** Modify your training loop to record the loss after every epoch (or batch) and plot these loss curves for both decoder-only and encoder–decoder models.
- **Task 2:** Create a test set with arithmetic expressions involving longer numbers (e.g., 7- or 8-digit numbers) and evaluate the performance of your trained models on this set.
- **Task 3:** Document your observations. How do the two architectures compare in terms of training loss and performance on the length generalization task?
- **Task 4:** Perform a brief literature search (or research online) to identify at least two different strategies that have been proposed to enhance length generalization. Summarize your findings.

#### Hints for Implementation:
- **Creating a Length Generalization Test Set:**  
  Modify your data generation function (or create a new dataset) where `max_digits` is increased (e.g., 7 or 8) to simulate the extrapolation scenario.


In [18]:
def train_transformer_lm(model, dataloader, num_epochs, learning_rate, device):
    """
    Trains the transformer language model.
    
    For encoder–decoder mode:
        - Each batch returns (src, tgt).
        - We create a causal mask for the target.
        - The model is called with: model(tgt, src_mask, tgt_mask, encoder_input=src)
        
    For decoder-only mode:
        - We combine src and tgt into a single sequence.
        - We create a causal mask for the combined sequence.
        - The model is called with: model(combined, causal_mask)
    """
    model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.CrossEntropyLoss()
    
    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        
        for batch_idx, (src, tgt) in enumerate(dataloader):
            # Move data to device
            src, tgt = src.to(device), tgt.to(device)
            
            optimizer.zero_grad()
            
            if model.use_encoder_decoder:
                # --- Encoder–Decoder Mode ---
                # (For arithmetic translation: src = "123+456", tgt = "579")
                # Create target causal mask to prevent looking ahead
                tgt_mask = create_causal_mask(tgt.size(1)).to(device)
                # For this task, we do not apply a source mask (or you can create one if needed)
                src_mask = None
                # Forward pass: decoder receives tgt and encoder gets src.
                outputs = model(tgt, src_mask, tgt_mask, encoder_input=src)
                # In teacher forcing, we compute loss against the target sequence.
                loss = criterion(outputs.view(-1, outputs.size(-1)), tgt.view(-1))
            else:
                # --- Decoder-Only Mode ---
                # For a translation-style task using a decoder-only model,
                # we combine the source and target into one sequence.
                # For instance, if src = "<sos>123+456<eos>" and tgt = "<sos>579<eos>",
                # we combine as: combined = src + tgt[1:] to avoid duplicating <sos>.
                combined = torch.cat([src, tgt[:, 1:]], dim=1)
                seq_len = combined.size(1)
                causal_mask = create_causal_mask(seq_len).to(device)
                outputs = model(combined, causal_mask)
                # The target is the combined sequence shifted by one.
                loss = criterion(outputs.view(-1, outputs.size(-1)), combined.view(-1))
            
            # Backpropagation and optimization
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
            if (batch_idx + 1) % 10 == 0:
                print(f'Epoch {epoch+1}/{num_epochs}, Batch {batch_idx+1}/{len(dataloader)}, '
                      f'Loss: {total_loss / (batch_idx + 1):.4f}')
        
        print(f'Epoch {epoch+1}/{num_epochs}, Average Loss: {total_loss / len(dataloader):.4f}')
    
    return model


In [19]:
def generate_text(model, tokenizer, start_text, max_length, temperature=1.0, device='cpu', src_text=None):
    """
    Generates text from the transformer model.
    
    For encoder–decoder mode:
        - src_text is the input arithmetic expression (e.g. "123+456" with special tokens).
        - start_text is the initial target prompt (e.g. "<sos>").
    For decoder-only mode:
        - start_text is used as the prompt.
    """
    model.eval()
    
    if model.use_encoder_decoder:
        # --- Encoder–Decoder Mode ---
        # Ensure a source text is provided.
        assert src_text is not None, "For encoder-decoder mode, please provide src_text."
        
        # Encode the source text and move to device.
        encoder_input = tokenizer.encode(src_text)
        encoder_input = torch.tensor([encoder_input], dtype=torch.long).to(device)
        
        # Encode the starting target text.
        input_seq = tokenizer.encode(start_text)
        input_tensor = torch.tensor([input_seq], dtype=torch.long).to(device)
        
        for _ in range(max_length):
            tgt_mask = create_causal_mask(input_tensor.size(1)).to(device)
            # Forward pass with encoder input.
            outputs = model(input_tensor, None, tgt_mask, encoder_input=encoder_input)
            next_token_logits = outputs[0, -1, :] / temperature
            probabilities = F.softmax(next_token_logits, dim=-1)
            next_token = torch.multinomial(probabilities, 1).item()
            input_tensor = torch.cat([
                input_tensor, 
                torch.tensor([[next_token]], dtype=torch.long).to(device)
            ], dim=1)
            if next_token == tokenizer.char_to_idx.get("<eos>", -1):
                break
        
        generated_tokens = input_tensor[0].tolist()
        generated_text = tokenizer.decode(generated_tokens)
        return generated_text
    
    else:
        # --- Decoder-Only Mode ---
        input_seq = tokenizer.encode(start_text)
        input_tensor = torch.tensor([input_seq], dtype=torch.long).to(device)
        for _ in range(max_length):
            seq_len = input_tensor.size(1)
            causal_mask = create_causal_mask(seq_len).to(device)
            outputs = model(input_tensor, causal_mask)
            next_token_logits = outputs[0, -1, :] / temperature
            probabilities = F.softmax(next_token_logits, dim=-1)
            next_token = torch.multinomial(probabilities, 1).item()
            input_tensor = torch.cat([
                input_tensor,
                torch.tensor([[next_token]], dtype=torch.long).to(device)
            ], dim=1)
            if next_token == tokenizer.char_to_idx.get("<eos>", -1):
                break
        
        generated_tokens = input_tensor[0].tolist()
        generated_text = tokenizer.decode(generated_tokens)
        return generated_text


In [20]:
# Hyperparameters
batch_size = 16
d_model = 16
num_heads = 4
d_ff = 16
num_layers = 2
dropout = 0.1
num_epochs = 10
learning_rate = 0.001

In [23]:
# Determine device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Initialize our arithmetic dataset
# Our dataset returns (src, tgt) pairs with special tokens

from torch.nn.utils.rnn import pad_sequence

def collate_fn(batch):
    # batch is a list of tuples (src_tensor, tgt_tensor)
    src_seqs, tgt_seqs = zip(*batch)
    
    # Determine the padding token index from the tokenizer. Here we assume <pad> is present.
    pad_idx = dataset.tokenizer.char_to_idx.get("<pad>", 0)
    
    # Pad the sequences
    src_padded = pad_sequence(src_seqs, batch_first=True, padding_value=pad_idx)
    tgt_padded = pad_sequence(tgt_seqs, batch_first=True, padding_value=pad_idx)
    
    return src_padded, tgt_padded

dataset = AdditionDataset(num_samples=10000, max_digits=6, add_special_tokens=True)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)

# Set up vocabulary size from the dataset's tokenizer
# (Our modified AdditionDataset creates a tokenizer based on the data if none is provided)
vocab_size = dataset.tokenizer.vocab_size if hasattr(dataset, 'tokenizer') else len(dataset.src_vocab)

# Initialize the model
# Set use_encoder_decoder=True for encoder-decoder mode or False for decoder-only mode.
model = TransformerLM(
    vocab_size=vocab_size,
    d_model=d_model,
    num_heads=num_heads,
    d_ff=d_ff,
    num_layers=num_layers,
    dropout=dropout,
    use_encoder_decoder=True  # Change this flag to switch architectures
)

# Print model summary and parameter count
print(model)
print(f"Number of parameters: {sum(p.numel() for p in model.parameters())}")

# Train the model
model = train_transformer_lm(model, dataloader, num_epochs, learning_rate, device)

# Generate text:
# For encoder-decoder mode, we need a source text (the arithmetic expression) and a starting target prompt.
# For example, src_text is "<sos>123+456<eos>" and start_text is "<sos>"
start_txt = "<sos>"
src_text = "<sos>123+456<eos>"  # Change this expression to test different arithmetic problems

generated_text = generate_text(model, dataset.tokenizer, start_txt, max_length=30, temperature=1.0, device=device, src_text=src_text)
print(f"Generated text:\n{generated_text}")


Using device: cpu
Epoch 1/10, Batch 10/625, Loss: 2.6297
Epoch 1/10, Batch 20/625, Loss: 2.4006
Epoch 1/10, Batch 30/625, Loss: 2.2132
Epoch 1/10, Batch 40/625, Loss: 2.0554
Epoch 1/10, Batch 50/625, Loss: 1.9205
Epoch 1/10, Batch 60/625, Loss: 1.8045
Epoch 1/10, Batch 70/625, Loss: 1.7010
Epoch 1/10, Batch 80/625, Loss: 1.6097
Epoch 1/10, Batch 90/625, Loss: 1.5264
Epoch 1/10, Batch 100/625, Loss: 1.4504
Epoch 1/10, Batch 110/625, Loss: 1.3808
Epoch 1/10, Batch 120/625, Loss: 1.3173
Epoch 1/10, Batch 130/625, Loss: 1.2577
Epoch 1/10, Batch 140/625, Loss: 1.2025
Epoch 1/10, Batch 150/625, Loss: 1.1512
Epoch 1/10, Batch 160/625, Loss: 1.1036
Epoch 1/10, Batch 170/625, Loss: 1.0592
Epoch 1/10, Batch 180/625, Loss: 1.0177


KeyboardInterrupt: 