In [1]:
# 导入必要的库
import torch
import math
from torch import nn
from dataclasses import dataclass
from transformers import BertTokenizer
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
print("所有库导入成功！")


ModuleNotFoundError: No module named 'torch'

In [None]:
@dataclass
class ModelArgs:
    """模型配置参数"""
    n_embd: int       # 嵌入维度 (Embedding dimension)
    n_heads: int      # 多头注意力的头数 (Number of attention heads)  
    dim: int          # 模型维度 (Model dimension)
    dropout: float    # Dropout率 (Dropout rate)
    max_seq_len: int  # 最大序列长度 (Maximum sequence length)
    vocab_size: int   # 词汇表大小 (Vocabulary size)
    block_size: int   # 块大小 (Block size)
    n_layer: int      # 层数 (Number of layers)

# 示例配置
args_example = ModelArgs(
    n_embd=512,      # 嵌入维度
    n_heads=8,       # 8个注意力头
    dim=512,         # 模型维度  
    dropout=0.1,     # 10% dropout
    max_seq_len=1024, # 最大1024个token
    vocab_size=30000, # 3万词汇量
    block_size=1024,  # 块大小
    n_layer=6        # 6层
)

print(f"配置示例: {args_example}")
print(f"每个注意力头的维度: {args_example.dim // args_example.n_heads}")
print(f"模型总参数量估计: ~{(args_example.vocab_size * args_example.n_embd + args_example.n_layer * 4 * args_example.dim**2) / 1e6:.1f}M")


In [None]:
class MultiHeadAttention(nn.Module):
    """多头注意力机制 - 逐行详细解释"""
    
    def __init__(self, args: ModelArgs, is_causal=False):
        super().__init__()
        
        # 第17行：检查维度能否被头数整除
        assert args.dim % args.n_heads == 0, f"dim({args.dim})必须能被n_heads({args.n_heads})整除"
        
        # 第19-20行：模型并行相关（这里简化为1）
        model_parallel_size = 1
        self.n_local_heads = args.n_heads // model_parallel_size
        
        # 第22行：计算每个头的维度
        self.head_dim = args.dim // args.n_heads
        
        print(f"多头注意力初始化:")
        print(f"  总维度: {args.dim}")
        print(f"  头数: {self.n_local_heads}")  
        print(f"  每个头维度: {self.head_dim}")
        
        # 第29-31行：Q、K、V的权重矩阵
        # 注意：这里用一个大矩阵代替多个小矩阵，提高效率
        self.wq = nn.Linear(args.n_embd, args.n_heads * self.head_dim, bias=False)
        self.wk = nn.Linear(args.n_embd, args.n_heads * self.head_dim, bias=False) 
        self.wv = nn.Linear(args.n_embd, args.n_heads * self.head_dim, bias=False)
        
        # 第33行：输出投影矩阵W^O
        self.wo = nn.Linear(args.n_heads * self.head_dim, args.dim, bias=False)
        
        # 第35-37行：Dropout层
        self.attn_dropout = nn.Dropout(args.dropout)
        self.resid_dropout = nn.Dropout(args.dropout)
        self.is_causal = is_causal
        
        # 第39-46行：创建因果掩码（用于解码器）
        if is_causal:
            # 创建上三角掩码，防止看到"未来"信息
            mask = torch.full((1, 1, args.max_seq_len, args.max_seq_len), float("-inf"))
            mask = torch.triu(mask, diagonal=1)  # 上三角矩阵
            self.register_buffer("mask", mask)
            print(f"  创建因果掩码: {mask.shape}")
    
    def forward(self, q: torch.Tensor, k: torch.Tensor, v: torch.Tensor):
        """前向传播 - 逐步解释注意力计算过程"""
        
        # 第51行：获取批次大小和序列长度
        bsz, seqlen, _ = q.shape
        print(f"\n=== 多头注意力前向传播 ===")
        print(f"输入形状: batch={bsz}, seq_len={seqlen}, dim={q.shape[2]}")
        
        # 第54行：通过线性层计算Q、K、V
        # [B, T, n_embd] -> [B, T, n_heads * head_dim]
        xq, xk, xv = self.wq(q), self.wk(k), self.wv(v)
        print(f"QKV线性变换后: {xq.shape}")
        
        # 第59-62行：重塑为多头格式
        # [B, T, n_heads * head_dim] -> [B, T, n_heads, head_dim]  
        xq = xq.view(bsz, seqlen, self.n_local_heads, self.head_dim)
        xk = xk.view(bsz, seqlen, self.n_local_heads, self.head_dim)
        xv = xv.view(bsz, seqlen, self.n_local_heads, self.head_dim)
        
        # 第63-65行：转置到注意力计算格式
        # [B, T, n_heads, head_dim] -> [B, n_heads, T, head_dim]
        xq = xq.transpose(1, 2)
        xk = xk.transpose(1, 2) 
        xv = xv.transpose(1, 2)
        print(f"多头重塑后: {xq.shape}")
        
        # 第68行：计算注意力分数 QK^T/√d_k
        # [B, n_heads, T, head_dim] × [B, n_heads, head_dim, T] -> [B, n_heads, T, T]
        scores = torch.matmul(xq, xk.transpose(2, 3)) / math.sqrt(self.head_dim)
        print(f"注意力分数: {scores.shape}, 缩放因子: {math.sqrt(self.head_dim):.2f}")
        
        # 第70-73行：应用因果掩码（如果需要）
        if self.is_causal:
            assert hasattr(self, 'mask')
            scores = scores + self.mask[:, :, :seqlen, :seqlen]
            print(f"应用掩码后分数范围: [{scores.min():.2f}, {scores.max():.2f}]")
        
        # 第75-77行：Softmax + Dropout
        scores = F.softmax(scores.float(), dim=-1).type_as(xq)
        scores = self.attn_dropout(scores)
        
        # 第79行：注意力加权求和
        # [B, n_heads, T, T] × [B, n_heads, T, head_dim] -> [B, n_heads, T, head_dim]
        output = torch.matmul(scores, xv)
        print(f"注意力输出: {output.shape}")
        
        # 第84行：合并多头
        # [B, n_heads, T, head_dim] -> [B, T, n_heads, head_dim] -> [B, T, n_heads*head_dim]
        output = output.transpose(1, 2).contiguous().view(bsz, seqlen, -1)
        
        # 第87-88行：最终投影和dropout
        output = self.wo(output)
        output = self.resid_dropout(output)
        
        print(f"最终输出: {output.shape}")
        return output


In [None]:
# 测试多头注意力机制
print("=== 测试多头注意力机制 ===")

# 创建小型配置用于测试
args_small = ModelArgs(
    n_embd=64,      # 嵌入维度
    n_heads=4,      # 4个注意力头
    dim=64,         # 模型维度
    dropout=0.1,    # dropout率
    max_seq_len=128, # 最大序列长度
    vocab_size=1000, # 词汇表大小
    block_size=128,  # 块大小
    n_layer=2       # 层数
)

# 创建多头注意力层（编码器版本，无掩码）
print("\n1. 创建编码器版本的多头注意力:")
mha_encoder = MultiHeadAttention(args_small, is_causal=False)

# 创建测试输入
batch_size, seq_len = 2, 8
x = torch.randn(batch_size, seq_len, args_small.n_embd)
print(f"\n2. 创建测试输入: {x.shape}")

# 前向传播（自注意力：Q=K=V）
print(f"\n3. 执行自注意力计算...")
with torch.no_grad():
    output_encoder = mha_encoder(x, x, x)

print(f"\n编码器自注意力完成！")
print(f"输入: {x.shape} -> 输出: {output_encoder.shape}")

# 创建解码器版本（带掩码）
print(f"\n4. 创建解码器版本的多头注意力:")
mha_decoder = MultiHeadAttention(args_small, is_causal=True)

# 测试解码器版本
print(f"\n5. 执行掩码自注意力计算...")
with torch.no_grad():
    output_decoder = mha_decoder(x, x, x)

print(f"\n解码器掩码自注意力完成！")
print(f"输入: {x.shape} -> 输出: {output_decoder.shape}")

print(f"\n=== 多头注意力测试完成 ===")


In [None]:
# 完整的Transformer实现 - 从原始代码逐行解释

# 先定义剩余的核心组件

class LayerNorm(nn.Module):
    '''Layer Norm 层 - 第95-110行详解'''
    def __init__(self, features, eps=1e-6):
        super(LayerNorm, self).__init__()
        # 线性变换参数 - 可学习的缩放和偏移
        self.a_2 = nn.Parameter(torch.ones(features))    # γ (gamma) 缩放参数
        self.b_2 = nn.Parameter(torch.zeros(features))   # β (beta) 偏移参数
        self.eps = eps  # 防止除零的小常数
        
    def forward(self, x):
        # 在最后一个维度计算均值和标准差
        mean = x.mean(-1, keepdim=True)  # 均值：[batch, seq_len, 1]
        std = x.std(-1, keepdim=True)    # 标准差：[batch, seq_len, 1]
        # LayerNorm公式：γ * (x-μ)/(σ+ε) + β
        return self.a_2 * (x - mean) / (std + self.eps) + self.b_2


class MLP(nn.Module):
    '''前馈神经网络 - 第112-128行详解'''
    def __init__(self, dim: int, hidden_dim: int, dropout: float):
        super().__init__()
        # 第一层：维度扩展 (通常 hidden_dim = 4 * dim)
        self.w1 = nn.Linear(dim, hidden_dim, bias=False)
        # 第二层：恢复原始维度
        self.w2 = nn.Linear(hidden_dim, dim, bias=False)
        # Dropout防止过拟合
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # FFN公式：dropout(W2 * ReLU(W1 * x))
        return self.dropout(self.w2(F.relu(self.w1(x))))


class PositionalEncoding(nn.Module):
    '''位置编码 - 第246-278行详解'''
    def __init__(self, args):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=args.dropout)

        # 创建位置编码矩阵 [block_size, n_embd]
        pe = torch.zeros(args.block_size, args.n_embd)
        position = torch.arange(0, args.block_size).unsqueeze(1)  # [block_size, 1]
        
        # 计算频率项：10000^(-2i/d_model)
        div_term = torch.exp(
            torch.arange(0, args.n_embd, 2) * -(math.log(10000.0) / args.n_embd)
        )
        
        # 位置编码公式：
        pe[:, 0::2] = torch.sin(position * div_term)  # 偶数位置用sin
        pe[:, 1::2] = torch.cos(position * div_term)  # 奇数位置用cos
        
        pe = pe.unsqueeze(0)  # 添加batch维度：[1, block_size, n_embd]
        self.register_buffer("pe", pe)  # 注册为缓冲区（不更新梯度）

    def forward(self, x):
        # 将位置编码加到token嵌入上
        x = x + self.pe[:, : x.size(1)].requires_grad_(False)
        return self.dropout(x)

print("核心组件定义完成！")


In [None]:
# 编码器和解码器实现

class EncoderLayer(nn.Module):
    """编码器层 - 第131-147行详解"""
    def __init__(self, args):
        super().__init__()
        # Pre-LayerNorm架构：先归一化再计算
        self.attention_norm = LayerNorm(args.n_embd)
        # 编码器使用非因果（双向）注意力
        self.attention = MultiHeadAttention(args, is_causal=False)
        self.fnn_norm = LayerNorm(args.n_embd)
        # FFN隐藏层维度通常是输入维度的4倍
        self.feed_forward = MLP(args.dim, args.dim * 4, args.dropout)

    def forward(self, x):
        # 第一个子层：多头自注意力 + 残差连接
        # Pre-Norm：先LayerNorm再注意力
        norm_x = self.attention_norm(x)
        h = x + self.attention.forward(norm_x, norm_x, norm_x)  # 残差连接
        
        # 第二个子层：前馈网络 + 残差连接  
        norm_h = self.fnn_norm(h)
        out = h + self.feed_forward.forward(norm_h)  # 残差连接
        return out


class Encoder(nn.Module):
    '''编码器 - 第149-160行详解'''
    def __init__(self, args):
        super(Encoder, self).__init__()
        # 堆叠N个编码器层
        self.layers = nn.ModuleList([EncoderLayer(args) for _ in range(args.n_layer)])
        # 最终的层归一化
        self.norm = LayerNorm(args.n_embd)

    def forward(self, x):
        # 逐层通过编码器
        for layer in self.layers:
            x = layer(x)
        return self.norm(x)  # 最终归一化


class DecoderLayer(nn.Module):
    '''解码器层 - 第162-186行详解'''
    def __init__(self, args):
        super().__init__()
        # 第一个注意力：掩码自注意力
        self.attention_norm_1 = LayerNorm(args.n_embd)
        self.mask_attention = MultiHeadAttention(args, is_causal=True)
        
        # 第二个注意力：编码器-解码器交叉注意力
        self.attention_norm_2 = LayerNorm(args.n_embd)
        self.attention = MultiHeadAttention(args, is_causal=False)
        
        # 前馈网络
        self.ffn_norm = LayerNorm(args.n_embd)
        self.feed_forward = MLP(args.dim, args.dim * 4, args.dropout)

    def forward(self, x, enc_out):
        # 子层1：掩码自注意力 + 残差连接
        norm_x1 = self.attention_norm_1(x)
        x = x + self.mask_attention.forward(norm_x1, norm_x1, norm_x1)
        
        # 子层2：编码器-解码器注意力 + 残差连接
        # Q来自解码器，K和V来自编码器
        norm_x2 = self.attention_norm_2(x)
        h = x + self.attention.forward(norm_x2, enc_out, enc_out)
        
        # 子层3：前馈网络 + 残差连接
        norm_h = self.ffn_norm(h)
        out = h + self.feed_forward.forward(norm_h)
        return out


class Decoder(nn.Module):
    '''解码器 - 第188-199行详解'''
    def __init__(self, args):
        super(Decoder, self).__init__()
        # 堆叠N个解码器层
        self.layers = nn.ModuleList([DecoderLayer(args) for _ in range(args.n_layer)])
        # 最终的层归一化
        self.norm = LayerNorm(args.n_embd)

    def forward(self, x, enc_out):
        # 逐层通过解码器，每层都需要编码器输出
        for layer in self.layers:
            x = layer(x, enc_out)
        return self.norm(x)

print("编码器和解码器定义完成！")


In [None]:
# 完整的Transformer模型

class Transformer(nn.Module):
    '''完整的Transformer模型 - 第280-349行详解'''

    def __init__(self, args):
        super().__init__()
        # 验证必要参数
        assert args.vocab_size is not None, "必须指定vocab_size"
        assert args.block_size is not None, "必须指定block_size"
        
        self.args = args
        
        # 主要组件 - 使用ModuleDict管理
        self.transformer = nn.ModuleDict(dict(
            wte=nn.Embedding(args.vocab_size, args.n_embd),  # Token嵌入
            wpe=PositionalEncoding(args),                     # 位置编码
            drop=nn.Dropout(args.dropout),                    # Dropout
            encoder=Encoder(args),                            # 编码器
            decoder=Decoder(args),                            # 解码器
        ))
        
        # 语言建模头：将隐藏状态映射到词汇表
        self.lm_head = nn.Linear(args.n_embd, args.vocab_size, bias=False)
        
        # 权重初始化
        self.apply(self._init_weights)
        
        # 打印参数统计
        print(f"模型参数数量: {self.get_num_params()/1e6:.2f}M")

    def get_num_params(self, non_embedding=False):
        """统计参数数量"""
        n_params = sum(p.numel() for p in self.parameters())
        if non_embedding:
            n_params -= self.transformer.wte.weight.numel()
        return n_params

    def _init_weights(self, module):
        """权重初始化 - 使用正态分布"""
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

    def forward(self, idx, targets=None):
        """前向传播 - 第326-349行详解
        
        Args:
            idx: 输入token索引 [batch_size, seq_len]
            targets: 目标序列（训练时使用） [batch_size, seq_len]
        
        Returns:
            logits: 输出概率分布
            loss: 损失值（如果提供targets）
        """
        device = idx.device
        b, t = idx.size()
        
        # 检查序列长度限制
        assert t <= self.args.block_size, f"序列长度{t}超过最大长度{self.args.block_size}"

        print(f"\\n=== Transformer前向传播 ===")
        print(f"输入idx形状: {idx.shape}")

        # 1. Token嵌入：将token索引转换为向量
        tok_emb = self.transformer.wte(idx)  # [B, T, n_embd]
        print(f"Token嵌入后: {tok_emb.shape}")

        # 2. 位置编码：添加位置信息
        pos_emb = self.transformer.wpe(tok_emb)
        print(f"位置编码后: {pos_emb.shape}")

        # 3. Dropout
        x = self.transformer.drop(pos_emb)
        print(f"Dropout后: {x.shape}")

        # 4. 编码器：理解输入序列
        enc_out = self.transformer.encoder(x)
        print(f"编码器输出: {enc_out.shape}")

        # 5. 解码器：生成输出序列
        x = self.transformer.decoder(x, enc_out)
        print(f"解码器输出: {x.shape}")

        if targets is not None:
            # 训练模式：计算所有位置的损失
            logits = self.lm_head(x)  # [B, T, vocab_size]
            loss = F.cross_entropy(
                logits.view(-1, logits.size(-1)), 
                targets.view(-1), 
                ignore_index=-1
            )
            print(f"训练模式 - logits: {logits.shape}, loss: {loss.item():.4f}")
        else:
            # 推理模式：只计算最后一个位置
            logits = self.lm_head(x[:, [-1], :])  # [B, 1, vocab_size]
            loss = None
            print(f"推理模式 - logits: {logits.shape}")

        return logits, loss

print("完整Transformer模型定义完成！")


In [None]:
# 完整示例运行
def main_demo():
    """主函数演示 - 原代码第351-375行详解"""
    print("="*50)
    print("   Transformer模型完整演示")
    print("="*50)
    
    # 第352行：配置模型参数
    args = ModelArgs(
        n_embd=128,       # 嵌入维度（简化版，原为100）
        n_heads=8,        # 注意力头数（原为10）
        dim=128,          # 模型维度（原为100）
        dropout=0.1,      # Dropout率
        max_seq_len=512,  # 最大序列长度
        vocab_size=21128, # 词汇表大小（BERT中文）
        block_size=512,   # 块大小
        n_layer=3         # 层数（简化版，原为2）
    )
    print(f"模型配置: {args}")
    
    # 第353行：测试文本
    text = "我喜欢快乐地学习大模型"
    print(f"\\n输入文本: '{text}'")
    
    # 第354-361行：分词处理
    try:
        # 尝试加载BERT分词器
        tokenizer = BertTokenizer.from_pretrained('bert-base-chinese')
        inputs_token = tokenizer(
            text,
            return_tensors='pt',
            max_length=args.max_seq_len,
            truncation=True,
            padding='max_length'
        )
        inputs_id = inputs_token['input_ids']
        args.vocab_size = tokenizer.vocab_size
        
        print(f"分词成功!")
        print(f"  Token IDs (前20个): {inputs_id[0][:20].tolist()}")
        print(f"  实际序列长度: {(inputs_id[0] != 0).sum().item()}")
        print(f"  词汇表大小: {args.vocab_size}")
        
    except Exception as e:
        print(f"无法加载BERT分词器: {e}")
        print("使用模拟数据...")
        # 使用随机数据
        inputs_id = torch.randint(1, 1000, (1, 20))  # 避免使用0（padding）
        tokenizer = None
        print(f"  模拟Token IDs: {inputs_id[0].tolist()}")
    
    # 第363行：创建Transformer模型
    print(f"\\n创建Transformer模型...")
    transformer = Transformer(args)
    
    # 第364-365行：推理模式
    print(f"\\n" + "="*30 + " 推理模式 " + "="*30)
    with torch.no_grad():
        logits, loss = transformer.forward(inputs_id)
    
    # 第367-375行：结果分析
    print(f"\\n推理结果分析:")
    print(f"  Logits形状: {logits.shape}")
    print(f"  Logits数值范围: [{logits.min():.3f}, {logits.max():.3f}]")
    
    # 获取概率最高的token
    predicted_ids = torch.argmax(logits, dim=-1)
    predicted_token_id = predicted_ids[0, 0].item()
    print(f"  预测的下一个Token ID: {predicted_token_id}")
    
    if tokenizer is not None:
        try:
            predicted_token = tokenizer.decode([predicted_token_id])
            print(f"  预测的下一个Token: '{predicted_token}'")
        except:
            print(f"  无法解码Token ID: {predicted_token_id}")
    
    # 训练模式演示
    print(f"\\n" + "="*30 + " 训练模式 " + "="*30)
    # 创建目标序列（向右偏移一位）
    targets = torch.roll(inputs_id, shifts=-1, dims=1)
    targets[:, -1] = -1  # 最后一位设为ignore_index
    
    print(f"创建训练目标:")
    print(f"  输入序列: {inputs_id[0][:10].tolist()}...")  
    print(f"  目标序列: {targets[0][:10].tolist()}...")
    
    with torch.no_grad():
        logits_train, loss_train = transformer.forward(inputs_id, targets)
    
    print(f"\\n训练结果:")
    print(f"  训练Logits形状: {logits_train.shape}")
    print(f"  交叉熵损失: {loss_train:.4f}")
    print(f"  困惑度: {torch.exp(loss_train):.2f}")
    
    print(f"\\n" + "="*50)
    print("   演示完成！")
    print("="*50)
    
    return transformer, args, tokenizer

# 运行完整演示
if __name__ == "__main__":
    model, config, tokenizer = main_demo()
