# 导入必要的库
本笔记本演示了如何使用Python进行模型并行组的初始化。首先，我们导入类型注解和参数解析所需的模块。

In [None]:
from typing import List, Optional, Callable
import argparse

# 定义工具函数
以下函数用于并行组计算所需的数学操作，包括`prefix_product`、`inner_product`和`decompose`。

In [None]:
from typing import List

# 工具函数：用于并行组的数学计算

def prefix_product(a: List[int], init=1) -> List[int]:
    """
    计算前缀积。例如 a=[2,3,4]，返回[1,2,6,24]。
    用于计算每个维度的步长（stride），方便后续索引展开。
    参数：
        a: 并行维度列表，如[tp, dp, pp]
        init: 初始值，默认为1
    返回：
        前缀积列表
    """
    r = [init]
    for v in a:
        init = init * v
        r.append(init)
    return r

def inner_product(a: List[int], b: List[int]) -> int:
    """
    计算两个列表的内积。用于将多维索引转换为一维rank编号。
    参数：
        a: 索引列表
        b: 步长列表
    返回：
        内积结果
    """
    return sum([x * y for x, y in zip(a, b)])

def decompose(index, shape, stride=None):
    """
    将一维索引分解为多维索引。
    例如：index=5, shape=[2,3], stride=[1,2]，返回[1,2]。
    参数：
        index: 一维索引
        shape: 每个维度的大小
        stride: 每个维度的步长（可选，默认自动计算）
    返回：
        多维索引列表
    """
    if stride is None:
        stride = prefix_product(shape)
    idx = [(index // d) % s for s, d in zip(shape, stride)]
    # 校验分解结果是否正确
    assert (
        sum([x * y for x, y in zip(idx, stride[:-1])]) == index
    ), f"idx {index} with shape {shape} mismatch the return idx {idx}"
    return idx

# 定义RankGenerator类
`RankGenerator`类用于生成不同并行模式下的rank分组，包括张量、流水线、数据、专家和上下文并行。

In [None]:
# 生成并行组的核心函数和类

def generate_masked_orthogonal_rank_groups(
    world_size: int, parallel_size: List[int], mask: List[bool]
) -> List[List[int]]:
    """
    根据并行维度和掩码生成正交并行组。
    参数：
        world_size: 总rank数量（如总GPU数）
        parallel_size: 各并行类型的大小列表，如[tp, dp, pp]
        mask: 掩码，True表示该维度参与分组，False表示不参与
    返回：
        并行组列表，每组为rank编号列表
    """
    masked_shape = [s for s, m in zip(parallel_size, mask) if m]  # 参与分组的维度
    unmasked_shape = [s for s, m in zip(parallel_size, mask) if not m]  # 未参与分组的维度
    global_stride = prefix_product(parallel_size)
    masked_stride = [d for d, m in zip(global_stride, mask) if m]
    unmasked_stride = [d for d, m in zip(global_stride, mask) if not m]
    group_size = prefix_product(masked_shape)[-1]  # 每组包含的rank数量
    num_of_group = world_size // group_size  # 总组数
    ranks = []
    for group_index in range(num_of_group):
        decomposed_group_idx = decompose(group_index, unmasked_shape)  # 未参与分组的多维索引
        rank = []
        for rank_in_group in range(group_size):
            decomposed_rank_idx = decompose(rank_in_group, masked_shape)  # 参与分组的多维索引
            rank.append(
                inner_product(decomposed_rank_idx, masked_stride)
                + inner_product(decomposed_group_idx, unmasked_stride)
            )
        ranks.append(rank)
    return ranks

class RankGenerator(object):
    """
    用于生成不同并行模式下的rank分组。
    支持张量、数据、流水线、专家、上下文等多种并行类型。
    """
    def __init__(self, tp: int, ep: int, dp: int, pp: int, cp: int, order: str, rank_offset: int = 0) -> None:
        """
        初始化RankGenerator。
        参数：
            tp: 张量并行大小
            ep: 专家并行大小
            dp: 数据并行大小
            pp: 流水线并行大小
            cp: 上下文并行大小
            order: 并行类型顺序（如'tp-dp-pp'）
            rank_offset: rank偏移量（用于多模块拼接）
        """
        assert (
            ep == 1 or cp == 1
        ), "EP和CP不能同时大于1。CP只在默认RankGenerator中，EP只在专家RankGenerator中。"
        self.tp = tp
        self.ep = ep
        self.dp = dp
        self.pp = pp
        self.cp = cp
        self.rank_offset = rank_offset
        self.world_size = tp * dp * pp * cp * ep  # 总rank数
        self.name_to_size = {
            "tp": self.tp,
            "pp": self.pp,
            "dp": self.dp,
            "ep": self.ep,
            "cp": self.cp,
        }
        self.order = order
        order = order.lower()
        # 检查顺序中是否包含所有并行类型
        for name in self.name_to_size.keys():
            if name not in order and self.name_to_size[name] != 1:
                raise RuntimeError(
                    f"并行类型({name})的大小为({self.name_to_size[name]})，但未在顺序({self.order})中指定。"
                )
            elif name not in order:
                order = order + '-' + name
        self.order = order
        self.ordered_size = []
        for token in order.split('-'):
            self.ordered_size.append(self.name_to_size[token])
    def get_mask(self, order: str, token: str):
        """
        根据顺序和目标类型生成掩码。
        参数：
            order: 并行类型顺序（如'tp-dp-pp'）
            token: 需要分组的类型（如'tp-dp'）
        返回：
            掩码列表
        """
        ordered_token = order.split('-')
        token_list = token.split('-')
        mask = [False] * len(ordered_token)
        for t in token_list:
            mask[ordered_token.index(t)] = True
        return mask
    def get_ranks(self, token):
        """
        获取指定类型的rank分组。
        参数：
            token: 需要分组的类型（如'tp-dp'）
        返回：
            rank分组列表
        """
        mask = self.get_mask(self.order, token)
        ranks = generate_masked_orthogonal_rank_groups(self.world_size, self.ordered_size, mask)
        if self.rank_offset > 0:
            for rank_group in ranks:
                for i in range(len(rank_group)):
                    rank_group[i] += self.rank_offset
        return ranks

# 定义initialize_model_parallel函数
`initialize_model_parallel`函数根据输入参数设置模型、张量、流水线、数据、专家和上下文并行组。

In [None]:
def initialize_model_parallel(
    tensor_model_parallel_size: int = 1,
    pipeline_model_parallel_size: int = 1,
    context_parallel_size: int = 1,
    expert_model_parallel_size: int = 1,
    expert_tensor_parallel_size: Optional[int] = None,
    order: str = "tp-cp-ep-dp-pp",
    world_size: int = 1,
) -> None:
    decoder_model_size = (
        tensor_model_parallel_size * pipeline_model_parallel_size * context_parallel_size
    )
    total_model_size = decoder_model_size
    if world_size % total_model_size != 0:
        raise RuntimeError(f"world_size ({world_size}) is not divisible by {total_model_size}")
    data_parallel_size: int = world_size // total_model_size
    decoder_world_size = decoder_model_size * data_parallel_size
    encoder_rank_generator = None
    decoder_rank_generator = RankGenerator(
        tp=tensor_model_parallel_size,
        ep=1,
        dp=data_parallel_size,
        pp=pipeline_model_parallel_size,
        cp=context_parallel_size,
        order=order,
        rank_offset=0,
    )
    if expert_tensor_parallel_size is None:
        expert_tensor_parallel_size = tensor_model_parallel_size
    expert_tensor_model_pipeline_parallel_size = (
        expert_tensor_parallel_size * expert_model_parallel_size * pipeline_model_parallel_size
    )
    expert_data_parallel_size = decoder_world_size // expert_tensor_model_pipeline_parallel_size
    if decoder_world_size % expert_tensor_model_pipeline_parallel_size != 0:
        raise RuntimeError(
            f"decoder world_size ({decoder_world_size}) is not divisible by expert_tensor_model_pipeline_parallel size ({expert_tensor_model_pipeline_parallel_size})"
        )
    expert_decoder_rank_generator = RankGenerator(
        tp=expert_tensor_parallel_size,
        ep=expert_model_parallel_size,
        dp=expert_data_parallel_size,
        pp=pipeline_model_parallel_size,
        cp=1,
        order=order,
        rank_offset=0,
    )
    def generator_wrapper(group_type, is_expert=False, **kwargs):
        if is_expert:
            d_ranks = expert_decoder_rank_generator.get_ranks(group_type, **kwargs)
        else:
            d_ranks = decoder_rank_generator.get_ranks(group_type, **kwargs)
        if encoder_rank_generator is None:
            for x in d_ranks:
                yield x
            return
    # 给定并行配置，打印所有并行组
    # DP groups
    _DATA_PARALLEL_GROUP = [ranks for ranks in generator_wrapper('dp')]
    print(f"Data Parallel groups (DP): \n{_DATA_PARALLEL_GROUP}")
    _DATA_PARALLEL_GROUP_WITH_CP = [ranks_with_cp for ranks_with_cp in generator_wrapper('dp-cp')]
    print(f"Data Parallel groups with cp (DP_with_cp): \n{_DATA_PARALLEL_GROUP_WITH_CP}")
    # CP groups
    _CONTEXT_PARALLEL_GROUP = [ranks for ranks in generator_wrapper('cp')]
    print(f"Context Parallel groups (CP): \n{_CONTEXT_PARALLEL_GROUP}")
    # MP groups
    _MODEL_PARALLEL_GROUP = [ranks for ranks in generator_wrapper('tp-pp')]
    print(f"Model Parallel groups (MP): \n{_MODEL_PARALLEL_GROUP}")
    # TP groups
    _TENSOR_MODEL_PARALLEL_GROUP = [ranks for ranks in generator_wrapper('tp')]
    print(f"Tensor Model Parallel groups (TP): \n{_TENSOR_MODEL_PARALLEL_GROUP}")
    # PP groups
    _PIPELINE_MODEL_PARALLEL_GROUP = [ranks for ranks in generator_wrapper('pp')]
    print(f"Pipeline Model Parallel groups (PP): \n{_PIPELINE_MODEL_PARALLEL_GROUP}")
    # EP groups
    _EXPERT_MODEL_PARALLEL_GROUP = [ranks for ranks in generator_wrapper('ep', is_expert=True)]
    print(f"Expert Model Parallel groups (EP): \n{_EXPERT_MODEL_PARALLEL_GROUP}")
    # ETP groups
    _EXPERT_TENSOR_PARALLEL_GROUP = [ranks for ranks in generator_wrapper('tp', is_expert=True)]
    print(f"Expert Tensor Parallel groups (ETP): \n{_EXPERT_TENSOR_PARALLEL_GROUP}")
    # ETP+MP groups
    _EXPERT_TENSOR_AND_MODEL_PARALLEL_GROUP = [ranks for ranks in generator_wrapper('tp-ep', is_expert=True)]
    print(f"Expert Tensor and Model Parallel groups: \n{_EXPERT_TENSOR_AND_MODEL_PARALLEL_GROUP}")
    # ETP+MP+PP groups
    _EXPERT_TENSOR_MODEL_PIPELINE_PARALLEL_GROUP = [ranks for ranks in generator_wrapper('tp-ep-pp', is_expert=True)]
    print(f"Expert Tensor Model Pipeline Parallel groups: \n{_EXPERT_TENSOR_MODEL_PIPELINE_PARALLEL_GROUP}")
    # EDP groups
    _EXPERT_DATA_PARALLEL_GROUP = [ranks for ranks in generator_wrapper('dp', is_expert=True)]
    print(f"Expert Data Parallel groups (EDP): \n{_EXPERT_DATA_PARALLEL_GROUP}")

# 参数解析与初始化
在笔记本中，我们手动为参数赋值，而不是使用命令行解析。这样可以方便地交互式实验不同的并行组配置。

In [None]:
# Example parameters for model parallel group initialization
world_size = 8
tensor_model_parallel_size = 2
pipeline_model_parallel_size = 2
context_parallel_size = 1
expert_model_parallel_size = 1
expert_tensor_parallel_size = None
order = "tp-pp-dp"


# 模型并行组初始化模拟
我们现在使用示例参数调用`initialize_model_parallel`函数，并展示生成的各类并行组。

In [None]:
print("Begin simulate model parallel group initialization...")
print(f"world size: {world_size}, tp: {tensor_model_parallel_size}, pp: {pipeline_model_parallel_size}, ep: {expert_model_parallel_size}, etp: {expert_tensor_parallel_size}, cp: {context_parallel_size}")
initialize_model_parallel(
    tensor_model_parallel_size=tensor_model_parallel_size,
    pipeline_model_parallel_size=pipeline_model_parallel_size,
    context_parallel_size=context_parallel_size,
    expert_model_parallel_size=expert_model_parallel_size,
    expert_tensor_parallel_size=expert_tensor_parallel_size,
    world_size=world_size,
    order=order,
)