In [1]:
import torch
import torch.nn as nn

inputs = torch.tensor(
  [
    [0.43, 0.15, 0.89], # Your     (x^1)
    [0.55, 0.87, 0.66], # journey  (x^2)
    [0.57, 0.85, 0.64], # starts   (x^3)
    [0.22, 0.58, 0.33], # with     (x^4)
    [0.77, 0.25, 0.10], # one      (x^5)
    [0.05, 0.80, 0.55]  # step     (x^6)
  ] 
)

batch = torch.stack((inputs, inputs), dim=0)
print(batch.shape)


torch.Size([2, 6, 3])


In [2]:
class CausalAttention(nn.Module):

    def __init__(self, d_in, d_out, context_length, dropout):
        super().__init__()
        self.W_query = nn.Linear(d_in, d_out, bias=False)
        self.W_key = nn.Linear(d_in, d_out, bias=False)
        self.W_value = nn.Linear(d_in, d_out, bias=False)
        self.dropout = nn.Dropout(dropout)
        self.register_buffer(
            'mask',
            torch.triu(
                torch.ones(context_length, context_length),
                diagonal=1
            )
        )

    def forward(self, x):
        b, num_tokens, d_in = x.shape
        queries = self.W_query(x)
        keys = self.W_key(x)
        values = self.W_value(x)
        
        attn_scores = queries @ keys.transpose(1, 2)
        attn_scores.masked_fill_(
            self.mask.bool()[:num_tokens, :num_tokens],
            -torch.inf
        )
        attn_weights = torch.softmax(attn_scores / (keys.shape[-1] * 0.5), dim=-1)
        attn_weights = self.dropout(attn_weights)
        context_vectors = attn_weights @ values
        return context_vectors


In [3]:
class MultiHeadAttentionWrapper(nn.Module):
    def __init__(self, d_in, d_out, context_length, dropout, num_heads):
        super().__init__()
        self.heads = nn.ModuleList(
            [CausalAttention(
                 d_in, d_out, context_length, dropout
             ) 
             for _ in range(num_heads)]
        )
 
    def forward(self, x):
        return torch.cat([head(x) for head in self.heads], dim=-1)

In [4]:
torch.manual_seed(123)
context_length = batch.shape[1] # This is the number of tokens
d_in, d_out = 3, 2
mha = MultiHeadAttentionWrapper(
    d_in, d_out, context_length, 0.0, num_heads=2
)
context_vecs = mha(batch)
 
print(context_vecs)
print("context_vecs.shape:", context_vecs.shape)

tensor([[[-0.4519,  0.2216,  0.4772,  0.1063],
         [-0.5893,  0.0029,  0.5940,  0.3352],
         [-0.6316, -0.0656,  0.6241,  0.3936],
         [-0.5685, -0.0853,  0.5487,  0.3620],
         [-0.5540, -0.0984,  0.5325,  0.3439],
         [-0.5308, -0.1089,  0.5081,  0.3520]],

        [[-0.4519,  0.2216,  0.4772,  0.1063],
         [-0.5893,  0.0029,  0.5940,  0.3352],
         [-0.6316, -0.0656,  0.6241,  0.3936],
         [-0.5685, -0.0853,  0.5487,  0.3620],
         [-0.5540, -0.0984,  0.5325,  0.3439],
         [-0.5308, -0.1089,  0.5081,  0.3520]]], grad_fn=<CatBackward0>)
context_vecs.shape: torch.Size([2, 6, 4])


In [5]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_in, d_out, 
                 context_length, dropout, num_heads, qkv_bias=False):
        super().__init__()
        assert (d_out % num_heads == 0), \
            "d_out must be divisible by num_heads"
 
        self.d_out = d_out
        self.num_heads = num_heads
        self.head_dim = d_out // num_heads
        self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_key = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.out_proj = nn.Linear(d_out, d_out)
        self.dropout = nn.Dropout(dropout)
        self.register_buffer(
            "mask",
            torch.triu(torch.ones(context_length, context_length),
                       diagonal=1)
        )
 
 
 
    def forward(self, x):
        b, num_tokens, d_in = x.shape
        keys = self.W_key(x)
        queries = self.W_query(x)
        values = self.W_value(x)
        print(f"values {values}")
 
        keys = keys.view(b, num_tokens, self.num_heads, self.head_dim)
        queries = queries.view(b, num_tokens, self.num_heads, self.head_dim)
        values = values.view(b, num_tokens, self.num_heads, self.head_dim)
        print(f"values_2 {values}")
 
        keys = keys.transpose(1, 2)
        queries = queries.transpose(1, 2)
        values = values.transpose(1, 2)
 
        attn_scores = queries @ keys.transpose(2, 3)
        mask_bool = self.mask.bool()[:num_tokens, :num_tokens]
  
        attn_scores.masked_fill_(mask_bool, -torch.inf)
 
        attn_weights = torch.softmax(
            attn_scores / keys.shape[-1]**0.5, dim=-1)
        attn_weights = self.dropout(attn_weights)
 
        context_vec = (attn_weights @ values).transpose(1, 2)

        context_vec = context_vec.contiguous().view(
            b, num_tokens, self.d_out
        )
        context_vec = self.out_proj(context_vec)
        return context_vec

In [7]:
torch.manual_seed(123)
batch_size, context_length, d_in = batch.shape
d_out = 2
mha = MultiHeadAttention(d_in, d_out, context_length, 0.0, num_heads=2)
context_vecs = mha(batch)
print(context_vecs)
# print("context_vecs.shape:", context_vecs.shape)

values tensor([[[-0.4519,  0.2216],
         [-0.7142, -0.1961],
         [-0.7127, -0.1971],
         [-0.3809, -0.1557],
         [-0.4861, -0.1597],
         [-0.4213, -0.1501]],

        [[-0.4519,  0.2216],
         [-0.7142, -0.1961],
         [-0.7127, -0.1971],
         [-0.3809, -0.1557],
         [-0.4861, -0.1597],
         [-0.4213, -0.1501]]], grad_fn=<UnsafeViewBackward0>)
values_2 tensor([[[[-0.4519],
          [ 0.2216]],

         [[-0.7142],
          [-0.1961]],

         [[-0.7127],
          [-0.1971]],

         [[-0.3809],
          [-0.1557]],

         [[-0.4861],
          [-0.1597]],

         [[-0.4213],
          [-0.1501]]],


        [[[-0.4519],
          [ 0.2216]],

         [[-0.7142],
          [-0.1961]],

         [[-0.7127],
          [-0.1971]],

         [[-0.3809],
          [-0.1557]],

         [[-0.4861],
          [-0.1597]],

         [[-0.4213],
          [-0.1501]]]], grad_fn=<ViewBackward0>)
tensor([[[0.3190, 0.4858],
         [0.2943, 