In [1]:
from typing import Tuple, Optional, List
import math
import random
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F

In [4]:
SEED = 42
random.seed(SEED); np.random.seed(SEED); torch.manual_seed(SEED)
DEVICE = torch.device("mps")

In [5]:
DEVICE

device(type='mps')

# 1. Intuition to attention

## 1.1 What is attention

我有三份信息（Values, V）要综合，比如 V1, V2, V3；我手上还有一组权重（weights），比如 0.7, 0.2, 0.1（加起来=1）。
那综合结果就是：0.7*V1 + 0.2*V2 + 0.1*V3。
——这就是注意力结果（只不过真正的权重不是手填，而是“算出来”的）。

In [6]:
def weighted_average(values, weights):
    """
    Inputs:
    values: np.ndarray, shape [N, D] N informations, each is D-dimentional vector
    weights: np.ndarray, shape [N]

    Outputs:
    out: np.ndarray, shape[D],= weighted average
    """
    weights = weights / (weights.sum() + 1e-12) #normalization
    return (weights[:, None] * values).sum(axis=0)

V = np.array([[10., 0.],   # V1
              [ 0.,10.],   # V2
              [ 5., 5.]])  # V3   
w = np.array([0.7, 0.2, 0.1]) 
out = weighted_average(V, w)
print(out)

[7.5 2.5]


In [7]:
w @ V

array([7.5, 2.5])

## 1.2 Where is weight from?

想法：

* 我有一个“问题”向量 Q（Query），表示“我现在想要什么”。

* 还有一堆“候选”向量 K（Keys），每个候选都有对应的 V（Values） 信息。

* 打分：用 Q 去和每个 K 做“相似度”（我们用点积，越像分越高）。

* 归一化：把这些分数做 softmax（指数归一化）→ 得到 0~1 的权重，且和=1。

* 带权平均：用这些权重去加权 V，得到输出。

这就是常说的 Scaled Dot-Product Attention（缩放点积注意力）

In [9]:
def softmax(x):
    """
    inputs:
    x: np.ndarray, shape[N]

    output:
    p: np.ndarray, shape[N]

    how it works:
    p[i] = exp(x[i]) / sum(exp(x[j]))
    """
    x = x - x.max()
    e = np.exp(x)
    return e / (e.sum() + 1e-12)

In [10]:
def attention_once(Q, K, V):
    """
    inputs:
    Q: np.ndarary, shape[D] one query vector
    K: np.ndarray, shape[N, D] N keys
    V: np.ndarray, shape[N, Dv] every key corresponding values

    output:
    out: np.ndarray, shape [Dv] attention output
    weights: np.ndarray, shape [N] attention weights (sum == 1)

    how it works:
    1) scores[i] = dot(Q, K[i])
    2) weights = softmax(scores)
    3) out = sum(weights[i] * V[i]) over i
    """

    scores = K @ Q

    weights = softmax(scores)

    out = weighted_average(V, weights)

    return out, weights, scores

In [11]:
Q = np.array([1.0, 0.0])                    # I would like the information along x axis
K = np.array([[1.0, 0.0],                   # K1, same as Q, along with it
              [0.7, 0.2],                   # K2, sort of align with Q
              [-1.0, 0.0]])                 # K3, opposite direction of W
V = np.array([[10., 0.],                    # V1
              [ 0.,10.],                    # V2
              [ 5., 5.]], dtype=np.float32) # V3

out, w, s = attention_once(Q, K, V)
print("打分 scores :", s)       # 看哪个最相关（越大越相关）
print("权重 weights:", w)       # softmax 后变为概率分布（和=1）
print("输出 out    :", out)     # 带权平均的结果

打分 scores : [ 1.   0.7 -1. ]
权重 weights: [0.53300543 0.39486013 0.07213444]
输出 out    : [5.69072648 4.30927352]


need * 1/sqrt(D) if large

## 1.3 What is mask?

* Padding Mask（填充掩码）：补齐出来的 <pad> 位置是“无效的”，权重要变成 0。
做法：在分数上把这些位置加上一个超大负数（如 -1e9），softmax 后几乎就是 0。

* Causal Mask（因果掩码）：解码时不能看未来（当前位置只能看它前面），把“未来位置”也加上超大负数。

In [12]:
def apply_mask(scores, mask):
    """
    Inputs:
    scores: np.ndarray, shape [N], original score
    mask: np.ndarray, shape [N], dtype = bool, True means need to mask that position

    output:
    masked_score: np.ndarray, shape [N]

    how it works:
    make mask==True position extremely small, after softmax, it will very close to 0
    """

    masked = scores.copy()
    masked[mask] = -1e9
    return masked

In [13]:
scores = np.array([3.0, 1.0, -2.0], dtype=np.float32)
mask   = np.array([False, True, False])  # 第二个位置是 pad，要遮住
masked_scores = apply_mask(scores, mask)

In [14]:
print("原分数:", scores, "-> softmax", softmax(scores))
print("打掩码:", masked_scores, "-> softmax", softmax(masked_scores))  # 中间那个几乎 0

原分数: [ 3.  1. -2.] -> softmax [0.8756006  0.11849965 0.00589975]
打掩码: [ 3.e+00 -1.e+09 -2.e+00] -> softmax [0.9933072  0.         0.00669285]


## 1.4 Self-attention

关键：现在 Q/K/V 都来自同一句话的向量表示 X。
最简单的演示：我们先不做任何线性变换，直接令 Q=K=V=X（现实里会各自过一层线性投影，这里先别管）。

这样你能看到：每个词的位置会按相似度从其他词那里“取信息”（加权平均）。

In [15]:
def self_attention_minimal(X):
    """
    inputs:
    X: np.ndarray shape [T, D], one sequence of T vectors (each vector is D-dimensional)

    output:
    Y: np.ndarray shape [T, D], self attention output, every position i is a new representation from weight averaged all positions

    how it works:
    for every i:
        socres[i, j] = X[i] X[j]
        weights[i] = softmax(socres[i])
        Y[i] = sum(weights[i, j] * X[j]) over j
    """

    T, D = X.shape
    Y = np.zeros_like(X)
    for i in range(T):
        scores = X @ X[i]
        weights = softmax(scores)
        Y[i] = weighted_average(X, weights)
    return Y

In [16]:
np.random.seed(0)
X = np.random.randn(4, 3).astype(np.float32)  # T=4 词，D=3 维
Y = self_attention_minimal(X)
print("输入 X 形状:", X.shape)
print("输出 Y 形状:", Y.shape)

输入 X 形状: (4, 3)
输出 Y 形状: (4, 3)


In [17]:
X

array([[ 1.7640524 ,  0.4001572 ,  0.978738  ],
       [ 2.2408931 ,  1.867558  , -0.9772779 ],
       [ 0.95008844, -0.1513572 , -0.10321885],
       [ 0.41059852,  0.14404356,  1.4542735 ]], dtype=float32)

In [19]:
Y

array([[ 1.7975295 ,  0.85911125,  0.3104185 ],
       [ 2.238525  ,  1.8615676 , -0.9702689 ],
       [ 1.7368875 ,  0.9578309 , -0.05566131],
       [ 1.0923121 ,  0.30132565,  1.0670953 ]], dtype=float32)

# 2. Almost replay of part 1 on NumPy: Weighted Average -> One-shot Attention -> Self-Attention (Q=K=V=X)

In [20]:
def weighted_average(values, weights):
    """
    Inputs: 
    values [N,D]
    weights [N]
    """
    w = weights / (weights.sum() + 1e-12)
    return (w[:, None] * values).sum(axis=0)

In [21]:
def softmax_np(x):
    """
    x shape [N]
    """
    x = x - x.max()
    e = np.exp(x)
    return e / (e.sum() + 1e-12)

In [22]:
def attention_once_numpy(Q, K, V, scale, mask):
    """
    Inputs:
    Q: [D] single query
    K: [N, D], N keys
    V: [N, Dv], N values
    scale: bool, if do 1/sqrt(D)
    mask: shape[N] dtype=bool, if True, that key's position is masked, not participate in attention

    Outputs:
    out: [Dv] attention output (weighted average of V)
    weights: [N] the weights (sum == 1)
    scores: [N] score (before softmax)
    """

    scores = K @ Q
    if scale:
        scores = scores / np.sqrt(Q.shape[0])

    if mask is not None:
        scores = scores.copy()
        scores[mask] = -1e9
    weights = softmax_np(scores)
    out = (weights[:, None] * V).sum(axis=0)
    return out, weights, scores

In [23]:
def self_attention_minimal_numpy(X, scale):
    """
    Inputs:
    X [T, D], one sequence has T D-d vector (Q = K = V)

    Outputs:
    T [T, D], attention output

    Here the purpose is to show how attention works, very very simple case of self-attention that Q=K=V=X, real cases that will do projection
    """

    T, D = X.shape
    Y = np.zeros_like(X)
    for i in range(T):
        q = X[i]
        out, _, _, = attention_once_numpy(q, X, X, scale=scale, mask=None)
        Y[i] = out
    return Y

In [27]:
def demo_numpy_block():
    print("\n[Demo] 1) NumPy attention basics")
    # Values & weights demo
    V = np.array([[10., 0.], [0., 10.], [5., 5.]], dtype=np.float32)  # [3,2]
    w = np.array([0.7, 0.2, 0.1], dtype=np.float32)                   # [3]
    print("weighted_average:", weighted_average(V, w))  # ~[7., 3.]

    # One-shot attention
    Q = np.array([1., 0.], dtype=np.float32)
    K = np.array([[1., 0.], [0.7, 0.2], [-1., 0.]], dtype=np.float32)  # [3,2]
    out, weights, scores = attention_once_numpy(Q, K, V, scale=True, mask=None)
    print("scores:", np.round(scores, 3), "weights:", np.round(weights, 3), "out:", np.round(out, 3))

    # Self-attention (Q=K=V=X)
    X = np.array([[1.0, 0.0],
                  [0.8, 0.2],
                  [0.1, 0.9]], dtype=np.float32)  # ["i","love","nlp"] toy vectors
    Y = self_attention_minimal_numpy(X, scale=True)
    print("X:\n", np.round(X, 3), "\nY:\n", np.round(Y, 3))

In [28]:
demo_numpy_block()


[Demo] 1) NumPy attention basics
weighted_average: [7.5 2.5]
scores: [ 0.707  0.495 -0.707] weights: [0.487 0.394 0.118] out: [5.466 4.534]
X:
 [[1.  0. ]
 [0.8 0.2]
 [0.1 0.9]] 
Y:
 [[0.729 0.271]
 [0.693 0.307]
 [0.545 0.455]]


In [87]:
X = np.array([[1.0, 0.0],
                  [0.8, 0.2],
                  [0.1, 0.9]], dtype=np.float32)

In [88]:
scores = (X @ X.T) / np.sqrt(2)

In [89]:
scores

array([[0.70710677, 0.56568545, 0.07071068],
       [0.56568545, 0.48083267, 0.18384776],
       [0.07071068, 0.18384776, 0.57982755]], dtype=float32)

In [90]:
weights = [softmax_np(t) for t in scores]

In [91]:
weights

[array([0.4171325 , 0.3621225 , 0.22074491], dtype=float32),
 array([0.38443005, 0.3531557 , 0.2624142 ], dtype=float32),
 array([0.26429808, 0.29595715, 0.43974477], dtype=float32)]

In [92]:
Y = [(weight[:, None] * X).sum(axis=0) for weight in weights]

In [93]:
Y

[array([0.72890496, 0.27109492], dtype=float32),
 array([0.69319606, 0.3068039 ], dtype=float32),
 array([0.5450383 , 0.45496172], dtype=float32)]

In [94]:
out, _, _ = attention_once_numpy(X[2], X, X, True, None)

In [95]:
out

array([0.5450383 , 0.45496172], dtype=float32)

In [96]:
for i in range(3):
    q = X[i]
    scores = X @ q / np.sqrt(2)
    weights = softmax_np(scores)
    out = (weights[:, None] * X).sum(axis=0)
    print(out)

[0.72890496 0.27109492]
[0.69319606 0.3068039 ]
[0.5450383  0.45496172]


In [101]:
W = [softmax_np(t) for t in X @ X.T / np.sqrt(2)]

In [102]:
W

[array([0.4171325 , 0.3621225 , 0.22074491], dtype=float32),
 array([0.38443005, 0.3531557 , 0.2624142 ], dtype=float32),
 array([0.26429808, 0.29595715, 0.43974477], dtype=float32)]

In [103]:
Y = [(weight[:, None] * X).sum(axis=0) for weight in W]

In [104]:
Y

[array([0.72890496, 0.27109492], dtype=float32),
 array([0.69319606, 0.3068039 ], dtype=float32),
 array([0.5450383 , 0.45496172], dtype=float32)]

# 3. Single head self attention in PyTorch

In [109]:
class SelfAttentionSimple(nn.Module):
    """
    Single head attention (can choose linear project or not), and support padding/causal mask
    - if use_projection False, Q=K=V=x
    - if use_projection True, Q=xWq, K=xWk, V=xWv

    Inputs (foward)
    x: [B, T, d_model]
    key_padding_mask: [B, T]
    causal: bool

    Outputs
    y: [B, T, d_model] here d_v = d_model
    attn: [B, T, T] attention weight, every sample a TxT graph

    Purpose:
    scores = QK^T/sqrt(d), mask->softmax->weights, weightsxV->output
    """

    def __init__(self, d_model=32, use_projection=False, dropout=0.0):
        super().__init__()
        self.d_model = d_model
        self.use_proj = use_projection
        if use_projection:
            self.Wq = nn.Linear(d_model, d_model, bias=False)
            self.Wk = nn.Linear(d_model, d_model, bias=False)
            self.Wv = nn.Linear(d_model, d_model, bias=False)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, key_padding_mask=None, causal=False):
        B, T, D = x.shape
        if self.use_proj:
            Q, K, V = self.Wq(x), self.Wk(x), self.Wv(x)
        else:
            Q = K = V = x

        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(D) #K from [B,T,D] transpose last two dimension -> [B, D, T]
        #score is [B, T, T], batch matmul
        if key_padding_mask is not None:
            scores = scores.masked_fill(key_padding_mask[:, None, :], float("-inf")) #mask [B,T] -> [B, 1, T] broadcast to [B, T, T]

        if causal:
            # 上三角（strict）为 -inf
            causal_mask = torch.triu(torch.ones(T, T, dtype=torch.bool, device=x.device), diagonal=1) #mask [T, T]
            scores = scores.masked_fill(causal_mask.unsqueeze(0), float("-inf")) # unsqueeze into [1, T, T], broadcast to [B,T,T]

        attn = F.softmax(scores, dim=-1) # [B,T,T]
        attn = self.dropout(attn)
        y = torch.matmul(attn, V) # [B,T,T] @ [B, T, D] get [B, T,D]
        return y, attn

In [110]:
def demo_torch_selfattn():
    print("\n[Demo] 2) PyTorch SelfAttentionSimple (with masks)")
    torch.manual_seed(SEED)
    B, T, D = 2, 5, 8
    x = torch.randn(B, T, D)

    # Padding mask: batch0 最后两位是 pad，batch1 最后三位是 pad
    key_pad = torch.tensor([[False, False, False, True, True],
                            [False, False, True,  True, True]])

    sa = SelfAttentionSimple(d_model=D, use_projection=False)
    y, attn = sa(x, key_padding_mask=key_pad, causal=False)
    print("y shape:", y.shape, "attn shape:", attn.shape)  # [B,T,D], [B,T,T]

    # 再看 causal（因果）自注意力
    y2, attn2 = sa(x, key_padding_mask=None, causal=True)
    print("causal attn upper-triangle ~0? row sums:", attn2.sum(-1))  # 每行和≈1

In [111]:
demo_torch_selfattn()


[Demo] 2) PyTorch SelfAttentionSimple (with masks)
y shape: torch.Size([2, 5, 8]) attn shape: torch.Size([2, 5, 5])
causal attn upper-triangle ~0? row sums: tensor([[1.0000, 1.0000, 1.0000, 1.0000, 1.0000],
        [1.0000, 1.0000, 1.0000, 1.0000, 1.0000]])


# 4. nn.MultiheadAttention in PyTorch

In [115]:
def make_causal_mask(T, device=None):
    """
    Inputs:
    T

    Outputs:
    attn_mask: [T,T], upper triangle is -inf
    """
    m = torch.zeros(T, T, dtype=torch.float32, device=device)
    m = m.masked_fill(torch.triu(torch.ones_like(m), diagonal=1).bool(), float("inf"))
    return m

In [116]:
def demo_torch_mha_api():
    print("\n[Demo] 3) nn.MultiheadAttention API (batch_first=True)")
    torch.manual_seed(SEED)
    B, T, D, H = 2, 5, 16, 4
    x = torch.randn(B, T, D)

    # key_padding_mask: True=pad
    key_padding_mask = torch.tensor([[False, False, False, True, True],
                                     [False, False, True,  True, True]])

    attn = nn.MultiheadAttention(embed_dim=D, num_heads=H, batch_first=True, dropout=0.0)
    # Self-attention: Q=K=V=x
    y, w = attn(x, x, x,
                key_padding_mask=key_padding_mask,
                need_weights=True,              # 请求权重
                average_attn_weights=False)     # 每个头各自的权重
    print("output:", y.shape, "weights:", w.shape)  # [B,T,D], [B,H,T,T]

    # Decoder self-attention with causal mask
    attn_mask = make_causal_mask(T, device=x.device)  # [T,T]
    y2, w2 = attn(x, x, x, attn_mask=attn_mask,
                  need_weights=True, average_attn_weights=False)
    print("causal output:", y2.shape, "weights:", w2.shape)

In [117]:
demo_torch_mha_api()


[Demo] 3) nn.MultiheadAttention API (batch_first=True)
output: torch.Size([2, 5, 16]) weights: torch.Size([2, 4, 5, 5])
causal output: torch.Size([2, 5, 16]) weights: torch.Size([2, 4, 5, 5])


# 5. Minimal Encoder Block

In [122]:
class PositionwiseFFN(nn.Module):
    """
    Inputs:
    x [B, T, d_model]

    Outputs:
    y [B, T, d_model]

    Purpose: Linear(d->4d) -> GELU -> Dropout -> Linear(4d->d)
    """
    def __init__(self, d_model, d_ff, dropout=0.1):
        super().__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        x = self.fc1(x)
        x = F.gelu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x

In [123]:
class EncoderBlock(nn.Module):
    """
    Inputs:
    x [B, T, d_model]
    key_padding_mask [B, T]

    Outputs:
    y [B, T, d_model]
    """
    def __init__(self, d_model=128, num_heads=4, d_ff=512, dropout=0.1):
        super().__init__()
        self.ln1 = nn.LayerNorm(d_model)
        self.mha = nn.MultiheadAttention(d_model, num_heads, batch_first=True, dropout=dropout)
        self.ln2 = nn.LayerNorm(d_model)
        self.ffn = PositionwiseFFN(d_model, d_ff, dropout)
        self.drop = nn.Dropout(dropout)

    def forward(self, x, key_padding_mask=None):
        h = self.ln1(x)
        attn_out, _ = self.mha(h, h, h, key_padding_mask=key_padding_mask, need_weights=False)
        x = x + self.drop(attn_out) #residual
        h = self.ln2(x)
        x = x + self.drop(self.ffn(h))
        return x

In [124]:
def demo_encoder_block():
    print("\n[Demo] 4) EncoderBlock forward")
    torch.manual_seed(SEED)
    B, T, D = 2, 6, 32
    x = torch.randn(B, T, D)
    pad = torch.tensor([[False, False, False, False, True,  True],
                        [False, False, False, True,  True,  True]])
    block = EncoderBlock(d_model=D, num_heads=4, d_ff=128, dropout=0.1)
    y = block(x, key_padding_mask=pad)
    print("encoder block output:", y.shape)  # [B,T,D]

In [125]:
demo_encoder_block()


[Demo] 4) EncoderBlock forward
encoder block output: torch.Size([2, 6, 32])
