In [2]:
import torch
import torch.nn as nn

In [3]:
class CasualAttention(nn.Module):
    def __init__(self, 
                 dim_in, 
                 dim_out, 
                 context_len, 
                 dropout, 
                 qkv_bias=False):
        super().__init__()
        
        self.d_out   = dim_out
        self.W_query = nn.Linear(dim_in, dim_out, bias=qkv_bias)
        self.W_key   = nn.Linear(dim_in, dim_out, bias=qkv_bias)
        self.W_value = nn.Linear(dim_in, dim_out, bias=qkv_bias)
        self.dropout = nn.Dropout(dropout)

        self.register_buffer(
            "mask",
            torch.triu(torch.ones(context_len, context_len), diagonal=1)
        )

    def forward(self, x):
        b, num_tokens, dim_in = x.size()
        
        keys    = self.W_key(x)
        queries = self.W_query(x)
        values  = self.W_value(x)

        attn_scores = queries @ keys.transpose(1,2)
        attn_scores.masked_fill(self.mask.bool()[:num_tokens, :num_tokens], -torch.inf)

        attn_weights = torch.softmax(attn_scores / keys.shape[-1] ** 0.5, dim=-1)
        attn_weights = self.dropout(attn_weights)

        context_vecs = attn_weights @ values
        return context_vecs

In [7]:
class MultiheadAttention(nn.Module):
    def __init__(self, 
                 dim_in, 
                 dim_out, 
                 context_length, 
                 dropout, 
                 num_heads,
                 qkv_bias=False):
        super().__init__()

        self.heads = nn.ModuleList([
            CasualAttention(dim_in, dim_out, context_length, dropout, qkv_bias) for _ in range(num_heads)
        ])

    def forward(self, x):
        return torch.cat([head(x) for head in self.heads], dim=-1)
        

In [15]:
torch.manual_seed(123)
inputs = torch.tensor(
    [[0.43, 0.15, 0.89], # Your
     [0.55, 0.87, 0.66], # journey
     [0.57, 0.85, 0.64], # starts
     [0.22, 0.58, 0.33], # with
     [0.77, 0.25, 0.10], # one
     [0.05, 0.80, 0.55]  # step
    ])
batch = torch.stack((inputs, inputs), dim=0) # [2, 6, 3]
print(batch.shape)

dim_in, dim_out = batch.shape[-1], 2
context_length = batch.shape[1]

mha = MultiheadAttention(dim_in, dim_out, context_length, 0.0, num_heads=2)
context_vecs = mha(batch)

print(context_vecs)
print(context_vecs.shape)

torch.Size([2, 6, 3])
tensor([[[-0.5337, -0.1051,  0.5085,  0.3508],
         [-0.5323, -0.1080,  0.5084,  0.3508],
         [-0.5323, -0.1079,  0.5084,  0.3506],
         [-0.5297, -0.1076,  0.5074,  0.3471],
         [-0.5311, -0.1066,  0.5076,  0.3446],
         [-0.5299, -0.1081,  0.5077,  0.3493]],

        [[-0.5337, -0.1051,  0.5085,  0.3508],
         [-0.5323, -0.1080,  0.5084,  0.3508],
         [-0.5323, -0.1079,  0.5084,  0.3506],
         [-0.5297, -0.1076,  0.5074,  0.3471],
         [-0.5311, -0.1066,  0.5076,  0.3446],
         [-0.5299, -0.1081,  0.5077,  0.3493]]], grad_fn=<CatBackward0>)
torch.Size([2, 6, 4])


In [16]:
class MultiheadAttention(nn.Module):
    def __init__(self, dim_in, dim_out,
                context_length, dropout, num_heads, qkv_bias=False):
        super().__init__()

        assert dim_out % num_heads == 0, \
            "dim_out must be divisible by num_heads"

        self.dim_out = dim_out
        self.num_heads = num_heads
        self.head_dim = dim_out // num_heads

        self.W_query = nn.Linear(dim_in, dim_out, bias=qkv_bias)
        self.W_key = nn.Linear(dim_in, dim_out, bias=qkv_bias)
        self.W_value = nn.Linear(dim_in, dim_out, bias=qkv_bias)

        self.out_proj = nn.Linear(dim_out, dim_out)
        self.dropout = nn.Dropout(dropout)

        self.register_buffer(
            "mask",
            torch.triu(torch.ones(context_length, context_length), diagonal=1)
        )

    def split_heads(self, x, batch_size, num_tokens):
        x = x.view(batch_size, num_tokens, self.num_heads, self.head_dim)
        x = x.transpose(1, 2)
        return x

    def forward(self, x):
        b, num_tokens, dim_in = x.shape
        
        keys = self.W_key(x)
        queries = self.W_query(x)
        values = self.W_value(x)

        keys = self.split_heads(keys, b, num_tokens)
        queries = self.split_heads(queries, b, num_tokens)
        values = self.split_heads(values, b, num_tokens)

        attn_scores = queries @ keys.transpose(2, 3)
        mask_bool = self.mask.bool()[:num_tokens, :num_tokens]
        attn_scores = attn_scores.masked_fill(mask_bool, -torch.inf)
        
        attn_weights = torch.softmax(attn_scores / keys.shape[-1] ** 0.5, dim=-1)
        attn_weights = self.dropout(attn_weights)

        context_vecs = (attn_weights @ values).transpose(1, 2)
        context_vecs = context_vecs.contiguous().view(b, num_tokens, self.dim_out)

        context_vecs = self.out_proj(context_vecs)
        return context_vecs
        

In [18]:
torch.manual_seed(123)
mha = MultiheadAttention(3, 2, 6, 0, 2)

context_vecs = mha(batch)
print(context_vecs)
print(context_vecs.shape)

tensor([[[0.3190, 0.4858],
         [0.2943, 0.3897],
         [0.2856, 0.3593],
         [0.2693, 0.3873],
         [0.2639, 0.3928],
         [0.2575, 0.4028]],

        [[0.3190, 0.4858],
         [0.2943, 0.3897],
         [0.2856, 0.3593],
         [0.2693, 0.3873],
         [0.2639, 0.3928],
         [0.2575, 0.4028]]], grad_fn=<ViewBackward0>)
torch.Size([2, 6, 2])
