# config

In [146]:
GPT_CONFIG = {
    "vocab_size": 50257,     # 词汇表大小
    "context_len": 1024,  # 上下文长度
    "emb_dim": 768,          # 嵌入维度
    "n_heads": 8,           # 注意力头的数量
    "n_layers": 12,          # 层数
    "drop_rate": 0.1,        # dropout率
    "qkv_bias": False        # 查询-键-值偏置
}



# Define Test Model

In [147]:
import torch
import torch.nn as nn
from torch.testing import assert_close
torch.manual_seed(42)


class DummyTransformerBlock(nn.Module):
    def __init__(self, cfg):
        super().__init__()
    
    def forward(self,x):
        return x
    
class DummyLayerNorm(nn.Module):
    def __init__(self, norm_shape,eps=1e-5):
        super().__init__()
        
    def forward(self,x):
        return x
        

class DummyGPT(nn.Module):
    def __init__(self,cfg):
        super().__init__()
        self.tok_emb = nn.Embedding(cfg['vocab_size'],cfg['emb_dim'])
        self.pos_emb = nn.Embedding(cfg['context_len'],cfg['emb_dim'])
        self.drop_emb = nn.Dropout(cfg["drop_rate"])
        self.trf_blocks =  nn.Sequential(
            *[DummyTransformerBlock(cfg) for _ in range(cfg["n_layers"])]
        )
        self.final_norm = DummyLayerNorm(cfg["emb_dim"])
        self.out_head = nn.Linear(
            cfg['emb_dim'],cfg['vocab_size'],bias=False
        )
       
    def forward(self,in_idx):
        batch_size, seq_len = in_idx.shape  #in_idx 通常是一个整数张量（Tensor），形状一般为 (batch_size, seq_len)
        tok_embeds = self.tok_emb(in_idx) #(batch_size, seq_len, emb_dim)
        pos_embeds = self.pos_emb(torch.arange(seq_len,device=in_idx.device))  #生成一个从 0 到 seq_len-1 的整数序列
        x = tok_embeds + pos_embeds
        x = self.drop_emb(x)
        x = self.trf_blocks(x)
        x = self.final_norm(x)
        logits = self.out_head(x)
        return logits
        
        

### view model parameters

In [148]:
model = DummyGPT(GPT_CONFIG)

model

DummyGPT(
  (tok_emb): Embedding(50257, 768)
  (pos_emb): Embedding(1024, 768)
  (drop_emb): Dropout(p=0.1, inplace=False)
  (trf_blocks): Sequential(
    (0): DummyTransformerBlock()
    (1): DummyTransformerBlock()
    (2): DummyTransformerBlock()
    (3): DummyTransformerBlock()
    (4): DummyTransformerBlock()
    (5): DummyTransformerBlock()
    (6): DummyTransformerBlock()
    (7): DummyTransformerBlock()
    (8): DummyTransformerBlock()
    (9): DummyTransformerBlock()
    (10): DummyTransformerBlock()
    (11): DummyTransformerBlock()
  )
  (final_norm): DummyLayerNorm()
  (out_head): Linear(in_features=768, out_features=50257, bias=False)
)

# Define layerNorm

In [149]:
class LayerNorm(nn.Module):
    def __init__(self, emb_dim):
        super().__init__()
        self.eps = 1e-5
        self.scale = nn.Parameter(torch.ones(emb_dim))
        self.shift = nn.Parameter(torch.zeros(emb_dim))
        
    def forward(self,x):
        mean = x.mean(dim=-1, keepdim=True)
        var = x.var(dim =-1 ,keepdim =True, unbiased =False)
        norm_x = (x-mean)/torch.sqrt(var+self.eps)
        return self.scale*norm_x + self.shift

### test layerNorm

In [150]:
def test_layer_norm():
   
    batch_size = 2
    seq_len = 5
    emb_dim = 3  
    

    x = torch.randn(batch_size, seq_len, emb_dim)  # 随机生成输入张量
    

    custom_ln = LayerNorm(emb_dim)
    official_ln = nn.LayerNorm(emb_dim, eps=1e-5, elementwise_affine=True)
    

    official_ln.weight.data.copy_(custom_ln.scale.data)
    official_ln.bias.data.copy_(custom_ln.shift.data)
    
 
    custom_out = custom_ln(x)
    official_out = official_ln(x)
    print(custom_out)
    print(official_out)

    assert_close(
        custom_out, 
        official_out, 
        rtol=1e-5,  # 相对误差容忍度
        atol=1e-5   # 绝对误差容忍度
    )
    print("测试通过：自定义LayerNorm与官方实现输出一致")

test_layer_norm()

tensor([[[-7.1567e-01, -6.9833e-01,  1.4140e+00],
         [ 1.3173e+00, -2.1323e-01, -1.1041e+00],
         [-9.2948e-04, -1.2243e+00,  1.2252e+00],
         [ 1.2962e+00, -1.1379e+00, -1.5825e-01],
         [ 9.1337e-02,  1.1765e+00, -1.2678e+00]],

        [[ 9.7590e-01,  3.9845e-01, -1.3743e+00],
         [-1.1870e+00, -7.2228e-02,  1.2593e+00],
         [ 1.3680e+00, -9.9436e-01, -3.7366e-01],
         [ 1.0581e-01, -1.2742e+00,  1.1684e+00],
         [ 9.7371e-01, -1.3751e+00,  4.0135e-01]]], grad_fn=<AddBackward0>)
tensor([[[-7.1567e-01, -6.9833e-01,  1.4140e+00],
         [ 1.3174e+00, -2.1323e-01, -1.1041e+00],
         [-9.2952e-04, -1.2243e+00,  1.2252e+00],
         [ 1.2962e+00, -1.1379e+00, -1.5825e-01],
         [ 9.1337e-02,  1.1765e+00, -1.2678e+00]],

        [[ 9.7590e-01,  3.9845e-01, -1.3743e+00],
         [-1.1870e+00, -7.2228e-02,  1.2593e+00],
         [ 1.3680e+00, -9.9436e-01, -3.7366e-01],
         [ 1.0581e-01, -1.2742e+00,  1.1684e+00],
         [ 9.7371e-0

# Define activate function

Φ(x) ≈ 0.5 * (1 + tanh(√(2/π) * (x + 0.044715 * x³)))

In [151]:
class GELU(nn.Module):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        
    def forward(self,x):
        return 0.5*x*(1+ 
                      torch.tanh(torch.sqrt(torch.tensor(2/torch.pi))
                                 *(x+0.044715*torch.pow(x,3))
                                )
                      )
        


In [152]:
def test_gelu():


    x = torch.tensor([-3.0, -1.0, 0.0, 0.5, 1.0, 2.0, 5.0])
    

    custom_gelu = GELU()
    official_gelu = nn.GELU()

    custom_out = custom_gelu(x)
    official_out = official_gelu(x)
    
    # 打印结果进行直观对比
    print("输入值:", x)
    print("自定义GELU输出:", custom_out)
    print("官方GELU输出:", official_out)
 
    assert_close(
        custom_out,
        official_out,
        rtol=1e-3,  # 相对误差容忍度
        atol=1e-3   # 绝对误差容忍度
    )
    print("\n测试通过：自定义GELU与官方实现近似一致")
    
test_gelu()

输入值: tensor([-3.0000, -1.0000,  0.0000,  0.5000,  1.0000,  2.0000,  5.0000])
自定义GELU输出: tensor([-3.6374e-03, -1.5881e-01,  0.0000e+00,  3.4571e-01,  8.4119e-01,
         1.9546e+00,  5.0000e+00])
官方GELU输出: tensor([-4.0499e-03, -1.5866e-01,  0.0000e+00,  3.4573e-01,  8.4134e-01,
         1.9545e+00,  5.0000e+00])

测试通过：自定义GELU与官方实现近似一致


# Define FFN
通过两层线性变换和激活函数，对注意力机制输出的特征进行非线性加工，增强模型表达能力。

In [153]:
class FeedForward(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        self.layers = nn.Sequential(
            #中间层hidden_dim通常设为4*emb_dim（如原始 Transformer 中为 512→2048→512），通过扩展维度捕捉更丰富的特征
            nn.Linear(cfg['emb_dim'],4*cfg['emb_dim']),  
            GELU(),
            nn.Dropout(cfg['drop_rate']),
            nn.Linear(4*cfg['emb_dim'],cfg['emb_dim'])
        )
        
    def forward(self,x):
        return self.layers(x)

# Define MultiAttention

In [154]:
class CausalAttention(nn.Module):
    def __init__(self, d_in, d_out,context_len,dropout,qkv_bias=False):
        super().__init__()
        self.d_out =d_out
        self.W_q = nn.Linear(d_in,d_out,bias= qkv_bias)
        self.W_k = nn.Linear(d_in,d_out,bias= qkv_bias)
        self.W_v = nn.Linear(d_in,d_out,bias= qkv_bias)
        self.dropout = nn.Dropout(dropout)
        #缓冲区（buffer）是模型中不需要被训练的参数（与 nn.Parameter 不同，后者是可学习参数），但会随模型一起保存（state_dict 中包含）
        self.register_buffer(
            'mask', 
            torch.triu(torch.ones(context_len,context_len),diagonal=1)
        )
    
    def forward(self,x):
        b,num_tokens,d_in = x.shape
        keys = self.W_k(x)
        queries = self.W_q(x)
        values = self.W_v(x)
        
        att_score = queries @ keys.transpose(1,2)
        att_score.masked_fill_(self.mask.bool()[:num_tokens,:num_tokens],-torch.inf) # 上面的register_buffer  形状为 (num_tokens, num_tokens) 的子矩阵
        att_weight = torch.softmax(att_score/keys.shape[-1]**0.5, dim=-1)
        att_weight = self.dropout(att_weight)
        context_vec = att_weight @ values
        return context_vec
        
class MultiHeadAttendtion(nn.Module):
    def __init__(self, d_in, d_out,context_len,dropout,num_heads,qkv_bias=False):
        super().__init__()
        # ModuleList与nn.Sequential不同，它不自动执行前向传播，而是需要手动遍历调用，适合需要单独处理每个子模块的场景
        self.heads = nn.ModuleList(
            [CausalAttention(d_in,d_out,context_len,dropout,qkv_bias) for _ in range(num_heads)]
        )
        
    def forward(self,x):
        return torch.cat([head(x) for head in self.heads],dim=-1)

In [155]:
# TODO 更高效的MutiAttention 减少计算量
#参数规模更小（d_model×d_model 对比 num_heads×d_model×head_dim
class MultiHeadAttendtion_new(nn.Module):
    def __init__(self, d_in, d_out,context_len,dropout,num_heads,qkv_bias=False):
        super().__init__()
        self.d_out =d_out
        self.num_heads = num_heads
        self.head_dim = d_out // num_heads
        self.W_q = nn.Linear(d_in,d_out,bias= qkv_bias)
        self.W_k = nn.Linear(d_in,d_out,bias= qkv_bias)
        self.W_v = nn.Linear(d_in,d_out,bias= qkv_bias)
        self.out_proj =nn.Linear(d_out,d_out) # out_proj 可以学习如何 “融合” 这些头的信息（例如对不同头的特征赋予不同权重），而不是简单保留原始拼接结果
        self.dropout = nn.Dropout(dropout)
        self.register_buffer(
            'mask', 
            torch.triu(torch.ones(context_len,context_len),diagonal=1)
        )
    
    def forward(self,x):
        b,num_tokens,d_in = x.shape
        keys = self.W_k(x)
        queries = self.W_q(x)
        values = self.W_v(x)
        
        keys = keys.view(b,num_tokens,self.num_heads,self.head_dim)
        queries = queries.view(b,num_tokens,self.num_heads,self.head_dim)
        values = values.view(b,num_tokens,self.num_heads,self.head_dim)
         
        #(b,num_tokens,num_heads,head_dim) --> (b,num_heads,num_tokens,head_dim)         
        keys = keys.transpose(1,2)
        queries = queries .transpose(1,2)
        values = values.transpose(1,2)
        
        
        att_score = queries @ keys.transpose(2,3)
        att_score.masked_fill_(self.mask.bool()[:num_tokens,:num_tokens],-torch.inf)
        att_weight = torch.softmax(att_score/keys.shape[-1]**0.5, dim=-1)
        att_weight = self.dropout(att_weight)
        context_vec = (att_weight @ values).transpose(1,2)
        context_vec = context_vec.contiguous.view(b,num_tokens,self.d_out)
        context_vec = self.out_proj(context_vec)
        return context_vec

# Define Transformer block

In [156]:
class TransformerBlock(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        self.att = MultiHeadAttendtion_new(
            d_in= cfg["emb_dim"],
            d_out= cfg['emb_dim'],
            context_len=  cfg['context_len'],
            num_heads= cfg["n_heads"],
            dropout= cfg["drop_rate"],
            qkv_bias=cfg["qkv_bias"]
        )
        self.ff =FeedForward(cfg)
        self.norm1 = LayerNorm(cfg['emb_dim']) #norm1：用于注意力模块（self.att）的输入归一化
        self.norm2 = LayerNorm(cfg['emb_dim']) #norm2：用于前馈网络（self.ff）的输入归一化
        self.dropout = nn.Dropout(cfg['drop_rate'])
        
    
    def forward(self,x):
       # 注意力分支：LayerNorm -> 注意力 -> Dropout -> 残差连接
        x = x + self.dropout(self.att(self.norm1(x))) 
        # FFN分支：LayerNorm -> FFN -> Dropout -> 残差连接
        x = x + self.dropout(self.ff(self.norm2(x)))  
        return x
        
         

# Define GPT Model

In [157]:
class GPTModel(nn.Module):
    def __init__(self,cfg):
        super().__init__()
        self.tok_emb = nn.Embedding(cfg['vocab_size'],cfg['emb_dim'])
        self.pos_emb = nn.Embedding(cfg['context_len'],cfg['emb_dim'])
        self.drop_emb = nn.Dropout(cfg["drop_rate"])
        self.trf_blocks =  nn.Sequential(
            *[TransformerBlock(cfg) for _ in range(cfg["n_layers"])]
        )
        self.final_norm = DummyLayerNorm(cfg["emb_dim"])
        self.out_head = nn.Linear(
            cfg['emb_dim'],cfg['vocab_size'],bias=False
        )
       
    def forward(self,in_idx):
        batch_size, seq_len = in_idx.shape  #in_idx 通常是一个整数张量（Tensor），形状一般为 (batch_size, seq_len)
        tok_embeds = self.tok_emb(in_idx) #(batch_size, seq_len, emb_dim)
        pos_embeds = self.pos_emb(torch.arange(seq_len,device=in_idx.device))  #生成一个从 0 到 seq_len-1 的整数序列
        x = tok_embeds + pos_embeds
        x = self.drop_emb(x)
        x = self.trf_blocks(x)
        x = self.final_norm(x)
        logits = self.out_head(x)
        return logits
        

In [158]:
model = GPTModel(GPT_CONFIG)

model

GPTModel(
  (tok_emb): Embedding(50257, 768)
  (pos_emb): Embedding(1024, 768)
  (drop_emb): Dropout(p=0.1, inplace=False)
  (trf_blocks): Sequential(
    (0): TransformerBlock(
      (att): MultiHeadAttendtion_new(
        (W_q): Linear(in_features=768, out_features=768, bias=False)
        (W_k): Linear(in_features=768, out_features=768, bias=False)
        (W_v): Linear(in_features=768, out_features=768, bias=False)
        (out_proj): Linear(in_features=768, out_features=768, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
      )
      (ff): FeedForward(
        (layers): Sequential(
          (0): Linear(in_features=768, out_features=3072, bias=True)
          (1): GELU()
          (2): Dropout(p=0.1, inplace=False)
          (3): Linear(in_features=3072, out_features=768, bias=True)
        )
      )
      (norm1): LayerNorm()
      (norm2): LayerNorm()
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (1): TransformerBlock(
      (att): MultiHeadAttendtion_ne

In [None]:
# attention_new 参数减少  (304,556,544 - 163,008,000)
total_params =sum(p.numel() for p in model.parameters())


print(f"Total number of parameters: {total_params:,}")

Total number of parameters: 163,008,000
