In [2]:
import math
import torch
import torch.nn as nn
from torch.nn import functional as F


class MultiHeadSelfAttention(nn.Module):
    """
    A simple multi-head masked self-attention layer.

    Attributes:
        key (torch.nn.Linear): Linear layer to compute the key matrix.
        query (torch.nn.Linear): Linear layer to compute the query matrix.
        value (torch.nn.Linear): Linear layer to compute the value matrix.
        attn_drop (torch.nn.Dropout): Dropout layer for attention weights.
        resid_drop (torch.nn.Dropout): Dropout layer for output.
        proj (torch.nn.Linear): Linear layer for final output projection.
        mask (torch.Tensor): Buffer storing the mask to apply in the attention scores to ensure causality.
        n_head (int): Number of attention heads.
    """

    def __init__(self, config):
        """
        Initializes the MultiHeadSelfAttention layer.

        Args:
            config (object): Configuration object containing attributes n_embd, n_head, attn_pdrop, resid_pdrop, and block_size.
        """
        super().__init__()
        assert config.n_embd % config.n_head == 0, "Embedding dimension must be divisible by the number of heads."

        # key, query, value projections for all heads
        self.key = nn.Linear(config.n_embd, config.n_embd)
        self.query = nn.Linear(config.n_embd, config.n_embd)
        self.value = nn.Linear(config.n_embd, config.n_embd)
        
        # regularization
        self.attn_drop = nn.Dropout(config.attn_pdrop)
        self.resid_drop = nn.Dropout(config.resid_pdrop)
        
        # output projection
        self.proj = nn.Linear(config.n_embd, config.n_embd)
        self.n_head = config.n_head

        # Creating a causal mask to mask out future tokens in the sequence, ensuring that predictions for a position
        # can depend only on the known outputs at positions before it.
        self.register_buffer(
            "mask",
            torch.tril(torch.ones(config.block_size, config.block_size)).view(
                1, 1, config.block_size, config.block_size
            )
        )

    def forward(self, x):
        """
        Forward pass for the multi-head masked self-attention layer.

        Args:
            x (torch.Tensor): Input tensor of shape (B, T, C) where B is the batch size, T is the sequence length,
                              and C is the embedding dimension.

        Returns:
            torch.Tensor: Output tensor of the same shape as input.
        """
        B, T, C = x.size()
        hs = C // self.n_head  # Size of each head

        k = self.key(x).view(B, T, self.n_head, hs).transpose(1, 2)
        q = self.query(x).view(B, T, self.n_head, hs).transpose(1, 2)
        v = self.value(x).view(B, T, self.n_head, hs).transpose(1, 2)

        # Calculate attention scores using scaled dot-product attention mechanism
        k_t = k.transpose(-2, -1)
        d_k = k.size(-1)
        att = q @ k_t / math.sqrt(d_k)
        print(f"Attention score shape before mask: {att.shape}")

        # Apply causal mask to prevent attending to future tokens
        att = att.masked_fill(self.mask[:, :, :T, :T] == 0, float('-inf'))
        print(f"Attention score shape after mask: {att.shape}")

        # Apply softmax to convert scores to probabilities
        att = F.softmax(att, dim=-1)
        att = self.attn_drop(att)  # Apply dropout to attention weights

        # Multiply attention weights with value matrix
        y = torch.matmul(att, v)
        print(f"Output shape before re-assembling: {y.shape}")

        # Re-assemble all head outputs side by side
        y = y.transpose(1, 2).contiguous().view(B, T, C)

        # Output projection
        y = self.resid_drop(self.proj(y))
        return y


In [3]:
# Test Attention Mechanism
# Define a class for GPT configuration settings
class GPTConfig:
    vocab_size = 11  # Size of the vocabulary
    block_size = 5   # Size of each block of tokens
    # n_layer = 1  # Number of layers, not used in this exercise
    n_head = 4       # Number of attention heads
    n_embd = 12      # Dimensionality of embeddings

    attn_pdrop = 0.0  # Dropout probability for the attention mechanism
    resid_pdrop = 0.0  # Dropout probability for residual connections


# Import necessary modules from PyTorch
from torch import nn
import torch

# Initialize the multi-head self-attention module using GPTConfig settings
attention = MultiHeadSelfAttention(GPTConfig())

# Define a tensor with a specific structure to represent input embeddings
x = torch.tensor([
    [
        # Embedding vectors, each with 12 dimensions, for each of the 5 tokens
        [1.0, 1.0, 1.0, 2.0, 2.0, 2.0, 3.0, 3.0, 3.0, 4.0, 4.0, 4.0],
        [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
        [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
        [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
        [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
    ]
])

# Initialize all model parameters to a constant value of 0.1
for weight in attention.parameters():
    nn.init.constant_(weight, 0.1)

# Optionally set the model to evaluation mode to disable dropout layers during inference
# attention.eval()

# Execute a forward pass through the attention module
y = attention(x)

# Ensure that the output shape matches the input shape
assert y.shape == x.shape

# Output the original input tensor and the result of the forward pass
print("=== Showing the input and output ===")
print(x)
print(y)

# Prepare for backpropagation by summing all outputs to create a scalar loss
loss = y.sum()
loss.backward()

# Check for NaN values in the output, which can indicate issues with the softmax computation
if torch.isnan(y).any().item():
    raise ValueError(
        "It appears that the output contains NaNs. Perhaps the softmax dimension is incorrect?"
    )

# Calculate and print the squares of the gradients for each model parameter
gradients = [
    int((attention.query.weight.grad**2).sum().item()),
    int((attention.query.bias.grad**2).sum().item()),
    int((attention.key.weight.grad**2).sum().item()),
    int((attention.key.bias.grad**2).sum().item()),
    int((attention.value.weight.grad**2).sum().item()),
    int((attention.value.bias.grad**2).sum().item()),
]

print("Gradients:", gradients)

# Custom checks for specific gradient values to verify correct implementation
if gradients == [161, 13, 294, 0, 37187, 432]:
    print("Correct")
elif gradients == [1, 0, 2, 0, 38787, 432]:
    raise RuntimeError(
        "Error: Did you remember to divide by the square root of d_k?"
    )
elif gradients[-1] == 432:
    raise RuntimeError(
        "There is an error in your implementation. Please check your code. Did you use -inf as the masked_fill_value?"
    )
else:
    raise RuntimeError(
        "There is an error in your implementation. Please check your code."
    )



Attention score shape before mask: torch.Size([1, 4, 5, 5])
Attention score shape after mask: torch.Size([1, 4, 5, 5])
Output shape before re-assembling: torch.Size([1, 4, 5, 3])
=== Showing the input and output ===
tensor([[[1., 1., 1., 2., 2., 2., 3., 3., 3., 4., 4., 4.],
         [1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.],
         [1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.],
         [1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.],
         [1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.]]])
tensor([[[3.8200, 3.8200, 3.8200, 3.8200, 3.8200, 3.8200, 3.8200, 3.8200,
          3.8200, 3.8200, 3.8200, 3.8200],
         [3.7831, 3.7831, 3.7831, 3.7831, 3.7831, 3.7831, 3.7831, 3.7831,
          3.7831, 3.7831, 3.7831, 3.7831],
         [3.7475, 3.7475, 3.7475, 3.7475, 3.7475, 3.7475, 3.7475, 3.7475,
          3.7475, 3.7475, 3.7475, 3.7475],
         [3.7130, 3.7130, 3.7130, 3.7130, 3.7130, 3.7130, 3.7130, 3.7130,
          3.7130, 3.7130, 3.7130, 3.7130],
         [3.6797, 3