# 四、分布式数据并行训练

### 一 分布式数据并行训练（Distributed Data Parallel Training）

在作业的这一部分中，我们将探索使用多张 GPU 训练语言模型的方法，重点关注**数据并行（data parallelism）**。
我们将首先对 PyTorch 中的分布式通信进行一个入门介绍。接着，我们会学习一种朴素（naive）的分布式数据并行训练实现方式，并在此基础上实现和评测多种用于提升通信效率的改进方案。

---

### 1.1 PyTorch 中的单机分布式通信（Single-Node Distributed Communication in PyTorch）

我们先来看一个 PyTorch 中的简单分布式应用，其目标是：
**生成四个随机整数张量，并计算它们的和。**

在下面的分布式示例中，我们会启动 **4 个 worker 进程**，每个进程都会生成一个随机整数张量。
为了对这些进程中的张量求和，我们将调用 **all-reduce** 这一集合通信（collective communication）操作。

`all-reduce` 的作用是：

> 将所有进程中的张量进行规约（这里是求和），并用规约后的结果 **原地替换** 每个进程中的原始张量。

也就是说，在 all-reduce 完成后，每个进程中的张量都会变成相同的“全局求和结果”。

下面我们来看代码。

---

在运行下面的脚本后，我们会得到如下输出。
可以看到：在 all-reduce 之前，每个 worker 进程持有的张量都不同；而在 all-reduce 操作之后，由于所有进程中的张量被求和并同步，每个进程中的 `data` 都被**原地修改**为相同的结果张量。

---

如果你多次运行这个脚本，你会发现打印输出的顺序并不是确定的。这是因为该应用运行在分布式环境中，我们无法控制各个进程执行打印语句的精确顺序。
**唯一可以保证的是**：在 all-reduce 操作完成之后，所有进程都会持有**逐位（bitwise）完全一致**的结果张量。

---




In [1]:
%%writefile distributed_demo.py
# =========================
# 标准库 & PyTorch 相关导入
# =========================

import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp


# =========================
# 分布式环境初始化函数
# =========================
def setup(rank, world_size):
    """
    初始化分布式通信环境（Process Group）

    参数：
    - rank: 当前进程的编号（0 ~ world_size-1）
    - world_size: 总进程数
    """

    # 指定“主进程”的地址
    # 所有进程都会通过这个地址进行 rendezvous（集合）
    # 单机多进程时使用 localhost
    os.environ["MASTER_ADDR"] = "localhost"

    # 指定主进程监听的端口号
    # 所有进程必须保持一致
    os.environ["MASTER_PORT"] = "29500"

    # 初始化进程组（非常关键的一步）
    # backend="gloo"：使用 gloo 通信后端（CPU 通用，支持多平台）
    # rank：当前进程在所有进程中的唯一编号
    # world_size：参与通信的总进程数
    dist.init_process_group(
        backend="gloo",
        rank=rank,
        world_size=world_size
    )


# =========================
# 每个进程实际执行的函数
# =========================
def distributed_demo(rank, world_size):
    """
    每个进程都会执行这个函数一次
    """

    # 初始化分布式通信环境
    setup(rank, world_size)

    # 每个进程各自生成一个本地张量
    # torch.randint 是“本地操作”，不同 rank 得到的值不同
    # 这里生成一个 shape 为 (3,) 的一维张量
    data = torch.randint(0, 10, (3,))

    # 打印 all-reduce 之前的数据
    # 注意：由于是多进程并行执行，打印顺序不确定
    print(f"rank {rank} data (before all-reduce): {data}")

    # 核心操作：all-reduce
    # - 对所有 rank 上的 data 做规约（默认是 SUM）
    # - 结果会“原地”写回到每个 rank 的 data 中
    # - async_op=False 表示同步执行（阻塞，直到完成）
    dist.all_reduce(data, async_op=False)

    # 打印 all-reduce 之后的数据
    # 此时每个 rank 的 data 都是完全相同的
    print(f"rank {rank} data (after all-reduce): {data}")


# =========================
# 程序主入口
# =========================
if __name__ == "__main__":

    # world_size 表示总进程数
    # 在 DDP 中通常等于 GPU 数量
    world_size = 4

    # 使用 torch.multiprocessing.spawn 启动多进程
    mp.spawn(
        fn=distributed_demo,   # 每个进程要执行的函数
        args=(world_size,),    # 传给函数的额外参数（rank 会自动作为第一个参数）
        nprocs=world_size,     # 启动的进程数量
        join=True              # 主进程是否等待所有子进程结束
    )


Overwriting distributed_demo.py


由于分布式环境不支持ipynb的notebook格式，只能先写入.py文件再在notebook中运行

In [2]:
!python distributed_demo.py


[Gloo] Rank [Gloo] Rank [Gloo] Rank [Gloo] Rank 021 is connected to 3 is connected to  is connected to 3 is connected to 33 peer ranks. 3 peer ranks.  peer ranks. Expected number of connected peer ranks is :  peer ranks. Expected number of connected peer ranks is : Expected number of connected peer ranks is : 3Expected number of connected peer ranks is : 33

3

rank 2 data (before all-reduce): tensor([1, 8, 8])rank 3 data (before all-reduce): tensor([2, 6, 8])rank 0 data (before all-reduce): tensor([0, 3, 4])


rank 1 data (before all-reduce): tensor([1, 9, 3])
rank 0 data (after all-reduce): tensor([ 4, 26, 23])
rank 3 data (after all-reduce): tensor([ 4, 26, 23])
rank 2 data (after all-reduce): tensor([ 4, 26, 23])rank 1 data (after all-reduce): tensor([ 4, 26, 23])



## 这段代码在“分布式视角”下做了什么？

> **它在一台机器上启动 4 个独立进程，每个进程生成一个随机张量，然后通过 `all_reduce` 把所有进程的张量求和，并让每个进程都拿到相同的结果。**

这正是 **数据并行训练中“梯度同步”**的最小原型。

---

## 二、整体执行流程（非常重要）

当运行：

```bash
python distributed_hello_world.py
```

实际发生的是： **主进程启动**、 `mp.spawn` 启动 **4 个子进程**。

每个子进程 被分配一个唯一的 `rank`（0～3），加入同一个 `process group`， 各自生成数据， 通过 `all_reduce` 进行通信。

注意：**每个 rank 是一个独立的 Python 进程，不是线程**

---

## 三、逐段精讲代码

---

### 1 导入模块

```python
import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
```

作用拆解：

| 模块                      | 作用             |
| ----------------------- | -------------- |
| `os`                    | 设置分布式通信所需的环境变量 |
| `torch`                 | 张量与随机数         |
| `torch.distributed`     | 分布式通信核心 API    |
| `torch.multiprocessing` | 启动多个进程         |

---

### 2 setup：初始化分布式通信

```python
def setup(rank, world_size):
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "29500"
    dist.init_process_group("gloo", rank=rank, world_size=world_size)
```

这是 **分布式程序的“入群”步骤**。

#### （1）`MASTER_ADDR` & `MASTER_PORT`

```text
谁是“总协调者”（Rendezvous Server）
```

* 所有进程都会连到这个地址
* 单机多进程 → 用 `localhost`
* 多机 → 用主节点 IP

---

#### （2）`init_process_group`

```python
dist.init_process_group(
    backend="gloo",
    rank=rank,
    world_size=world_size
)
```

这是最关键的一行。

参数解释：

| 参数               | 含义             |
| ---------------- | -------------- |
| `backend="gloo"` | 通信后端（CPU / 通用） |
| `rank`           | 当前进程的唯一编号      |
| `world_size`     | 进程总数           |

 **核心概念**：

* `rank`：我是谁
* `world_size`：我们一共有多少人

在 DDP 里：

* **1 个 GPU ≈ 1 个 rank**

---

### 3  distributed_demo：每个进程执行的逻辑

```python
def distributed_demo(rank, world_size):
    setup(rank, world_size)
```

> 每个子进程都会执行这个函数一次

---

#### 3.1 生成本地数据

```python
data = torch.randint(0, 10, (3,))
```

* 每个 rank 生成 **不同的随机张量**
* 例如：

  * rank 0 → `[4, 4, 7]`
  * rank 1 → `[9, 5, 3]`
  * rank 2 → `[6, 0, 7]`
  * rank 3 → `[3, 7, 8]`

---

#### 3.2 all-reduce 前打印

```python
print(f"rank {rank} data (before all-reduce): {data}")
```

 打印顺序**不保证**，因为 4 个进程是并行的。

---

#### 3.3 核心：`all_reduce`

```python
dist.all_reduce(data, async_op=False)
```

这是整段代码的**灵魂**。

### all-reduce 做了什么？

对所有 rank 上的 `data`：

1. **收集**
2. **求和（默认是 SUM）**
3. **把结果写回每个 rank 的 `data`（原地）**

数学上等价于：

```python
data = data_rank0 + data_rank1 + data_rank2 + data_rank3
```

关键点：

* **in-place 操作**
* 所有 rank 最终结果 **完全一致**
* 默认是 `SUM`（梯度同步就是用这个）

---

#### 3.4 all-reduce 后打印

```python
print(f"rank {rank} data (after all-reduce): {data}")
```

你会看到所有 rank 都变成：

```text
tensor([22, 16, 25])
```

---

### 4 主入口：spawn 多进程

```python
if __name__ == "__main__":
    world_size = 4
```

* 定义进程数
* 在 DDP 中通常 = GPU 数量

---

```python
mp.spawn(
    fn=distributed_demo,
    args=(world_size,),
    nprocs=world_size,
    join=True
)
```

解释：

| 参数          | 含义           |
| ----------- | ------------ |
| `fn`        | 每个进程执行的函数    |
| `args`      | 传给函数的参数      |
| `nprocs`    | 启动多少个进程      |
| `join=True` | 主进程等待所有子进程结束 |

`spawn` 会自动给每个进程分配 `rank = 0, 1, 2, 3`，把 `rank` 作为 **第一个参数** 传入 `fn`。

---

## 四、这段代码 ↔ 真正的 DDP 训练

| 本例                 | DDP 中            |
| ------------------ | ---------------- |
| `data`             | 梯度（`param.grad`） |
| `torch.randint`    | 前向 + 反向计算        |
| `all_reduce(data)` | 梯度同步             |
| 单次操作               | 每个 iteration 都做  |

本质上：

> **DDP = 前向各算各的，反向之后用 all-reduce 同步梯度**




## 2.1.1 分布式应用基准测试的最佳实践

（Best Practices for Benchmarking Distributed Applications）

在本作业的这一部分中，你将对**分布式应用进行基准测试**，以更好地理解**通信带来的开销**。以下是一些推荐的最佳实践：

* **尽可能在同一台机器上运行基准测试**，以便进行可控、公平的对比。

* **在正式计时之前进行多次 warm-up（预热）**。
  这一点对 **NCCL 通信调用**尤为重要。通常 **5 次预热迭代** 就足够了。

* **在 GPU 上进行基准测试时，务必调用 `torch.cuda.synchronize()`**，以等待 CUDA 操作真正完成。
  注意：即使在通信操作中设置了 `async_op=False`，这一步仍然是必须的，因为该调用只保证操作被**加入 GPU 队列**，而不是通信实际完成。³

* **不同 rank 的测量时间可能略有差异**，因此通常会在所有 rank 之间**聚合测量结果**以提高估计的稳定性。
  可以使用 **all-gather 集合通信操作**（特别是 `dist.all_gather_object`）来收集所有 rank 的结果。

* **一般建议先在 CPU 上使用 Gloo 后端进行本地调试**，然后在实际问题中根据需要使用 GPU + NCCL 进行基准测试。
  在两种后端之间切换通常只需要修改：

  * `init_process_group` 中的 backend 参数
  * 张量的设备（CPU / GPU）转换

 


# 作业一

## 问题（distributed_communication_single_node）：5 分

请编写一个脚本，用于在**单节点多进程（single-node multi-process）**设置下，**基准测试 all-reduce 操作的运行时间**。

你可以参考前面给出的示例代码作为起点，并尝试改变以下配置参数：

### 实验设置要求

* **后端 + 设备类型**

  * Gloo + CPU
  * NCCL + GPU

* **all-reduce 数据规模**

  * 数据类型：`float32`
  * 张量大小：

    * 1MB
    * 10MB
    * 100MB
    * 1GB

* **进程数量**

  * 2 个进程
  * 4 个进程
  * 6 个进程

### 资源限制

* 最多可使用 **6 张 GPU**
* 每一次基准测试运行时间 **不得超过 5 分钟**

---

## 交付内容（Deliverable）

* 提供 **图表（plot）和/或表格（table）**，用于比较不同实验设置下的性能表现
* 并附上 **2–3 句话的分析说明**，总结你的实验结果，以及你对各因素（后端、数据规模、进程数等）如何相互影响的理解


In [3]:

%%writefile distributed_communication_single_node_demo.py
import os
import time
import torch
import torch.distributed as dist
import argparse
import torch
from torch.multiprocessing import spawn


# ============================================================
# 1. 分布式环境初始化与清理
# ============================================================

def init_distributed_environment(
    master_addr: str,
    master_port: int,
    global_rank: int,
    world_size: int,
    backend: str
):
    """
    初始化 PyTorch 分布式通信环境（进程组）

    参数说明：
    - master_addr : rank 0 所在节点的 IP 地址
    - master_port : 用于进程间通信的端口
    - global_rank : 当前进程在所有进程中的唯一编号
    - world_size  : 总进程数
    - backend     : 通信后端（gloo / nccl / mpi）
    """

    # --------------------------------------------------------
    # 1.1 设置 rendezvous（所有进程汇合的地址）
    # --------------------------------------------------------
    # 所有进程必须通过 MASTER_ADDR:MASTER_PORT 建立初始连接
    os.environ["MASTER_ADDR"] = master_addr
    os.environ["MASTER_PORT"] = str(master_port)

    # --------------------------------------------------------
    # 1.2 初始化进程组（分布式的“总开关”）
    # --------------------------------------------------------
    # 这是使用 torch.distributed 的前置条件
    dist.init_process_group(
        backend=backend,
        rank=global_rank,
        world_size=world_size
    )


def destroy_distributed_environment():
    """
    清理分布式环境并释放资源
    """

    # 销毁进程组，防止资源泄漏或死锁
    dist.destroy_process_group()

    # GPU 场景下，清空 PyTorch 的 CUDA cache（非强制，但推荐）
    if torch.cuda.is_available():
        torch.cuda.empty_cache()


# ============================================================
# 2. All-Reduce 通信性能 Benchmark
# ============================================================

def run_all_reduce_benchmark(
    global_rank: int,
    world_size: int,
    tensor_size_mb: int,
    backend: str,
    device: str,
    master_addr: str,
    master_port: int
):
    """
    对 dist.all_reduce 进行性能测试

    测试指标：
    - 单次 all-reduce 平均耗时
    - 理论通信带宽（GB/s）
    """

    # --------------------------------------------------------
    # 2.1 初始化分布式环境
    # --------------------------------------------------------
    init_distributed_environment(
        master_addr=master_addr,
        master_port=master_port,
        global_rank=global_rank,
        world_size=world_size,
        backend=backend
    )

    # --------------------------------------------------------
    # 2.2 绑定 GPU（仅在 CUDA 场景下）
    # --------------------------------------------------------
    if device == "cuda":
        # 通常约定：rank i → GPU i
        torch.cuda.set_device(global_rank)
        torch.cuda.empty_cache()

    # --------------------------------------------------------
    # 2.3 构造通信测试用的 Tensor
    # --------------------------------------------------------
    # 将 MB 转换为字节
    tensor_num_bytes = tensor_size_mb * 1024 * 1024

    # float32 占 4 字节
    bytes_per_element = 4
    num_elements = tensor_num_bytes // bytes_per_element

    # 随机生成测试数据（数值本身不重要）
    communication_tensor = torch.randn(
        num_elements,
        device=device,
        dtype=torch.float32
    )

    # --------------------------------------------------------
    # 2.4 Warm-up（非常重要）
    # --------------------------------------------------------
    # 原因：
    # - NCCL 通信器初始化
    # - CUDA kernel lazy initialization
    # - GPU 频率爬升
    warmup_iterations = 5
    for _ in range(warmup_iterations):
        dist.all_reduce(
            communication_tensor,
            op=dist.ReduceOp.SUM
        )

        # CUDA 是异步执行，必须显式同步
        if device == "cuda":
            torch.cuda.synchronize()

    # 确保所有进程在同一时刻开始正式测试
    dist.barrier()

    # --------------------------------------------------------
    # 2.5 正式 benchmark（计时）
    # --------------------------------------------------------
    benchmark_iterations = 20

    start_time = time.time()

    for _ in range(benchmark_iterations):
        dist.all_reduce(
            communication_tensor,
            op=dist.ReduceOp.SUM
        )

    # GPU 场景下等待所有 kernel 完成
    if device == "cuda":
        torch.cuda.synchronize()

    end_time = time.time()

    # --------------------------------------------------------
    # 2.6 性能指标计算
    # --------------------------------------------------------
    total_elapsed_time = end_time - start_time
    avg_latency_seconds = total_elapsed_time / benchmark_iterations

    # 带宽 = 数据量 / 时间（单位：GB/s）
    bandwidth_gbps = (tensor_num_bytes / avg_latency_seconds) / 1e9

    # --------------------------------------------------------
    # 2.7 打印结果（仅 rank 0）
    # --------------------------------------------------------
    if global_rank == 0:
        print(
            f"[All-Reduce Benchmark]\n"
            f"  Backend        : {backend}\n"
            f"  Device         : {device}\n"
            f"  World Size     : {world_size}\n"
            f"  Tensor Size    : {tensor_size_mb} MB\n"
            f"  Avg Latency    : {avg_latency_seconds * 1000:.4f} ms\n"
            f"  Bandwidth      : {bandwidth_gbps:.4f} GB/s\n"
        )

    # --------------------------------------------------------
    # 2.8 汇总所有 rank 的结果（教学用）
    # --------------------------------------------------------
    local_result = {
        "rank": global_rank,
        "world_size": world_size,
        "backend": backend,
        "device": device,
        "tensor_size_mb": tensor_size_mb,
        "avg_latency_ms": avg_latency_seconds * 1000,
        "bandwidth_gbps": bandwidth_gbps,
    }

    gathered_results = [None for _ in range(world_size)]
    dist.all_gather_object(gathered_results, local_result)

    # --------------------------------------------------------
    # 2.9 清理环境
    # --------------------------------------------------------
    destroy_distributed_environment()

    # 只让主进程返回结果
    if global_rank == 0:
        return gathered_results


def main():

    for size in [1,10,100,1000]:
        for Backend in ["gloo", "nccl"]:
            for world_size in [1]:# 有多少个GPU就增加测试用例，当然一个也能跑
                parser = argparse.ArgumentParser("All-Reduce Benchmark")

                parser.add_argument("--world_size", type=int, default=world_size,
                                    help="进程总数（通常等于 GPU 数）")
                parser.add_argument("--tensor_size_mb", type=int, default=size,
                                    help="通信 tensor 大小（MB）")
                parser.add_argument("--backend", type=str, default=Backend,
                                    choices=["gloo", "nccl"],
                                    help="分布式通信后端")
                parser.add_argument("--device", type=str, default="cuda",
                                    choices=["cpu", "cuda"],
                                    help="运行设备")
                parser.add_argument("--master_addr", type=str, default="127.0.0.1")
                parser.add_argument("--master_port", type=int, default=29500)

                args = parser.parse_args()

                # GPU 数量检查
                if args.device == "cuda":
                    assert torch.cuda.is_available(), "CUDA 不可用"
                    assert args.world_size <= torch.cuda.device_count(), (
                        "world_size 不能超过 GPU 数量"
                    )

                # 使用 spawn 启动多进程
                spawn(
                    fn=run_all_reduce_benchmark,
                    args=(
                        args.world_size,
                        args.tensor_size_mb,
                        args.backend,
                        args.device,
                        args.master_addr,
                        args.master_port
                    ),
                    nprocs=args.world_size,
                    join=True
                )


if __name__ == "__main__":
    main()



Overwriting distributed_communication_single_node_demo.py


In [4]:
!python distributed_communication_single_node_demo.py

[Gloo] Rank 0 is connected to 0 peer ranks. Expected number of connected peer ranks is : 0
[All-Reduce Benchmark]
  Backend        : gloo
  Device         : cuda
  World Size     : 1
  Tensor Size    : 1 MB
  Avg Latency    : 0.2600 ms
  Bandwidth      : 4.0336 GB/s

[All-Reduce Benchmark]
  Backend        : nccl
  Device         : cuda
  World Size     : 1
  Tensor Size    : 1 MB
  Avg Latency    : 0.0162 ms
  Bandwidth      : 64.5348 GB/s

[Gloo] Rank 0 is connected to 0 peer ranks. Expected number of connected peer ranks is : 0
[All-Reduce Benchmark]
  Backend        : gloo
  Device         : cuda
  World Size     : 1
  Tensor Size    : 10 MB
  Avg Latency    : 1.9385 ms
  Bandwidth      : 5.4092 GB/s

[All-Reduce Benchmark]
  Backend        : nccl
  Device         : cuda
  World Size     : 1
  Tensor Size    : 10 MB
  Avg Latency    : 0.0178 ms
  Bandwidth      : 589.5505 GB/s

[Gloo] Rank 0 is connected to 0 peer ranks. Expected number of connected peer ranks is : 0
[All-Reduce Be

## 一、Gloo和NCCL的基本概念

在 PyTorch 中，多进程/多 GPU 之间进行通信（比如 `all_reduce`, `broadcast`, `all_gather`）需要 **通信后端（backend）**。
常用的两种是：

| 名称       | 适用设备            | 特点                                                 |
| -------- | --------------- | -------------------------------------------------- |
| **Gloo** | CPU / GPU（有限支持） | 跨平台，支持 CPU，多节点可以用 TCP/IP 通信，适合 CPU 张量，简单可靠         |
| **NCCL** | GPU（NVIDIA GPU） | NVIDIA 官方库，专门优化 GPU 间通信，支持高速 PCIe、NVLink、网络通信，性能最好 |

---

## 二、Gloo

Gloo 在 CPU 和 GPU 都可以用，但 GPU 支持有限，通常用在 CPU 张量上，它支持跨节点网络通信（TCP/IP）。并且使用方便，启动和调试比 NCCL 容易。在 CPU 环境下是**默认且可靠的选择**。

它通常使用在单机 CPU 多进程训练，或者小规模实验和调试，没有 GPU 的服务器环境。

在CPU 通信效率中等，GPU 上比 NCCL 慢，不适合大规模深度学习训练。

---

## 三、NCCL（NVIDIA Collective Communication Library）


它是专门为 NVIDIA GPU 设计的，支持多 GPU、多节点高速通信。并且优化了**PCIe 直连**，**NVLink（GPU 内部高速总线）**，**网络互联（Ethernet / InfiniBand）**，PyTorch 会自动使用 NCCL 的 ring / tree 通信算法做 `all_reduce`、`all_gather` 等操作。

通常在单机多 GPU 训练（如 2~8 GPU）上使用。分布式训练深度学习大模型，高性能 GPU 集群使用较多。

能将GPU 间带宽最大化，并且延迟极低（特别是大张量）， NCCL + CUDA 几乎是训练大模型的默认选择。

---

## 四、总结对比

| 特性           | Gloo            | NCCL               |
| ------------ | --------------- | ------------------ |
| 设备           | CPU（主要），GPU（有限） | GPU（NVIDIA 专用）     |
| 性能           | 中等              | 很高                 |
| 跨节点支持        | TCP/IP          | NVLink / PCIe / 网络 |
| 用途           | CPU 分布式 / 小实验   | GPU 多卡训练 / 大模型     |
| PyTorch 默认推荐 | CPU → Gloo      | GPU → NCCL         |




## 一、代码概览

这段代码的目的只有一个：

**在分布式环境下 benchmark `all_reduce` 的通信性能**
测量指标包括：单次 `all_reduce` 的平均耗时，实际通信带宽（GB/s）适用场景在多进程（多卡 / 多节点），对比不同 backend（`gloo` / `nccl`）， 对比 CPU vs GPU 通信性能。

## 二、模块介绍

### 1. `torch.distributed as dist`

它是PyTorch 的**分布式通信核心模块**，它提供进程组管理、collective ops（`all_reduce`, `broadcast`, `all_gather` 等）。

### 2. `torch.multiprocessing.spawn`

它是**多进程启动工具**，每个进程对应一个 `rank`，通常 rank + GPU id。当然分布式训练不等于多线程，**每个 rank 是一个独立 Python 进程**。


## 三、初始化分布式环境

```python
def setup(master_addr, master_port, rank, world_size, backend):
```

### 1. MASTER_ADDR / MASTER_PORT

```python
os.environ['MASTER_ADDR'] = master_addr
os.environ['MASTER_PORT'] = str(master_port)
```

定义**主节点（rank 0）的地址**，所有进程都通过这个地址进行 rendezvous，即使是**单机多卡**，也必须设置。

### 2. 初始化进程组

```python
dist.init_process_group(
    backend,
    rank=rank,
    world_size=world_size
)
```

| 参数         | 含义        |
| ---------- | --------- |
| backend    | 通信后端      |
| rank       | 当前进程的全局编号 |
| world_size | 总进程数      |

#### 常见 backend 对比

| backend | 适用场景            |
| ------- | --------------- |
| `gloo`  | CPU / 小规模       |
| `nccl`  | GPU / 高性能（事实标准） |
| `mpi`   | HPC 环境          |

---

## 四、`cleanup()`：资源回收

```python
def cleanup():
    dist.destroy_process_group()
```

它显式释放通信资源，防止程序异常退出导致死锁

```python
if torch.cuda.is_available():
    torch.cuda.empty_cache()
```

清空 CUDA cache（**不是释放显存，只是释放 PyTorch cache**），benchmark 中避免显存碎片影响。

---

## 五、`benchmark_all_reduce()`


### Step 1. 设置环境 & GPU 绑定

```python
setup(...)
```

#### GPU 场景下：

```python
torch.cuda.set_device(rank)
```

**为什么是 `rank`？**

通常约定：

```
rank 0 → GPU 0
rank 1 → GPU 1
...
```


### Step 2. 构造测试 Tensor

```python
tensor_size_bytes = tensor_size_mb * 1024 * 1024
num_elements = tensor_size_bytes // 4
tensor_data = torch.randn(num_elements, device=device)
```

`float32 = 4 bytes`，`randn` 只是为了填充数据，**值本身无关紧要**


### Step 3 Warm-up

```python
for _ in range(5):
    dist.all_reduce(tensor_data, op=dist.ReduceOp.SUM)
```

#### 为什么一定要 warm-up？

1. CUDA kernel lazy initialization
2. NCCL communicator 建立
3. GPU 时钟频率爬升
4. cache / pipeline 填充



GPU 场景下：

```python
torch.cuda.synchronize()
```

CUDA 是**异步执行**不同步，测到的是 launch 时间。

---

### Barrier：所有进程对齐

```python
dist.barrier()
```

确保所有 rank **同时开始计时**，防止快的进程提前进入 benchmark。


### Step 4. 正式 benchmark

```python
start_time = time.time()
```

```python
for _ in range(num_iterations):
    dist.all_reduce(tensor_data, op=dist.ReduceOp.SUM)
```

`num_iterations = 20`，用来取平均，降低噪声。

GPU 同步：

```python
torch.cuda.synchronize()
```

---

### Step 5. 计算耗时与带宽

#### 平均耗时

```python
avg_time = duration / num_iterations
```

#### 带宽计算

```python
bandwidth_gbps = (tensor_size_bytes / avg_time) / 1e9
```

含义：

```
GB/s = 数据量 / 时间
```

当然这是 **简化模型**，没区分 ring / tree，适合对比，不是绝对值。


### Step 6. 只让 rank 0 打印

```python
if rank == 0:
    print(...)
```

避免：N 个进程打印 N 份日志

---

### Step 7. `all_gather_object` 汇总结果

```python
dist.all_gather_object(gathered_results, local_result)
```

它支持 **Python 对象**，比 `all_gather(tensor)` 更灵活，常用于 benchmark / profiling，这样 rank 0 能拿到所有 rank 的结果。

---

### Step 8. 清理 & 返回结果

```python
cleanup()
```

```python
if rank == 0:
    return gathered_results
```

只有主进程返回， 子进程自动退出。




# 作业二

## 问题（naive_ddp）：5 分

**交付物**：编写一个脚本来通过在反向传播后对各个参数梯度进行全规约，以简单方式执行分布式数据并行训练。为了验证你的DDP实现的正确性，使用它在随机生成的数据上训练一个小玩具模型，并验证其权重与单进程训练的结果相匹配。

---

## 问题（naive_ddp_benchmarking）：3 分

在这种简单的DDP实现中，参数在每个反向传播后在各个rank之间单独进行全规约。为了更好地理解数据并行训练的开销，创建一个脚本来对之前实现的语言模型进行基准测试，该模型使用这种简单的DDP实现进行训练。测量每个训练步骤的总时间以及用于通信梯度的时间比例。收集在单节点设置（1个节点 x 2个GPU）下对XL模型的测量结果，如§1.1.2中所述。

**交付物**：描述你的基准测试设置，以及每个训练迭代的测量时间以及用于通信梯度的时间。

---

In [1]:
%%writefile ddp_model_demo.py

import os
import time
import random
import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.distributed as dist

from torchvision import datasets, transforms
from torch.multiprocessing.spawn import spawn


# ============================
# 一、定义一个简单的全连接神经网络
# ============================
class SimpleNet(nn.Module):
    """
    一个最简单的三层全连接网络，用于 MNIST 分类
    输入：28×28 展平后的 784 维向量
    输出：10 类（0~9）
    """

    def __init__(self, input_dim: int, hidden_dim: int, num_classes: int):
        super().__init__()

        # 第一层全连接：输入层 -> 隐藏层
        self.fc1 = nn.Linear(input_dim, hidden_dim)

        # 第二层全连接：隐藏层 -> 隐藏层
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)

        # 输出层：隐藏层 -> 类别数
        self.fc3 = nn.Linear(hidden_dim, num_classes)

    def forward(self, x):
        """
        前向传播逻辑
        """
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)

        # 使用 log_softmax，方便后续配合 NLLLoss
        return F.log_softmax(x, dim=1)


# ============================
# 二、分布式环境初始化与清理
# ============================
def init_distributed_env(rank: int, world_size: int, backend: str):
    """
    初始化分布式进程组（所有进程都必须调用）
    """
    # 主进程地址和端口
    # 在单机多卡场景中，localhost 即可
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "29500"

    # 初始化进程组
    dist.init_process_group(
        backend=backend,
        rank=rank,
        world_size=world_size
    )


def destroy_distributed_env():
    """
    销毁进程组并清理资源
    """
    dist.destroy_process_group()

    if torch.cuda.is_available():
        torch.cuda.empty_cache()


# ============================
# 三、DDP 训练主逻辑（教学重点）
# ============================
def ddp_training_worker(rank: int, world_size: int, backend: str):
    """
    每一个进程都会执行这个函数
    rank      : 当前进程编号
    world_size: 总进程数
    """

    # ------------------------------------------------
    # 1. 固定随机种子（确保所有进程行为一致）
    # ------------------------------------------------
    seed = 42
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    random.seed(seed)

    # 确保 cudnn 的确定性行为（教学更容易对齐结果）
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

    # ------------------------------------------------
    # 2. 初始化分布式环境
    # ------------------------------------------------
    init_distributed_env(rank, world_size, backend)

    # ------------------------------------------------
    # 3. 为当前进程分配设备
    # ------------------------------------------------
    if torch.cuda.is_available():
        torch.cuda.set_device(rank)
        device = torch.device(f"cuda:{rank}")
        print(f"[进程 {rank}] 使用 GPU：{device}")
    else:
        device = torch.device("cpu")
        print(f"[进程 {rank}] CUDA 不可用，使用 CPU")

    # ------------------------------------------------
    # 4. 数据预处理与加载
    # ------------------------------------------------
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,)),
        # 将 28×28 展平为 784
        transforms.Lambda(lambda x: x.view(-1))
    ])

    full_dataset = datasets.MNIST(
        root="../data",
        train=True,
        download=True,
        transform=transform
    )

    # ------------------------------------------------
    # 5. 手动划分数据（教学版 DDP）
    #    每个进程只处理数据集的一部分
    # ------------------------------------------------
    total_samples = len(full_dataset)
    samples_per_rank = total_samples // world_size

    start_index = rank * samples_per_rank
    end_index = start_index + samples_per_rank

    subset_dataset = torch.utils.data.Subset(
        full_dataset,
        range(start_index, end_index)
    )

    # DataLoader 使用相同随机种子，确保 shuffle 行为可复现
    data_generator = torch.Generator()
    data_generator.manual_seed(seed)

    train_loader = torch.utils.data.DataLoader(
        subset_dataset,
        batch_size=64,
        shuffle=True,
        generator=data_generator
    )

    print(
        f"[进程 {rank}] 负责样本区间："
        f"{start_index} ~ {end_index - 1}，"
        f"共 {samples_per_rank} 条数据"
    )

    # ------------------------------------------------
    # 6. 创建模型与优化器
    # ------------------------------------------------
    model = SimpleNet(
        input_dim=784,
        hidden_dim=50,
        num_classes=10
    ).to(device)

    optimizer = optim.Adam(model.parameters(), lr=1e-3)

    # ------------------------------------------------
    # 7. 步骤一：广播模型参数（关键教学点）
    #    确保所有进程从“完全相同”的初始模型开始
    # ------------------------------------------------
    for param in model.parameters():
        dist.broadcast(param.data, src=0)

    model.train()

    # ------------------------------------------------
    # 8. 正式开始训练
    # ------------------------------------------------
    for epoch in range(1, 3):
        for batch_idx, (data, target) in enumerate(train_loader):
            data = data.to(device)
            target = target.to(device)

            # ----------------------------
            # 步骤二：前向 + 反向传播
            # 每个进程只计算自己那一部分数据的梯度
            # ----------------------------
            optimizer.zero_grad()

            output = model(data)
            loss = F.nll_loss(output, target)

            step_start_time = time.time()
            loss.backward()

            # ----------------------------
            # 步骤三：梯度 All-Reduce
            # 将所有进程的梯度相加，再取平均
            # ----------------------------
            comm_start_time = time.time()

            for param in model.parameters():
                if param.grad is not None:
                    dist.all_reduce(
                        param.grad.data,
                        op=dist.ReduceOp.SUM
                    )
                    param.grad.data /= world_size

            comm_time = time.time() - comm_start_time

            # ----------------------------
            # 步骤四：参数更新
            # 所有进程使用“完全相同”的平均梯度
            # ----------------------------
            optimizer.step()

            step_time = time.time() - step_start_time

            if batch_idx % 50 == 0:
                print(
                    f"[进程 {rank}] "
                    f"Epoch {epoch} | "
                    f"Batch {batch_idx} | "
                    f"Loss = {loss.item():.6f}"
                )

            if rank == 0 and batch_idx % 50 == 0:
                print(
                    f"[主进程] 单步耗时：{step_time * 1000:.2f} ms，"
                    f"通信耗时：{comm_time * 1000:.2f} ms"
                )

    # ------------------------------------------------
    # 9. 仅在 rank 0 保存模型
    # ------------------------------------------------
    if rank == 0:
        torch.save(model.state_dict(), "mnist_simple_ddp.pt")
        print("[主进程] 模型已保存：mnist_simple_ddp.pt")

    destroy_distributed_env()


# ============================
# 四、程序入口
# ============================
def main():
    # world_size = 使用的进程（GPU）数量
    # 如果只有一张 GPU，就设为 1（等价于普通训练）
    world_size = 1

    # NCCL：GPU 通信首选后端
    backend = "nccl"

    # 检查数据是否能被平均分配
    dataset_size = len(
        datasets.MNIST("../data", train=True, download=True)
    )

    if dataset_size % world_size != 0:
        print(
            f"警告：数据量 {dataset_size} 不能被 world_size={world_size} 整除，"
            f"将丢弃部分样本以保证均分"
        )

    print(f"启动分布式训练，进程数：{world_size}")
    print(f"每个进程处理样本数：{dataset_size // world_size}")

    # 启动多进程
    spawn(
        ddp_training_worker,
        args=(world_size, backend),
        nprocs=world_size,
        join=True
    )

    print("分布式训练结束")


if __name__ == "__main__":
    main()


Overwriting ddp_model_demo.py


In [2]:
!python ddp_model_demo.py

启动分布式训练，进程数：1
每个进程处理样本数：60000
[进程 0] 使用 GPU：cuda:0
[进程 0] 负责样本区间：0 ~ 59999，共 60000 条数据
[进程 0] Epoch 1 | Batch 0 | Loss = 2.294834
[主进程] 单步耗时：178.22 ms，通信耗时：14.50 ms
[进程 0] Epoch 1 | Batch 50 | Loss = 0.515532
[主进程] 单步耗时：1.15 ms，通信耗时：0.30 ms
[进程 0] Epoch 1 | Batch 100 | Loss = 0.286108
[主进程] 单步耗时：1.21 ms，通信耗时：0.32 ms
[进程 0] Epoch 1 | Batch 150 | Loss = 0.235657
[主进程] 单步耗时：1.63 ms，通信耗时：0.49 ms
[进程 0] Epoch 1 | Batch 200 | Loss = 0.305218
[主进程] 单步耗时：1.06 ms，通信耗时：0.29 ms
[进程 0] Epoch 1 | Batch 250 | Loss = 0.421952
[主进程] 单步耗时：1.36 ms，通信耗时：0.30 ms
[进程 0] Epoch 1 | Batch 300 | Loss = 0.376283
[主进程] 单步耗时：1.17 ms，通信耗时：0.32 ms
[进程 0] Epoch 1 | Batch 350 | Loss = 0.159866
[主进程] 单步耗时：1.60 ms，通信耗时：0.39 ms
[进程 0] Epoch 1 | Batch 400 | Loss = 0.211988
[主进程] 单步耗时：1.91 ms，通信耗时：0.65 ms
[进程 0] Epoch 1 | Batch 450 | Loss = 0.401293
[主进程] 单步耗时：1.24 ms，通信耗时：0.32 ms
[进程 0] Epoch 1 | Batch 500 | Loss = 0.201109
[主进程] 单步耗时：1.12 ms，通信耗时：0.31 ms
[进程 0] Epoch 1 | Batch 550 | Loss = 0.257683
[主进程] 单步耗时：1.62 ms，通信

# 问题（minimal_ddp_flat_benchmarking）：2分

修改您的最小DDP实现，以从所有参数中传递一个扁平化的梯度张量。将其性能与在先前使用条件下（1个节点 x 2个GPU，XL模型大小如§1.1.3中所述）为每个参数张量发出一个all-reduce的最小DDP实现进行比较。

交付物：在单次批量all-reduce调用下，未分布式数据并行训练中每个训练迭代的测量时间以及梯度通信所花费的时间。1-2句话比较批量与单独通信梯度的结果。

**验证：把“很多小的梯度通信”合并成“一次大的梯度通信”，能否显著降低 DDP 的通信开销**

这在真实系统里是一个**非常核心的工程问题**。



## naive DDP （对照基线）

之前的最小 DDP 实现是这样的逻辑：

```text
for param in model.parameters():
    all_reduce(param.grad)
```

也就是：**每个参数张量**，**单独一次 all-reduce**，通信调用次数 = 参数个数，XL 模型来说：参数张量数量几百～上千个，NCCL / GPU 通信，每一次 all-reduce 都有 **启动开销（latency）**，小 tensor → 极其低效，这就是作业的**优化的瓶颈**

---

## 改什么

###  核心改动：**梯度展平 + 单次通信**

把：

```python
grad_1, grad_2, grad_3, ..., grad_n
```

变成：

```python
flat_grad = concat([grad_1, grad_2, ..., grad_n])
```

然后只做：

```python
dist.all_reduce(flat_grad)
```

再把结果 **切回每个参数的 grad**



In [7]:
%%writefile minimal_ddp_flat_benchmarking.py
import torch
import torch.distributed as dist
import os
from torchvision import datasets, transforms
import torch.optim as optim
import time
from torch.multiprocessing.spawn import spawn
import numpy as np
import random
import torch.nn as nn
import torch.nn.functional as F

class SimpleNet(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(SimpleNet, self).__init__()
        # 定义第一个隐藏层
        self.fc1 = nn.Linear(input_size, hidden_size)
        # 定义第二个隐藏层
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        # 定义输出层
        self.fc3 = nn.Linear(hidden_size, num_classes)
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return F.log_softmax(x, dim=1)  
# 示例：创建一个SimpleNet实例

def setup(rank, world_size, backend):
    """ 初始化分布式环境 """
    os.environ['MASTER_ADDR'] = 'localhost' # 设置主节点IP地址以及端口，其他节点需要通过这个地址连接到主节点
    os.environ['MASTER_PORT'] = '29500'
    # 根据后端初始化进程组
    dist.init_process_group(backend, rank=rank, world_size=world_size) # 初始化进程组，rank是当前进程的rank，world_size是总进程数，backend: 这是指定通信后端的参数。常见的后端有：gloo(CPU), nccl(GPU), mpi等。

def cleanup():
    """ 清理分布式环境 """
    dist.destroy_process_group()
    # 清理GPU缓存
    if torch.cuda.is_available():
        torch.cuda.empty_cache()

def ddp_train(rank, world_size, backend):
    """ Naive DDP训练函数，实现图片中描述的4个步骤 """
    # 设置随机种子确保可重现性（所有进程使用相同种子）
    seed = 42
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    random.seed(seed)
    
    # 设置确定性行为
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    
    # 初始化分布式环境
    setup(rank, world_size, backend)
    
    # 为每个进程分配不同的GPU设备
    # 使用GPU 0和1，它们当前是空闲的
    gpu_id = rank  # rank 0 -> GPU 0, rank 1 -> GPU 1
    if torch.cuda.is_available():
        torch.cuda.set_device(gpu_id)
        device = torch.device(f"cuda:{gpu_id}")
        print(f"Rank {rank} using GPU: {device}")
    else:
        device = torch.device("cpu")
        print(f"CUDA not available, using CPU for rank {rank}")
    
    # 数据加载和预处理
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,)),
        transforms.Lambda(lambda x: x.view(-1))
    ])
    
    dataset = datasets.MNIST('../data', train=True, download=True, transform=transform)
    
    # 为了验证DDP正确性，我们让每个进程处理不同的数据子集
    # 这样总的有效batch size = 64 * world_size
    total_samples = len(dataset)
    samples_per_process = total_samples // world_size
    start_idx = rank * samples_per_process
    end_idx = start_idx + samples_per_process
    
    # 创建当前进程的数据子集
    subset = torch.utils.data.Subset(dataset, range(start_idx, end_idx))
    
    # 设置确定性的数据加载器
    # 重要：为了与单进程训练比较，我们需要确保数据顺序一致
    generator = torch.Generator()
    generator.manual_seed(seed)  # 所有进程使用相同的种子确保一致性
    train_loader = torch.utils.data.DataLoader(subset, batch_size=64, shuffle=True, generator=generator)
    
    print(f"Rank {rank} processing samples {start_idx} to {end_idx-1} ({samples_per_process} samples)")
    print(f"Rank {rank} effective batch size: 64, total distributed batch size: {64 * world_size}")
    
    # 创建模型和优化器（每个设备都创建相同的模型）
    model = SimpleNet(input_size=784, hidden_size=50, num_classes=10).to(device)
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    # ===== 新增：记录梯度展平所需的元信息 =====
    grad_shapes = []
    grad_numels = []

    for p in model.parameters():
        grad_shapes.append(p.shape)
        grad_numels.append(p.numel())

    total_grad_numel = sum(grad_numels)

    # 步骤1: Broadcast模型参数从rank 0到所有其他ranks
    # 确保所有设备从相同的初始模型和优化器状态开始
    for param in model.parameters():
        # 确保参数在正确的设备上进行broadcast
        dist.broadcast(param.data, src=0)
    
    # 同步优化器状态（如果有的话）
    for group in optimizer.param_groups:
        for param in group['params']:
            if param in optimizer.state:
                for key, value in optimizer.state[param].items():
                    if torch.is_tensor(value):
                        dist.broadcast(value, src=0)
    
    model.train()
    
    for epoch in range(1, 3):  # Train for 2 epochs
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(device), target.to(device)
            
            # 步骤2: 每个设备使用本地模型参数进行前向传播和反向传播
            # 计算n/d个样本的梯度
            optimizer.zero_grad()
            output = model(data)
            loss = torch.nn.functional.nll_loss(output, target)
            step_start = time.time()
            loss.backward()
            
            comm_start = time.time()
            # 步骤3: All-reduce梯度
            # 将所有设备的梯度求平均，使每个设备都持有所有n个样本的平均梯度

            # ===== 新的步骤3：Flattened gradient all-reduce =====
            flat_grad = torch.zeros(total_grad_numel, device=device)

            offset = 0
            for p in model.parameters():
                if p.grad is not None:
                    numel = p.grad.numel()
                    flat_grad[offset:offset + numel] = p.grad.view(-1)
                    offset += numel

            # 单次通信
            dist.all_reduce(flat_grad, op=dist.ReduceOp.SUM)
            flat_grad /= world_size

            # 写回每个参数的梯度
            offset = 0
            for p, shape, numel in zip(model.parameters(), grad_shapes, grad_numels):
                if p.grad is not None:
                    p.grad.copy_(
                        flat_grad[offset:offset + numel].view(shape)
                    )
                    offset += numel

            comm_time = time.time() - comm_start
            # 步骤4: 优化器步骤 - 每个设备使用相同的平均梯度更新参数
            # 由于所有设备从相同初始状态开始并使用相同梯度，参数会保持同步
            optimizer.step()
            step_time = time.time() - step_start
            if batch_idx % 50 == 0:
                print(f'Rank {rank}, Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)} ({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}')
            if rank == 0 and batch_idx % 50 == 0:
                print(f"Step time: {step_time*1000:.2f} ms | Comm time: {comm_time*1000:.2f} ms")

    # 只在rank 0保存模型
    if rank == 0:
        torch.save(model.state_dict(), "mnist_simple_ddp.pt")
        print("Model saved!")
            
    cleanup()

def main():
    world_size = 1 # gpu只有一个就改成1，但是这样和平时普通的训练就没有区别了
    backend = 'nccl'  # 使用gloo后端，更适合CPU训练
    # 只检查数据集大小是否能被world_size整除
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ])
    
    dataset = datasets.MNIST('../data', train=True, download=True, transform=transform)
    data_size = len(dataset)

    if data_size % world_size != 0:
        print(f"Warning: Data size {data_size} is not divisible by world size {world_size}")
        print(f"Some samples will be ignored to ensure equal distribution")
    
    print(f"Starting distributed training with {world_size} processes")
    print(f"Each process will handle {data_size // world_size} samples")
    print(f"Total samples: {data_size}")
    
    # 启动分布式训练
    spawn(ddp_train, args=(world_size, backend), nprocs=world_size, join=True)
    
    print("Distributed training completed!")

if __name__ == '__main__':
    main()

Overwriting minimal_ddp_flat_benchmarking.py


In [8]:
!python minimal_ddp_flat_benchmarking.py


Traceback (most recent call last):
  File "/mnt/d/code/项目/cs336/CS336-Chinese-co-construction/coursework/Assignment2_System/cs336_systems/minimal_ddp_flat_benchmarking.py", line 4, in <module>
    from torchvision import datasets, transforms
ModuleNotFoundError: No module named 'torchvision'


# 作业三


## 问题 (ddp_overlap_individual_parameters): 5 分

实现一个 Python 类来处理分布式数据并行训练。该类应包装一个任意的 PyTorch `nn.Module` 并在训练之前处理权重的广播（以便所有 ranks 具有相同的初始参数）以及发出通信调用以进行梯度平均。我们建议以下公共接口：

```python
def __init__(self, module: torch.nn.Module): 给定一个实例化的 PyTorch nn.Module 要并行化，构建一个 DDP 容器，该容器将处理跨 ranks 的梯度同步。

def forward(self, *inputs, **kwargs): 调用被包装模块的 `forward()` 方法，并使用提供的定位参数和关键字参数。

def finish_gradient_synchronization(self): 当调用时，等待异步通信
```

*在高级情况下，如果你使用多个 CUDA 流，你可能需要显式同步跨流以确保输出准备好进行后续操作。* 参见 [CUDA Streams](https://pytorch.org/docs/stable/notes/cuda.html#cuda-streams)。

### GPU 上的调用

为了使用此类执行分布式训练，我们将一个模块传递给该模块进行包装，然后在我们运行 `optimizer.step()` 之前添加一个对 `finish_gradient_synchronization()` 的调用，以确保依赖于梯度的优化器步骤可以排队：

```python
model = ToyModel().to(device)
ddp_model = DDP(model)

for _ in range(train_steps):
    x, y = get_batch()
    logits = ddp_model(x)
    loss = loss_fn(logits, y)
    loss.backward()
    ddp_model.finish_gradient_synchronization()
    optimizer.step()
```

### 交付物：实现一个容器类来处理分布式数据并行训练。该类应重叠梯度通信和反向传播的计算。为了测试你的 DDP 类，首先实现适配器 `adapters.get_ddp_individual_parameters` 和 `adapters.ddp_individual_parameters_on_after_backward`（后者是可选的，取决于你的实现，你可能不需要它）。

然后，执行测试，通过运行 `pytest tests/test_ddp_individual_parameters.py`。我们建议多次运行测试（例如，5 次）以确保其可靠通过。

---

In [None]:
%%writefile ddp_overlap_individual_parameters.py
'''
实现一个手动版本的分布式数据并行（DDP）训练框架，核心特性是梯度分桶（Gradient Bucketing）+ 计算通信重叠（Computation-Communication Overlap）。
'''


import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.distributed as dist
import os

# ============================
# 二、手动 DDP（按桶通信梯度）
# ============================
class DDPOverlapBucketed(nn.Module):
    def __init__(self, module: torch.nn.Module, bucket_size_mb: float):
        super(DDPOverlapBucketed, self).__init__()
        self.module = module
        self.bucket_size_bytes = bucket_size_mb * 1024 * 1024
        self.handles = []   # 保存异步 all_reduce 句柄
        self.buckets = []   # 梯度桶
        self.world_size = dist.get_world_size()
        
        # --------------------
        # 初始化：广播参数 + 创建桶 + 注册钩子
        # --------------------
        self._broadcast_parameters()  # 初始参数广播
        self._create_bucket()         # 按大小划分梯度桶
        self._register_hook()         # 注册梯度钩子实现异步通信
    
    # --------------------
    # 广播初始参数
    # --------------------
    def _broadcast_parameters(self):
        """
        将 rank 0 的参数广播到所有进程，确保初始状态一致
        """
        if self.world_size > 1:
            for param in self.module.parameters():
                dist.broadcast(param.data, src=0)
    
    # --------------------
    # 按 bucket_size 创建梯度桶
    # --------------------
    def _create_bucket(self):
        current_bucket_size = 0
        current_bucket = []

        # 倒序遍历参数，构建桶
        for p in reversed(list(self.module.parameters())):
            if p.requires_grad:
                p_size = p.numel() * p.element_size()
                
                # 当前桶满了则保存并创建新桶
                if p_size + current_bucket_size > self.bucket_size_bytes and current_bucket:
                    self.buckets.append(current_bucket)
                    current_bucket_size = 0
                    current_bucket = []
                
                current_bucket.append(p)
                current_bucket_size += p_size
        
        # 如果还有剩余参数，作为最后一个桶
        if current_bucket:
            self.buckets.append(current_bucket)
        
        # 为每个桶创建缓冲区
        for i, bucket_params in enumerate(self.buckets):
            if not bucket_params:
                continue
            buffer_size = sum(p.numel() for p in bucket_params)
            buffer = torch.zeros(buffer_size, device=bucket_params[0].device, dtype=bucket_params[0].dtype)
            self.buckets[i] = {
                "params": bucket_params,
                "buffer": buffer,
                "ready_params": set(),
                "triggered": False
            }
    
    # --------------------
    # 注册梯度钩子
    # --------------------
    def _register_hook(self):
        for bucket_idx, bucket_info in enumerate(self.buckets):
            for param in bucket_info["params"]:
                # 闭包捕获当前 bucket_idx
                def make_hook(idx):
                    return lambda grad, param=param: self._create_hook(grad, param, idx)
                param.register_hook(make_hook(bucket_idx))
    
    # --------------------
    # 梯度钩子逻辑：延迟执行 all_reduce
    # --------------------
    def _create_hook(self, grad, param, bucket_idx):
        bucket_info = self.buckets[bucket_idx]
        bucket_info["ready_params"].add(param)

        # 当桶内所有参数梯度都准备好，且未触发通信
        if len(bucket_info["ready_params"]) == len(bucket_info["params"]) and not bucket_info["triggered"]:
            bucket_info["triggered"] = True  # 标记为已触发

            def delayed_sync():
                # 将桶内所有梯度拷贝到扁平缓冲区
                offset = 0
                for p in bucket_info["params"]:
                    numel = p.numel()
                    if p.grad is not None:
                        bucket_info["buffer"][offset:offset+numel].copy_(p.grad.view(-1))
                    else:
                        bucket_info["buffer"][offset:offset+numel].zero_()
                    offset += numel
                
                # 启动异步 all_reduce，实现计算与通信重叠
                handle = dist.all_reduce(bucket_info["buffer"], async_op=True)
                self.handles.append((handle, bucket_idx))
            
            # 延迟执行，确保所有梯度计算完成
            import torch.autograd as autograd
            autograd.Variable._execution_engine.queue_callback(delayed_sync)
    
    # --------------------
    # forward 前清理状态
    # --------------------
    def forward(self, x):
        if self.world_size > 1:
            for bucket in self.buckets:
                bucket["triggered"] = False
                bucket["ready_params"].clear()
        self.handles.clear()
        return self.module(x)

    # --------------------
    # 等待所有异步通信完成
    # --------------------
    def finish_gradient_synchronization(self):
        """
        等待所有 all_reduce 完成，并将梯度写回各参数
        需在 optimizer.step() 之前调用
        """
        for handle, bucket_idx in self.handles:
            handle.wait()  # 等待通信完成

            bucket_info = self.buckets[bucket_idx]
            buffer = bucket_info["buffer"]

            # 求平均梯度
            buffer.div_(self.world_size)

            offset = 0
            for p in bucket_info["params"]:
                numel = p.numel()
                if p.grad is not None:
                    p.grad.view(-1).copy_(buffer[offset:offset+numel])
                offset += numel
        
        # 清空 handles，为下一次迭代准备
        self.handles.clear()


# ============================
# 三、简单示例运行
# ============================
def run():
    # 初始化分布式环境（torchrun 已设置环境变量）
    dist.init_process_group(backend="nccl")
    local_rank = int(os.environ["LOCAL_RANK"])
    torch.cuda.set_device(local_rank)

    model = nn.Linear(10, 5).cuda()
    ddp_model = DDPOverlapBucketed(model, bucket_size_mb=1)  # 1 MB 小桶方便观察
    opt = torch.optim.SGD(ddp_model.parameters(), lr=0.1)

    for step in range(3):
        x = torch.randn(4, 10).cuda()
        loss = ddp_model(x).sum()
        loss.backward()
        ddp_model.finish_gradient_synchronization()  # 必须手动调用
        opt.step()
        opt.zero_grad()
        print(f"[rank {dist.get_rank()}] step {step} loss={loss.item()}")


if __name__ == "__main__":
    run()


Overwriting ddp_overlap_individual_parameters.py


In [7]:
!torchrun --nproc_per_node=1 ddp_overlap_individual_parameters.py # 有多少张卡设置nproc_per_node=几

[rank 0] step 0 loss=2.803687572479248
[rank 0] step 1 loss=-8.734395980834961
[rank 0] step 2 loss=-22.41156768798828


# 作业四

### 问题 (ddp_bucketed_benchmarking)：3分

**(a)** 使用与之前实验相同的配置（1个节点，2个GPU，XL模型大小）对你的分桶DDP实现进行基准测试，变化最大桶大小（1, 10, 100, 1000 MB）。将你的结果与之前没有分桶的实验进行比较——结果是否符合你的预期？如果不符合，为什么？你可能需要使用PyTorch分析器来更好地理解通信调用的顺序和/或执行方式。你期望对实验设置进行哪些更改才能使结果符合预期？

**交付物：** 各种桶大小下每次训练迭代的测量时间。对结果、你的预期以及任何不匹配的可能原因进行3-4句评论。

**(b)** 假设计算一个桶的梯度所需的时间与通信该梯度桶所需的时间相同。写一个方程来建模DDP的通信开销（即反向传播后额外花费的时间），作为以下变量的函数：
- 模型参数的总大小（字节）$s$
- all-reduce算法带宽 $w$（计算为每个rank的数据大小除以完成all-reduce所需的时间）
- 每次通信调用的开销（秒）$o$
- 桶的数量 $n_b$

从这个方程，写出最小化DDP开销的最优桶大小的方程。

**交付物：** 建模DDP开销的方程，以及最优桶大小的方程。

In [None]:

%%writefile distributed_communication_single_node_demo.py

'''
实现了一个单节点多进程分布式通信性能基准测试工具，用于测量 PyTorch 中 all_reduce 操作的延迟和带宽性能。
'''

import os
import time
import torch
import torch.distributed as dist
import argparse
import torch
from torch.multiprocessing import spawn


# ============================================================
# 1. 分布式环境初始化与清理
# ============================================================

def init_distributed_environment(
    master_addr: str,
    master_port: int,
    global_rank: int,
    world_size: int,
    backend: str
):
    """
    初始化 PyTorch 分布式通信环境（进程组）

    参数说明：
    - master_addr : rank 0 所在节点的 IP 地址
    - master_port : 用于进程间通信的端口
    - global_rank : 当前进程在所有进程中的唯一编号
    - world_size  : 总进程数
    - backend     : 通信后端（gloo / nccl / mpi）
    """

    # --------------------------------------------------------
    # 1.1 设置 rendezvous（所有进程汇合的地址）
    # --------------------------------------------------------
    # 所有进程必须通过 MASTER_ADDR:MASTER_PORT 建立初始连接
    os.environ["MASTER_ADDR"] = master_addr
    os.environ["MASTER_PORT"] = str(master_port)

    # --------------------------------------------------------
    # 1.2 初始化进程组（分布式的“总开关”）
    # --------------------------------------------------------
    # 这是使用 torch.distributed 的前置条件
    dist.init_process_group(
        backend=backend,
        rank=global_rank,
        world_size=world_size
    )


def destroy_distributed_environment():
    """
    清理分布式环境并释放资源
    """

    # 销毁进程组，防止资源泄漏或死锁
    dist.destroy_process_group()

    # GPU 场景下，清空 PyTorch 的 CUDA cache（非强制，但推荐）
    if torch.cuda.is_available():
        torch.cuda.empty_cache()


# ============================================================
# 2. All-Reduce 通信性能 Benchmark
# ============================================================

def run_all_reduce_benchmark(
    global_rank: int,
    world_size: int,
    tensor_size_mb: int,
    backend: str,
    device: str,
    master_addr: str,
    master_port: int
):
    """
    对 dist.all_reduce 进行性能测试

    测试指标：
    - 单次 all-reduce 平均耗时
    - 理论通信带宽（GB/s）
    """

    # --------------------------------------------------------
    # 2.1 初始化分布式环境
    # --------------------------------------------------------
    init_distributed_environment(
        master_addr=master_addr,
        master_port=master_port,
        global_rank=global_rank,
        world_size=world_size,
        backend=backend
    )

    # --------------------------------------------------------
    # 2.2 绑定 GPU（仅在 CUDA 场景下）
    # --------------------------------------------------------
    if device == "cuda":
        # 通常约定：rank i → GPU i
        torch.cuda.set_device(global_rank)
        torch.cuda.empty_cache()

    # --------------------------------------------------------
    # 2.3 构造通信测试用的 Tensor
    # --------------------------------------------------------
    # 将 MB 转换为字节
    tensor_num_bytes = tensor_size_mb * 1024 * 1024

    # float32 占 4 字节
    bytes_per_element = 4
    num_elements = tensor_num_bytes // bytes_per_element

    # 随机生成测试数据（数值本身不重要）
    communication_tensor = torch.randn(
        num_elements,
        device=device,
        dtype=torch.float32
    )

    # --------------------------------------------------------
    # 2.4 Warm-up（非常重要）
    # --------------------------------------------------------
    # 原因：
    # - NCCL 通信器初始化
    # - CUDA kernel lazy initialization
    # - GPU 频率爬升
    warmup_iterations = 5
    for _ in range(warmup_iterations):
        dist.all_reduce(
            communication_tensor,
            op=dist.ReduceOp.SUM
        )

        # CUDA 是异步执行，必须显式同步
        if device == "cuda":
            torch.cuda.synchronize()

    # 确保所有进程在同一时刻开始正式测试
    dist.barrier()

    # --------------------------------------------------------
    # 2.5 正式 benchmark（计时）
    # --------------------------------------------------------
    benchmark_iterations = 20

    start_time = time.time()

    for _ in range(benchmark_iterations):
        dist.all_reduce(
            communication_tensor,
            op=dist.ReduceOp.SUM
        )

    # GPU 场景下等待所有 kernel 完成
    if device == "cuda":
        torch.cuda.synchronize()

    end_time = time.time()

    # --------------------------------------------------------
    # 2.6 性能指标计算
    # --------------------------------------------------------
    total_elapsed_time = end_time - start_time
    avg_latency_seconds = total_elapsed_time / benchmark_iterations

    # 带宽 = 数据量 / 时间（单位：GB/s）
    bandwidth_gbps = (tensor_num_bytes / avg_latency_seconds) / 1e9

    # --------------------------------------------------------
    # 2.7 打印结果（仅 rank 0）
    # --------------------------------------------------------
    if global_rank == 0:
        print(
            f"[All-Reduce Benchmark]\n"
            f"  Backend        : {backend}\n"
            f"  Device         : {device}\n"
            f"  World Size     : {world_size}\n"
            f"  Tensor Size    : {tensor_size_mb} MB\n"
            f"  Avg Latency    : {avg_latency_seconds * 1000:.4f} ms\n"
            f"  Bandwidth      : {bandwidth_gbps:.4f} GB/s\n"
        )

    # --------------------------------------------------------
    # 2.8 汇总所有 rank 的结果（教学用）
    # --------------------------------------------------------
    local_result = {
        "rank": global_rank,
        "world_size": world_size,
        "backend": backend,
        "device": device,
        "tensor_size_mb": tensor_size_mb,
        "avg_latency_ms": avg_latency_seconds * 1000,
        "bandwidth_gbps": bandwidth_gbps,
    }

    gathered_results = [None for _ in range(world_size)]
    dist.all_gather_object(gathered_results, local_result)

    # --------------------------------------------------------
    # 2.9 清理环境
    # --------------------------------------------------------
    destroy_distributed_environment()

    # 只让主进程返回结果
    if global_rank == 0:
        return gathered_results


def main():

    for size in [1,10,100,1000]:
        for Backend in ["gloo", "nccl"]:
            for world_size in [1]:
                parser = argparse.ArgumentParser("All-Reduce Benchmark")

                parser.add_argument("--world_size", type=int, default=world_size,
                                    help="进程总数（通常等于 GPU 数）")
                parser.add_argument("--tensor_size_mb", type=int, default=size,
                                    help="通信 tensor 大小（MB）")
                parser.add_argument("--backend", type=str, default=Backend,
                                    choices=["gloo", "nccl"],
                                    help="分布式通信后端")
                parser.add_argument("--device", type=str, default="cuda",
                                    choices=["cpu", "cuda"],
                                    help="运行设备")
                parser.add_argument("--master_addr", type=str, default="127.0.0.1")
                parser.add_argument("--master_port", type=int, default=29500)

                args = parser.parse_args()

                # GPU 数量检查
                if args.device == "cuda":
                    assert torch.cuda.is_available(), "CUDA 不可用"
                    assert args.world_size <= torch.cuda.device_count(), (
                        "world_size 不能超过 GPU 数量"
                    )

                # 使用 spawn 启动多进程
                spawn(
                    fn=run_all_reduce_benchmark,
                    args=(
                        args.world_size,
                        args.tensor_size_mb,
                        args.backend,
                        args.device,
                        args.master_addr,
                        args.master_port
                    ),
                    nprocs=args.world_size,
                    join=True
                )


if __name__ == "__main__":
    main()



Overwriting distributed_communication_single_node_demo.py


In [14]:
!python distributed_communication_single_node_demo.py

[Gloo] Rank 0 is connected to 0 peer ranks. Expected number of connected peer ranks is : 0
[All-Reduce Benchmark]
  Backend        : gloo
  Device         : cuda
  World Size     : 1
  Tensor Size    : 1 MB
  Avg Latency    : 0.2266 ms
  Bandwidth      : 4.6281 GB/s

[All-Reduce Benchmark]
  Backend        : nccl
  Device         : cuda
  World Size     : 1
  Tensor Size    : 1 MB
  Avg Latency    : 0.0165 ms
  Bandwidth      : 63.7398 GB/s

[Gloo] Rank 0 is connected to 0 peer ranks. Expected number of connected peer ranks is : 0
[All-Reduce Benchmark]
  Backend        : gloo
  Device         : cuda
  World Size     : 1
  Tensor Size    : 10 MB
  Avg Latency    : 2.0388 ms
  Bandwidth      : 5.1430 GB/s

[All-Reduce Benchmark]
  Backend        : nccl
  Device         : cuda
  World Size     : 1
  Tensor Size    : 10 MB
  Avg Latency    : 0.0149 ms
  Bandwidth      : 704.8151 GB/s

[Gloo] Rank 0 is connected to 0 peer ranks. Expected number of connected peer ranks is : 0
[All-Reduce Be

In [None]:
%%writefile ddp_bucketed_benchmark_complete.py
import os
import time
import torch
import torch.nn as nn
import torch.distributed as dist
import torch.profiler
from torch.multiprocessing import spawn, Manager
import argparse
from typing import List, Dict
import json
import tempfile
import pickle

# ============================================================
# 1. 无分桶DDP实现（用于对比）
# ============================================================

class DDPNoBucket(nn.Module):
    """
    无分桶DDP：所有参数作为一个整体进行all_reduce
    """
    def __init__(self, module: nn.Module):
        super().__init__()
        self.module = module
        self.world_size = dist.get_world_size()
        self.handles = []
        
        self._broadcast_parameters()
        self._register_hook()
    
    def _broadcast_parameters(self):
        if self.world_size > 1:
            for param in self.module.parameters():
                dist.broadcast(param.data, src=0)
    
    def _register_hook(self):
        self.params_with_grad = []
        for param in self.module.parameters():
            if param.requires_grad:
                self.params_with_grad.append(param)
                param.register_hook(lambda grad, p=param: self._hook_callback(p))
    
    def _hook_callback(self, param):
        if all(p.grad is not None for p in self.params_with_grad):
            flat_grads = []
            for p in self.params_with_grad:
                flat_grads.append(p.grad.view(-1))
            
            if flat_grads:
                buffer = torch.cat(flat_grads)
                handle = dist.all_reduce(buffer, async_op=True)
                self.handles.append((handle, buffer, self.params_with_grad))
    
    def forward(self, x):
        self.handles.clear()
        for p in self.params_with_grad:
            p.grad = None
        return self.module(x)
    
    def finish_gradient_synchronization(self):
        for handle, buffer, params in self.handles:
            handle.wait()
            buffer.div_(self.world_size)
            
            offset = 0
            for p in params:
                numel = p.numel()
                p.grad.view(-1).copy_(buffer[offset:offset+numel])
                offset += numel
        self.handles.clear()


# ============================================================
# 2. 分桶DDP实现（优化版）
# ============================================================

class DDPBucketed(nn.Module):
    """
    分桶DDP：按指定桶大小分组梯度，实现计算通信重叠
    """
    def __init__(self, module: nn.Module, bucket_size_mb: float):
        super().__init__()
        self.module = module
        self.bucket_size_bytes = bucket_size_mb * 1024 * 1024
        self.world_size = dist.get_world_size()
        self.handles = []
        self.buckets = []
        
        self._broadcast_parameters()
        self._create_buckets()
        self._register_hooks()
    
    def _broadcast_parameters(self):
        if self.world_size > 1:
            for param in self.module.parameters():
                dist.broadcast(param.data, src=0)
    
    def _create_buckets(self):
        """按桶大小创建梯度桶"""
        current_bucket = []
        current_bucket_size = 0
        
        for p in reversed(list(self.module.parameters())):
            if not p.requires_grad:
                continue
            
            p_size = p.numel() * p.element_size()
            
            if current_bucket and (current_bucket_size + p_size > self.bucket_size_bytes):
                self._finalize_bucket(current_bucket)
                current_bucket = []
                current_bucket_size = 0
            
            current_bucket.append(p)
            current_bucket_size += p_size
        
        if current_bucket:
            self._finalize_bucket(current_bucket)
    
    def _finalize_bucket(self, params: List[torch.nn.Parameter]):
        """为桶创建缓冲区"""
        if not params:
            return
        
        buffer_size = sum(p.numel() for p in params)
        buffer = torch.zeros(
            buffer_size, 
            device=params[0].device, 
            dtype=params[0].dtype
        )
        
        self.buckets.append({
            "params": params,
            "buffer": buffer,
            "ready_count": 0,
            "triggered": False,
            "total_params": len(params)
        })
    
    def _register_hooks(self):
        """为每个参数注册梯度钩子"""
        for bucket_idx, bucket in enumerate(self.buckets):
            for param in bucket["params"]:
                param.register_hook(
                    lambda grad, b_idx=bucket_idx: self._on_gradient_ready(b_idx)
                )
    
    def _on_gradient_ready(self, bucket_idx: int):
        """梯度就绪回调"""
        bucket = self.buckets[bucket_idx]
        bucket["ready_count"] += 1
        
        if (bucket["ready_count"] == bucket["total_params"] and 
            not bucket["triggered"]):
            
            bucket["triggered"] = True
            
            def launch_all_reduce():
                offset = 0
                for p in bucket["params"]:
                    numel = p.numel()
                    if p.grad is not None:
                        bucket["buffer"][offset:offset+numel].copy_(p.grad.view(-1))
                    else:
                        bucket["buffer"][offset:offset+numel].zero_()
                    offset += numel
                
                handle = dist.all_reduce(bucket["buffer"], async_op=True)
                self.handles.append((handle, bucket_idx))
            
            torch.autograd.Variable._execution_engine.queue_callback(launch_all_reduce)
    
    def forward(self, x):
        """前向传播，重置状态"""
        for bucket in self.buckets:
            bucket["triggered"] = False
            bucket["ready_count"] = 0
        
        self.handles.clear()
        return self.module(x)
    
    def finish_gradient_synchronization(self):
        """等待所有通信完成并写回梯度"""
        for handle, bucket_idx in self.handles:
            handle.wait()
            
            bucket = self.buckets[bucket_idx]
            bucket["buffer"].div_(self.world_size)
            
            offset = 0
            for p in bucket["params"]:
                numel = p.numel()
                if p.grad is not None:
                    p.grad.view(-1).copy_(bucket["buffer"][offset:offset+numel])
                offset += numel
        
        self.handles.clear()


# ============================================================
# 3. 0.5B模型定义，可以自行修改参数配置，这里是让笔记本也能跑起来
# ============================================================

class XLModel(nn.Module):
    """
    模拟XL大小的模型
    总参数量约 0.5B（5亿参数）
    """
    def __init__(self, hidden_size: int = 1024, num_layers: int = 24):
        super().__init__()
        self.hidden_size = hidden_size
        
        # Embedding层: 50000 * 1024 = 51.2M
        self.embedding = nn.Embedding(50000, hidden_size)
        
        # Transformer层: 24层
        self.layers = nn.ModuleList([
            nn.TransformerEncoderLayer(
                d_model=hidden_size,
                nhead=16,
                dim_feedforward=hidden_size * 4,
                batch_first=True
            )
            for _ in range(num_layers)
        ])
        
        # 输出层: 1024 * 50000 = 51.2M
        self.output = nn.Linear(hidden_size, 50000)
        
        self._init_weights()
    
    def _init_weights(self):
        for p in self.parameters():
            if p.dim() > 1:
                nn.init.xavier_uniform_(p)
    
    def forward(self, x):
        x = self.embedding(x)
        for layer in self.layers:
            x = layer(x)
        return self.output(x)


# ============================================================
# 4. 基准测试主逻辑
# ============================================================

def run_benchmark(
    global_rank: int,
    world_size: int,
    bucket_size_mb: float,
    use_bucket: bool,
    num_iterations: int,
    master_addr: str,
    master_port: int,
    result_queue=None,
    result_file=None
):
    """
    运行DDP基准测试
    """
    
    # 设置设备
    torch.cuda.set_device(global_rank)
    device = torch.device(f"cuda:{global_rank}")
    
    # 初始化分布式环境 - 指定device_id消除警告
    os.environ["MASTER_ADDR"] = master_addr
    os.environ["MASTER_PORT"] = str(master_port)
    
    # 修复：使用device_id指定设备，避免NCCL猜测
    dist.init_process_group(
        backend="nccl",
        rank=global_rank,
        world_size=world_size,
        device_id=device  # 关键修复：明确指定设备ID
    )
    
    if global_rank == 0:
        config_str = f"{'Bucketed' if use_bucket else 'No Bucket'}"
        if use_bucket:
            config_str += f" (bucket={bucket_size_mb}MB)"
        print(f"\n{'='*60}")
        print(f"Testing: {config_str}")
        print(f"World size: {world_size}")
        print(f"{'='*60}")
    
    model = XLModel().to(device)
    
    # 包装DDP
    if use_bucket:
        ddp_model = DDPBucketed(model, bucket_size_mb=bucket_size_mb)
    else:
        ddp_model = DDPNoBucket(model)
    
    optimizer = torch.optim.AdamW(ddp_model.parameters(), lr=1e-4)
    
    # 预热
    if global_rank == 0:
        print("Warming up...")
    
    for _ in range(3):
        dummy_input = torch.randint(0, 50000, (2, 512), device=device)
        output = ddp_model(dummy_input)
        loss = output.mean()
        loss.backward()
        ddp_model.finish_gradient_synchronization()
        optimizer.step()
        optimizer.zero_grad()
    
    # 修复：使用device参数明确指定barrier设备
    dist.barrier(device_ids=[global_rank])
    torch.cuda.synchronize()
    
    # 正式测试
    if global_rank == 0:
        print(f"Running benchmark ({num_iterations} iterations)...")
    
    iteration_times = []
    
    for iter_idx in range(num_iterations):
        input_ids = torch.randint(0, 50000, (2, 512), device=device)
        
        start_event = torch.cuda.Event(enable_timing=True)
        end_event = torch.cuda.Event(enable_timing=True)
        
        start_event.record()
        
        output = ddp_model(input_ids)
        loss = output.mean()
        loss.backward()
        ddp_model.finish_gradient_synchronization()
        optimizer.step()
        optimizer.zero_grad()
        
        end_event.record()
        torch.cuda.synchronize()
        
        elapsed_ms = start_event.elapsed_time(end_event)
        iteration_times.append(elapsed_ms)
    
    # 收集所有rank的结果
    local_result = {
        "rank": global_rank,
        "bucket_size_mb": bucket_size_mb if use_bucket else float('inf'),
        "use_bucket": use_bucket,
        "iteration_times_ms": iteration_times,
        "avg_time_ms": sum(iteration_times) / len(iteration_times),
        "min_time_ms": min(iteration_times),
        "max_time_ms": max(iteration_times),
    }
    
    all_results = [None] * world_size
    dist.all_gather_object(all_results, local_result)
    
    # PyTorch Profiler（仅rank 0，且仅分桶模式）
    if global_rank == 0 and use_bucket and bucket_size_mb <= 100:
        try:
            os.makedirs("./profiler_log", exist_ok=True)
            with torch.profiler.profile(
                activities=[
                    torch.profiler.ProfilerActivity.CPU,
                    torch.profiler.ProfilerActivity.CUDA,
                ],
                schedule=torch.profiler.schedule(wait=1, warmup=1, active=2, repeat=1),
                on_trace_ready=torch.profiler.tensorboard_trace_handler(
                    f"./profiler_log/bucket_{bucket_size_mb}MB"
                ),
                record_shapes=False,
                profile_memory=False,
                with_stack=False
            ) as prof:
                
                for _ in range(4):
                    input_ids = torch.randint(0, 50000, (2, 512), device=device)
                    output = ddp_model(input_ids)
                    loss = output.mean()
                    loss.backward()
                    ddp_model.finish_gradient_synchronization()
                    optimizer.step()
                    optimizer.zero_grad()
                    prof.step()
        except Exception as e:
            print(f"Profiler warning: {e}")
    
    # 只有rank 0返回结果
    if global_rank == 0:
        result_data = {
            "config": {
                "use_bucket": use_bucket,
                "bucket_size_mb": bucket_size_mb if use_bucket else None,
                "world_size": world_size,
            },
            "ranks": all_results,
            "global_avg_time_ms": sum(r["avg_time_ms"] for r in all_results) / world_size,
        }
        
        # 通过queue返回
        if result_queue is not None:
            result_queue.put(result_data)
        
        # 同时写入文件（备用）
        if result_file is not None:
            with open(result_file, 'wb') as f:
                pickle.dump(result_data, f)
    
    dist.destroy_process_group()


def benchmark_worker(rank, world_size, bucket_size, use_bucket, num_iter, 
                     master_addr, master_port, result_queue, result_file):
    """包装函数用于spawn"""
    try:
        run_benchmark(rank, world_size, bucket_size, use_bucket, num_iter,
                      master_addr, master_port, result_queue, result_file)
    except Exception as e:
        print(f"Error in worker {rank}: {e}")
        raise


def run_single_test(world_size, bucket_size, use_bucket, num_iterations,
                   master_addr, master_port):
    """运行单次测试并返回结果"""
    
    with Manager() as manager:
        result_queue = manager.Queue()
        
        with tempfile.NamedTemporaryFile(delete=False, suffix='.pkl') as tmp:
            result_file = tmp.name
        
        try:
            spawn(
                benchmark_worker,
                args=(world_size, bucket_size, use_bucket, num_iterations,
                      master_addr, master_port, result_queue, result_file),
                nprocs=world_size,
                join=True
            )
            
            if not result_queue.empty():
                return result_queue.get()
            else:
                with open(result_file, 'rb') as f:
                    return pickle.load(f)
        finally:
            if os.path.exists(result_file):
                os.unlink(result_file)


def main():
    parser = argparse.ArgumentParser(description="DDP Bucketed Benchmark")
    parser.add_argument("--world_size", type=int, default=2, 
                       help="GPU数量")
    parser.add_argument("--num_iterations", type=int, default=20,
                       help="每次测试的迭代次数")
    parser.add_argument("--master_addr", type=str, default="127.0.0.1")
    parser.add_argument("--master_port", type=int, default=29500)
    
    args = parser.parse_args()
    
    assert torch.cuda.is_available(), "需要CUDA"
    assert args.world_size <= torch.cuda.device_count(), "GPU数量不足"
    
    results = []
    bucket_sizes = [1, 10, 100, 1000]
    
    print("=" * 70)
    print("DDP Bucketed Benchmark")
    print(f"Configuration: 1 node, {args.world_size} GPUs, 0.5B model")
    print("=" * 70)
    
    # 1. 先测试无分桶版本（baseline）
    print("\n[1/5] Testing No Bucket (Baseline)...")
    result = run_single_test(
        args.world_size, 0, False, args.num_iterations,
        args.master_addr, args.master_port
    )
    results.append(result)
    
    # 2. 测试各种桶大小
    for i, bucket_size in enumerate(bucket_sizes, 2):
        print(f"\n[{i}/5] Testing Bucket Size = {bucket_size} MB...")
        result = run_single_test(
            args.world_size, bucket_size, True, args.num_iterations,
            args.master_addr, args.master_port
        )
        results.append(result)
    
    # 汇总结果
    print("\n" + "=" * 70)
    print("BENCHMARK RESULTS SUMMARY")
    print("=" * 70)
    
    baseline = None
    bucketed_results = []
    
    for r in results:
        if not r["config"]["use_bucket"]:
            baseline = r
        else:
            bucketed_results.append(r)
    
    if baseline is None:
        print("ERROR: Baseline result not found!")
        return
    
    baseline_time = baseline["global_avg_time_ms"]
    print(f"\nBaseline (No Bucket): {baseline_time:.2f} ms/iteration")
    print("-" * 60)
    print(f"{'Config':<20} {'Time (ms)':<12} {'Overhead':<12} {'Speedup':<10}")
    print("-" * 60)
    
    print(f"{'No Bucket':<20} {baseline_time:<12.2f} {0.0:<12.2f} {1.0:<10.2f}x")
    
    for r in sorted(bucketed_results, key=lambda x: x["config"]["bucket_size_mb"]):
        bucket_size = r["config"]["bucket_size_mb"]
        time_ms = r["global_avg_time_ms"]
        overhead = time_ms - baseline_time
        speedup = baseline_time / time_ms if time_ms > 0 else float('inf')
        
        config_str = f"Bucket={bucket_size}MB"
        print(f"{config_str:<20} {time_ms:<12.2f} {overhead:<12.2f} {speedup:<10.2f}x")
    
    # 保存详细结果
    with open("benchmark_results.json", "w") as f:
        json_results = []
        for r in results:
            json_r = {
                "config": r["config"],
                "global_avg_time_ms": float(r["global_avg_time_ms"]),
                "ranks": [
                    {
                        "rank": rank_r["rank"],
                        "bucket_size_mb": float(rank_r["bucket_size_mb"]) if rank_r["bucket_size_mb"] != float('inf') else "inf",
                        "use_bucket": rank_r["use_bucket"],
                        "avg_time_ms": float(rank_r["avg_time_ms"]),
                        "min_time_ms": float(rank_r["min_time_ms"]),
                        "max_time_ms": float(rank_r["max_time_ms"]),
                    }
                    for rank_r in r["ranks"]
                ]
            }
            json_results.append(json_r)
        json.dump(json_results, f, indent=2)
    
    print(f"\n详细结果已保存到 benchmark_results.json")


if __name__ == "__main__":
    main()

Overwriting ddp_bucketed_benchmark_complete.py


In [9]:
!python ddp_bucketed_benchmark_complete.py --world_size 1 --num_iterations 20

DDP Bucketed Benchmark
Configuration: 1 node, 1 GPUs, 0.5B model

[1/5] Testing No Bucket (Baseline)...

Testing: No Bucket
World size: 1
Warming up...
Running benchmark (20 iterations)...

[2/5] Testing Bucket Size = 1 MB...

Testing: Bucketed (bucket=1MB)
World size: 1
Warming up...
Running benchmark (20 iterations)...

[3/5] Testing Bucket Size = 10 MB...

Testing: Bucketed (bucket=10MB)
World size: 1
Warming up...
Running benchmark (20 iterations)...

[4/5] Testing Bucket Size = 100 MB...

Testing: Bucketed (bucket=100MB)
World size: 1
Warming up...
Running benchmark (20 iterations)...

[5/5] Testing Bucket Size = 1000 MB...

Testing: Bucketed (bucket=1000MB)
World size: 1
Warming up...
Running benchmark (20 iterations)...

BENCHMARK RESULTS SUMMARY

Baseline (No Bucket): 8681.96 ms/iteration
------------------------------------------------------------
Config               Time (ms)    Overhead     Speedup   
------------------------------------------------------------
No Bucket   

# 作业五

## 3 优化器状态分片

分布式数据并行训练在概念上简单且通常非常高效，但要求每个进程持有模型参数和优化器状态的不同副本。这种冗余往往会导致显著的内存开销。例如，AdamW优化器为每个参数维护两个浮点数，这意味着其占用的内存是模型权重的两倍。Rajbhandari等人[2020]描述了几种方法，通过将（1）优化器状态、（2）梯度以及（3）参数在各进程间进行划分，并根据需要在不同工作节点之间通信，从而有效降低数据并行训练中的冗余问题。

在本部分作业中，我们将通过实现优化器状态分片的简化版本，来降低每个Rank的内存消耗。具体而言，我们不会为所有参数都保存优化器状态，而是让每个Rank的优化器实例仅处理一部分参数（大约为1 / 世界规模）。当每个Rank的优化器执行一步更新时，它只会更新其分片中对应的那一部分模型参数。随后，各Rank会将其更新后的参数广播给其他Rank，以确保在每一步优化器更新后，模型参数始终保持同步。

---

### 问题（优化器状态分片）：15分

实现一个Python类，用于处理优化器状态的分片。该类应封装任意输入的PyTorch `optim.Optimizer`，并在每次优化器步骤后负责同步更新的参数。我们建议采用以下公共接口：

**`def __init__(self, params, optimizer_cls: Type[Optimizer], **kwargs: Any)`**：初始化分片状态优化器。`params` 是待优化的参数集合（或参数组，如果用户希望为模型的不同部分使用不同的超参数，例如学习率）；这些参数将在所有进程间进行分片。`optimizer_cls` 参数指定了要封装的优化器类型（例如，`optim.AdamW`）。最后，任何剩余的关键字参数将被转发到 `optimizer_cls` 的构造函数中。请务必在此方法中调用 `torch.optim.Optimizer` 超类的构造函数。

**`def step(self, closure, **kwargs)`**：调用包装优化器的 `step()` 方法，并传入相应参数；提供了闭包和关键字参数。更新参数后，与其他进程同步。

**`def add_param_group(self, param_group: dict[str, Any])`**：该方法应添加一个参数组传递给分片优化器。此方法在超类构造函数构建分片优化器时被调用，也可能在训练过程中被调用（例如，用于逐步解冻模型中的各层）。因此，该方法应负责将模型的参数分配到各个进程 ranks 中。

---

**交付物**：实现一个容器类，以支持优化器状态的分片处理。为测试你的分片优化器，请先实现适配器 `[adapters.get_sharded_optimizer]`。随后，运行 `uv run pytest tests/test_sharded_optimizer.py` 来执行测试。我们建议多次运行测试（例如5次），以确保测试能够稳定通过。

In [14]:
%%writefile sharded_optimizer.py
from typing import Any, Type, Iterable, Dict, List
import torch
import torch.distributed as dist
from torch.optim import Optimizer


class ShardedOptimizer(Optimizer):
    """
    Optimizer State Sharding (ZeRO-1 style, simplified)

    Each rank owns a shard of parameters and maintains optimizer
    states only for those parameters. After each step, updated
    parameters are broadcast to all other ranks.
    """

    def __init__(
        self,
        params: Iterable,
        optimizer_cls: Type[Optimizer],
        **kwargs: Any,
    ):
        if not dist.is_initialized():
            raise RuntimeError("torch.distributed must be initialized")

        self.rank = dist.get_rank()
        self.world_size = dist.get_world_size()
        self.optimizer_cls = optimizer_cls
        self.optimizer_kwargs = kwargs

        # super() initializes param_groups and calls add_param_group
        super().__init__(params, defaults={})

        # Build the local (sharded) optimizer
        self._build_local_optimizer()

    def _build_local_optimizer(self):
        """
        Create the wrapped optimizer using only parameters
        assigned to this rank.
        """
        local_param_groups: List[Dict[str, Any]] = []

        for group in self.param_groups:
            local_params = [
                p for p in group["params"]
                if self._param_owner(p) == self.rank
            ]

            if len(local_params) == 0:
                continue

            local_group = dict(group)
            local_group["params"] = local_params
            local_param_groups.append(local_group)

        self.local_optimizer = self.optimizer_cls(
            local_param_groups,
            **self.optimizer_kwargs,
        )

    def _param_owner(self, param: torch.nn.Parameter) -> int:
        """
        Determine which rank owns this parameter.
        """
        return self._global_param_index(param) % self.world_size

    def _global_param_index(self, param: torch.nn.Parameter) -> int:
        """
        Assign a stable global index to each parameter.
        """
        if not hasattr(self, "_param_to_index"):
            self._param_to_index = {}
            idx = 0
            for group in self.param_groups:
                for p in group["params"]:
                    self._param_to_index[p] = idx
                    idx += 1
        return self._param_to_index[param]

    @torch.no_grad()
    def step(self, closure=None, **kwargs):
        """
        Run optimizer step on local shard, then synchronize parameters.
        """
        loss = None
        if closure is not None:
            with torch.enable_grad():
                loss = closure()

        # Step only updates local shard
        self.local_optimizer.step(**kwargs)

        # Synchronize updated parameters
        self._sync_parameters()

        return loss

    def _sync_parameters(self):
        """
        Broadcast updated parameters from owning rank to all others.
        """
        for group in self.param_groups:
            for p in group["params"]:
                owner = self._param_owner(p)
                dist.broadcast(p.data, src=owner)

    def add_param_group(self, param_group: Dict[str, Any]):
        """
        Add a parameter group and rebuild local optimizer.
        """
        super().add_param_group(param_group)
        self._build_local_optimizer()


Overwriting sharded_optimizer.py


In [16]:
%%writefile test_sharded_optimizer.py
import os
import torch
import torch.nn as nn
import torch.distributed as dist

from sharded_optimizer import ShardedOptimizer   # 假设你把之前代码存成这个文件


def setup_distributed():
    dist.init_process_group(backend="nccl")
    torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))


def main():
    setup_distributed()

    rank = dist.get_rank()
    device = torch.device("cuda")

    # 一个简单模型
    model = nn.Sequential(
        nn.Linear(1024, 1024),
        nn.ReLU(),
        nn.Linear(1024, 10),
    ).to(device)

    # 确保模型初始化一致
    for p in model.parameters():
        dist.broadcast(p.data, src=0)

    optimizer = ShardedOptimizer(
        model.parameters(),
        torch.optim.AdamW,
        lr=1e-3
    )

    loss_fn = nn.CrossEntropyLoss()

    for step in range(5):
        x = torch.randn(8, 1024, device=device)
        y = torch.randint(0, 10, (8,), device=device)

        optimizer.zero_grad()
        loss = loss_fn(model(x), y)
        loss.backward()
        optimizer.step()

        if rank == 0:
            print(f"step={step}, loss={loss.item():.4f}")

    dist.destroy_process_group()


if __name__ == "__main__":
    main()


Writing test_sharded_optimizer.py


In [17]:
!torchrun --nproc_per_node=1 test_sharded_optimizer.py

step=0, loss=2.5581
step=1, loss=2.4269
step=2, loss=2.3533
step=3, loss=2.2617
step=4, loss=2.4645
