In [1]:
! pip install torch



In [2]:
import torch
import torch.nn as nn

# attention.py


In [3]:
B, n_heads, L_q, L_k, d_k = 2, 3, 4, 5, 1 
#* L_q 为多少个 Q ，L_k 为多少个 key ，d_k为 QK 的统一压缩维度;B是 batchsize
Q = torch.randn(B, n_heads, L_q, d_k) #* 这样生成随机张量
K = torch.randn(B, n_heads, L_k, d_k)
print(Q.shape)
print(K.shape)

torch.Size([2, 3, 4, 1])
torch.Size([2, 3, 5, 1])


In [4]:
print(K.transpose(-2, -1).shape) #* transpose 必须传入两个参数。.T默认是后两个维度

torch.Size([2, 3, 1, 5])


In [5]:
scores = Q @ K.transpose(-2, -1)
print(scores.shape)

torch.Size([2, 3, 4, 5])


In [6]:
K.transpose(-2, -1)

tensor([[[[-0.3963,  1.2913,  2.2296,  0.7844, -0.0720]],

         [[-0.7595,  1.3902,  1.5466, -0.4601, -1.1809]],

         [[ 1.4915,  0.9172,  0.2998,  0.2017, -0.1851]]],


        [[[-0.9460,  1.0402,  0.4597,  0.3917,  0.4495]],

         [[ 0.4013,  0.6619, -1.1623,  0.5673,  1.8676]],

         [[ 1.5308,  0.6311, -0.1676,  1.2222, -0.1744]]]])

In [7]:
# tests/test_attention.py
#* attention 的单元测试

import torch
import torch.nn as nn
import math

# 假设项目根目录在 Python 路径中
from transformer_from_scratch.attention import ScaledDotProductAttention, MultiHeadAttention

# --- 1. 设置测试所需的通用变量 ---
def setup_test_variables():
    """返回一个包含所有测试所需参数和张量的字典。"""
    params = {
        'batch_size': 4,
        'd_model': 128,
        'n_heads': 8,
        'seq_len': 10,
    }
    params['d_k'] = params['d_model'] // params['n_heads']

    # 为 ScaledDotProductAttention 准备的输入
    params['query_sdpa'] = torch.randn(params['batch_size'], params['n_heads'], params['seq_len'], params['d_k'])
    params['key_sdpa'] = torch.randn(params['batch_size'], params['n_heads'], params['seq_len'], params['d_k'])
    params['value_sdpa'] = torch.randn(params['batch_size'], params['n_heads'], params['seq_len'], params['d_k'])

    # 为 MultiHeadAttention 准备的输入
    params['query_mha'] = torch.randn(params['batch_size'], params['seq_len'], params['d_model'])
    params['key_mha'] = torch.randn(params['batch_size'], params['seq_len'], params['d_model'])
    params['value_mha'] = torch.randn(params['batch_size'], params['seq_len'], params['d_model'])
    
    return params

# --- 2. 为 ScaledDotProductAttention 编写测试函数 ---

def test_sdpa_forward_shape(params):
    """Test 1: 测试 ScaledDotProductAttention 的输出形状。"""
    print("  - Running test_sdpa_forward_shape...")
    attention = ScaledDotProductAttention()
    # 注意：您的模型实现只返回 output，我们遵循这个实现
    output = attention(params['query_sdpa'], params['key_sdpa'], params['value_sdpa'])
    
    expected_shape = (params['batch_size'], params['n_heads'], params['seq_len'], params['d_k'])
    assert output.shape == expected_shape, f"Expected shape {expected_shape}, but got {output.shape}"

def test_sdpa_with_mask_behavior(params):
    """Test 2 [ENHANCED]: 通过行为测试验证掩码的有效性。"""
    print("  - Running test_sdpa_with_mask_behavior...")
    attention = ScaledDotProductAttention(dropout=0.0) # 关闭 dropout 以进行确定性测试
    
    # 创建一个掩码，遮盖最后一个 token
    mask = torch.ones(params['batch_size'], 1, 1, params['seq_len'], dtype=torch.bool)
    mask[:, :, :, -1] = 0  # Mask the last token
    
    # 创建一个特殊的 value 张量
    # 被掩码的位置（最后一个 token）的值设为 100.0，其他位置为 0
    special_value = torch.zeros_like(params['value_sdpa'])
    special_value[:, :, -1, :] = 100.0
    
    # 前向传播
    output = attention(params['query_sdpa'], params['key_sdpa'], special_value, mask=mask)
    
    # 验证：如果掩码有效，100.0 这个值不应该对输出有任何贡献。
    # 因此，输出张量中的最大值应该非常接近于 0。
    max_output_val = torch.max(output).item()
    assert max_output_val < 1e-6, f"Masking failed. Large value from masked position leaked into output. Max output value is {max_output_val}"

# --- 3. 为 MultiHeadAttention 编写测试函数 ---

def test_mha_forward_shape(params):
    """Test 3: 测试 MultiHeadAttention 的输出形状。"""
    print("  - Running test_mha_forward_shape...")
    mha = MultiHeadAttention(d_model=params['d_model'], n_heads=params['n_heads'])
    output = mha(params['query_mha'], params['key_mha'], params['value_mha'])
    
    expected_shape = (params['batch_size'], params['seq_len'], params['d_model'])
    assert output.shape == expected_shape, f"Expected shape {expected_shape}, but got {output.shape}"

def test_mha_gradient_flow(params):
    """Test 4: 测试梯度是否能正确流经 MultiHeadAttention。"""
    print("  - Running test_mha_gradient_flow...")
    mha = MultiHeadAttention(d_model=params['d_model'], n_heads=params['n_heads'])
    mha.train()
    
    output = mha(params['query_mha'], params['key_mha'], params['value_mha'])
    loss = output.sum()
    loss.backward()

    assert mha.w_q.weight.grad is not None, "Gradient missing in w_q"
    assert mha.w_k.weight.grad is not None, "Gradient missing in w_k"
    assert mha.w_v.weight.grad is not None, "Gradient missing in w_v"
    assert mha.fc.weight.grad is not None, "Gradient missing in fc"
    assert mha.layer_norm.weight.grad is not None, "Gradient missing in layer_norm"

def test_mha_vs_pytorch_implementation(params):
    """Test 5: 将我们的 MultiHeadAttention 与 PyTorch 的 nn.MultiheadAttention 进行比较。"""
    print("  - Running test_mha_vs_pytorch_implementation...")
    # 设置我们的模型
    our_mha = MultiHeadAttention(d_model=params['d_model'], n_heads=params['n_heads'], dropout=0.0)
    our_mha.eval()

    # 设置 PyTorch 的模型
    pytorch_mha = nn.MultiheadAttention(embed_dim=params['d_model'], num_heads=params['n_heads'], bias=True, batch_first=True, dropout=0.0)
    pytorch_mha.eval()

    # 复制权重
    pytorch_mha.in_proj_weight.data.copy_(torch.cat([our_mha.w_q.weight, our_mha.w_k.weight, our_mha.w_v.weight]))
    pytorch_mha.in_proj_bias.data.copy_(torch.cat([our_mha.w_q.bias, our_mha.w_k.bias, our_mha.w_v.bias]))
    pytorch_mha.out_proj.weight.data.copy_(our_mha.fc.weight.data)
    pytorch_mha.out_proj.bias.data.copy_(our_mha.fc.bias.data)
    
    # 运行 PyTorch 模型
    pytorch_output, _ = pytorch_mha(params['query_mha'], params['key_mha'], params['value_mha'])

    # 复现我们 MHA 的核心逻辑（不含 Add & Norm）
    with torch.no_grad():
        query = our_mha.w_q(params['query_mha'])
        key = our_mha.w_k(params['key_mha'])
        value = our_mha.w_v(params['value_mha'])
        query = query.view(params['batch_size'], -1, params['n_heads'], params['d_k']).transpose(1, 2)
        key = key.view(params['batch_size'], -1, params['n_heads'], params['d_k']).transpose(1, 2)
        value = value.view(params['batch_size'], -1, params['n_heads'], params['d_k']).transpose(1, 2)
        context = our_mha.attention(query, key, value, mask=None)
        context = context.transpose(1, 2).contiguous().view(params['batch_size'], -1, params['d_model'])
        our_output_pre_add_norm = our_mha.fc(context)

    # 比较输出
    assert torch.allclose(pytorch_output, our_output_pre_add_norm, atol=1e-6), "Our MHA core implementation does not match PyTorch's."

# --- 4. 主执行块 ---
if __name__ == "__main__":
    print("Running tests for attention.py...")
    test_params = setup_test_variables()
    
    test_sdpa_forward_shape(test_params)
    test_sdpa_with_mask_behavior(test_params)
    test_mha_forward_shape(test_params)
    test_mha_gradient_flow(test_params)
    test_mha_vs_pytorch_implementation(test_params)
    
    print("\n✅ All attention tests passed!")

Running tests for attention.py...
  - Running test_sdpa_forward_shape...
  - Running test_sdpa_with_mask_behavior...
  - Running test_mha_forward_shape...
  - Running test_mha_gradient_flow...
  - Running test_mha_vs_pytorch_implementation...

✅ All attention tests passed!


# layers.py

In [8]:
Q.shape

torch.Size([2, 3, 4, 1])

In [9]:
Q.mean(-1,keepdim=True)

tensor([[[[ 0.9242],
          [-0.1576],
          [ 0.0711],
          [ 0.0508]],

         [[-0.1264],
          [ 0.4284],
          [ 2.8152],
          [-0.8491]],

         [[ 1.1062],
          [-2.5651],
          [-1.0460],
          [ 0.0137]]],


        [[[-0.7050],
          [ 1.0639],
          [-0.1188],
          [-0.6589]],

         [[ 1.4388],
          [ 0.2072],
          [ 0.7617],
          [ 1.4504]],

         [[-1.4042],
          [ 0.2621],
          [-2.0038],
          [ 1.5751]]]])

In [10]:
Q.mean(-1,keepdim=True).shape

torch.Size([2, 3, 4, 1])

In [11]:
Q.mean(-1)

tensor([[[ 0.9242, -0.1576,  0.0711,  0.0508],
         [-0.1264,  0.4284,  2.8152, -0.8491],
         [ 1.1062, -2.5651, -1.0460,  0.0137]],

        [[-0.7050,  1.0639, -0.1188, -0.6589],
         [ 1.4388,  0.2072,  0.7617,  1.4504],
         [-1.4042,  0.2621, -2.0038,  1.5751]]])

In [12]:
Q.mean(-1).shape

torch.Size([2, 3, 4])

# blocks.py 和 layers.py

In [13]:
# tests/test_blocks.py
#! 记得重启内核在测试
import torch
from transformer_from_scratch.blocks import EncoderBlock, DecoderBlock

def setup_test_variables():
    """为测试设置通用变量。"""
    params = {
        'batch_size': 4,
        'd_model': 128,
        'n_heads': 8,
        'd_ff': 512,
        'src_len': 12,
        'tgt_len': 10,
        'dropout': 0.1
    }
    params['src'] = torch.randn(params['batch_size'], params['src_len'], params['d_model'])
    params['src_mask'] = torch.ones(params['batch_size'], 1, 1, params['src_len'], dtype=torch.bool)
    params['src_mask'][:, :, :, -2:] = False

    params['tgt'] = torch.randn(params['batch_size'], params['tgt_len'], params['d_model'])
    params['enc_output'] = torch.randn(params['batch_size'], params['src_len'], params['d_model'])
    
    tgt_padding_mask = torch.ones(params['batch_size'], 1, params['tgt_len'], 1, dtype=torch.bool)
    tgt_padding_mask[:, :, -1, :] = False
    tgt_causal_mask = torch.tril(torch.ones(params['tgt_len'], params['tgt_len'], dtype=torch.bool))
    params['tgt_mask'] = tgt_padding_mask & tgt_causal_mask
    return params

def test_encoder_block_forward_shape(params):
    """Test 1: 测试 EncoderBlock 的输出形状。"""
    print("  - Running test_encoder_block_forward_shape...")
    encoder_block = EncoderBlock(d_model=params['d_model'], n_heads=params['n_heads'], d_ff=params['d_ff'], dropout=params['dropout'])
    output = encoder_block(params['src'], params['src_mask'])
    
    assert output.shape == params['src'].shape, f"Expected shape {params['src'].shape}, but got {output.shape}"

def test_encoder_block_gradient_flow(params):
    """Test 2: 测试梯度是否能正确流经 EncoderBlock。"""
    print("  - Running test_encoder_block_gradient_flow...")
    encoder_block = EncoderBlock(d_model=params['d_model'], n_heads=params['n_heads'], d_ff=params['d_ff'], dropout=params['dropout'])
    encoder_block.train()
    output = encoder_block(params['src'], params['src_mask'])
    loss = output.sum()
    loss.backward()
    assert encoder_block.self_attn.w_q.weight.grad is not None, "Gradient missing in self_attn.w_q"
    assert encoder_block.feed_forward.w_1.weight.grad is not None, "Gradient missing in feed_forward.w_1"

def test_decoder_block_forward_shape(params):
    """Test 3: 测试 DecoderBlock 的输出形状。"""
    print("  - Running test_decoder_block_forward_shape...")
    decoder_block = DecoderBlock(d_model=params['d_model'], n_heads=params['n_heads'], d_ff=params['d_ff'], dropout=params['dropout'])
    output = decoder_block(params['tgt'], params['enc_output'], params['tgt_mask'], params['src_mask'])
    assert output.shape == params['tgt'].shape, f"Expected shape {params['tgt'].shape}, but got {output.shape}"

def test_decoder_block_cross_attention_dependency(params):
    """Test 4 [NEW]: 测试 DecoderBlock 的输出是否依赖于编码器的输出。"""
    print("  - Running test_decoder_block_cross_attention_dependency...")
    decoder_block = DecoderBlock(d_model=params['d_model'], n_heads=params['n_heads'], d_ff=params['d_ff'], dropout=0.0)
    decoder_block.eval()

    with torch.no_grad():
        output_base = decoder_block(params['tgt'], params['enc_output'], params['tgt_mask'], params['src_mask'])
        enc_output_zero = torch.zeros_like(params['enc_output'])
        output_zero_enc = decoder_block(params['tgt'], enc_output_zero, params['tgt_mask'], params['src_mask'])
    
    assert not torch.allclose(output_base, output_zero_enc), "Decoder output should change when encoder output changes."

if __name__ == "__main__":
    print("Running tests for blocks.py...")
    test_params = setup_test_variables()
    
    test_encoder_block_forward_shape(test_params)
    test_encoder_block_gradient_flow(test_params)
    test_decoder_block_forward_shape(test_params)
    test_decoder_block_cross_attention_dependency(test_params)
    
    print("\n✅ All block tests passed!")

Running tests for blocks.py...
  - Running test_encoder_block_forward_shape...
  - Running test_encoder_block_gradient_flow...
  - Running test_decoder_block_forward_shape...
  - Running test_decoder_block_cross_attention_dependency...

✅ All block tests passed!


# model.py

In [14]:
# tests/test_transformer.py

import torch
from transformer_from_scratch.model import Transformer

def setup_test_variables():
    """设置通用参数和虚拟数据。"""
    params = {
        'src_vocab_size': 100,
        'tgt_vocab_size': 120,
        'd_model': 128,
        'num_layers': 2,
        'n_heads': 8,
        'd_ff': 512,
        'max_len': 100,
        'dropout': 0.1,
        'batch_size': 4,
        'src_len': 12,
        'tgt_len': 10,
        'pad_idx': 0
    }
    
    params['src'] = torch.randint(1, params['src_vocab_size'], (params['batch_size'], params['src_len']))
    params['tgt'] = torch.randint(1, params['tgt_vocab_size'], (params['batch_size'], params['tgt_len']))
    params['src'][0, -2:] = params['pad_idx']
    params['tgt'][1, -1:] = params['pad_idx']

    device = params['src'].device
    params['src_mask'] = Transformer.create_padding_mask(params['src'], params['pad_idx'])
    tgt_padding_mask = Transformer.create_padding_mask(params['tgt'], params['pad_idx'])
    tgt_causal_mask = Transformer.create_causal_mask(params['tgt_len'], device)
    params['tgt_mask'] = tgt_padding_mask & tgt_causal_mask
    
    return params

def test_model_forward_shape(params):
    """Test 1: 验证模型前向传播的输出形状。"""
    print("  - Running test_model_forward_shape...")
    model = Transformer(
        src_vocab_size=params['src_vocab_size'], tgt_vocab_size=params['tgt_vocab_size'],
        d_model=params['d_model'], num_layers=params['num_layers'], n_heads=params['n_heads'],
        d_ff=params['d_ff'], max_len=params['max_len'], dropout=params['dropout']
    )
    model.eval()
    with torch.no_grad():
        output = model(params['src'], params['tgt'], params['src_mask'], params['tgt_mask'])
    
    expected_shape = (params['batch_size'], params['tgt_len'], params['tgt_vocab_size'])
    assert output.shape == expected_shape, f"Expected output shape {expected_shape}, but got {output.shape}"

def test_model_gradient_flow(params):
    """Test 2: 确保梯度能流经整个模型。"""
    print("  - Running test_model_gradient_flow...")
    model = Transformer(
        src_vocab_size=params['src_vocab_size'], tgt_vocab_size=params['tgt_vocab_size'],
        d_model=params['d_model'], num_layers=params['num_layers'], n_heads=params['n_heads'],
        d_ff=params['d_ff'], max_len=params['max_len'], dropout=params['dropout']
    )
    model.train()

    output = model(params['src'], params['tgt'], params['src_mask'], params['tgt_mask'])
    loss = output.sum()
    loss.backward()

    assert model.src_embedding.weight.grad is not None, "Gradient missing in source embedding."
    assert model.encoder.layers[0].self_attn.w_q.weight.grad is not None, "Gradient missing in encoder's attention."
    assert model.decoder.layers[0].cross_attn.w_k.weight.grad is not None, "Gradient missing in decoder's cross-attention."
    assert model.fc_out.weight.grad is not None, "Gradient missing in the final output layer."

def test_causal_mask_logic(params):
    """Test 3: 验证因果掩码的逻辑。"""
    print("  - Running test_causal_mask_logic...")
    model = Transformer(
        src_vocab_size=params['src_vocab_size'], tgt_vocab_size=params['tgt_vocab_size'],
        d_model=params['d_model'], num_layers=params['num_layers'], n_heads=params['n_heads'],
        d_ff=params['d_ff'], max_len=params['max_len'], dropout=0.0 # 关闭 dropout
    )
    model.eval()

    with torch.no_grad():
        output_base = model(params['src'], params['tgt'], params['src_mask'], params['tgt_mask'])

        tgt_modified = params['tgt'].clone()
        new_token_val = (params['tgt'][0, 5] + 10) % params['tgt_vocab_size']
        if new_token_val == params['pad_idx']: new_token_val += 1
        tgt_modified[0, 5] = new_token_val

        output_modified = model(params['src'], tgt_modified, params['src_mask'], params['tgt_mask'])

    output_at_pos_4_base = output_base[0, 4, :]
    output_at_pos_4_modified = output_modified[0, 4, :]
    
    assert torch.allclose(output_at_pos_4_base, output_at_pos_4_modified, atol=1e-6), \
        "Causal mask failed: Output at t=4 changed when input at t=5 was modified."

    output_at_pos_5_base = output_base[0, 5, :]
    output_at_pos_5_modified = output_modified[0, 5, :]

    assert not torch.allclose(output_at_pos_5_base, output_at_pos_5_modified), \
        "Sanity check failed: Output at t=5 did not change when input at t=5 was modified."

if __name__ == "__main__":
    print("Running tests for model.py (Transformer)...")
    test_params = setup_test_variables()
    
    test_model_forward_shape(test_params)
    test_model_gradient_flow(test_params)
    test_causal_mask_logic(test_params)
    
    print("\n✅ All Transformer model tests passed!")

Running tests for model.py (Transformer)...
  - Running test_model_forward_shape...
  - Running test_model_gradient_flow...
  - Running test_causal_mask_logic...

✅ All Transformer model tests passed!
