<a href="https://colab.research.google.com/github/rczhen/code_ml/blob/main/attention.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import math
import inspect
from dataclasses import dataclass

import torch
import torch.nn as nn
from torch.nn import functional as F

In [3]:
@dataclass
class GPTConfig:
    block_size: int = 1024
    vocab_size: int = 50304 # GPT-2 vocab_size of 50257, padded up to nearest multiple of 64 for efficiency
    n_layer: int = 12
    n_head: int = 12
    n_embd: int = 768
    dropout: float = 0.0
    bias: bool = True # True: bias in Linears and LayerNorms, like GPT-2. False: a bit better and faster

@dataclass
class Config:
    block_size: int = 8
    vocab_size: int = 50304 # GPT-2 vocab_size of 50257, padded up to nearest multiple of 64 for efficiency
    n_layers: int = 2
    n_heads: int = 4
    d_model: int = 16
    dropout_rate: float = 0.0
    bias: bool = True # True: bias in Linears and LayerNorms, like GPT-2. False: a bit better and faster

# v1: standard CausalSelfAttention

In [4]:
class CausalSelfAttention(nn.Module):
    def __init__(self, config) -> None:
        super().__init__()
        assert config.d_model % config.n_heads == 0

        self.n_heads = config.n_heads
        self.d_model = config.d_model
        self.head_size = config.d_model // config.n_heads
        self.block_size = config.block_size

        self.attention_dropout = nn.Dropout(config.dropout_rate) # after softmax
        self.residual_dropout = nn.Dropout(config.dropout_rate) # after attention block, before adding with residual connection

        self.w_qkv = nn.Linear(self.d_model, 3 * self.d_model)
        self.w_o = nn.Linear(self.d_model, self.d_model)

        self.register_buffer("mask", torch.tril(torch.ones(self.block_size, self.block_size)) # register buffer for low triangular matrix mask
                                    .view(1, 1, self.block_size, self.block_size))  # reshape for (B, n_head, T, T) inputs

    def forward(self, x):
        B, T, D = x.size() # batch size, sequence length, d_model

        # calculate query, key, values for all heads in batch and move head forward to be the batch dim
        q, k, v = self.w_qkv(x).split(self.d_model, dim=2) # (B, T, D) @ (D, 3D) --> (B, T, 3D) --> split at dim=2 --> (B, T, D)
        q = q.view(B, T, self.n_heads, self.head_size).transpose(1, 2) # (B, T, nh, hs) --> (B, nh, T, hs), hs for head_size
        k = k.view(B, T, self.n_heads, self.head_size).transpose(1, 2) # (B, T, nh, hs) --> (B, nh, T, hs), hs for head_size
        v = v.view(B, T, self.n_heads, self.head_size).transpose(1, 2) # (B, T, nh, hs) --> (B, nh, T, hs), hs for head_size

        # attention
        attention = q @ k.transpose(-1, -2) # (B, nh, T, hs) @ (B, nh, hs, T) --> (B, nh, T, T)
        attention *= self.head_size ** -0.5 # scaled dot product attention
        attention = attention.masked_fill(self.mask[:, :, :T, :T] == 0, float('-inf'))
        attention = F.softmax(attention, dim=-1)
        attention = self.attention_dropout(attention)

        # output
        y = attention @ v # (B, nh, T, T) @ (B, nh, T, hs) --> (B, nh, T, hs)
        y = y.transpose(1, 2).contiguous().view(B, T, D) # (B, nh, T, hs) --> (B, T, nh, hs) --> (B, T, D)
        y = self.w_o(y) # (B, T, D) @ (D, D) --> (B, T, D)
        y = self.residual_dropout(y)

        return y


"""
why scaled?
if no normalization, the variance of weights wil be on the order of head_size, here is 16
when deviding by sqrt(head_size), bring the variance back
"Scaled" attention additional divides attention scores by 1/sqrt(head_size).
so when input Q,K are unit variance, attentions will be unit variance too
and Softmax will stay diffuse and not saturate too much.
"""


'\nwhy scaled? \nif no normalization, the variance of weights wil be on the order of head_size, here is 16\nwhen deviding by sqrt(head_size), bring the variance back\n"Scaled" attention additional divides attention scores by 1/sqrt(head_size). \nso when input Q,K are unit variance, attentions will be unit variance too\nand Softmax will stay diffuse and not saturate too much.\n'

In [5]:
x = torch.rand(1, Config.block_size, Config.d_model)
print(x)
layer = CausalSelfAttention(Config)
print(layer(x))

tensor([[[0.1335, 0.5033, 0.0119, 0.9521, 0.0588, 0.9284, 0.3936, 0.6225,
          0.6452, 0.3971, 0.9477, 0.7729, 0.7849, 0.9677, 0.5737, 0.1898],
         [0.0463, 0.7900, 0.2029, 0.2007, 0.9862, 0.8995, 0.8501, 0.4497,
          0.0779, 0.4141, 0.6336, 0.6112, 0.7698, 0.5033, 0.5788, 0.3431],
         [0.2839, 0.9898, 0.9354, 0.7228, 0.7328, 0.5463, 0.4106, 0.4918,
          0.5689, 0.0622, 0.2844, 0.7631, 0.0073, 0.0599, 0.2667, 0.1991],
         [0.1219, 0.4216, 0.6814, 0.1920, 0.7979, 0.9971, 0.0258, 0.6272,
          0.6598, 0.0274, 0.6750, 0.7370, 0.5341, 0.4181, 0.9826, 0.5319],
         [0.5073, 0.6594, 0.3810, 0.3264, 0.5320, 0.2145, 0.2166, 0.9515,
          0.6532, 0.6341, 0.8759, 0.1684, 0.2516, 0.3369, 0.1295, 0.6470],
         [0.8165, 0.4878, 0.8657, 0.7369, 0.4041, 0.3254, 0.9028, 0.6001,
          0.1756, 0.1283, 0.1517, 0.7840, 0.8976, 0.4481, 0.9875, 0.7592],
         [0.1032, 0.7290, 0.5401, 0.6724, 0.4569, 0.0051, 0.1456, 0.9306,
          0.9603, 0.6033, 0.2806

# v2: Talking Heads Attention

In [8]:
class CausalSelfAttention(nn.Module):
    def __init__(self, config, talking_heads=True) -> None:
        super().__init__()
        assert config.d_model % config.n_heads == 0

        self.n_heads = config.n_heads
        self.d_model = config.d_model
        self.head_size = config.d_model // config.n_heads
        self.block_size = config.block_size

        self.attention_dropout = nn.Dropout(config.dropout_rate) # after softmax
        self.residual_dropout = nn.Dropout(config.dropout_rate) # after attention block, before adding with residual connection

        self.w_qkv = nn.Linear(self.d_model, 3 * self.d_model)
        self.w_o = nn.Linear(self.d_model, self.d_model)

        # talking heads: 引入两个小的(n_heads, n_heads)矩阵, 在immediately before and after softmax, 进行head之间的线性变换
        # 2 learnable linear transformations, process (1) attention scores (right after Q*V and scaling, before masking),
        #   (2) logits (attention logits, after masking, optional padding, softmax, before weighted summing keys)
        self.talking_heads = talking_heads
        if self.talking_heads:
            self.w_talking_weights = nn.Linear(self.n_heads, self.n_heads)
            self.w_talking_logits = nn.Linear(self.n_heads, self.n_heads)

        self.register_buffer("mask", torch.tril(torch.ones(self.block_size, self.block_size)) # register buffer for low triangular matrix mask
                                    .view(1, 1, self.block_size, self.block_size))  # reshape for (B, n_head, T, T) inputs

    def forward(self, x):
        B, T, D = x.size() # batch size, sequence length, d_model

        # calculate query, key, values for all heads in batch and move head forward to be the batch dim
        q, k, v = self.w_qkv(x).split(self.d_model, dim=2) # (B, T, D) @ (D, 3D) --> (B, T, 3D) --> split at dim=2 --> (B, T, D)
        q = q.view(B, T, self.n_heads, self.head_size).transpose(1, 2) # (B, T, nh, hs) --> (B, nh, T, hs), hs for head_size
        k = k.view(B, T, self.n_heads, self.head_size).transpose(1, 2) # (B, T, nh, hs) --> (B, nh, T, hs), hs for head_size
        v = v.view(B, T, self.n_heads, self.head_size).transpose(1, 2) # (B, T, nh, hs) --> (B, nh, T, hs), hs for head_size

        # attention
        attention = q @ k.transpose(-1, -2) # (B, nh, T, hs) @ (B, nh, hs, T) --> (B, nh, T, T)
        attention *= self.head_size ** -0.5 # scaled dot product attention
        if self.talking_heads:
            attention = attention.permute(0, 2, 3, 1) # (B, nh, T, T) --> (B, T, T, nh); (B, nh, T_q, T_k) --> (B, T_q, T_k, nh) when query and key have different length
            attention = self.w_talking_weights(attention) # (B, T, T, nh) @ (nh, nh) --> (B, T, T, nh)
            attention = attention.permute(0, 3, 1, 2) # (B, T, T, nh) --> (B, nh, T, T)
        attention = attention.masked_fill(self.mask[:, :, :T, :T] == 0, float('-inf'))
        attention = F.softmax(attention, dim=-1)
        if self.talking_heads:
            attention = attention.permute(0, 2, 3, 1)  # (B, nh, T, T) --> (B, T, T, nh)
            attention = self.w_talking_logits(attention) # (B, T, T, nh) @ (nh, nh) --> (B, T, T, nh)
            attention = attention.permute(0, 3, 1, 2) # (B, T, T, nh) --> (B, nh, T, T)
        attention = self.attention_dropout(attention)

        # output
        y = attention @ v # (B, nh, T, T) @ (B, nh, T, hs) --> (B, nh, T, hs)
        y = y.transpose(1, 2).contiguous().view(B, T, D) # (B, nh, T, hs) --> (B, T, nh, hs) --> (B, T, D)
        y = self.w_o(y) # (B, T, D) @ (D, D) --> (B, T, D)
        y = self.residual_dropout(y)

        return y

In [9]:
x = torch.rand(1, Config.block_size, Config.d_model)
print(x)
layer = CausalSelfAttention(Config)
print(layer(x))

tensor([[[0.3813, 0.6721, 0.2603, 0.6302, 0.2283, 0.3321, 0.0882, 0.1664,
          0.2291, 0.5071, 0.8659, 0.1470, 0.1494, 0.0381, 0.5740, 0.2529],
         [0.3449, 0.9917, 0.6983, 0.1112, 0.0900, 0.0011, 0.0673, 0.5607,
          0.7021, 0.1708, 0.8078, 0.9408, 0.2962, 0.9869, 0.4653, 0.2010],
         [0.6817, 0.4621, 0.2949, 0.4915, 0.9488, 0.6643, 0.8228, 0.3356,
          0.9932, 0.5010, 0.5966, 0.1304, 0.2402, 0.8760, 0.5761, 0.6892],
         [0.5934, 0.0857, 0.3116, 0.7343, 0.2737, 0.7082, 0.7385, 0.3015,
          0.6424, 0.8895, 0.7715, 0.2097, 0.6284, 0.7914, 0.7759, 0.0373],
         [0.9506, 0.1744, 0.7485, 0.9021, 0.7730, 0.3930, 0.5014, 0.2275,
          0.1787, 0.6184, 0.5131, 0.0772, 0.2575, 0.4504, 0.1519, 0.9919],
         [0.9278, 0.5570, 0.5830, 0.9215, 0.5986, 0.2957, 0.8409, 0.5637,
          0.7597, 0.0286, 0.2481, 0.5496, 0.9791, 0.4818, 0.2578, 0.9870],
         [0.8867, 0.8018, 0.6264, 0.6848, 0.3668, 0.9750, 0.4369, 0.2738,
          0.5364, 0.0901, 0.5150

# v3: GQA, MQA, MLA