# Setup

To set up an Anaconda environment for implementing the Transformer model in PyTorch, follow these steps:

---

### **1. Create a New Conda Environment**
Open a terminal and run:
```bash
conda create --name attention-is-all-you-need python=3.12
```

---

### **2. Activate the Environment**
```bash
conda activate attention-is-all-you-need
```

---

### **3. Install PyTorch**
For GPU (CUDA):
```bash
conda install pytorch torchvision torchaudio pytorch-cuda=11.8 -c pytorch -c nvidia
```
For CPU (if you don’t have a compatible GPU):
```bash
conda install pytorch torchvision torchaudio cpuonly -c pytorch
```
Check if PyTorch is installed correctly:
```python
python -c "import torch; print(torch.__version__)"
```

---

### **4. Install Essential Libraries**
```bash
pip install numpy pandas matplotlib tqdm
```
- `numpy`: Tensor operations
- `pandas`: Data handling (optional, useful for datasets)
- `matplotlib`: Visualization
- `tqdm`: Progress bars for training

---

### **5. Install NLP Libraries (If Needed)**
```bash
pip install transformers datasets tokenizers sentencepiece
```
- `transformers`: Pretrained models from Hugging Face (optional)
- `datasets`: NLP datasets from Hugging Face
- `tokenizers`: Efficient tokenization
- `sentencepiece`: Subword tokenization (used in original Transformer)

---

### **6. Install Jupyter Notebook (Optional)**
If you want to develop in Jupyter:
```bash
conda install jupyter
```
Then start Jupyter:
```bash
jupyter notebook
```

---

### **7. Verify Everything**
Run the following to ensure your environment is properly set up:
```python
import torch
print("PyTorch version:", torch.__version__)
print("CUDA available:", torch.cuda.is_available())
```

---

### **8. Save the Environment (Optional)**
To export your environment for reproducibility:
```bash
conda env export > environment.yml
```
To recreate it later:
```bash
conda env create -f environment.yml
```

---

# Start

In [1]:
import torch
print("PyTorch version:", torch.__version__)
print("CUDA available:", torch.cuda.is_available())

PyTorch version: 2.5.1
CUDA available: False


In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from typing import Optional, Tuple

In [3]:
import torch
import torch.nn as nn

class TokenEmbedding(nn.Module):
    def __init__(self, vocab_size: int, d_model: int):
        """
        Initializes the embedding layer.

        Args:
            vocab_size (int): Number of unique tokens in the vocabulary.
            d_model (int): Dimension of the embedding vectors.
        """
        super().__init__()
        
        # TODO: Define the embedding layer that maps token indices to dense vectors.
        self.embedding = nn.Embedding(num_embeddings=vocab_size, embedding_dim=d_model)  

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Forward pass for token embedding.

        Args:
            x (torch.Tensor): Tensor of shape (batch_size, seq_len) containing token indices.

        Returns:
            torch.Tensor: Tensor of shape (batch_size, seq_len, d_model) containing embedded representations.
        """
        # TODO: Implement the lookup operation using the embedding layer.
        embedded = self.embedding(x)  

        return embedded


In [4]:
def run_tests():
    # Test Parameters
    vocab_size = 100
    d_model = 16
    batch_size = 4
    seq_len = 10

    # Create a sample input tensor
    test_input = torch.randint(0, vocab_size, (batch_size, seq_len))

    # Initialize TokenEmbedding
    embedding_layer = TokenEmbedding(vocab_size, d_model)

    # Test 1: Check Output Shape
    output = embedding_layer(test_input)
    assert output.shape == (batch_size, seq_len, d_model), f"Unexpected shape: {output.shape}"
    
    # Test 2: Ensure Output is a Tensor of Correct Type
    assert isinstance(output, torch.Tensor), "Output is not a tensor"
    assert output.dtype == torch.float32, f"Unexpected dtype: {output.dtype}"
    
    # Test 3: Check if the Same Token Index Maps to the Same Embedding
    index = torch.tensor([[5]])
    embedding_1 = embedding_layer(index)
    embedding_2 = embedding_layer(index)
    assert torch.allclose(embedding_1, embedding_2), "Embeddings should be identical for the same index"
    
    # Test 4: Check if Different Indices Give Different Embeddings
    index1 = torch.tensor([[5]])
    index2 = torch.tensor([[8]])
    embedding_1 = embedding_layer(index1)
    embedding_2 = embedding_layer(index2)
    assert not torch.allclose(embedding_1, embedding_2), "Different indices should have different embeddings"
    
    # Test 5: Check if Gradients are Computed
    loss = output.sum()
    loss.backward()
    assert embedding_layer.embedding.weight.grad is not None, "Gradients should not be None"
    assert embedding_layer.embedding.weight.grad.shape == (vocab_size, d_model), "Gradient shape mismatch"
    
    print("✅ All tests passed successfully!")

# Run all tests
run_tests()


✅ All tests passed successfully!


In [5]:
embedding_layer = TokenEmbedding(vocab_size=10, d_model=3)
embedding_layer(torch.tensor(5))

tensor([ 1.9214,  0.2546, -1.1575], grad_fn=<EmbeddingBackward0>)

In [6]:
# import torch
# import torch.nn as nn
# import math

# class PositionalEncoding(nn.Module):
#     def __init__(self, d_model: int, max_len: int = 5000):
#         """
#         Initializes positional encoding.

#         Args:
#             d_model (int): Dimension of the embedding vectors.
#             max_len (int): Maximum sequence length.
#         """
#         super().__init__()

#         # TODO: Create an empty tensor to hold positional encodings of shape (max_len, d_model)
#         pe = torch.zeros(size=(max_len, d_model))

#         # TODO: Create a position index tensor of shape (max_len, 1)
#         positions = torch.arange(max_len).unsqueeze(1)  # Replace with the correct initialization

#         # TODO: Compute the denominator term for the sine/cosine functions
#         # div_term = 10**4**(2*positions/d_model)  # Replace with the correct computation
#         div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))

#         # TODO: Compute sin and cos positional encodings
#         # Apply sine to even indices and cosine to odd indices
#         # Hint: Use slicing `self.pe[:, 0::2] = ...` for even indices
#         #       Use slicing `self.pe[:, 1::2] = ...` for odd indices
#         pe[:, 0::2] = torch.sin(positions/div_term)
#         pe[:, 1::2] = torch.cos(positions/div_term)

#         # TODO: Register `self.pe` as a buffer so it doesn't update during training
#         # Hint: Use `self.register_buffer("pe", self.pe)`
#         self.register_buffer("pe", pe)

#     def forward(self, x: torch.Tensor) -> torch.Tensor:
#         """
#         Adds positional encoding to the input embeddings.

#         Args:
#             x (torch.Tensor): Tensor of shape (batch_size, seq_len, d_model) containing input embeddings.

#         Returns:
#             torch.Tensor: Tensor of shape (batch_size, seq_len, d_model) with positional encodings added.
#         """
#         # TODO: Retrieve only the necessary positions up to the input sequence length
#         # Hint: Slice `self.pe` correctly based on `x.size(1)`
#         pe_slice = self.pe[:x.size(1),:].unsqueeze(0)

#         # TODO: Add positional encodings to the input embeddings
#         # Hint: Ensure the positional encodings are on the same device as `x`
#         pe_slice.to(x.device)

#         return x + pe_slice  # Replace with the final tensor with positional encoding added
    

import math
import torch
import torch.nn as nn

class PositionalEncoding(nn.Module):
    def __init__(self, d_model: int, max_len: int = 5000):
        super().__init__()
        # Create a (max_len, d_model) tensor to hold the positional encodings
        pe = torch.zeros(max_len, d_model)            # shape: (max_len, d_model)
        
        # position: shape (max_len, 1)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)

        # div_term: shape (d_model/2,)  -> we’ll use it for the even/odd splits
        # This follows exp(- log(10000) * (2i/d_model)) = 10000^(-2i/d_model).
        div_term = torch.exp(
            torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)
        )
        
        # Apply sine to even indices (0, 2, 4, ...)
        pe[:, 0::2] = torch.sin(position * div_term)
        
        # Apply cosine to odd indices (1, 3, 5, ...)
        pe[:, 1::2] = torch.cos(position * div_term)
        
        # Register 'pe' as a buffer so it's not trained
        self.register_buffer('pe', pe)
        self.d_model = d_model

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # x: shape (batch_size, seq_len, d_model)
        seq_len = x.size(1)

        # Grab up to seq_len positions from pe and add to x
        # shape of pe_slice becomes (1, seq_len, d_model)
        pe_slice = self.pe[:seq_len, :].unsqueeze(0).to(x.device)

        return x + pe_slice


In [7]:
def run_positional_encoding_tests():
    d_model = 16
    seq_len = 10
    batch_size = 4

    test_input = torch.zeros((batch_size, seq_len, d_model))  # Placeholder embeddings
    pos_encoding = PositionalEncoding(d_model=d_model)

    # Test 1: Check Output Shape
    output = pos_encoding(test_input)
    assert output.shape == (batch_size, seq_len, d_model), f"Unexpected shape: {output.shape}"
    
    # Test 2: Ensure Output is a Tensor of Correct Type
    assert isinstance(output, torch.Tensor), "Output is not a tensor"
    assert output.dtype == torch.float32, f"Unexpected dtype: {output.dtype}"
    
    # Test 3: Check if Positional Encoding is Being Added
    assert not torch.allclose(test_input, output), "Positional encoding is not being added!"
    
    # Test 4: Check Device Compatibility
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    test_input = test_input.to(device)
    pos_encoding = pos_encoding.to(device)
    output = pos_encoding(test_input)
    assert output.device == test_input.device, f"Device mismatch: {output.device} vs {test_input.device}"
    
    # Test 5: Check if Encodings are Deterministic
    output1 = pos_encoding(test_input)
    output2 = pos_encoding(test_input)
    assert torch.allclose(output1, output2), "Positional encoding should be deterministic!"
    
    print("✅ All positional encoding tests passed successfully!")

# Run all tests
run_positional_encoding_tests()


✅ All positional encoding tests passed successfully!


In [8]:
def scratchboard(max_len, d_model):
    pe = torch.zeros(size=(max_len, d_model))
    positions = torch.arange(max_len).unsqueeze(1)
    div_term = 10**4**(2*positions/d_model)
    print(div_term)

scratchboard(13, 10)

tensor([[1.0000e+01],
        [2.0869e+01],
        [5.5094e+01],
        [1.9833e+02],
        [1.0751e+03],
        [1.0000e+04],
        [1.8968e+05],
        [9.2131e+06],
        [1.5473e+09],
        [1.3358e+12],
        [1.0000e+16],
        [1.2946e+21],
        [7.2048e+27]])


In [9]:
# import torch
# import torch.nn as nn
# from typing import Optional, Tuple

# class ScaledDotProductAttention(nn.Module):
#     def __init__(self, d_k: int):
#         """
#         Initializes scaled dot-product attention.

#         Args:
#             d_k (int): Dimension of the key vectors (used for scaling).
#         """
#         super().__init__()

#         # TODO: Store d_k for scaling attention scores
#         self.d_k = d_k  # Replace with correct initialization

#     def forward(
#         self, 
#         query: torch.Tensor, 
#         key: torch.Tensor, 
#         value: torch.Tensor, 
#         mask: Optional[torch.Tensor] = None
#     ) -> Tuple[torch.Tensor, torch.Tensor]:
#         """
#         Computes the scaled dot-product attention.

#         Args:
#             query (torch.Tensor): Shape (batch_size, num_heads, seq_len, d_k)
#             key (torch.Tensor): Shape (batch_size, num_heads, seq_len, d_k)
#             value (torch.Tensor): Shape (batch_size, num_heads, seq_len, d_v)
#             mask (Optional[torch.Tensor]): Shape (batch_size, 1, seq_len, seq_len) 
#                                            (mask for padding or future tokens in decoder)

#         Returns:
#             Tuple[torch.Tensor, torch.Tensor]: 
#                 - Attention output of shape (batch_size, num_heads, seq_len, d_v)
#                 - Attention weights of shape (batch_size, num_heads, seq_len, seq_len)
#         """
#         # TODO: Compute attention scores as QK^T / sqrt(d_k)
#         attention_scores = torch.matmul(query, torch.transpose(key, -2, -1)) / math.sqrt(self.d_k)   # Replace with correct computation

#         # TODO: Apply mask (if provided) by setting masked positions to a very low value
#         # Hint: Use `float('-inf')` for masked positions before applying softmax
#         if mask is not None:
#             attention_scores = attention_scores.masked_fill(mask == 0, float('-inf'))

#         # TODO: Compute attention weights using softmax
#         attention_weights = torch.softmax(attention_scores, dim=-1)  # Replace with correct computation

#         # TODO: Multiply attention weights by value matrix to get the final output
#         output = torch.matmul(attention_weights, value)  # Replace with correct computation

#         return output, attention_weights  # Return attention output and weights
    

import math

class ScaledDotProductAttention(nn.Module):
    def __init__(self, d_k: int):
        super().__init__()
        self.d_k = d_k   # for scaling

    def forward(
        self,
        query: torch.Tensor,   # (batch_size, num_heads, seq_len, d_k)
        key: torch.Tensor,     # (batch_size, num_heads, seq_len, d_k)
        value: torch.Tensor,   # (batch_size, num_heads, seq_len, d_v)
        mask: Optional[torch.Tensor] = None
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        # (1) QK^T
        # key.transpose(-2, -1) is shape (batch_size, num_heads, d_k, seq_len)
        attention_scores = torch.matmul(query, key.transpose(-2, -1))
        
        # (2) Scale by sqrt(d_k)
        attention_scores = attention_scores / math.sqrt(self.d_k)

        # (3) If mask is provided, set masked positions to -inf
        if mask is not None:
            # Typically a 1/0 mask is used; we want to fill 0’s with -inf
            attention_scores = attention_scores.masked_fill(mask == 0, float('-inf'))

        # (4) Apply softmax over the last dimension (seq_len of the key)
        attention_weights = torch.softmax(attention_scores, dim=-1)

        # (5) Multiply by V
        output = torch.matmul(attention_weights, value)

        return output, attention_weights



In [10]:
import torch
import math

def test_scaled_dot_product_attention():
    # Make some deterministic random data.
    torch.manual_seed(42)

    batch_size = 2
    num_heads = 3
    seq_len_q = 4  # length of the query
    seq_len_k = 5  # length of the key
    d_k = 6        # dimension per head for query/key
    d_v = 6        # dimension per head for value

    # Create a random ScaledDotProductAttention instance
    attention_module = ScaledDotProductAttention(d_k)

    # Create random query, key, value
    query = torch.randn(batch_size, num_heads, seq_len_q, d_k)
    key   = torch.randn(batch_size, num_heads, seq_len_k, d_k)
    value = torch.randn(batch_size, num_heads, seq_len_k, d_v)

    # (1) Test forward pass without mask
    output, attn_weights = attention_module(query, key, value, mask=None)
    
    #  -- Check output shape = (batch_size, num_heads, seq_len_q, d_v)
    assert output.shape == (batch_size, num_heads, seq_len_q, d_v), \
        f"Output shape mismatch. Got {output.shape}"
    
    #  -- Check attention weight shape = (batch_size, num_heads, seq_len_q, seq_len_k)
    assert attn_weights.shape == (batch_size, num_heads, seq_len_q, seq_len_k), \
        f"Attention weights shape mismatch. Got {attn_weights.shape}"
    
    #  -- Check attention weights sum to ~1 across last dimension
    attn_sum = attn_weights.sum(dim=-1)
    assert torch.allclose(attn_sum, torch.ones_like(attn_sum), atol=1e-5), \
        "Attention weights do not sum to 1 along the last dimension."
    
    # (2) Test forward pass with a mask (e.g., masking out the last two positions)
    #     We'll create a mask of shape (batch_size, 1, seq_len_q, seq_len_k).
    #     Suppose we only want the first 3 positions of the key unmasked:
    mask = torch.ones(batch_size, 1, seq_len_q, seq_len_k)
    mask[:, :, :, -2:] = 0  # mask out the last 2 positions
    output_masked, attn_weights_masked = attention_module(query, key, value, mask=mask)

    #  -- The masked positions in the softmax should drop to near 0
    #     We’ll check the last two positions of each attention row in attn_weights_masked
    #     are effectively 0 (within a floating tolerance).
    masked_positions = attn_weights_masked[..., -2:]  # shape (batch_size, num_heads, seq_len_q, 2)
    assert torch.allclose(masked_positions, torch.zeros_like(masked_positions), atol=1e-5), \
        "Masking does not appear to zero out the last two positions."

    print("ScaledDotProductAttention unit test passed!")


# Example usage in a Jupyter cell:
test_scaled_dot_product_attention()



ScaledDotProductAttention unit test passed!


In [11]:
import torch
import torch.nn as nn
from typing import Optional, Tuple

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model: int, num_heads: int):
        """
        Initializes multi-head attention.

        Args:
            d_model (int): Dimension of the model (input and output size).
            num_heads (int): Number of attention heads.
        """
        super().__init__()

        # TODO: Ensure d_model is divisible by num_heads
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"

        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads  # Dimension per head

        # TODO: Define linear transformations for query, key, and value
        self.W_q = nn.Linear(d_model, d_model)  # Replace with nn.Linear
        self.W_k = nn.Linear(d_model, d_model)  # Replace with nn.Linear
        self.W_v = nn.Linear(d_model, d_model)  # Replace with nn.Linear

        # TODO: Define output projection layer
        self.W_o = nn.Linear(d_model, d_model)  # Replace with nn.Linear

        # TODO: Define the scaled dot-product attention module
        self.attention = ScaledDotProductAttention(self.d_k)  # Replace with ScaledDotProductAttention(self.d_k)

    def forward(
        self,
        query: torch.Tensor,
        key: torch.Tensor,
        value: torch.Tensor,
        mask: Optional[torch.Tensor] = None
    ) -> torch.Tensor:
        """
        Computes multi-head attention.

        Args:
            query (torch.Tensor): Shape (batch_size, seq_len, d_model)
            key (torch.Tensor): Shape (batch_size, seq_len, d_model)
            value (torch.Tensor): Shape (batch_size, seq_len, d_model)
            mask (Optional[torch.Tensor]): Shape (batch_size, 1, seq_len, seq_len)

        Returns:
            torch.Tensor: Shape (batch_size, seq_len, d_model) - Multi-head attention output.
        """
        # TODO: Apply linear transformations to query, key, and value
        Q = self.W_q(query)  # Replace with correct transformation
        K = self.W_k(key)  # Replace with correct transformation
        V = self.W_v(value)  # Replace with correct transformation

        # TODO: Reshape Q, K, V for multi-head attention
        # Hint: Use `.view()` and `.transpose()` to shape into (batch_size, num_heads, seq_len, d_k)
        batch_size, seq_len, _ = query.shape
        Q = Q.view(batch_size, seq_len, self.num_heads, self.d_k)
        K = K.view(batch_size, seq_len, self.num_heads, self.d_k)
        V = V.view(batch_size, seq_len, self.num_heads, self.d_k)

        Q = Q.transpose(1, 2)
        K = K.transpose(1, 2)
        V = V.transpose(1, 2)

        # TODO: Apply scaled dot-product attention
        output, attention_weights = self.attention(Q, K, V, mask)  # Replace with correct computation

        # TODO: Concatenate the heads back and apply final linear transformation
        # Current shape: (batch_size, num_heads, seq_len, d_k)
        # We first swap num_heads and seq_len
        output = output.transpose(1, 2)  # (batch_size, seq_len, num_heads, d_k)
        output = output.contiguous().view(batch_size, seq_len, self.d_model)

        output = self.W_o(output)  # Replace with correct transformation

        return output


In [12]:
def test_multi_head_attention():
    """
    Tests the MultiHeadAttention module for correctness.
    """
    batch_size = 2
    seq_len = 5
    d_model = 16
    num_heads = 4

    # Initialize test input tensors
    query = torch.randn(batch_size, seq_len, d_model)
    key = torch.randn(batch_size, seq_len, d_model)
    value = torch.randn(batch_size, seq_len, d_model)

    # Initialize multi-head attention module
    mha = MultiHeadAttention(d_model, num_heads)

    # Run forward pass without a mask
    output = mha(query, key, value, mask=None)

    # Test 1: Check output shape
    assert output.shape == (batch_size, seq_len, d_model), \
        f"Unexpected output shape: {output.shape}"
    print("✅ Output shape test passed!")

    # Test 2: Ensure output is a tensor
    assert isinstance(output, torch.Tensor), "Output is not a tensor"
    print("✅ Tensor type test passed!")

    # Test 3: Check deterministic output for same input
    output_2 = mha(query, key, value, mask=None)
    assert torch.allclose(output, output_2), "Output should be deterministic!"
    print("✅ Deterministic output test passed!")

    # Test 4: Apply a mask and check if masking works
    mask = torch.zeros(batch_size, 1, seq_len, seq_len)
    mask[:, :, :, -1] = float('-inf')  # Mask the last token

    output_masked = mha(query, key, value, mask=mask)

    # Ensure output is still the correct shape
    assert output_masked.shape == (batch_size, seq_len, d_model), \
        "Masked output shape mismatch"
    print("✅ Masking test passed!")

    print("🎉 All MultiHeadAttention tests passed!")

# Run the test
test_multi_head_attention()

✅ Output shape test passed!
✅ Tensor type test passed!
✅ Deterministic output test passed!
✅ Masking test passed!
🎉 All MultiHeadAttention tests passed!


In [13]:
class PositionwiseFeedForward(nn.Module):
    def __init__(self, d_model: int, d_ff: int):
        super().__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(d_ff, d_model)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Args:
            x: (batch_size, seq_len, d_model)

        Returns:
            (batch_size, seq_len, d_model) - Transformed representations.
        """
        output = self.fc1(x)
        output = self.relu(output)
        output = self.fc2(output)

        return output

In [14]:
def test_positionwise_feedforward():
    """
    Tests the PositionwiseFeedForward module.
    """
    batch_size = 2
    seq_len = 5
    d_model = 16
    d_ff = 32  # Expanded dimension

    # Initialize test input tensor (random)
    x = torch.randn(batch_size, seq_len, d_model)

    # Initialize the feed-forward module
    ffn = PositionwiseFeedForward(d_model, d_ff)

    # Run forward pass
    output = ffn(x)

    # Test 1: Check output shape
    assert output.shape == (batch_size, seq_len, d_model), \
        f"Unexpected output shape: {output.shape}"
    print("✅ Output shape test passed!")

    # Test 2: Ensure output is a tensor
    assert isinstance(output, torch.Tensor), "Output is not a tensor"
    print("✅ Tensor type test passed!")

    # Test 3: Ensure ReLU activation is applied
    hidden_layer_output = ffn.fc1(x)  # Get pre-ReLU values
    assert torch.all((hidden_layer_output > 0) == (ffn.relu(hidden_layer_output) > 0)), \
        "ReLU activation is not applied correctly"
    print("✅ ReLU activation test passed!")

    # Test 4: Check deterministic output for same input
    output_2 = ffn(x)
    assert torch.allclose(output, output_2), "Output should be deterministic!"
    print("✅ Deterministic output test passed!")

    # Test 5: Check gradients (ensuring backpropagation works)
    output.sum().backward()  # Compute gradients
    assert ffn.fc1.weight.grad is not None, "Gradients are not computed for fc1!"
    assert ffn.fc2.weight.grad is not None, "Gradients are not computed for fc2!"
    print("✅ Gradient computation test passed!")

    print("🎉 All PositionwiseFeedForward tests passed!")

# Run the test
test_positionwise_feedforward()


✅ Output shape test passed!
✅ Tensor type test passed!
✅ ReLU activation test passed!
✅ Deterministic output test passed!
✅ Gradient computation test passed!
🎉 All PositionwiseFeedForward tests passed!


In [15]:
class TransformerEncoderLayer(nn.Module):
    def __init__(self, d_model: int, num_heads: int, d_ff: int, dropout: float = 0.1):
        """
        Initializes a single Transformer Encoder Layer.

        Args:
            d_model (int): The embedding dimension (must be divisible by num_heads).
            num_heads (int): Number of attention heads.
            d_ff (int): Hidden layer size of the feed-forward network.
            dropout (float): Dropout rate (default 0.1).
        """
        super().__init__()

        # TODO: Define multi-head self-attention layer
        self.self_attn = MultiHeadAttention(d_model, num_heads)  # Replace with MultiHeadAttention(d_model, num_heads)

        # TODO: Define feed-forward network (FFN)
        self.ffn = PositionwiseFeedForward(d_model, d_ff)  # Replace with a two-layer FFN

        # TODO: Define Layer Normalization layers
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model) 

        # TODO: Define Dropout layers
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)

    def forward(self, x: torch.Tensor, mask: Optional[torch.Tensor] = None) -> torch.Tensor:
        """
        Forward pass for Transformer Encoder Layer.

        Args:
            x (torch.Tensor): Input tensor of shape (batch_size, seq_len, d_model).
            mask (Optional[torch.Tensor]): Mask for attention (default None).

        Returns:
            torch.Tensor: Output tensor of shape (batch_size, seq_len, d_model).
        """
        # TODO: Apply multi-head self-attention
        attn_output, _ = self.self_attn(x, x, x, mask=mask) 

        # TODO: Apply residual connection and layer normalization
        x = self.norm1(x + self.dropout1(attn_output))

        # TODO: Apply feed-forward network
        ffn_output = self.ffn(x) 

        # TODO: Apply second residual connection and layer normalization
        x = self.norm2(x + self.dropout2(ffn_output))

        return x


In [16]:
import torch

def test_transformer_encoder_layer():
    # Set a fixed seed for reproducibility
    torch.manual_seed(42)
    
    # Hyperparameters
    d_model = 8
    num_heads = 2
    d_ff = 16
    dropout = 0.1
    
    batch_size = 2
    seq_len = 4
    
    # Instantiate the encoder layer
    encoder_layer = TransformerEncoderLayer(
        d_model=d_model, 
        num_heads=num_heads, 
        d_ff=d_ff, 
        dropout=dropout
    )
    
    # Create a random input tensor: (batch_size, seq_len, d_model)
    x = torch.randn(batch_size, seq_len, d_model)
    
    # 1) Forward pass without a mask
    out_no_mask = encoder_layer(x)
    
    # -- Check output shape
    assert out_no_mask.shape == (batch_size, seq_len, d_model), \
        f"Output shape mismatch. Expected {(batch_size, seq_len, d_model)}, got {out_no_mask.shape}"
    
    # 2) Forward pass with a mask
    # We'll create a simple mask that zeroes out attention to token #1
    # mask shape: (batch_size, 1, seq_len, seq_len)
    mask = torch.ones(batch_size, 1, seq_len, seq_len)
    mask[:, :, :, 1] = 0  # Mask out second token for all queries
    
    out_with_mask = encoder_layer(x, mask=mask)
    
    # -- Check output shape
    assert out_with_mask.shape == (batch_size, seq_len, d_model), \
        f"Output shape mismatch with mask. Expected {(batch_size, seq_len, d_model)}, got {out_with_mask.shape}"
    
    # 3) (Optional) Check that the outputs differ when using a mask
    #    This may or may not always differ numerically based on random init, but often it does.
    #    If you want to ensure they are always different, you can keep this assertion or comment it out.
    #    We'll do a "not allclose" check with a fairly tight tolerance.
    if not torch.allclose(out_no_mask, out_with_mask, atol=1e-5, rtol=1e-5):
        pass  # They differ, which is usually good
    else:
        print("Warning: out_no_mask and out_with_mask are numerically very close (maybe random init).")
    
    print("TransformerEncoderLayer unit test passed!")


# Example usage:
test_transformer_encoder_layer()


TransformerEncoderLayer unit test passed!


In [17]:
class TransformerDecoderLayer(nn.Module): 
    def __init__(self, d_model: int, num_heads: int, d_ff: int, dropout: float):
        """
        Initializes a single Transformer Decoder Layer.

        Args:
            d_model (int): The embedding dimension (must be divisible by num_heads).
            num_heads (int): Number of attention heads.
            d_ff (int): Hidden layer size of the feed-forward network.
            dropout (float): Dropout rate (default 0.1).
        """
        super().__init__()

        # TODO: Define masked multi-head self-attention layer
        self.self_attn = MultiHeadAttention(d_model, num_heads)

        # TODO: Define multi-head attention layer for encoder-decoder attention
        self.enc_dec_attn = MultiHeadAttention(d_model, num_heads)  # Replace with MultiHeadAttention(d_model, num_heads)

        # TODO: Define feed-forward network (FFN)
        self.ffn = PositionwiseFeedForward(d_model, d_ff)

        # TODO: Define Layer Normalization layers
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model) 
        self.norm3 = nn.LayerNorm(d_model) 

        # TODO: Define Dropout layers
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.dropout3 = nn.Dropout(dropout)

    def forward(self, x: torch.Tensor, memory: torch.Tensor, 
                src_mask: Optional[torch.Tensor] = None, 
                tgt_mask: Optional[torch.Tensor] = None) -> torch.Tensor:
        """
        Forward pass for Transformer Decoder Layer.

        Args:
            x (torch.Tensor): Input tensor of shape (batch_size, seq_len, d_model) (decoder input).
            memory (torch.Tensor): Encoder outputs of shape (batch_size, seq_len_enc, d_model).
            tgt_mask (Optional[torch.Tensor]): Mask for target self-attention (default None).
            src_mask (Optional[torch.Tensor]): Mask for encoder-decoder attention (default None).

        Returns:
            torch.Tensor: Output tensor of shape (batch_size, seq_len, d_model).
        """
        # TODO: Apply masked multi-head self-attention
        attn_output, _ = self.self_attn(x, x, x, mask=tgt_mask) 

        # TODO: Apply residual connection and layer normalization
        x = self.norm1(attn_output + self.dropout1(x))

        # TODO: Apply encoder-decoder multi-head attention
        attn_output_2, _ = self.enc_dec_attn(memory, memory, x, mask=src_mask)

        # TODO: Apply residual connection and layer normalization
        x = self.norm2(attn_output_2 + self.dropout2(x))

        # TODO: Apply feed-forward network
        ffn_output = self.ffn(x)

        # TODO: Apply final residual connection and layer normalization
        x = self.norm3(ffn_output + self.dropout3(x))

        return x

In [18]:
import torch
import torch.nn as nn
import pytest

def test_transformer_decoder_layer():
    """
    Basic tests for TransformerDecoderLayer to check:
    1) Instantiation without errors
    2) Forward pass shape consistency
    3) Handling of optional masks
    4) Presence of ReLU activation (if expected)
    5) Gradient backprop flow
    """
    d_model = 32
    num_heads = 4
    d_ff = 64
    dropout = 0.1
    batch_size = 2
    seq_len = 5

    # 1) Instantiate the layer
    try:
        decoder_layer = TransformerDecoderLayer(d_model, num_heads, d_ff, dropout)
    except Exception as e:
        pytest.fail(f"Instantiation failed with error: {e}")

    print("✅ Instantiation test passed!")

    # 2) Create dummy inputs
    x = torch.randn(batch_size, seq_len, d_model)
    memory = torch.randn(batch_size, seq_len, d_model)

    # 3) Forward pass shape test without masks
    try:
        output = decoder_layer(x, memory)
    except Exception as e:
        pytest.fail(f"Forward pass failed without masks: {e}")

    # Check shape
    assert output.shape == (batch_size, seq_len, d_model), \
        f"Output shape {output.shape} != {(batch_size, seq_len, d_model)}"
    print("✅ Forward pass (no masks) shape test passed!")

    # 4) Forward pass with random masks
    src_mask = torch.ones(batch_size, 1, seq_len, seq_len)
    tgt_mask = torch.ones(batch_size, 1, seq_len, seq_len)
    try:
        output_masked = decoder_layer(x, memory, src_mask=src_mask, tgt_mask=tgt_mask)
    except Exception as e:
        pytest.fail(f"Forward pass failed with masks: {e}")

    # Check shape again
    assert output_masked.shape == (batch_size, seq_len, d_model), \
        f"Output shape with masks {output_masked.shape} != {(batch_size, seq_len, d_model)}"
    print("✅ Forward pass (with masks) shape test passed!")

    # 5) (Optional) Check for ReLU activation in the layer
    #    Depending on your exact implementation, you may not have a direct ReLU submodule.
    found_relu = False
    for mod in decoder_layer.modules():
        if isinstance(mod, nn.ReLU):
            found_relu = True
            break
    assert found_relu, "No ReLU found in the TransformerDecoderLayer (if you expected one)!"
    print("✅ ReLU activation test passed!")

    # 6) Quick gradient test
    #    Make sure we can do a backward pass without errors
    output_sum = output_masked.sum()
    try:
        output_sum.backward()
    except Exception as e:
        pytest.fail(f"Backward pass failed: {e}")

    print("✅ Backward pass (gradient) test passed!")

    print("All TransformerDecoderLayer tests passed successfully!")

test_transformer_decoder_layer()


✅ Instantiation test passed!
✅ Forward pass (no masks) shape test passed!
✅ Forward pass (with masks) shape test passed!
✅ ReLU activation test passed!
✅ Backward pass (gradient) test passed!
All TransformerDecoderLayer tests passed successfully!


In [19]:

class TransformerEncoder(nn.Module):
    def __init__(self, num_layers: int, d_model: int, num_heads: int, d_ff: int, dropout: float):
        super().__init__()

    def forward(self, x: torch.Tensor, mask: Optional[torch.Tensor] = None) -> torch.Tensor:
        """
        Args:
            x: (batch_size, seq_len, d_model) - Input sequence.
            mask: (batch_size, 1, seq_len, seq_len) - Optional mask.

        Returns:
            (batch_size, seq_len, d_model) - Encoder output.
        """
        pass


class TransformerDecoder(nn.Module):
    def __init__(self, num_layers: int, d_model: int, num_heads: int, d_ff: int, dropout: float):
        super().__init__()

    def forward(self, x: torch.Tensor, memory: torch.Tensor, 
                src_mask: Optional[torch.Tensor] = None, 
                tgt_mask: Optional[torch.Tensor] = None) -> torch.Tensor:
        """
        Args:
            x: (batch_size, seq_len, d_model) - Target sequence.
            memory: (batch_size, seq_len, d_model) - Encoder output.
            src_mask: (batch_size, 1, seq_len, seq_len) - Optional encoder mask.
            tgt_mask: (batch_size, 1, seq_len, seq_len) - Optional decoder mask.

        Returns:
            (batch_size, seq_len, d_model) - Decoder output.
        """
        pass


class Transformer(nn.Module):
    def __init__(self, vocab_size: int, d_model: int, num_layers: int, num_heads: int, 
                 d_ff: int, dropout: float):
        super().__init__()

    def forward(self, src: torch.Tensor, tgt: torch.Tensor, 
                src_mask: Optional[torch.Tensor] = None, 
                tgt_mask: Optional[torch.Tensor] = None) -> torch.Tensor:
        """
        Args:
            src: (batch_size, src_seq_len) - Source token indices.
            tgt: (batch_size, tgt_seq_len) - Target token indices.
            src_mask: (batch_size, 1, src_seq_len, src_seq_len) - Optional source mask.
            tgt_mask: (batch_size, 1, tgt_seq_len, tgt_seq_len) - Optional target mask.

        Returns:
            (batch_size, tgt_seq_len, vocab_size) - Token probabilities.
        """
        pass


class TransformerTrainer:
    def __init__(self, model: Transformer, learning_rate: float, weight_decay: float):
        """
        Initializes optimizer and loss function for training.
        """
        pass

    def train_step(self, src: torch.Tensor, tgt: torch.Tensor) -> torch.Tensor:
        """
        Runs a single training step.

        Args:
            src: (batch_size, src_seq_len) - Source sequence.
            tgt: (batch_size, tgt_seq_len) - Target sequence.

        Returns:
            Loss value.
        """
        pass

    def evaluate(self, src: torch.Tensor, tgt: torch.Tensor) -> float:
        """
        Evaluates the model on a validation set.

        Args:
            src: (batch_size, src_seq_len) - Source sequence.
            tgt: (batch_size, tgt_seq_len) - Target sequence.

        Returns:
            BLEU score or another evaluation metric.
        """
        pass