In [1]:
#需要使用L4GPU，然后官方代码的专家数，从64改为16
#把依赖文件kernel.py上传
#注意本地没有GPU没法运行这个代码，要用colab或者autodl，torch版本太低不可以，因为没有rms归一化，还有本地没安装triton不可以

In [2]:
# !pip list|grep triton
# 要求结果是triton 3.1.0

In [3]:
!ls

kernel.py  __pycache__	sample_data


In [4]:
import math
from dataclasses import dataclass
from typing import Tuple, Optional, Literal

import torch
from torch import nn
import torch.nn.functional as F
import torch.distributed as dist


print(torch.__version__)

from kernel import act_quant, weight_dequant, fp8_gemm

# 定义全局变量
world_size = 1  # 世界大小（用于分布式计算）
rank = 0  # 当前进程的排名
block_size = 128  # 块大小（可能用于矩阵运算优化）
gemm_impl: Literal["bf16", "fp8"] = "bf16"  # 矩阵乘法实现方式（bf16 或 fp8）
attn_impl: Literal["naive", "absorb"] = "absorb"  # 注意力机制实现方式

@dataclass
class ModelArgs:
    """
    用于定义模型参数和超参数的数据类。

    属性：
        max_batch_size (int): 最大批量大小。
        max_seq_len (int): 最大序列长度。
        dtype (Literal["bf16", "fp8"]): 计算数据类型（bf16 或 fp8）。
        vocab_size (int): 词汇表大小。
        dim (int): 模型隐藏层维度。
        inter_dim (int): MLP 层的中间层维度。
        moe_inter_dim (int): MoE 层的中间层维度。
        n_layers (int): Transformer 层的数量。
        n_dense_layers (int): 模型中的全连接层数量。
        n_heads (int): 注意力头的数量。
        n_routed_experts (int): MoE 层中可路由的专家数量。
        n_shared_experts (int): MoE 层中共享的专家数量。
        n_activated_experts (int): MoE 层中每次激活的专家数量。
        n_expert_groups (int): MoE 层中的专家组数量。
        n_limited_groups (int): MoE 路由中的受限组数量。
        score_func (Literal["softmax", "sigmoid"]): MoE 路由的评分函数。
        route_scale (float): MoE 路由评分的缩放因子。
        q_lora_rank (int): 查询（Query）投影的 LoRA 秩。
        kv_lora_rank (int): 键值（Key-Value）投影的 LoRA 秩。
        qk_nope_head_dim (int): 无位置编码的 Query-Key 投影维度。
        qk_rope_head_dim (int): 使用旋转位置编码（RoPE）的 Query-Key 投影维度。
        v_head_dim (int): 值（Value）投影维度。
        original_seq_len (int): 原始序列长度。
        rope_theta (float): 旋转位置编码的基数。
        rope_factor (float): 扩展序列长度的缩放因子。
        beta_fast (int): 快速 Beta 修正因子。
        beta_slow (int): 慢速 Beta 修正因子。
        mscale (float): 扩展注意力的缩放因子。
    """
    max_batch_size: int = 8  # 训练时的最大批次大小
    max_seq_len: int = 4096 * 4  # 允许的最大序列长度（可能用于扩展序列训练）
    dtype: Literal["bf16", "fp8"] = "bf16"  # 计算数据类型，默认使用 bfloat16
    vocab_size: int = 102400  # 词汇表大小
    dim: int = 2048  # 模型隐藏层维度
    inter_dim: int = 10944  # MLP 层的中间层维度
    moe_inter_dim: int = 1408  # MoE 层的中间层维度
    n_layers: int = 27  # Transformer 层数
    n_dense_layers: int = 1  # 全连接层数（可能用于 MoE 或 MLP 结构）
    n_heads: int = 16  # 注意力头数

    # MoE（Mixture of Experts）相关参数
    n_routed_experts: int = 16  # 可被路由的专家数量,论文里DeepSeekV3用了256个专家
    n_shared_experts: int = 2  # 共享专家数量，始终都激活的专家数量，来保证模型的基线性能
    n_activated_experts: int = 6  # 每次激活的专家数量
    n_expert_groups: int = 1  # 专家组数量（可能用于分组专家路由）
    n_limited_groups: int = 1  # 受限专家组数量
    score_func: Literal["softmax", "sigmoid"] = "softmax"  # MoE 路由评分函数，默认使用 softmax
    route_scale: float = 1.0  # 路由评分的缩放因子

    # MLA（Multi-Level Attention）相关参数
    q_lora_rank: int = 0  # Query 投影的 LoRA 秩（低秩适配）
    kv_lora_rank: int = 512  # Key-Value 投影的 LoRA 秩
    qk_nope_head_dim: int = 128  # 无 RoPE 的 Query-Key 维度
    qk_rope_head_dim: int = 64  # 使用 RoPE 的 Query-Key 维度
    v_head_dim: int = 128  # Value 维度

    # YARN（Yet Another RoPE Network）相关参数
    original_seq_len: int = 4096  # 原始序列长度
    rope_theta: float = 10000.0  # RoPE 旋转位置编码的基数
    rope_factor: float = 40  # 扩展序列长度的缩放因子
    beta_fast: int = 32  # 快速 Beta 修正因子
    beta_slow: int = 1  # 慢速 Beta 修正因子
    mscale: float = 1.0  # 扩展注意力的缩放因子

2.5.1+cu124


In [5]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.distributed as dist
from typing import Optional

# 假设 world_size 和 rank 由分布式训练环境提供
world_size = dist.get_world_size() if dist.is_initialized() else 1
rank = dist.get_rank() if dist.is_initialized() else 0

class ParallelEmbedding(nn.Module):
    """
    并行嵌入层（ParallelEmbedding），支持分布式训练环境下的词向量分片。

    参数:
        vocab_size (int): 词表大小，即整个模型的词汇总数。
        dim (int): 词向量的维度。

    说明:
        - 词表在多个进程（GPU）之间进行分片，每个进程仅存储词表的一部分。
        - vocab_size 必须能够被 world_size 整除，以确保各 GPU 拥有相同大小的词向量片段。
    """
    def __init__(self, vocab_size: int, dim: int):
        super().__init__()
        self.vocab_size = vocab_size  # 词表大小
        self.dim = dim  # 词向量维度
        assert vocab_size % world_size == 0, f"词表大小必须能被 world_size 整除 (world_size={world_size})"

        # 计算当前进程（GPU）负责的词表片段大小
        self.part_vocab_size = vocab_size // world_size
        # 计算当前进程的词表起始和结束索引
        self.vocab_start_idx = rank * self.part_vocab_size
        self.vocab_end_idx = self.vocab_start_idx + self.part_vocab_size
        # 初始化当前进程的词向量参数
        self.weight = nn.Parameter(torch.empty(self.part_vocab_size, self.dim))

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        并行嵌入层的前向传播。

        参数:
            x (torch.Tensor): 输入张量，包含 token 的索引。

        返回:
            torch.Tensor: 词向量表示。

        处理流程:
            1. 如果使用多 GPU 训练（world_size > 1），检查 token 是否属于当前 GPU 负责的词表范围:
                - 若 token 超出当前 GPU 负责的范围，设为 0（避免索引超界）。
            2. 计算词嵌入。
            3. 若是多 GPU 训练，则进行 all_reduce 操作，将所有 GPU 的嵌入求和（同步）。
        """
        if world_size > 1:
            # 生成掩码，标记不属于当前进程词表范围的 token
            mask = (x < self.vocab_start_idx) | (x >= self.vocab_end_idx)
            # 将输入索引映射到当前进程的词表范围内
            x = x - self.vocab_start_idx
            # 将超出范围的索引设为 0，避免索引超界
            x[mask] = 0

        # 获取嵌入
        y = F.embedding(x, self.weight)

        if world_size > 1:
            # 对超出范围的 token 设置为 0
            y[mask] = 0
            # all_reduce 操作，确保所有进程得到相同的词嵌入
            dist.all_reduce(y)

        return y


def linear(x: torch.Tensor, weight: torch.Tensor, bias: Optional[torch.Tensor] = None) -> torch.Tensor:
    """
    线性变换函数，实现 y = xA^T + b，支持量化权重的计算。

    参数:
        x (torch.Tensor): 输入张量。
        weight (torch.Tensor): 权重张量，可能是量化后的，需要进行解量化处理。
        bias (Optional[torch.Tensor]): 偏置项（可选），默认为 None。

    返回:
        torch.Tensor: 线性变换后的结果。

    说明:
        - 若 weight 不是量化的，则直接调用 F.linear 计算。
        - 若 weight 是量化的（element_size() == 1），则需要先进行解量化，再进行计算。
        - 当 gemm_impl == "bf16" 时，使用 bf16 计算。
        - 其他情况，对 x 进行量化，然后使用 fp8_gemm 计算。
    """
    if weight.element_size() > 1:
        # 直接使用标准的 F.linear 计算
        return F.linear(x, weight, bias)
    elif gemm_impl == "bf16":
        # 量化权重，需要解量化
        weight = weight_dequant(weight, weight.scale)
        return F.linear(x, weight, bias)
    else:
        # 其他情况：对 x 进行量化，并使用 fp8_gemm 计算
        x, scale = act_quant(x, block_size)
        y = fp8_gemm(x, scale, weight, weight.scale)
        if bias is not None:
            y += bias
        return y


class Linear(nn.Module):
    """
    自定义线性层，支持量化权重，并提供可选的偏置项。

    参数:
        in_features (int): 输入特征维度。
        out_features (int): 输出特征维度。
        bias (bool): 是否包含偏置项，默认为 False。
        dtype (可选): 计算数据类型，默认为 torch.bfloat16。

    说明:
        - 如果 weight 是量化的，则需要额外存储 scale 参数。
        - bias 可选，若不使用，则注册为 None。
    """
    dtype = torch.bfloat16  # 默认数据类型

    def __init__(self, in_features: int, out_features: int, bias: bool = False, dtype=None):
        super().__init__()
        self.in_features = in_features  # 输入特征维度
        self.out_features = out_features  # 输出特征维度
        self.weight = nn.Parameter(torch.empty(out_features, in_features, dtype=dtype or Linear.dtype))

        # 若权重是量化的（element_size() == 1），则需要 scale 参数
        if self.weight.element_size() == 1:
            scale_out_features = (out_features + block_size - 1) // block_size
            scale_in_features = (in_features + block_size - 1) // block_size
            # 存储量化 scale 参数
            self.weight.scale = self.scale = nn.Parameter(torch.empty(scale_out_features, scale_in_features, dtype=torch.float32))
        else:
            # 非量化情况，无需 scale 参数
            self.register_parameter("scale", None)

        # 处理偏置
        if bias:
            self.bias = nn.Parameter(torch.empty(out_features))
        else:
            self.register_parameter("bias", None)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        线性层的前向传播。

        参数:
            x (torch.Tensor): 输入张量。

        返回:
            torch.Tensor: 经过线性变换的张量。

        说明:
            - 调用 linear 函数，自动处理量化权重和偏置项的计算。
        """
        return linear(x, self.weight, self.bias)


In [6]:
class ColumnParallelLinear(Linear):
    """
    列并行线性层（Column Parallel Linear），将输出特征分割到多个分布式进程中。

    参数：
        in_features (int): 输入特征的数量。
        out_features (int): 总输出特征数量。
        bias (bool): 是否包含偏置项，默认为 False。
        dtype (optional): 数据类型，默认为 `torch.bfloat16`。
    """
    def __init__(self, in_features: int, out_features: int, bias: bool = False, dtype = None):
        # 确保总输出特征数量可以被世界大小整除，以实现均匀分割
        assert out_features % world_size == 0, f"输出特征数必须能被 world_size 整除 (world_size={world_size})"

        # 计算当前进程负责的部分输出特征数
        self.part_out_features = out_features // world_size

        # 调用父类 Linear 的初始化，创建一个 in_features 到 part_out_features 的线性层
        super().__init__(in_features, self.part_out_features, bias, dtype)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        列并行线性层的前向传播。

        参数：
            x (torch.Tensor): 输入张量。

        返回：
            torch.Tensor: 经过线性变换后的张量，进行列并行计算。
        """
        # 进行线性变换
        y = linear(x, self.weight, self.bias)
        return y


class RowParallelLinear(Linear):
    """
    行并行线性层（Row Parallel Linear），将输入特征分割到多个分布式进程中。

    参数：
        in_features (int): 总输入特征数量。
        out_features (int): 输出特征的数量。
        bias (bool): 是否包含偏置项，默认为 False。
        dtype (optional): 数据类型，默认为 `torch.bfloat16`。
    """
    def __init__(self, in_features: int, out_features: int, bias: bool = False, dtype = None):
        # 确保输入特征数量可以被世界大小整除，以实现均匀分割
        assert in_features % world_size == 0, f"输入特征数必须能被 world_size 整除 (world_size={world_size})"

        # 计算当前进程负责的部分输入特征数
        self.part_in_features = in_features // world_size

        # 调用父类 Linear 的初始化，创建一个 part_in_features 到 out_features 的线性层
        super().__init__(self.part_in_features, out_features, bias, dtype)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        行并行线性层的前向传播。

        参数：
            x (torch.Tensor): 输入张量。

        返回：
            torch.Tensor: 经过线性变换后的张量，进行行并行计算。
        """
        # 进行线性变换
        y = linear(x, self.weight)

        # 如果是分布式环境（world_size > 1），则对 y 进行 all_reduce 操作，使所有进程的计算结果进行累加
        if world_size > 1:
            dist.all_reduce(y)

        # 如果存在偏置项，则加上偏置
        if self.bias is not None:
            y += self.bias

        return y


class RMSNorm(nn.Module):
    """
    均方根归一化（RMSNorm），用于对输入张量进行归一化。

    该方法不同于标准 LayerNorm，不依赖均值，而是基于均方根（RMS）进行归一化。

    参数：
        dim (int): 输入张量的维度。
        eps (float): 用于数值稳定性的 epsilon 值，默认为 1e-6。
    """
    def __init__(self, dim: int, eps: float = 1e-6):
        super().__init__()
        self.dim = dim  # 记录输入维度
        self.eps = eps  # 记录 epsilon 值

        # 归一化的可训练缩放参数，初始化为全 1
        self.weight = nn.Parameter(torch.ones(dim))

    def forward(self, x: torch.Tensor):
        """
        均方根归一化的前向传播。

        参数：
            x (torch.Tensor): 输入张量。

        返回：
            torch.Tensor: 归一化后的张量，保持输入形状不变。
        """
        # 使用 torch 的 rms_norm 进行均方根归一化
        return F.rms_norm(x, (self.dim,), self.weight, self.eps)


In [7]:
import torch
import torch.nn as nn
import math
from typing import Optional

# 预计算旋转位置编码的频率值
# 该函数用于计算基于旋转位置编码的复指数值
# 主要目的是为了加速计算，避免在每次前向传播时重新计算这些值
def precompute_freqs_cis(args: ModelArgs) -> torch.Tensor:
    """
    预计算旋转位置编码的频率值。

    参数：
        args (ModelArgs): 包含位置编码参数的模型参数。

    返回：
        torch.Tensor: 预计算的复指数值，用于旋转位置编码。
    """
    dim = args.qk_rope_head_dim  # 旋转位置编码的维度,是64
    seqlen = args.max_seq_len  # 最大序列长度
    beta_fast = args.beta_fast  # 快速调整参数
    beta_slow = args.beta_slow  # 缓慢调整参数
    base = args.rope_theta  # 旋转位置编码的基数
    factor = args.rope_factor  # 旋转位置编码的缩放因子

    # 计算旋转位置编码修正维度
    def find_correction_dim(num_rotations, dim, base, max_seq_len):
        """
        计算旋转位置编码的修正维度。
        """
        return dim * math.log(max_seq_len / (num_rotations * 2 * math.pi)) / (2 * math.log(base))

    # 计算旋转位置编码修正范围
    def find_correction_range(low_rot, high_rot, dim, base, max_seq_len):
        """
        计算旋转位置编码修正范围。
        """
        low = math.floor(find_correction_dim(low_rot, dim, base, max_seq_len))
        high = math.ceil(find_correction_dim(high_rot, dim, base, max_seq_len))
        return max(low, 0), min(high, dim-1)

    # 计算线性斜坡因子，用于平滑过渡
    def linear_ramp_factor(min, max, dim):
        """
        计算线性斜坡因子。
        """
        if min == max:
            max += 0.001
        linear_func = (torch.arange(dim, dtype=torch.float32) - min) / (max - min)
        ramp_func = torch.clamp(linear_func, 0, 1)
        return ramp_func

    # 计算基础频率
    freqs = 1.0 / (base ** (torch.arange(0, dim, 2, dtype=torch.float32) / dim))

    # 如果序列长度超过原始最大长度，则进行修正
    if seqlen > args.original_seq_len:
        low, high = find_correction_range(beta_fast, beta_slow, dim, base, args.original_seq_len)
        smooth = 1 - linear_ramp_factor(low, high, dim // 2)
        freqs = freqs / factor * (1 - smooth) + freqs * smooth

    t = torch.arange(seqlen)
    freqs = torch.outer(t, freqs)
    freqs_cis = torch.polar(torch.ones_like(freqs), freqs)
    return freqs_cis

# 应用旋转位置编码到输入张量
# 该函数使用预计算的复指数值对输入进行旋转编码

def apply_rotary_emb(x: torch.Tensor, freqs_cis: torch.Tensor) -> torch.Tensor:
    """
    应用旋转位置编码到输入张量。

    参数：
        x (torch.Tensor): 输入张量。
        freqs_cis (torch.Tensor): 预计算的复指数值。

    返回：
        torch.Tensor: 旋转编码后的张量。
    """
    dtype = x.dtype
    x = torch.view_as_complex(x.float().view(*x.shape[:-1], -1, 2))  # 变换为复数表示
    freqs_cis = freqs_cis.view(1, x.size(1), 1, x.size(-1))  # 调整形状
    y = torch.view_as_real(x * freqs_cis).flatten(3)  # 进行旋转编码并转换回实数
    return y.to(dtype)

# 多头注意力层（MLA）
# 该类实现了标准的多头注意力机制，并结合了旋转位置编码
class MLA(nn.Module):
    """
    多头注意力层（MLA）。

    属性:
        dim (int): 输入特征的维度。
        n_heads (int): 注意力头的数量。
        n_local_heads (int): 分布式系统中用于局部注意力的头数量。
        q_lora_rank (int): 查询低秩投影的秩。
        kv_lora_rank (int): 键值低秩投影的秩。
        qk_nope_head_dim (int): 非位置查询/键投影的维度。
        qk_rope_head_dim (int): 旋转位置查询/键投影的维度。
        qk_head_dim (int): 查询/键投影的总维度。
        v_head_dim (int): 值投影的维度。
        softmax_scale (float): 注意力计算中Softmax的缩放因子。
    """
    def __init__(self, args: ModelArgs):
        super().__init__()
        # 初始化各个参数
        self.dim = args.dim  # 输入的特征维度
        self.n_heads = args.n_heads  # 注意力头的数量
        self.n_local_heads = args.n_heads // world_size  # 分布式环境中的局部注意力头数
        self.q_lora_rank = args.q_lora_rank  # 查询低秩投影的秩
        self.kv_lora_rank = args.kv_lora_rank  # 键值低秩投影的秩
        self.qk_nope_head_dim = args.qk_nope_head_dim  # 非位置查询/键的维度
        self.qk_rope_head_dim = args.qk_rope_head_dim  # 旋转位置查询/键的维度
        self.qk_head_dim = args.qk_nope_head_dim + args.qk_rope_head_dim  # 查询/键投影的总维度
        self.v_head_dim = args.v_head_dim  # 值投影的维度

        # 如果q_lora_rank为0，直接使用列并行线性层，否则使用低秩投影和标准化
        if self.q_lora_rank == 0:
            self.wq = ColumnParallelLinear(self.dim, self.n_heads * self.qk_head_dim)
        else:
            self.wq_a = Linear(self.dim, self.q_lora_rank)  # 低秩投影的第一部分，不为0，为512，先从2048变为512
            self.q_norm = RMSNorm(self.q_lora_rank)  # 低秩投影的标准化
            self.wq_b = ColumnParallelLinear(self.q_lora_rank, self.n_heads * self.qk_head_dim)  # 低秩投影的第二部分

        # 键值投影和标准化
        self.wkv_a = Linear(self.dim, self.kv_lora_rank + self.qk_rope_head_dim)  # 键值低秩投影
        self.kv_norm = RMSNorm(self.kv_lora_rank)  # 键值的标准化
        self.wkv_b = ColumnParallelLinear(self.kv_lora_rank, self.n_heads * (self.qk_nope_head_dim + self.v_head_dim))  # 键值投影
        self.wo = RowParallelLinear(self.n_heads * self.v_head_dim, self.dim)  # 输出投影
        self.softmax_scale = self.qk_head_dim ** -0.5  # Softmax缩放因子

        # 如果最大序列长度大于原始序列长度，调整softmax_scale
        if args.max_seq_len > args.original_seq_len:
            mscale = 0.1 * args.mscale * math.log(args.rope_factor) + 1.0
            self.softmax_scale = self.softmax_scale * mscale * mscale

        # 根据注意力实现类型选择不同的缓存方式
        if attn_impl == "naive":
            # 在"naive"实现下缓存k和v
            self.register_buffer("k_cache", torch.zeros(args.max_batch_size, args.max_seq_len, self.n_local_heads, self.qk_head_dim), persistent=False)
            self.register_buffer("v_cache", torch.zeros(args.max_batch_size, args.max_seq_len, self.n_local_heads, self.v_head_dim), persistent=False)
        else:
            # 在其他实现下缓存kv和pe
            self.register_buffer("kv_cache", torch.zeros(args.max_batch_size, args.max_seq_len, self.kv_lora_rank), persistent=False)
            self.register_buffer("pe_cache", torch.zeros(args.max_batch_size, args.max_seq_len, self.qk_rope_head_dim), persistent=False)

    def forward(self, x: torch.Tensor, start_pos: int, freqs_cis: torch.Tensor, mask: Optional[torch.Tensor]):
        """
        多头注意力层的前向传播。

        参数:
            x (torch.Tensor): 输入张量，形状为(batch_size, seq_len, dim)。
            start_pos (int): 缓存的起始位置。
            freqs_cis (torch.Tensor): 预计算的旋转嵌入的复数指数值。
            mask (Optional[torch.Tensor]): 掩码张量，用于排除某些位置的注意力计算。

        返回:
            torch.Tensor: 输出张量，形状与输入相同。
        """
        bsz, seqlen, _ = x.size()  # 获取输入张量的batch size和序列长度
        end_pos = start_pos + seqlen  # 计算结束位置

        # 计算查询（q）
        if self.q_lora_rank == 0:
            q = self.wq(x)  # 使用列并行线性层计算查询
        else:
            q = self.wq_b(self.q_norm(self.wq_a(x)))  # 使用低秩投影和标准化计算查询

        # 重塑查询张量的形状
        q = q.view(bsz, seqlen, self.n_local_heads, self.qk_head_dim)
        #使用f格式打印q的形状
        print(f"q shape: {q.shape}")
        # 分割查询张量为非位置部分和位置编码部分
        q_nope, q_pe = torch.split(q, [self.qk_nope_head_dim, self.qk_rope_head_dim], dim=-1)
        #使用f格式打印q_nope, q_pe的形状，把密集向量分成两部分，q_pe计算旋转位置编码
        print(f"q_nope shape: {q_nope.shape}, q_pe shape: {q_pe.shape}")
        q_pe = apply_rotary_emb(q_pe, freqs_cis)  # 应用旋转位置编码

        # 计算键值（kv）
        kv = self.wkv_a(x)
        #打印kv shape
        print("kv shape:", kv.shape)
        kv, k_pe = torch.split(kv, [self.kv_lora_rank, self.qk_rope_head_dim], dim=-1)
        #一行代码f格式打印kv，k_pe shape
        print(f"kv shape: {kv.shape}, k_pe shape: {k_pe.shape}")
        k_pe = apply_rotary_emb(k_pe.unsqueeze(2), freqs_cis)  # 应用旋转位置编码

        # 判断是否使用"naive"注意力实现
        if attn_impl == "naive":
            q = torch.cat([q_nope, q_pe], dim=-1)  # 拼接查询的非位置部分和位置编码部分
            kv = self.wkv_b(self.kv_norm(kv))  # 对键值进行标准化
            kv = kv.view(bsz, seqlen, self.n_local_heads, self.qk_nope_head_dim + self.v_head_dim)  # 重塑键值张量形状
            # 分割键值张量为非位置部分和值（v）
            k_nope, v = torch.split(kv, [self.qk_nope_head_dim, self.v_head_dim], dim=-1)
            k = torch.cat([k_nope, k_pe.expand(-1, -1, self.n_local_heads, -1)], dim=-1)  # 拼接键的非位置部分和位置编码部分
            #一行代码f格式打印k shape
            print(f"k shape: {k.shape}")
            self.k_cache[:bsz, start_pos:end_pos] = k  # 更新k缓存，是推理阶段
            self.v_cache[:bsz, start_pos:end_pos] = v  # 更新v缓存
            # 计算注意力得分
            scores = torch.einsum("bshd,bthd->bsht", q, self.k_cache[:bsz, :end_pos]) * self.softmax_scale
        else:
            # 在其他实现下，处理wkv_b的权重，并计算q_nope的注意力得分
            wkv_b = self.wkv_b.weight if self.wkv_b.scale is None else weight_dequant(self.wkv_b.weight, self.wkv_b.scale, block_size)
            wkv_b = wkv_b.view(self.n_local_heads, -1, self.kv_lora_rank)
            q_nope = torch.einsum("bshd,hdc->bshc", q_nope, wkv_b[:, :self.qk_nope_head_dim])
            self.kv_cache[:bsz, start_pos:end_pos] = self.kv_norm(kv)  # 更新kv缓存
            self.pe_cache[:bsz, start_pos:end_pos] = k_pe.squeeze(2)  # 更新位置编码缓存
            # 计算注意力得分
            scores = (torch.einsum("bshc,btc->bsht", q_nope, self.kv_cache[:bsz, :end_pos]) +
                      torch.einsum("bshr,btr->bsht", q_pe, self.pe_cache[:bsz, :end_pos])) * self.softmax_scale

        # 如果有mask，添加到得分上
        if mask is not None:
            scores += mask.unsqueeze(1)

        # 计算softmax得分
        scores = scores.softmax(dim=-1, dtype=torch.float32).type_as(x)

        # 根据注意力实现类型，选择不同的计算方式
        if attn_impl == "naive":
            x = torch.einsum("bsht,bthd->bshd", scores, self.v_cache[:bsz, :end_pos])  # 计算输出
        else:
            x = torch.einsum("bsht,btc->bshc", scores, self.kv_cache[:bsz, :end_pos])  # 计算输出
            x = torch.einsum("bshc,hdc->bshd", x, wkv_b[:, -self.v_head_dim:])  # 计算最终输出

        # 通过行并行线性层投影到原始维度
        x = self.wo(x.flatten(2))
        return x



In [8]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Tuple

class MLP(nn.Module):
    """
    多层感知机（MLP），用于前馈计算。

    该模块包含三个线性变换层，分别是 w1、w2 和 w3，用于特征变换和计算。

    属性:
        w1 (nn.Module): 线性层，用于从输入层到隐藏层的转换。
        w2 (nn.Module): 线性层，用于从隐藏层到输出层的转换。
        w3 (nn.Module): 额外的线性层，用于特征变换。
    """
    def __init__(self, dim: int, inter_dim: int):
        """
        初始化 MLP 层。

        参数:
            dim (int): 输入和输出的维度（维度保持一致）。
            inter_dim (int): 隐藏层的维度。
        """
        super().__init__()
        self.w1 = ColumnParallelLinear(dim, inter_dim)  # 第一层线性变换
        self.w2 = RowParallelLinear(inter_dim, dim)     # 第二层线性变换（回到原始维度）
        self.w3 = ColumnParallelLinear(dim, inter_dim)  # 额外的线性变换层

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        MLP 的前向计算。

        参数:
            x (torch.Tensor): 输入张量，形状为 (batch_size, dim)。

        返回:
            torch.Tensor: 经过 MLP 计算后的输出张量，形状为 (batch_size, dim)。
        """
        return self.w2(F.silu(self.w1(x)) * self.w3(x))  # 使用 SiLU 激活函数进行非线性变换，并结合 w3(x) 进行特征变换


class Gate(nn.Module):
    """
    用于 Mixture-of-Experts（MoE）模型的门控机制（Gating Mechanism）。

    该模块用于在多个专家（Expert）之间进行路由选择，决定每个输入数据应该被送到哪些专家进行计算。

    属性:
        dim (int): 输入特征的维度。
        topk (int): 每个输入激活的专家数（选择 top-k 个专家）。
        n_groups (int): 专家被划分的组数（用于路由分组）。
        topk_groups (int): 每个输入路由到的专家组数。
        score_func (str): 计算分数的函数（可选 "softmax" 或 "sigmoid"）。
        route_scale (float): 路由权重的缩放因子。
        weight (torch.nn.Parameter): 可训练参数，表示门控网络的权重矩阵。
        bias (Optional[torch.nn.Parameter]): 可选的偏置项，仅当输入维度为 7168 时存在。
    """
    def __init__(self, args: ModelArgs):
        """
        初始化 Gate 模块。

        参数:
            args (ModelArgs): 传入的模型参数对象，包含 MoE 相关的超参数。
        """
        super().__init__()
        self.dim = args.dim  # 输入特征的维度
        self.topk = args.n_activated_experts  # 选择的前 top-k 个专家
        self.n_groups = args.n_expert_groups  # 专家分组的数量
        self.topk_groups = args.n_limited_groups  # 选择的前 top-k 组
        self.score_func = args.score_func  # 计算分数的方式
        self.route_scale = args.route_scale  # 路由权重的缩放比例

        # 可训练权重参数（用于计算门控分数）
        self.weight = nn.Parameter(torch.empty(args.n_routed_experts, args.dim))

        # 只有当 dim = 7168 时，才会添加可训练的偏置项
        self.bias = nn.Parameter(torch.empty(args.n_routed_experts)) if self.dim == 7168 else None

    def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
        """
        计算门控权重，并确定选择哪些专家进行计算。

        参数:
            x (torch.Tensor): 输入张量，形状为 (batch_size, dim)。

        返回:
            Tuple[torch.Tensor, torch.Tensor]:
                - 选择的专家权重 (batch_size, topk)
                - 选择的专家索引 (batch_size, topk)
        """
        # 一行代码f格式打印x的形状
        print(f"Gate x shape: {x.shape}")
        scores = linear(x, self.weight)  # 计算输入与门控权重的线性变换

        # 根据 score_func 选择 softmax 或 sigmoid 进行归一化
        if self.score_func == "softmax":
            scores = scores.softmax(dim=-1, dtype=torch.float32)
        else:
            scores = scores.sigmoid()

        original_scores = scores  # 保存原始分数

        # 若存在偏置项，则加上偏置
        if self.bias is not None:
            scores = scores + self.bias

        # 若使用多个专家组，则进行分组处理
        if self.n_groups > 1:
            scores = scores.view(x.size(0), self.n_groups, -1)  # 重新 reshape 为 (batch_size, n_groups, 每组的专家数)

            # 计算每组的得分，若无偏置，则取最大值；若有偏置，则取 top-2 得分之和
            if self.bias is None:
                group_scores = scores.amax(dim=-1)
            else:
                group_scores = scores.topk(2, dim=-1)[0].sum(dim=-1)

            # 选择得分最高的 topk_groups 组，并生成掩码
            indices = group_scores.topk(self.topk_groups, dim=-1)[1]
            mask = torch.zeros_like(scores[..., 0]).scatter_(1, indices, True)
            scores = (scores * mask.unsqueeze(-1)).flatten(1)  # 仅保留选中的专家分数

        # 选择得分最高的 top-k 个专家
        indices = torch.topk(scores, self.topk, dim=-1)[1]
        #打印indices
        print("Gate indices shape:", indices.shape)
        # 计算最终的专家权重
        weights = original_scores.gather(1, indices)
        #打印weights
        # print("Gate weights :", weights)
        # 若使用 sigmoid，则需要归一化
        if self.score_func == "sigmoid":
            weights /= weights.sum(dim=-1, keepdim=True)

        weights *= self.route_scale  # 乘以路由缩放因子
        return weights.type_as(x), indices  # 返回计算后的权重和选择的专家索引


class Expert(nn.Module):
    """
    专家（Expert）层，用于 Mixture-of-Experts（MoE）模型。

    该模块实现了一个独立的专家网络，每个专家由三层线性变换层组成。

    属性:
        w1 (nn.Module): 线性层，从输入到隐藏层的变换。
        w2 (nn.Module): 线性层，从隐藏层到输出的变换。
        w3 (nn.Module): 额外的线性层，用于特征变换。
    """
    def __init__(self, dim: int, inter_dim: int):
        """
        初始化 Expert 层。

        参数:
            dim (int): 输入和输出的维度。
            inter_dim (int): 隐藏层的维度。
        """
        super().__init__()
        self.w1 = nn.Linear(dim, inter_dim)  # 输入到隐藏层的线性变换
        self.w2 = nn.Linear(inter_dim, dim)  # 隐藏层到输出层的线性变换
        self.w3 = nn.Linear(dim, inter_dim)  # 额外的线性变换层

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Expert 层的前向计算。

        参数:
            x (torch.Tensor): 输入张量，形状为 (batch_size, dim)。

        返回:
            torch.Tensor: 经过 Expert 计算后的输出张量，形状为 (batch_size, dim)。
        """
        return self.w2(F.silu(self.w1(x)) * self.w3(x))  # 先经过 w1 进行非线性变换，再与 w3 计算的结果相乘，最后通过 w2 输出


In [9]:
import torch
import torch.nn as nn
import torch.distributed as dist
from typing import Optional

class MoE(nn.Module):
    """
    MoE（Mixture-of-Experts，专家混合）模块，用于选择性地激活多个专家网络，提高计算效率。

    主要属性：
        dim (int): 输入特征的维度。
        n_routed_experts (int): 该模型中的总专家数量。
        n_local_experts (int): 在分布式环境中，每个设备处理的专家数量。
        n_activated_experts (int): 每个输入激活的专家数量。
        gate (nn.Module): 用于计算输入到各专家的分配权重的门控机制。
        experts (nn.ModuleList): 专家网络列表，每个专家都是一个神经网络模块。
        shared_experts (nn.Module): 共享专家网络，对所有输入均生效。
    """
    def __init__(self, args: ModelArgs):
        """
        初始化 MoE 模块。

        参数：
            args (ModelArgs): 包含 MoE 参数的模型配置。
        """
        super().__init__()
        self.dim = args.dim

        # 确保专家数量可以被世界大小整除（用于分布式训练）。
        assert args.n_routed_experts % world_size == 0, f"专家数量必须被 world_size 整除 (world_size={world_size})"

        self.n_routed_experts = args.n_routed_experts
        self.n_local_experts = args.n_routed_experts // world_size
        self.n_activated_experts = args.n_activated_experts

        # 计算当前设备负责的专家索引范围。
        self.experts_start_idx = rank * self.n_local_experts
        self.experts_end_idx = self.experts_start_idx + self.n_local_experts

        # 门控机制，用于决定输入数据分配给哪些专家。
        self.gate = Gate(args)

        # 仅在当前设备上初始化其负责的专家，其余设为 None。
        self.experts = nn.ModuleList([Expert(args.dim, args.moe_inter_dim) if self.experts_start_idx <= i < self.experts_end_idx else None
                                      for i in range(self.n_routed_experts)])

        # 共享专家，对所有输入均适用。
        self.shared_experts = MLP(args.dim, args.n_shared_experts * args.moe_inter_dim)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        MoE 模块的前向传播。

        参数：
            x (torch.Tensor): 输入张量。

        返回：
            torch.Tensor: 经过专家计算后的输出张量。
        """
        shape = x.size()
        #打印x的shape
        print(f"MoE x shape: {x.shape}")
        x = x.view(-1, self.dim) #把x的bs和seq_len展平了

        # 通过门控机制获取专家索引及其权重。
        weights, indices = self.gate(x)
        y = torch.zeros_like(x)

        # 统计各专家被选中的次数。
        counts = torch.bincount(indices.flatten(), minlength=self.n_routed_experts).tolist()

        # 遍历当前设备管理的专家，并处理其对应的输入。
        for i in range(self.experts_start_idx, self.experts_end_idx):
            if counts[i] == 0:
                continue  # 如果当前专家未被选中，则跳过。
            expert = self.experts[i]
            idx, top = torch.where(indices == i)
            print(f'moe-idx{idx}')
            y[idx] += expert(x[idx]) * weights[idx, top, None]

        # 共享专家计算。
        z = self.shared_experts(x)

        # 若为多设备环境，则进行全局同步。
        if world_size > 1:
            dist.all_reduce(y)
        #打印，y，z的shape
        print(f"MoE y shape: {y.shape}, MoE z shape: {z.shape}")
        return (y + z).view(shape)




import torch
import torch.nn as nn
import torch.distributed as dist
from typing import Optional

class Block(nn.Module):
    """
    Transformer 块，结合了注意力机制和前馈网络。

    属性:
        attn (nn.Module): 多头注意力（MLA, Multi-Head Attention）。
        ffn (nn.Module): 前馈神经网络（MLP 或 MoE）。
        attn_norm (nn.Module): 注意力层的归一化层。
        ffn_norm (nn.Module): 前馈网络的归一化层。
    """
    def __init__(self, layer_id: int, args: ModelArgs):
        """
        初始化 Transformer 块。

        参数:
            layer_id (int): 该层在 Transformer 中的索引。
            args (ModelArgs): 包含 Transformer 相关参数的配置对象。
        """
        super().__init__()
        self.attn = MLA(args)  # 多头注意力机制
        # 如果 layer_id 小于稠密层数量，则使用 MLP，否则使用 MoE 结构
        self.ffn = MLP(args.dim, args.inter_dim) if layer_id < args.n_dense_layers else MoE(args)
        self.attn_norm = RMSNorm(args.dim)  # 注意力归一化层
        self.ffn_norm = RMSNorm(args.dim)   # 前馈归一化层

    def forward(self, x: torch.Tensor, start_pos: int, freqs_cis: torch.Tensor, mask: Optional[torch.Tensor]) -> torch.Tensor:
        """
        Transformer 块的前向传播。

        参数:
            x (torch.Tensor): 输入张量。
            start_pos (int): 序列的起始位置。
            freqs_cis (torch.Tensor): 预计算的旋转嵌入复指数值。
            mask (Optional[torch.Tensor]): 掩码张量，用于排除特定位置的注意力。

        返回:
            torch.Tensor: 经过 Transformer 块计算后的输出张量。
        """
        x = x + self.attn(self.attn_norm(x), start_pos, freqs_cis, mask)  # 归一化后进行注意力计算并残差连接
        x = x + self.ffn(self.ffn_norm(x))  # 归一化后进入前馈网络并残差连接
        return x


class Transformer(nn.Module):
    """
    Transformer 模型，包括嵌入层、多层 Transformer 块、最终归一化层和输出层。

    属性:
        max_seq_len (int): 最大序列长度。
        embed (nn.Module): 词嵌入层。
        layers (torch.nn.ModuleList): Transformer 块的列表。
        norm (nn.Module): 所有 Transformer 层之后的归一化层。
        head (nn.Module): 输出投影层，将隐藏状态映射到词汇表大小。
        freqs_cis (torch.Tensor): 预计算的旋转嵌入复指数值。
    """
    def __init__(self, args: ModelArgs):
        """
        初始化 Transformer 模型。

        参数:
            args (ModelArgs): 包含 Transformer 相关参数的配置对象。
        """
        global world_size, rank
        world_size = dist.get_world_size() if dist.is_initialized() else 1  # 获取分布式训练的总进程数
        rank = dist.get_rank() if dist.is_initialized() else 0  # 获取当前进程的 rank 值
        Linear.dtype = torch.float8_e4m3fn if args.dtype == "fp8" else torch.bfloat16  # 设置默认数据类型
        super().__init__()
        self.max_seq_len = args.max_seq_len  # 最大序列长度
        self.embed = ParallelEmbedding(args.vocab_size, args.dim)  # 词嵌入层，支持并行计算
        self.layers = torch.nn.ModuleList()
        for layer_id in range(args.n_layers):
            self.layers.append(Block(layer_id, args))  # 添加多个 Transformer 块
        self.norm = RMSNorm(args.dim)  # 归一化层
        self.head = ColumnParallelLinear(args.dim, args.vocab_size, dtype=torch.get_default_dtype())  # 输出投影层
        self.register_buffer("freqs_cis", precompute_freqs_cis(args), persistent=False)  # 预计算旋转位置编码

    @torch.inference_mode()
    def forward(self, tokens: torch.Tensor, start_pos: int = 0):
        """
        Transformer 的前向传播。

        参数:
            tokens (torch.Tensor): 形状为 (batch_size, seq_len) 的输入 token ID。
            start_pos (int, 可选): 序列的起始位置，默认为 0。

        返回:
            torch.Tensor: 形状为 (batch_size, vocab_size) 的 logits。
        """
        seqlen = tokens.size(1)  # 获取输入序列长度
        h = self.embed(tokens)  # 通过词嵌入层获取 token 表示
        freqs_cis = self.freqs_cis[start_pos:start_pos+seqlen]  # 获取对应位置的旋转位置编码
        mask = None
        if seqlen > 1:
            mask = torch.full((seqlen, seqlen), float("-inf"), device=tokens.device).triu_(1)  # 构造上三角掩码（防止未来信息泄露）
        for layer in self.layers:
            h = layer(h, start_pos, freqs_cis, mask)  # 依次通过每个 Transformer 块
        h = self.norm(h)[:, -1]  # 归一化后取最后一个时间步的输出
        logits = self.head(h)  # 通过输出投影层计算 logits
        if world_size > 1:
            all_logits = [torch.empty_like(logits) for _ in range(world_size)]
            dist.all_gather(all_logits, logits)  # 在所有进程间收集 logits
            logits = torch.cat(all_logits, dim=-1)  # 拼接所有进程的 logits
        return logits


if __name__ == "__main__":
    torch.set_default_dtype(torch.bfloat16)  # 设置默认数据类型
    torch.set_default_device("cuda")  # 设置默认计算设备为 GPU
    torch.manual_seed(0)  # 设置随机种子，保证可复现性
    args = ModelArgs()  # 初始化模型参数
    x = torch.randint(0, args.vocab_size, (2, 128))  # 随机生成 token ID 作为输入
    model = Transformer(args)  # 初始化 Transformer 模型
    print(model(x).size())  # 运行模型并打印输出张量的形状，为啥只有一个词，因为只取了最后一个的输出，可以理解这个样例像咱们的bert基座一样，让大家去练习一个分类问题



q shape: torch.Size([2, 128, 16, 192])
q_nope shape: torch.Size([2, 128, 16, 128]), q_pe shape: torch.Size([2, 128, 16, 64])
kv shape: torch.Size([2, 128, 576])
kv shape: torch.Size([2, 128, 512]), k_pe shape: torch.Size([2, 128, 64])
q shape: torch.Size([2, 128, 16, 192])
q_nope shape: torch.Size([2, 128, 16, 128]), q_pe shape: torch.Size([2, 128, 16, 64])
kv shape: torch.Size([2, 128, 576])
kv shape: torch.Size([2, 128, 512]), k_pe shape: torch.Size([2, 128, 64])
MoE x shape: torch.Size([2, 128, 2048])
Gate x shape: torch.Size([256, 2048])
Gate indices shape: torch.Size([256, 6])
Gate weights : tensor([[0.0625, 0.0625, 0.0625, 0.0625, 0.0625, 0.0625],
        [0.0625, 0.0625, 0.0625, 0.0625, 0.0625, 0.0625],
        [0.0625, 0.0625, 0.0625, 0.0625, 0.0625, 0.0625],
        ...,
        [0.0625, 0.0625, 0.0625, 0.0625, 0.0625, 0.0625],
        [0.0625, 0.0625, 0.0625, 0.0625, 0.0625, 0.0625],
        [0.0625, 0.0625, 0.0625, 0.0625, 0.0625, 0.0625]], device='cuda:0',
       dtype=torc