# 动⼿实现⼀个 LLaMA2 ⼤模型

## 定义超参数

In [1]:
# 在Python终端/Jupyter中执行以下代码，查看Python位数
import platform
import sys
print(f"Python位数：{platform.architecture()[0]}")  # 必须输出64bit
print(f"Python路径：{sys.executable}")  # 确认是d:\python\py310\python.exe

Python位数：64bit
Python路径：D:\python\py310\python.exe


In [2]:
import sys
# 输出当前内核使用的Python可执行文件路径
print("当前Jupyter内核的Python路径：", sys.executable)
# 输出当前内核的site-packages路径（模块安装目录）
print("当前内核的site-packages路径：", sys.path)

当前Jupyter内核的Python路径： D:\python\py310\python.exe
当前内核的site-packages路径： ['D:\\python\\py310\\python310.zip', 'D:\\python\\py310\\DLLs', 'D:\\python\\py310\\lib', 'D:\\python\\py310', '', 'C:\\Users\\xuexu\\AppData\\Roaming\\Python\\Python310\\site-packages', 'D:\\python\\py310\\lib\\site-packages', 'D:\\python\\py310\\lib\\site-packages\\win32', 'D:\\python\\py310\\lib\\site-packages\\win32\\lib', 'D:\\python\\py310\\lib\\site-packages\\Pythonwin']


In [3]:
#pip install torch==2.2.0+cpu torchvision==0.17.0+cpu --index-url https://download.pytorch.org/whl/cpu

In [4]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Tuple
from transformers import PretrainedConfig, PreTrainedModel
from typing import Optional, Tuple, Iterable, Any
import math
from transformers.modeling_outputs import CausalLMOutputWithPast
from unittest.mock import MagicMock

  from .autonotebook import tqdm as notebook_tqdm


In [5]:
from transformers import PretrainedConfig

class ModelConfig(PretrainedConfig):
    model_type = "Tiny-K"
    def __init__(
        self,
        dim: int = 768,#模型维度
        n_layers: int = 12, #Transformer的层数
        n_heads: int = 16, #注意力机制的头数
        n_kv_heads: int = 8,#键/值 头的数量
        vocab_size: int = 6144,#词汇表大小
        hidden_dim: int = None,#隐藏层维度;每个注意力头的特征维度
        multiple_of: int = 64,#调整模型中间层维度的参数，核心作用是确保某些层的维度（通常是前馈网络的隐藏层维度）是 multiple_of 的整数倍
        norm_eps:float = 1e-5,#归一化层的eps
        max_seq_len: int =512,#输入序列的最大长度
        dropout: float =0.0,#dropout概率
        flash_attn:bool =True,#是否使用Flash attention
        **kwargs,
    
    ):
        self.dim = dim
        self.n_layers =n_layers
        self.n_heads = n_heads
        self.n_kv_heads = n_kv_heads
        self.vocab_size = vocab_size
        self.hidden_dim = hidden_dim
        self.multiple_of = multiple_of
        self.norm_eps = norm_eps
        self.max_seq_len = max_seq_len
        self.dropout = dropout
        self.flash_attn = flash_attn
        super().__init__(**kwargs)


In [6]:
args = ModelConfig()

In [7]:
args

ModelConfig {
  "dim": 768,
  "dropout": 0.0,
  "flash_attn": true,
  "hidden_dim": null,
  "max_seq_len": 512,
  "model_type": "Tiny-K",
  "multiple_of": 64,
  "n_heads": 16,
  "n_kv_heads": 8,
  "n_layers": 12,
  "norm_eps": 1e-05,
  "transformers_version": "4.57.3",
  "vocab_size": 6144
}

##  构建 RMSNorm  归一化

In [8]:
###公式：output = (x / sqrt(mean(x²) + eps)) * weight

In [9]:
class RMSNorm(nn.Module):
    def __init__(self, dim:int ,eps:float):
        super().__init__()
        self.eps = eps #防止归一化时候分母为0
        self.weight = nn.Parameter(torch.ones(dim))#缩放参数，可学习参数、初始化为1
        
    def _norm(self,x):
        return x*torch.rsqrt(x.pow(2).mean(-1,keepdim = True) + self.eps)
        
    def forward(self,x):#前向传播
        output = self._norm(x.float()).type_as(x)#首先将输入x转为float类型，然后进行RMSNorm，最后再转回原来的数据类型
        return output * self.weight

In [10]:
###测试RMSNorm
norm = RMSNorm(args.dim, args.norm_eps)
x = torch.randn(1, 50, args.dim)
output = norm(x)
print(output.shape)

#out: torch.Size([1, 50, 768])


torch.Size([1, 50, 768])


## 构建 LLaMA2 Attention

In [11]:
#将键和值的维度扩展到和查询的维度⼀样
def repeat_kv(x:torch.Tensor , n_rep:int ) -> torch.Tensor:#函数返回值类型注解（也叫类型提示 / Type Hint），->是用来标记函数返回值的类型信息的符号，目的是告诉开发者（和 IDE、静态检查工具）这个函数执行后会返回什么类型的数据。
    bs,slen,n_kv_heads,head_dim = x.shape   # 获取输⼊张量的形状：批量⼤⼩、序列⻓度、键/值对头的数量、每个头的维度⼤⼩
    if n_rep == 1: # 如果重复次数为1，则不需要重复，直接返回原始张量；每个 K/V 头需要复制的次数（比如 8 个 Q 头 ÷ 2 个 K/V 头 = 4 次，即n_rep=4）
        return x 
    return (
        x[:,:,:,None,:]#在第三个维度（即键/值对头的维度）之后添加⼀个新的维度，形成 x[:, :, :, None, :]
        .expand(bs,slen,n_kv_heads,n_rep,head_dim)#使⽤ expand ⽅法将新添加的维度扩展到 n_rep ⼤⼩，实现键/值对的重复效果
        .reshape(bs,slen,n_kv_heads*n_rep,head_dim)#通过 reshape ⽅法重新塑形，将扩展后的维度合并回键/值对头的数量中，即 x.reshape(bs, slen, n_kv_heads * n_rep, head_dim) ，这样最终的张量形状就达到了与查询维度⼀致的效果。
    )


In [12]:
# 旋转嵌⼊---为注意⼒机制提供更强的上下⽂信息
# 注意：此处的dim应为 dim//n_head，因为我们是对每个head进⾏旋转嵌⼊
def precompute_freqs_cis(dim:int ,end:int ,theta:float=10000.0):
    freqs = 1.0/(theta**( torch.arange(0,dim,2)[:(dim//2)].float()/dim)) #频率值
    t = torch.arange(end,device = freqs.device)#创建的序列张量[0,1,2,...,end-1]，要和freqs张量放在同一个硬件设备上
    freqs = torch.outer(t,freqs).float()#位置 × 频率的角度矩阵,;计算外积，得到⼀个⼆维矩阵，每⼀⾏是t的元素乘以freqs的元素
    #每个值对应该位置、该维度对的 cos/sin 值
    freqs_cos = torch.cos(freqs)#旋转嵌入的实部
    freqs_sin = torch.sin(freqs)#旋转嵌入的虚部
    
    return freqs_cos,freqs_sin


In [13]:
#调整张量形状---是调整 freqs_cis 的形状，使其在进⾏⼴播操作时与 x 的维度对⻬，从⽽能够进⾏正确的张量运算
# freqs_cis[序列长度--x个 token，每个注意力头的维度]
#x[批量大小，序列长度，注意力头数---ndim，头维度]
def reshape_for_broadcast(freqs_cis:torch.Tensor,x:torch.Tensor):
    ndim = x.ndim# 获取x的维度数----注意力头数
    assert 0<=1<=ndim# 断⾔，确保 x 有第二维
    assert freqs_cis.shape == (x.shape[1],x.shape[-1])# 断⾔，确保freqs_cis的形状与x的第⼆维（序列长度）和最后⼀维（每个注意力头的维度）相同
    shape = [d if i==1 or i==ndim-1 else 1 for i ,d in enumerate(x.shape)]# 构造⼀个新的形状，除了第⼆维（序列长度）和最后⼀维（每个注意力头的维度），其他维度都为1，这样做是为了能够将freqs_cis与x进⾏⼴播操作
    return freqs_cis.view(shape)# 将freqs_cis调整为新的形状，并返回
    

In [14]:
###旋转嵌入
def apply_rotary_emb(
    xq:torch.Tensor,
    xk:torch.Tensor,
    freqs_cos:torch.Tensor,
    freqs_sin:torch.Tensor
)->Tuple[torch.Tensor,torch.Tensor]:
    # 将查询和键张量转换为浮点数，并重塑形状以分离实部和虚部
    xq_r,xq_i = xq.float().reshape(xq.shape[:-1]+(-1,2)).unbind(-1)
    xk_r,xk_i = xk.float().reshape(xk.shape[:-1]+(-1,2)).unbind(-1)
    # 重新塑形频率张量以进⾏⼴播
    freqs_cos = reshape_for_broadcast(freqs_cos,xq_r)
    freqs_sin = reshape_for_broadcast(freqs_sin,xq_r)
    # 应⽤旋转，分别计算旋转后的实部和虚部
    # 旋转公式：z' = z * (cosθ + i sinθ) → 实部= r*cos - i*sin，虚部= r*sin + i*cos
    xq_out_r = xq_r * freqs_cos - xq_i * freqs_sin
    xq_out_i = xq_r * freqs_sin + xq_i * freqs_cos
    xk_out_r = xk_r * freqs_cos - xk_i * freqs_sin
    xk_out_i = xk_r * freqs_sin + xk_i * freqs_cos
    # 将最后两个维度合并，并还原为原始张量的形状
    xq_out = torch.stack([xq_out_r,xq_out_i],dim = -1).flatten(3)
    xk_out = torch.stack([xk_out_r,xk_out_i],dim = -1).flatten(3)
    # 还原为原始张量的数据类型（比如从float32转回float16）
    return xq_out.type_as(xq),xk_out.type_as(xk)

In [15]:
xq = torch.randn(1, 50, 6, 48) # bs, seq_len, dim//n_head, n_head_dim
xk = torch.randn(1, 50, 6, 48) # bs, seq_len, dim//n_head, n_head_dim

# 使用 precompute_freqs_cis 函数获取 sin和cos
cos, sin = precompute_freqs_cis(288//6, 50)
print(cos.shape, sin.shape)
xq_out, xk_out = apply_rotary_emb(xq, xk, cos, sin)

xq_out.shape, xk_out.shape


torch.Size([50, 24]) torch.Size([50, 24])


(torch.Size([1, 50, 6, 48]), torch.Size([1, 50, 6, 48]))

In [16]:
#组装LLaMA2 Attention
class Attention(nn.Module):
    def __init__(self,args:ModelConfig):
        super().__init__()
        # 根据是否指定n_kv_heads，确定⽤于键（key）和值（value）的头的数量。
        self.n_kv_heads = args.n_heads if args.n_kv_heads is None else args.n_kv_heads
        # 确保总头数可以被键值头数整除
        assert args.n_heads % self.n_kv_heads == 0
        # 模型并⾏处理⼤⼩，默认为1。
        model_parallel_size = 1
        # 本地计算头数，等于总头数除以模型并⾏处理⼤⼩。
        self.n_local_heads = args.n_heads // model_parallel_size
        # 本地键值头数，等于键值头数除以模型并⾏处理⼤⼩。
        self.n_local_kv_heads = args.n_kv_heads // model_parallel_size
        # 重复次数，⽤于扩展键和值的尺⼨。
        self.n_rep = self.n_local_heads // self.n_local_kv_heads
         # 每个头的维度，等于模型维度除以头的总数。
        self.head_dim = args.dim // args.n_heads
        
         # 定义权重矩阵。
        self.wq = nn.Linear(args.dim,args.n_heads * self.head_dim ,bias =False)
        self.wk = nn.Linear(args.dim,self.n_kv_heads * self.head_dim,bias = False)
        self.wv = nn.Linear(args.dim,self.n_kv_heads * self.head_dim,bias = False)
         # 输出权重矩阵。
        self.wo = nn.Linear(args.n_heads * self.head_dim ,args.dim ,bias = False)
        # 定义dropout。
        self.attn_dropout = nn.Dropout(args.dropout)
        self.resid_dropout = nn.Dropout(args.dropout)
        # 保存dropout概率。
        self.dropout =  args.dropout
         # 检查是否使⽤Flash Attention（需要PyTorch >= 2.0）。
        self.flash = hasattr(torch.nn.functional,'scaled_dot_product_attention')
        if not self.flash : # 若不⽀持Flash Attention，则使⽤⼿动实现的注意⼒机制，并设置mask。
            print("WARNING : using slow attention. Flash Attention requires PyTorch >= 2.0 ")
             # 创建⼀个上三⻆矩阵，⽤于遮蔽未来信息。
            mask = torch.full((1,1,args.max_seq_len,args.max_seq_len),float("-inf"))
            mask = torch.triu(mask,diagonal=1)
            # 注册为模型的缓冲区
            self.register_buffer("mask",mask)
    def forward(self, x:torch.Tensor,freqs_cos:torch.Tensor,freqs_sin:torch):
          # 获取批次⼤⼩和序列⻓度，[batch_size, seq_len, dim]
        bsz,seqlen,_ = x.shape
         # 计算查询（Q）、键（K）、值（V）
        xq,xk,xv = self.wq(x),self.wk(x),self.wv(x)
         # 调整形状以适应头的维度。
        xq = xq.view(bsz,seqlen,self.n_local_heads,self.head_dim)
        xk = xk.view(bsz,seqlen,self.n_local_kv_heads,self.head_dim)
        xv = xv.view(bsz,seqlen,self.n_local_kv_heads,self.head_dim)
        # 应⽤旋转位置嵌⼊（RoPE）
        xq,xk = apply_rotary_emb(xq,xk,freqs_cos,freqs_sin)
         # 对键和值进⾏扩展以适应重复次数。
        xk = repeat_kv(xk,self.n_rep)
        xv = repeat_kv(xv,self.n_rep)
         # 将头作为批次维度处理，(bsz批次大小, seqlen序列长度, n_local_heads本地注意力头数, head_dim每个注意力头的维度)--->(bsz, n_local_heads, seqlen, head_dim)
        xq = xq.transpose(1,2)
        xk = xk.transpose(1,2)
        xv = xv.transpose(1,2)
         # 根据是否⽀持Flash Attention，选择实现⽅式
        if self.flash:
            # 使⽤Flash Attention
            output = torch.nn.functional.scaled_dot_product_attention(xq,xk,xv,attn_mask=None,
                    dropout_p=self.dropout if self.training else 0.0,is_causal = True)
        else: # 使⽤⼿动实现的注意⼒机制。
            scores = troch.matmul(xq,xk.transpose(2,3))/math.sqrt(self.head_dim)
            assert hasattr(self,'mask')
            scores = scores + self.mask[:,:,:seqlen,:seqlen]
            scores = F.softmax(scores,float(),dim=-1).type_as(xq)
            scores = self.attn_dropout(scores)
            output = torch.matmul(scores,xv)
        # 恢复时间维度并合并头
        output = output.transpose(1,2).contiguous().view(bsz,seqlen,-1)
         # 最终投影回残差流
        output = self.wo(output)
        output = self.resid_dropout(output)
        return output

In [17]:
# 创建Attention实例
attention_model = Attention(args)

# 模拟输入数据
batch_size = 1
seq_len = 50  # 假设实际使用的序列长度为50
dim = args.dim
x = torch.rand(batch_size, seq_len, dim)  # 随机生成输入张量
# freqs_cos = torch.rand(seq_len, dim // 2)  # 模拟cos频率，用于RoPE
# freqs_sin = torch.rand(seq_len, dim // 2)  # 模拟sin频率，用于RoPE

freqs_cos, freqs_sin = precompute_freqs_cis(dim//args.n_heads, seq_len)

# 运行Attention模型
output = attention_model(x, freqs_cos, freqs_sin)

# attention出来之后的形状 依然是[batch_size, seq_len, dim]
print("Output shape:", output.shape)


Output shape: torch.Size([1, 50, 768])


##  构建LLaMA2 MLP（Multi-Layer Perceptron）模块----线性变换层和一个激活函数

In [18]:
class MLP(nn.Module):
    def __init__(self , dim:int , hidden_dim:int , multiple_of:int , dropout:float):
        super().__init__()
        if hidden_dim is None:
            # 如果没有指定隐藏层的维度，我们将其设置为输⼊维度的4倍
            hidden_dim = 4*dim
             # 然后将其减少到2/3
            hidden_dim = int(2*hidden_dim/3)
            #确保它是multiple_of的倍数
            hidden_dim = multiple_of * ((hidden_dim + multiple_of - 1)//multiple_of)
        # 定义第⼀层线性变换，从输⼊维度到隐藏维度
        self.w1 = nn.Linear(dim,hidden_dim,bias=False)
        # 定义第⼆层线性变换，从隐藏维度到输⼊维度
        self.w2 = nn.Linear(hidden_dim,dim,bias=False)
        # 定义第三层线性变换，从输⼊维度到隐藏维度
        self.w3 = nn.Linear(dim,hidden_dim,bias=False)
         # 定义dropout层，⽤于防⽌过拟合
        self.dropout = nn.Dropout(dropout)
        # F.silu 是激活函数，公式为silu(x) = x * sigmoid(x)；self.dropout (...)：随机丢弃，正则化，维度不变
    def forward(self,x):
         # 前向传播函数
        # ⾸先，输⼊x通过第⼀层线性变换和SILU激活函数
        # 然后，结果乘以输⼊x通过第三层线性变换的结果
        # 最后，通过第⼆层线性变换和dropout层
        return self.dropout(      self.w2(     F.silu(self.w1(x)) * self.w3(x)    )     )

In [19]:
#创建实例mlp
mlp = MLP(args.dim,args.hidden_dim,args.multiple_of,args.dropout)
x = torch.randn(1,50,args.dim)
output = mlp(x)
output,output.shape

(tensor([[[ 0.1903, -0.0837,  0.0198,  ..., -0.0112, -0.1498,  0.0918],
          [-0.0385, -0.0892, -0.1282,  ...,  0.1547, -0.1080,  0.0659],
          [-0.0009, -0.1448, -0.1081,  ..., -0.0095, -0.0075,  0.2419],
          ...,
          [ 0.0301,  0.0385,  0.0852,  ..., -0.1626,  0.2265,  0.0170],
          [-0.1769,  0.0666, -0.0467,  ...,  0.0284,  0.0935,  0.0332],
          [-0.0431,  0.1322, -0.0535,  ...,  0.0838,  0.0649,  0.0841]]],
        grad_fn=<UnsafeViewBackward0>),
 torch.Size([1, 50, 768]))

##  构建 LLaMA2 Decoder Layer

In [20]:
class DecoderLayer(nn.Module):
    def __init__(self,layer_id:int,args:ModelConfig):
        super().__init__()
         # 定义多头注意⼒的头数
        self.n_heads = args.n_heads
         # 定义输⼊维度
        self.dim = args.dim
        # 定义每个头的维度，等于输⼊维度除以头数
        self.head_dim = args.dim // args.n_heads
        # 定义LLaMA2Attention对象，⽤于进⾏多头注意⼒计算
        self.attention = Attention(args)
         # 定义LLaMA  MLP对象，⽤于进⾏前馈神经⽹络计算
        self.feed_forward = MLP(
            dim =args.dim,
            hidden_dim =args.hidden_dim,
            multiple_of = args.multiple_of,
            dropout = args.dropout,
        )
        # 定义层的ID，当前解码器层在整个模型中的编号（索引）
        self.layer_id = layer_id
         # 定义注意⼒计算的归⼀化层
        self.attention_norm = RMSNorm(args.dim, eps = args.norm_eps)
         # 定义前馈神经⽹络计算的归⼀化层
        self.ffn_norm = RMSNorm(args.dim , eps = args.norm_eps)
        
    def forward(self,x,freqs_cos,freqs_sin): # 前向传播函数
         # ⾸先，输⼊x经过注意⼒归⼀化层，然后进⾏注意⼒计算，结果与输⼊x相加得到h
        h = x + self.attention.forward( self.attention_norm(x),freqs_cos,freqs_sin )
        # h经过前馈神经⽹络归⼀化层，然后进⾏前馈神经⽹络计算，结果与h相加得到输出
        out = h + self.feed_forward.forward(self.ffn_norm(h))
        return out
        

In [21]:
##DecoderLayer实例
decoderlayer = DecoderLayer(0,args)#0为layer_id:int
dim = args.dim
seq_len = 50
x = torch.randn(1,seq_len,dim)
freqs_cos ,freqs_sin = precompute_freqs_cis(dim//args.n_heads,seq_len)
out = decoderlayer(x,freqs_cos,freqs_sin)
x,x.shape,out,out.shape

(tensor([[[-0.7873,  0.3936, -0.7192,  ..., -0.0743,  1.2577, -0.3635],
          [ 0.5187, -0.5349, -0.2202,  ..., -1.5718, -0.2472, -0.1420],
          [ 0.2706, -0.2408, -1.0117,  ...,  0.0044, -0.6116, -0.0244],
          ...,
          [ 0.6035, -0.7309, -1.8942,  ...,  0.2627,  0.4336, -1.3416],
          [ 1.0965, -2.1346,  1.3750,  ...,  0.1465,  0.5675,  0.9712],
          [-1.5775,  0.3258, -1.3788,  ..., -0.1122, -0.8187,  0.8832]]]),
 torch.Size([1, 50, 768]),
 tensor([[[-1.0137,  0.6413, -0.2364,  ..., -0.3789,  0.9855, -0.1646],
          [ 0.2198, -0.5613, -0.1305,  ..., -1.5832, -0.2571, -0.0605],
          [-0.0999, -0.2659, -1.2229,  ...,  0.0981, -0.5334,  0.1146],
          ...,
          [ 0.5414, -0.8398, -1.8394,  ...,  0.4887,  0.4185, -1.3680],
          [ 1.1526, -2.0901,  1.4264,  ...,  0.0723,  0.4947,  1.2367],
          [-1.6481,  0.1264, -1.4654,  ..., -0.1283, -0.9463,  0.9406]]],
        grad_fn=<AddBackward0>),
 torch.Size([1, 50, 768]))

##  构建LLaMA2模型

In [22]:
try:
    from torch._utils import _cuda
except ImportError:
    mock_cuda = MagicMock()
    sys.modules['torch._utils._cuda'] = mock_cuda
    import torch._utils
    torch._utils._cuda = mock_cuda

In [23]:
class Transformer(PreTrainedModel):
    config_class = ModelConfig #模型的词汇表、嵌入维度等参数类
    last_loss : Optional[torch.Tensor]# 记录最后⼀次计算的损失

        # 1.9.1 初始化权重的函数
    def _init_weights(self, module):
        if isinstance (module,nn.Linear):#线性层
            #均值为 0、标准差为 0.02 的标准正态分布随机初始化
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:#偏置如果存在，初始化为全 0
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module,nn.Embedding):#嵌入层
            torch.nn.init.normal_(module.weight, mean = 0.0 ,std = 0.02)
    
    #第一部分：模型初始化
    #1.1 继承类，保存配置
    def __init__(self, args:ModelConfig = None):
        super().__init__(args)
        self.args = args
        self.vocab_size = args.vocab_size
        self.n_layers = args.n_layers
    #1.2 词嵌入层，将tokenID转化为向量   
        self.tok_embeddings = nn.Embedding(args.vocab_size,args.dim)
    #1.3 dropout层，作用于词嵌入后的张量，随机将最后一维的维特征中X%的位置置为 0，防止过拟合
        self.dropout = nn.Dropout(args.dropout)
    #1.4 解码器层列表（layers）    
        self.layers = torch.nn.ModuleList()#ModuleList[用于存储和管理神经网络模块]中存储n_layers个 DecoderLayer实例
        for layer_id in range(args.n_layers):# 循环n_layers次
            self.layers.append(DecoderLayer(layer_id,args)) # 加入n_layers个解码器层
    #1.5 归一化层        
        self.norm = RMSNorm(args.dim, eps = args.norm_eps)
    #1.6 输出层（output）：把向量映射回词汇表    
        self.output = nn.Linear(args.dim, args.vocab_size, bias = False)
    #1.7 共享词嵌入和输出层的权重：减少参数
        self.tok_embeddings.weight = self.output.weight
    #1.8 预计算RoPE--旋转位置编码
        freqs_cos, freqs_sin = precompute_freqs_cis(self.args.dim // self.args.n_heads, self.args.max_seq_len)
        # register_buffer表示这些张量不是模型参数（不会被梯度更新）
        # persistent=False表示保存模型时不保存这些张量
        self.register_buffer("freqs_cos" ,freqs_cos,persistent = False)
        self.register_buffer("freqs_sin",freqs_sin,persistent =False)
    #1.9 参数初始化：给权重赋初始值
        self.apply(self._init_weights)# 遍历所有层，调用_init_weights
        for pn,p in self.named_parameters():
            # 对特定参数进行精细初始化
            # 对前馈网络的参数（w3/wo）按层数调整标准差，避免深层模型的梯度爆炸
            if pn.endswith('w3.weight') or pn.endswith('wo.weight'):
                torch.nn.init.normal_(p, mean =0.0, std=0.02/math.sqrt(2*args.n_layers))
    #1.10 初始化最后⼀次前向传播的损失属性
        self.last_loss = None # 存储最新的损失值
        self.OUT = CausalLMOutputWithPast() # 封装模型输出（logits和loss）
        self._no_split_modules = [name for name , _ in self.named_modules()] # 不分割的模块列表

            
    #第二部分：前向传播
    #2.1 输入 token ID--tok_embeddings转为语义特征向量--dropouts随机失活--截取对应序列长度的旋转位置编码--
     #   遍历所有DecoderLayer---
     #   归一化（把h的数值缩放到合理范围，保证后续计算的稳定性（避免梯度消失 / 爆炸），但不会改变其 “语义特征向量” 的本质）
     #   --output层：将特征向量映射为词汇表维度的原始得分（logits）
     #  分支处理：若有目标值（训练阶段），计算交叉熵损失；若无目标值（推理阶段），仅取最后一个token的logits →
     #  封装logits和loss到OUT对象中返回
    def forward(self, tokens:torch.Tensor,targets:Optional[torch.Tensor]=None, **keyargs) ->torch.Tensor:
        if 'input_ids' in keyargs:
            tokens = keyargs['input_ids']
        if 'attention_mask' in keyargs:
            targets = keyargs['attention_mask']
        _bsz, seqlen = tokens.shape
        h = self.tok_embeddings(tokens)
        h = self.dropout(h)
        freqs_cos = self.freqs_cos[:seqlen]
        freqs_sin = self.freqs_sin[:seqlen]
        for layer in self.layers:
            h = layer(h,freqs_cos,freqs_sin)
        h = self.norm(h)
        if targets is not None:
            logits = self.output(h)
            #计算交叉熵损失
            self.last_loss = F.cross_entropy(logits.view(-1,logits.size(-1)),
                                            targets.view(-1),ignore_index=0,
                                            reduction='none')
        else:
            #推理阶段，只需要预测下一个 token，所以只取最后一个 token 的向量进行映射
            logits = self.output(h[:,[-1],:])
            self.last_loss = None
        
        self.OUT.__setitem__('logits',logits)
        self.OUT.__setitem__('last_loss',self.last_loss)
        return self.OUT
        
    #第三部分：文本生成--一个循环过程，每次用当前的 token ID 序列作为输入，通过模型前向传播得到预测的 logits（得分），再从 logits 中采样出下一个 token ID，拼接到原序列后重复此过程，直到满足停止条件    
    @torch.inference_mode()#@torch.inference_mode()装饰器会关闭梯度计算，加快推理速度
    #idx--用户提供的初始 token ID 序列；
    #temperature--控制生成的随机性：值越大，随机性越强；值为 0 时是贪心采样（选得分最高的 token）
    #Top-K 采样的参数：只保留得分最高的k个 token 作为候选，其余排除；为None时不启用 Top-K
    def generate(self, idx, stop_id=None, max_new_tokens=256, temperature=1.0, top_k=None):
        index = idx.shape[1]
        for _ in range(max_new_tokens):
            #判断输入序列长度是否超过模型的max_seq_len-->序列截断
            idx_cond = idx if idx.size(1) <= self.args.max_seq_len else idx[:,-self.args.max_seq_len:]
            # 前向传播获取logits
            # self(idx_cond)——模型实例被调用，执行forward方法，返回的是模型的输出对象--self.OUT
            # .logits—— 从输出对象中提取 logits（预测得分）
            logits = self(idx_cond).logits
            logits = logits[:,-1,:]#提取最后一个 token 的 logits
            if temperature == 0.0:#贪心采样，生成的文本最 “通顺” 但缺乏多样性
                _,idx_next = torch.topk(logits, k=1, dim=-1)#选得分最高的 token
            else :#带温度的随机采样
                logits = logits / temperature#调整 logits 的分布，控制随机性
                if top_k is not None:
                    #取张量指定维度上前 k 个最大值，返回值（v）和对应的索引（idx）
                    v,_ = torch.topk(logits,min(top_k,logits.size(-1)))
                    #废掉所有低于阈值的 token；v[:,[-1]] → 取出 k 个最高分中的 “最低分”（阈值）
                    logits[logits<v[:,[-1]]] = -float('Inf')
                #按概率选一个token
                #将得分转换为概率
                probs = F.softmax(logits, dim=-1)#对调整后的 logits 在最后一个维度（词汇表维度）做 softmax 归一化，转换为概率分布（所有值之和为 1）
                #从概率分布probs中随机采样 1 个样本（即下一个 token ID）
                idx_next = torch.multinomial(probs,num_samples=1)
            #如果采样到的下一个 token ID 是停止符（stop_id）（比如句号、<EOS>标记），立即跳出循环，停止生成
            if idx_next == stop_id:
                break
            idx = torch.cat((idx,idx_next),dim=1)
        return idx[:,index:]#只返回生成的新 token，去掉初始输入


In [24]:
x = torch.randint(0, 6144, (1, 50)) # [bs, seq_len]
# 实例化LLaMA2Model
model = Transformer(args=args)
# 计算model的全部参数
num_params = sum(p.numel() for p in model.parameters())
print('Number of parameters:', num_params)
out = model(x)
print(out.logits.shape) # [batch_size, 1, vocab_size]

Number of parameters: 82594560
torch.Size([1, 1, 6144])


In [25]:
x,model

(tensor([[ 752, 5951, 3313, 2801, 4074,  135, 5020, 5110, 5147, 3141, 4235, 5707,
          4619, 4474, 1940, 4055, 1446,  947, 2796, 3119, 1931, 4872,  817, 2554,
          1176, 3222, 2920, 5790, 5037,  182, 1659,   69, 1102, 1000, 6019,   58,
          5459, 1518,  594,  543, 2836, 1351, 1365, 4381, 5158, 1229,  846,  421,
          5957,  820]]),
 Transformer(
   (tok_embeddings): Embedding(6144, 768)
   (dropout): Dropout(p=0.0, inplace=False)
   (layers): ModuleList(
     (0-11): 12 x DecoderLayer(
       (attention): Attention(
         (wq): Linear(in_features=768, out_features=768, bias=False)
         (wk): Linear(in_features=768, out_features=384, bias=False)
         (wv): Linear(in_features=768, out_features=384, bias=False)
         (wo): Linear(in_features=768, out_features=768, bias=False)
         (attn_dropout): Dropout(p=0.0, inplace=False)
         (resid_dropout): Dropout(p=0.0, inplace=False)
       )
       (feed_forward): MLP(
         (w1): Linear(in_features=7

# 训练Tokenizer--BPE

In [26]:
#⽤于加载训练数据和和加载训练完成后的 Tokenizer
#pip install tokenizers datasets transformers

In [27]:
import random
import json
import os
from transformers import AutoTokenizer ,PreTrainedTokenizerFast
from tokenizers import (
     decoders,models,pre_tokenizers,trainers,Tokenizer,)
from tokenizers.normalizers import NFKC
from typing import Generator
print("导入成功，无报错！")

导入成功，无报错！


## 加载训练数据

In [28]:
# 从JSONL格式的文件中逐行读取数据，提取每一行中text字段的内容并返回
def read_texts_from_jsonl(file_path:str)->Generator[str,None,None]:
    with open(file_path, 'r', encoding = 'utf-8') as f:
        for line_num, line in enumerate(f,1):#line_num：当前行的行号，line：当前行的字符串内容
            try:
                data = json.loads(line)  #json.loads(line)：将单行字符串解析为 JSON 对象（字典类型）
                if 'text' not in data:
                    raise KeyError(f"Missing 'text' field in line {line_num}")
                yield data['text']   #将当前行的text字段内容返回给迭代器。此时函数会暂停执行，直到下一次迭代时继续处理下一行
            except json.JSONDecodeError:  #捕获JSON解析错误（如行内容格式错误、缺少引号等）
                print(f"Error decoding JSON in line {line_num}")
                continue  #报错后，跳过当前行，继续处理下一行
            except KeyError as e:
                print (e)
                continue

## 创建配置文件

In [29]:
#生成配置文件，--使得训练后的 BPE Tokenizer 与 Hugging Face 生态兼容

def create_tokenizer_config(save_dir: str)->None: #指定两个配置文件的保存目录save_dir
    # 定义分词器的核心配置字典
    #构建tokenizer_config字典并保存为tokenizer_config.json
    config = {
        "add_bos_token":False, #分词时 不在 文本开头添加bos_token（<|im_start|>），因为是聊天场景
        "add_eos_token":False, #分词时 不在 文本结束添加bos_token（<|im_end|>）
        "add_prefix_space":False, # 不在 文本前添加空格（针对某些子词分词器的兼容设置）
        "bos_token":"<|im_start|>",  # 自定义的开始标记（用于聊天场景的消息开头，如<|im_start|>user）
        "eos_token":"<|im_end|>", # 自定义的结束标记（用于聊天场景的消息结尾）
        "pad_token":"<|im_end|>", #填充标记
        "unk_token":"<unk>",  # UNK（Unknown）token：未知字符标记（遇到分词器不认识的字符时使用）
        "model_max_length": 1000000000000000019884624838656, #分词器支持的最大序列长度--分词后生成的 token ID的长度-- token 数量
        "clean_up_tokenization_spaces":False, #不 自动清理分词后的多余空格
        "tokenizer_class":"PreTrainedTokenizerFast", #指定分词器类为快速分词器
         # 聊天模板（Jinja2语法）：用于格式化多轮对话数据
        "chat_template":(  
            #{%...%}控制流函数，{{ ... }}把变量的值输出成文本
            "{% for message in messages %}"
            "{% if message['role'] == 'system' %}"
            "<|im_start|>system\n{{message['content']}}<|im_end|>\n"
            "{% elif message['role'] == 'user' %}"
            "<|im_start|>user\n {{message['content']}}<|im_end|>\n"
            "{% elif message['role'] == 'assistant' %}"
            "<|im_start|>assistant\n{{message['content']}}<|im_send|>\n"
            "{% endif %}"
            "{% endfor %}"
            #添加生成提示
            "{% if ass_generation_prompt %}"
            "{{'<|im_start|>assistant\n'}}"
            "{% endif %}"
        )
    }
    # 将config字典写入tokenizer_config.json文件
    with open(os.path.join(save_dir,"tokenizer_config.json"),"w",encoding='utf-8') as f:
        json.dump(config, f, ensure_ascii=False, indent=4)
    
    # 构建special_tokens_map字典
    special_tokens_map = {
        "bos_token":"<|im_start|>", # BOS token映射
        "eos_token":"<|im_end|>",
        "unk_token":"<unk>",
        "pad_token":"<|im_end|>",
        "additional_special_tokens":["<s>","</s>"]# 额外的特殊token（供分词器识别，不与上述冲突）
    }
    with open(os.path.join(save_dir,"special_tokens_map.json"),"w",encoding='utf-8') as f:
        json.dump(special_tokens_map,f,ensure_ascii=False,indent=4)

## 训练BPE Tokenizer--分词器

In [30]:
##初始化 BPE 分词器并配置各项预处理 / 后处理规则；
##从 JSONL 格式的语料文件中读取文本（调用【2.1加载训练数据】的read_texts_from_jsonl生成器）
##用 BPE 算法训练分词器，生成指定大小的词汇表；
##验证特殊 token 的 ID 是否符合预期；
##保存分词器文件，并调用【2.2创建配置文件】create_tokenizer_config生成适配transformers库的配置文件；
##最终让训练后的分词器既能用tokenizers库快速处理文本，又能接入transformers生态进行模型训练 / 推理。

In [31]:
def train_tokenizer(data_path: str, save_dir: str, vocab_size: int = 8192) -> None:
    os.makedirs(save_dir, exist_ok=True)
    #创建一个 “空的” BPE 分词器。设置未知 token 为<unk>
    tokenizer = Tokenizer(model.BPE(unk_token="<unk>"))
    #将文本中的字符转换为统一的标准形式，减少词汇表的冗余
    tokenizer.normalizer = NFKC()
    #对文本进行初步拆分。按UTF-8 字节拆分；add_prefix_space = False不在【中文】文本开头添加前缀空格
    tokenizer.pre_tokenizer = pre_tokenizers.ByteLevel(add_prefix_space = False)
    #解码器，将分词后的 token 序列还原为原始文本
    tokenizer.decoder = decoders.ByteLevel()
    # 定义特殊 token 
    special_tokens = ["<unk>","<s>","</s>","<|im_start|>","<|im_end|>" ]
    # 初始化BPE训练器
    trainer = trainers.BpeTrainer(
        vocab_size = vocab_size,
        special_tokens = special_tokens,
        min_frequency = 2,#将数据集中出现次数≥min_frequency的相邻字节/子词对，合并成一个新的单一符号（新子词），并加入词汇表的操作
        show_progress = True, # 显示训练进度条（直观看到训练过程）
        initial_alphabet = pre_tokenizers.ByteLevel.alphabet() # 初始字母表
    )
    print(f"Training tokenizer with data from {data_path}")

    ###！读取语料并训练分词器！
    # 调用生成器函数，读取JSONL中的文本（惰性迭代，节省内存）
    texts = read_texts_from_jsonl(data_path)
    # 从BPE迭代器训练分词器
    tokenizer.train_from_iterator(texts, trainer = trainer, length = os.path.getsize(data_path))
    #验证特殊 token 的 ID
    try:
        assert tokenizer.token_to_id("<unk>") == 0
        assert tokenizer.token_to_id("<s>") == 1
        assert tokenizer.token_to_id("</s>") == 2
        assert tokenizer.token_to_id("<|im_start|>") == 3
        assert tokenizer.token_to_id("<|im_end|>") == 4
    except AssertionError as e:
        print("Special tokens mapping error:",e)
        raise
    # 保存分词器并生成配置文件
     #tokenizer.json：是tokenizers库的标准输出文件，包含分词器的所有【规则】（BPE 模型、词汇表、归一化器、预分词器等），可以直接用tokenizers.Tokenizer.from_file加载
    tokenizer.save(os.path.join(save_dir,"tokenizer.json"))
     #生成tokenizer_config.json和special_tokens_map.json，让训练后的分词器能被transformers.AutoTokenizer.from_pretrained加载
    create_tokenizer_config(save_dir)
    print(f"Tokenizer save to {save_dir}")

## 使用训练完成的Tokenizer

In [32]:
#验证分词器是否能正常工作
def eval_tokenizer(tokenizer_path:str) ->None:
    try :
        tokenizer = AutoTokenizer.from_pretrained(tokenizer_path)
    except Exception as e:
        print(f"Error loading tokenizer : {e}")
        return
    print("\n=== Tokenizer基本信息 ===")
    print(f"Vocab size: {len(tokenizer)}")
    print(f"Special tokens: {tokenizer.all_special_tokens}")
    print(f"Special token IDs: {tokenizer.all_special_ids}")
    messages = [
        {"role":"system","content":"你是一个AI助手。"},
        {"role":"user","content":"How are you?"},
        {"role":"assistant","content":"I'm fine,thank you. and you ?"},
        {"role":"user","content":"I'm good too."},
        {"role":"assistant","content":"That's great to hear!"},
    ]
    print("\n====聊天模板测试===")
    prompt = tokenizer.apply_chat_template(
        messages,tokenize = False,
        #add_generation_prompt = True
    )
    print("Generated prompt:\n",prompt,sep="")
    print("\n====编码解码测试===")
    encoded = tokenizer(prompt, truncation = True, max_length = 256)
    decoded = tokenizer.decode(encoded["input_ids"], skip_special_tokens = False)
    print("Decoded text matches original :",decoded == prompt)
    print("\n====特殊token处理 ===")
    test_text = "<|im_start|>user\nHello<|im_end|>"
    encoded = tokenizer(test_text).input_ids
    decoded = tokenizer.decode(encoded)
    print(f"Original : {test_text}")
    print(f"Decoded : {decoded}")
    print("Special tokens preserved : ",decoded == test_text)

## 下载和处理好的数据集

In [36]:
eval_tokenizer(r"C:\Users\xuexu\NLP_LLM\Data")


=== Tokenizer基本信息 ===
Vocab size: 6144
Special tokens: ['<|im_start|>', '<|im_end|>', '<unk>', '<s>', '</s>']
Special token IDs: [3, 4, 0, 1, 2]

====聊天模板测试===
Generated prompt:
<|im_start|>system
你是一个AI助手。<|im_end|>
<|im_start|>user
How are you?<|im_end|>
<|im_start|>assistant
I'm fine,thank you. and you ?<|im_end|>
<|im_start|>user
I'm good too.<|im_end|>
<|im_start|>assistant
That's great to hear!<|im_end|>


====编码解码测试===
Decoded text matches original : True

====特殊token处理 ===
Original : <|im_start|>user
Hello<|im_end|>
Decoded : <|im_start|>user
Hello<|im_end|>
Special tokens preserved :  True
