# 論文 9：GPipe - 使用管線平行化高效訓練巨型神經網路

**論文**：Huang et al. (2019) - GPipe: Efficient Training of Giant Neural Networks using Pipeline Parallelism

**核心洞見**：訓練非常大的神經網路需要將它們分割到多個裝置上。GPipe 引入了**管線平行化**，結合**微批次**和**重新實體化**，以高效訓練無法放入單一加速器的模型。

## 核心概念

### 1. 管線平行化
- 將模型分割為 **K 個分區**，分布在 K 個裝置上
- 每個裝置持有連續的層
- 資料流經管線：裝置 1 → 裝置 2 → ... → 裝置 K

### 2. 微批次處理
- 將大小為 N 的小批次分割為 M 個微批次，每個大小為 N/M
- 微批次依序通過管線處理
- **減少氣泡時間**（裝置閒置時間）

### 3. F-then-B 排程
```
先前向所有 M 個微批次，再反向所有 M 個微批次
裝置 1: F1 F2 F3 F4 ........... B4 B3 B2 B1
裝置 2: .. F1 F2 F3 F4 ....... B4 B3 B2 B1
裝置 3: .... F1 F2 F3 F4 ..... B4 B3 B2 B1
裝置 4: ...... F1 F2 F3 F4 ... B4 B3 B2 B1
```

### 4. 重新實體化（梯度檢查點）
- 不儲存所有激活值（記憶體密集）
- 只在分區邊界設置檢查點
- 在反向傳遞時重新計算中間激活值
- **用計算換取記憶體**

### 5. 氣泡時間
- 裝置閒置時間的比例：**(K-1) / (K-1 + M)**
- 更多微批次 M → 更少氣泡時間
- 更多裝置 K → 更多氣泡時間

---

## 實作概述

我們將實作：
1. 在「模擬」裝置上進行模型分區
2. 微批次分割和排程
3. 通過管線的前向和反向傳遞
4. 梯度累積
5. 記憶體效率的重新實體化
6. 與資料平行化的比較
7. 氣泡時間分析

讓我們開始建構！

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from typing import List, Tuple, Dict, Callable
from dataclasses import dataclass
import time
from collections import defaultdict

np.random.seed(42)

print("程式庫匯入成功！")
print("NumPy 版本:", np.__version__)

# 第一節：模型分區和管線結構

GPipe 的第一步是將大型模型分區為 K 個段落，每個分配給不同的裝置。

## 分區策略

對於有 L 層的模型：
- **均勻分區**：每個分區獲得約 L/K 層
- **平衡分區**：按計算時間或記憶體進行分區

我們將實作一個簡單的多層網路並進行均勻分區。

In [None]:
@dataclass
class Layer:
    """單一神經網路層。"""
    W: np.ndarray  # 權重矩陣
    b: np.ndarray  # 偏置向量
    activation: str = 'relu'  # 'relu'、'tanh' 或 'linear'
    
    def forward(self, x: np.ndarray, store_activation: bool = True) -> Tuple[np.ndarray, np.ndarray]:
        """前向傳遞：z = W @ x + b, a = activation(z)"""
        z = x @ self.W + self.b  # 線性變換
        
        # 應用激活函數
        if self.activation == 'relu':
            a = np.maximum(0, z)
        elif self.activation == 'tanh':
            a = np.tanh(z)
        elif self.activation == 'linear':
            a = z
        else:
            raise ValueError(f"未知的激活函數：{self.activation}")
        
        return a, z if store_activation else None
    
    def backward(self, da: np.ndarray, z: np.ndarray, x: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        """反向傳遞：計算梯度。"""
        # 激活函數梯度
        if self.activation == 'relu':
            dz = da * (z > 0)
        elif self.activation == 'tanh':
            dz = da * (1 - np.tanh(z)**2)
        elif self.activation == 'linear':
            dz = da
        else:
            raise ValueError(f"未知的激活函數：{self.activation}")
        
        # 參數梯度
        dW = x.T @ dz
        db = np.sum(dz, axis=0)
        
        # 輸入梯度（用於前一層）
        dx = dz @ self.W.T
        
        return dx, dW, db


@dataclass
class Partition:
    """模型的一個分區（分配給一個裝置的層子集）。"""
    device_id: int
    layers: List[Layer]
    
    def forward(self, x: np.ndarray, store_activations: bool = True) -> Tuple[np.ndarray, List[Tuple]]:
        """此分區中所有層的前向傳遞。"""
        activations = []  # 如需要，儲存每層的 (x, z)
        
        current = x
        for layer in self.layers:
            if store_activations:
                activations.append(current)  # 儲存此層的輸入
            
            current, z = layer.forward(current, store_activation=store_activations)
            
            if store_activations:
                activations.append(z)  # 儲存激活前的值
        
        return current, activations
    
    def backward(self, dout: np.ndarray, activations: List) -> Tuple[np.ndarray, List[Tuple]]:
        """此分區中所有層的反向傳遞。"""
        gradients = []  # 儲存每層的 (dW, db)
        
        da = dout
        # 反向遍歷各層
        for i in range(len(self.layers) - 1, -1, -1):
            layer = self.layers[i]
            
            # 獲取儲存的激活值
            x = activations[2*i]      # 此層的輸入
            z = activations[2*i + 1]  # 激活前的值
            
            # 計算梯度
            da, dW, db = layer.backward(da, z, x)
            gradients.insert(0, (dW, db))
        
        return da, gradients  # da 是相對於分區輸入的梯度


def create_model(layer_dims: List[int], activations: List[str]) -> List[Layer]:
    """建立多層神經網路。
    
    參數：
        layer_dims: [input_dim, hidden1, hidden2, ..., output_dim]
        activations: 每層的激活函數
    """
    layers = []
    for i in range(len(layer_dims) - 1):
        W = np.random.randn(layer_dims[i], layer_dims[i+1]) * np.sqrt(2.0 / layer_dims[i])
        b = np.zeros(layer_dims[i+1])
        layers.append(Layer(W, b, activations[i]))
    return layers


def partition_model(layers: List[Layer], num_partitions: int) -> List[Partition]:
    """將層均勻分區到各裝置。"""
    num_layers = len(layers)
    layers_per_partition = num_layers // num_partitions
    
    partitions = []
    for k in range(num_partitions):
        start = k * layers_per_partition
        if k == num_partitions - 1:
            # 最後一個分區獲得剩餘的層
            end = num_layers
        else:
            end = (k + 1) * layers_per_partition
        
        partition_layers = layers[start:end]
        partitions.append(Partition(device_id=k, layers=partition_layers))
    
    return partitions


# 範例：建立並分區一個 12 層網路
layer_dims = [128] + [256] * 10 + [10]  # 輸入=128，10 個 256 的隱藏層，輸出=10
activations = ['relu'] * 10 + ['linear']  # 隱藏層用 ReLU，輸出層用 linear

model_layers = create_model(layer_dims, activations)
print(f"建立了 {len(model_layers)} 層的模型")

# 分區到 4 個「裝置」
K = 4
partitions = partition_model(model_layers, K)

print(f"\n將模型分區為 {K} 個分區：")
for i, partition in enumerate(partitions):
    print(f"  裝置 {i}：{len(partition.layers)} 層")

print("\n✓ 模型分區完成！")

# 第二節：微批次策略

GPipe 將每個小批次分割為 M 個**微批次**以提高管線利用率。

## 為什麼要微批次處理？

沒有微批次處理：
```
裝置 1: [前向] .................... [反向]
裝置 2:          [前向] .......... [反向]
裝置 3:                   [前向] [反向]
          ^^^^^^^^                     ^^^^^^^^^^
          氣泡                         氣泡
```

有 M 個微批次：
```
裝置 1: F1 F2 F3 F4 ........... B4 B3 B2 B1
裝置 2:    F1 F2 F3 F4 ....... B4 B3 B2 B1
裝置 3:       F1 F2 F3 F4 .... B4 B3 B2 B1
          ^^                              ^^
          更小的氣泡
```

**氣泡比例**：(K-1) / (K-1 + M)
- 更多微批次 → 更少氣泡時間
- 但更多微批次 → 更多開銷

In [None]:
def split_into_microbatches(X: np.ndarray, y: np.ndarray, num_microbatches: int) -> List[Tuple[np.ndarray, np.ndarray]]:
    """將小批次分割為微批次。
    
    參數：
        X: 輸入資料 (batch_size, features)
        y: 標籤 (batch_size, ...)
        num_microbatches: M（微批次數量）
    
    返回：
        (X_micro, y_micro) 元組的列表
    """
    batch_size = X.shape[0]
    microbatch_size = batch_size // num_microbatches
    
    if batch_size % num_microbatches != 0:
        raise ValueError(f"批次大小 {batch_size} 必須能被微批次數 {num_microbatches} 整除")
    
    microbatches = []
    for m in range(num_microbatches):
        start = m * microbatch_size
        end = (m + 1) * microbatch_size
        microbatches.append((X[start:end], y[start:end]))
    
    return microbatches


def compute_bubble_fraction(K: int, M: int) -> float:
    """GPipe 的理論氣泡比例。
    
    公式：(K - 1) / (K - 1 + M)
    
    參數：
        K: 裝置/分區數量
        M: 微批次數量
    """
    return (K - 1) / (K - 1 + M)


# 範例：分析氣泡比例
K_values = [2, 4, 8, 16]
M_values = [1, 2, 4, 8, 16, 32, 64]

print("氣泡比例分析：")
print("\nM（微批次數）→")
print("K ↓\t" + "\t".join(f"{M:d}" for M in M_values))
print("-" * 80)

for K in K_values:
    row = f"{K}\t"
    for M in M_values:
        bubble = compute_bubble_fraction(K, M)
        row += f"{bubble:.3f}\t"
    print(row)

print("\n關鍵觀察：")
print("  - 更多裝置 (K) → 更多氣泡時間（裝置等待管線）")
print("  - 更多微批次 (M) → 更少氣泡時間（管線保持滿載）")
print("  - K=4, M=8 時：氣泡比例 = 27.3%（裝置 27% 時間閒置）")
print("  - K=4, M=32 時：氣泡比例 = 8.6%（好很多！）")

# 微批次處理範例
batch_size = 32
M = 8
X_batch = np.random.randn(batch_size, 128)
y_batch = np.random.randint(0, 10, batch_size)

microbatches = split_into_microbatches(X_batch, y_batch, M)
print(f"\n\n將 {batch_size} 的批次分割為 {M} 個微批次：")
for i, (X_m, y_m) in enumerate(microbatches):
    print(f"  微批次 {i}：X 形狀 {X_m.shape}，y 形狀 {y_m.shape}")

print("\n✓ 微批次處理完成！")

# 第三節：通過管線的前向傳遞（F-then-B 排程）

GPipe 使用 **F-then-B 排程**：
1. 將所有 M 個微批次前向通過管線
2. 將所有 M 個微批次反向通過管線（以相反順序）

## 時間線範例（K=3 裝置，M=4 微批次）：

```
時間 →  0   1   2   3   4   5   6   7   8   9   10  11  12
裝置 0:  F0  F1  F2  F3  ... ... ... B3  B2  B1  B0
裝置 1:  ... F0  F1  F2  F3  ... ... ... B3  B2  B1  B0
裝置 2:  ... ... F0  F1  F2  F3  ... ... ... B3  B2  B1  B0
```

說明：
- **F0** = 微批次 0 前向
- **B3** = 微批次 3 反向
- **...** = 氣泡（裝置閒置）

In [None]:
@dataclass
class PipelineEvent:
    """記錄裝置何時執行操作。"""
    time_step: int
    device_id: int
    operation: str  # 'forward' 或 'backward'
    microbatch_id: int


class GPipePipeline:
    """使用 F-then-B 排程的 GPipe 管線。"""
    
    def __init__(self, partitions: List[Partition]):
        self.partitions = partitions
        self.K = len(partitions)  # 裝置數量
        
        # 用於追蹤執行時間線
        self.events = []  # PipelineEvent 列表
    
    def forward_pipeline(self, microbatches: List[Tuple[np.ndarray, np.ndarray]], 
                        store_activations: bool = True) -> Tuple[List[np.ndarray], List[List]]:
        """前向傳遞：處理所有微批次通過管線。
        
        返回：
            outputs: 每個微批次的最終輸出列表
            all_activations: 激活值列表的列表（每個微批次一個）
        """
        M = len(microbatches)
        
        # 輸出和激活值的儲存
        outputs = [None] * M
        all_activations = [[None] * self.K for _ in range(M)]  # [微批次][分區]
        
        # F-then-B 排程：前向所有微批次
        time_step = 0
        
        for m in range(M):
            X_micro, y_micro = microbatches[m]
            current = X_micro
            
            # 通過每個分區前向
            for k, partition in enumerate(self.partitions):
                self.events.append(PipelineEvent(time_step, k, 'forward', m))
                
                current, activations = partition.forward(current, store_activations)
                all_activations[m][k] = activations
                
                time_step += 1
            
            outputs[m] = current
        
        return outputs, all_activations
    
    def backward_pipeline(self, outputs: List[np.ndarray], 
                         labels: List[np.ndarray],
                         all_activations: List[List]) -> List[List[List[Tuple]]]:
        """反向傳遞：以相反順序處理所有微批次。
        
        返回：
            all_gradients: [微批次][分區][(dW, db) 每層]
        """
        M = len(outputs)
        
        # 梯度的儲存
        all_gradients = [[None] * self.K for _ in range(M)]
        
        # 找到當前時間步（在前向傳遞之後）
        time_step = max(e.time_step for e in self.events) + 1
        
        # 以相反順序反向所有微批次
        for m in range(M - 1, -1, -1):
            # 計算損失梯度（簡單 MSE 示範）
            dout = 2 * (outputs[m] - labels[m]) / labels[m].shape[0]
            
            # 以相反順序通過每個分區反向
            for k in range(self.K - 1, -1, -1):
                partition = self.partitions[k]
                activations = all_activations[m][k]
                
                self.events.append(PipelineEvent(time_step, k, 'backward', m))
                
                dout, gradients = partition.backward(dout, activations)
                all_gradients[m][k] = gradients
                
                time_step += 1
        
        return all_gradients
    
    def get_timeline_matrix(self) -> np.ndarray:
        """將事件轉換為 K×T 矩陣以供視覺化。
        
        矩陣值：
            0 = 氣泡（閒置）
            m+1 = 微批次 m 前向
            -(m+1) = 微批次 m 反向
        """
        max_time = max(e.time_step for e in self.events) + 1
        timeline = np.zeros((self.K, max_time))
        
        for event in self.events:
            value = event.microbatch_id + 1
            if event.operation == 'backward':
                value = -value
            timeline[event.device_id, event.time_step] = value
        
        return timeline


# 測試前向傳遞
print("測試 GPipe 前向傳遞...\n")

# 建立管線
pipeline = GPipePipeline(partitions)

# 建立微批次
M = 4
batch_size = 16
X_batch = np.random.randn(batch_size, 128)
y_batch_onehot = np.eye(10)[np.random.randint(0, 10, batch_size)]

microbatches = split_into_microbatches(X_batch, y_batch_onehot, M)

# 前向傳遞
outputs, all_activations = pipeline.forward_pipeline(microbatches)

print(f"處理了 {M} 個微批次通過 {pipeline.K} 個裝置")
print(f"輸出形狀：{[out.shape for out in outputs]}")
print(f"前向事件總數：{len([e for e in pipeline.events if e.operation == 'forward'])}")

# 反向傳遞
labels = [mb[1] for mb in microbatches]
all_gradients = pipeline.backward_pipeline(outputs, labels, all_activations)

print(f"反向事件總數：{len([e for e in pipeline.events if e.operation == 'backward'])}")
print(f"\n總時間步數：{max(e.time_step for e in pipeline.events) + 1}")

print("\n✓ 管線前向和反向傳遞完成！")

# 第四節：跨微批次的梯度累積

處理完所有 M 個微批次後，我們需要：
1. **累積梯度**來自所有微批次
2. **平均**它們（因為它們來自同一個小批次）
3. **應用**累積的梯度來更新參數

這等同於一次處理整個小批次，但管線利用率更好！

In [None]:
def accumulate_gradients(all_gradients: List[List[List[Tuple]]]) -> List[List[Tuple]]:
    """累積並平均所有微批次的梯度。
    
    參數：
        all_gradients: [微批次][分區][(dW, db) 每層]
    
    返回：
        accumulated: [分區][(dW, db) 每層] - 跨微批次平均
    """
    M = len(all_gradients)  # 微批次數量
    K = len(all_gradients[0])  # 分區數量
    
    # 初始化累積梯度（從第一個微批次複製結構）
    accumulated = []
    for k in range(K):
        partition_grads = []
        for layer_idx in range(len(all_gradients[0][k])):
            # 跨微批次求和梯度
            dW_sum = sum(all_gradients[m][k][layer_idx][0] for m in range(M))
            db_sum = sum(all_gradients[m][k][layer_idx][1] for m in range(M))
            
            # 平均（因為微批次是同一個小批次的一部分）
            dW_avg = dW_sum / M
            db_avg = db_sum / M
            
            partition_grads.append((dW_avg, db_avg))
        
        accumulated.append(partition_grads)
    
    return accumulated


def apply_gradients(partitions: List[Partition], gradients: List[List[Tuple]], learning_rate: float):
    """應用累積的梯度來更新參數。
    
    參數：
        partitions: 模型分區列表
        gradients: [分區][(dW, db) 每層]
        learning_rate: SGD 的學習率
    """
    for k, partition in enumerate(partitions):
        partition_grads = gradients[k]
        
        for layer_idx, layer in enumerate(partition.layers):
            dW, db = partition_grads[layer_idx]
            
            # SGD 更新
            layer.W -= learning_rate * dW
            layer.b -= learning_rate * db


# 測試梯度累積
print("測試梯度累積...\n")

# 我們已經有了前一個 cell 的 all_gradients
accumulated_grads = accumulate_gradients(all_gradients)

print(f"為 {len(accumulated_grads)} 個分區累積了梯度：")
for k, partition_grads in enumerate(accumulated_grads):
    print(f"  分區 {k}：{len(partition_grads)} 層")
    for i, (dW, db) in enumerate(partition_grads[:2]):  # 顯示前 2 層
        print(f"    層 {i}：dW 形狀 {dW.shape}，db 形狀 {db.shape}")
        print(f"             dW 範數：{np.linalg.norm(dW):.6f}，db 範數：{np.linalg.norm(db):.6f}")

# 應用梯度
learning_rate = 0.01
old_W = partitions[0].layers[0].W.copy()

apply_gradients(partitions, accumulated_grads, learning_rate)

new_W = partitions[0].layers[0].W
weight_change = np.linalg.norm(new_W - old_W)

print(f"\n以學習率 {learning_rate} 應用了梯度")
print(f"權重變化（第一層）：{weight_change:.6f}")

print("\n✓ 梯度累積和應用完成！")

# 第五節：重新實體化（梯度檢查點）

**問題**：儲存所有 M 個微批次在 K 個分區的激活值需要 O(M × K × layer_memory) 的記憶體。

**解決方案**：**重新實體化**（梯度檢查點）
- 只在**分區邊界**設置激活值檢查點
- 在反向傳遞時**重新計算**中間激活值
- 權衡：約 33% 額外計算換取約 K 倍更少記憶體

## 記憶體比較

**沒有重新實體化**：
- 儲存所有分區所有層的激活值
- 記憶體：O(M × L)，其中 L = 總層數

**有重新實體化**：
- 只儲存分區邊界的激活值
- 記憶體：O(M × K)，其中 K = 分區數（K << L）
- 根據需要重新計算中間激活值

In [None]:
class GPipePipelineWithRemat:
    """帶重新實體化（梯度檢查點）的 GPipe。"""
    
    def __init__(self, partitions: List[Partition]):
        self.partitions = partitions
        self.K = len(partitions)
        self.events = []
    
    def forward_pipeline_remat(self, microbatches: List[Tuple[np.ndarray, np.ndarray]]) -> Tuple[List, List]:
        """帶重新實體化的前向傳遞：只儲存分區邊界激活值。
        
        返回：
            outputs: 每個微批次的最終輸出
            boundary_inputs: 每個分區的輸入（用於重新計算）
        """
        M = len(microbatches)
        
        outputs = [None] * M
        # 只儲存每個分區的輸入（邊界激活值）
        boundary_inputs = [[None] * self.K for _ in range(M)]
        
        time_step = 0
        
        for m in range(M):
            X_micro, y_micro = microbatches[m]
            current = X_micro
            
            for k, partition in enumerate(self.partitions):
                # 儲存此分區的輸入（邊界）
                boundary_inputs[m][k] = current.copy()
                
                self.events.append(PipelineEvent(time_step, k, 'forward', m))
                
                # 前向傳遞，不儲存中間激活值
                current, _ = partition.forward(current, store_activations=False)
                
                time_step += 1
            
            outputs[m] = current
        
        return outputs, boundary_inputs
    
    def backward_pipeline_remat(self, outputs: List[np.ndarray],
                                labels: List[np.ndarray],
                                boundary_inputs: List[List]) -> List[List[List[Tuple]]]:
        """帶重新實體化的反向傳遞：根據需要重新計算激活值。"""
        M = len(outputs)
        all_gradients = [[None] * self.K for _ in range(M)]
        
        time_step = max(e.time_step for e in self.events) + 1
        
        for m in range(M - 1, -1, -1):
            dout = 2 * (outputs[m] - labels[m]) / labels[m].shape[0]
            
            for k in range(self.K - 1, -1, -1):
                partition = self.partitions[k]
                
                self.events.append(PipelineEvent(time_step, k, 'backward', m))
                
                # 重新計算此分區的激活值
                partition_input = boundary_inputs[m][k]
                _, activations = partition.forward(partition_input, store_activations=True)
                
                # 現在使用重新計算的激活值計算梯度
                dout, gradients = partition.backward(dout, activations)
                all_gradients[m][k] = gradients
                
                time_step += 1
        
        return all_gradients


def estimate_memory_usage(M: int, K: int, layers_per_partition: int, 
                         activation_size_mb: float, with_remat: bool) -> float:
    """估計有無重新實體化的記憶體使用量。
    
    參數：
        M: 微批次數量
        K: 分區數量
        layers_per_partition: 每分區平均層數
        activation_size_mb: 一層激活值的記憶體（MB）
        with_remat: 是否使用重新實體化？
    
    返回：
        估計的記憶體（MB）
    """
    if with_remat:
        # 只儲存邊界輸入（每個微批次 K 個）
        return M * K * activation_size_mb
    else:
        # 儲存所有中間激活值
        total_layers = K * layers_per_partition
        return M * total_layers * activation_size_mb


# 測試重新實體化
print("測試重新實體化...\n")

# 建立帶重新實體化的新管線
pipeline_remat = GPipePipelineWithRemat(partitions)

# 帶重新實體化的前向
outputs_remat, boundary_inputs = pipeline_remat.forward_pipeline_remat(microbatches)

print("帶重新實體化的前向傳遞：")
print(f"  儲存的邊界輸入：{len(boundary_inputs)} 個微批次 × {len(boundary_inputs[0])} 個分區")
print(f"  邊界輸入形狀：{[bi[0].shape for bi in boundary_inputs]}")

# 帶重新實體化的反向
gradients_remat = pipeline_remat.backward_pipeline_remat(outputs_remat, labels, boundary_inputs)

print(f"\n帶重新實體化的反向傳遞：")
print(f"  計算的梯度：{len(gradients_remat)} 個微批次 × {len(gradients_remat[0])} 個分區")

# 記憶體分析
print("\n" + "="*70)
print("記憶體使用量比較")
print("="*70)

M_test = 8
K_test = 4
layers_per_partition = 3
activation_size_mb = 10  # 每層激活值 MB

mem_without = estimate_memory_usage(M_test, K_test, layers_per_partition, activation_size_mb, with_remat=False)
mem_with = estimate_memory_usage(M_test, K_test, layers_per_partition, activation_size_mb, with_remat=True)

print(f"\n配置：M={M_test}，K={K_test}，每分區 {layers_per_partition} 層")
print(f"  無重新實體化：{mem_without:.1f} MB")
print(f"  有重新實體化：{mem_with:.1f} MB")
print(f"  記憶體節省：    {mem_without / mem_with:.1f}×")

print("\n✓ 重新實體化完成！")

# 第六節：管線排程視覺化和氣泡分析

讓我們視覺化 F-then-B 排程並量化氣泡時間。

In [None]:
def visualize_pipeline_schedule(pipeline: GPipePipeline, title: str = "GPipe 排程（F-then-B）"):
    """視覺化管線執行時間線。"""
    timeline = pipeline.get_timeline_matrix()
    K, T = timeline.shape
    
    fig, ax = plt.subplots(figsize=(14, 6))
    
    # 建立顏色映射
    # 正值 = 前向（暖色），負值 = 反向（冷色），0 = 氣泡（白色）
    M = int(np.max(np.abs(timeline)))
    colors_forward = plt.cm.Reds(np.linspace(0.3, 0.9, M))
    colors_backward = plt.cm.Blues(np.linspace(0.3, 0.9, M))
    
    # 繪製時間線
    for k in range(K):
        for t in range(T):
            val = timeline[k, t]
            if val > 0:  # 前向
                color = colors_forward[int(val) - 1]
                label = f'F{int(val)-1}'
            elif val < 0:  # 反向
                color = colors_backward[int(-val) - 1]
                label = f'B{int(-val)-1}'
            else:  # 氣泡
                color = 'white'
                label = ''
            
            rect = plt.Rectangle((t, k), 1, 1, facecolor=color, edgecolor='black', linewidth=1)
            ax.add_patch(rect)
            
            if label:
                ax.text(t + 0.5, k + 0.5, label, ha='center', va='center', 
                       fontsize=9, fontweight='bold')
    
    ax.set_xlim(0, T)
    ax.set_ylim(0, K)
    ax.set_xlabel('時間步', fontsize=12)
    ax.set_ylabel('裝置', fontsize=12)
    ax.set_yticks(np.arange(K) + 0.5)
    ax.set_yticklabels([f'裝置 {k}' for k in range(K)])
    ax.set_xticks(np.arange(T) + 0.5)
    ax.set_xticklabels(np.arange(T))
    ax.set_title(title, fontsize=14, fontweight='bold')
    ax.invert_yaxis()
    
    # 添加圖例
    from matplotlib.patches import Patch
    legend_elements = [
        Patch(facecolor='salmon', label='前向傳遞'),
        Patch(facecolor='lightblue', label='反向傳遞'),
        Patch(facecolor='white', edgecolor='black', label='氣泡（閒置）')
    ]
    ax.legend(handles=legend_elements, loc='upper right')
    
    plt.tight_layout()
    plt.show()


def compute_actual_bubble_time(timeline: np.ndarray) -> float:
    """從時間線計算實際氣泡比例。"""
    total_steps = timeline.size
    bubble_steps = np.sum(timeline == 0)
    return bubble_steps / total_steps


# 視覺化我們之前建立的管線
print("視覺化 GPipe 管線排程...\n")

visualize_pipeline_schedule(pipeline_remat, f"GPipe：K={K} 個裝置，M={M} 個微批次")

# 分析氣泡時間
timeline = pipeline_remat.get_timeline_matrix()
actual_bubble = compute_actual_bubble_time(timeline)
theoretical_bubble = compute_bubble_fraction(K, M)

print(f"\n氣泡時間分析（K={K}，M={M}）：")
print(f"  理論氣泡比例：{theoretical_bubble:.3f}（{theoretical_bubble*100:.1f}%）")
print(f"  實際氣泡比例：{actual_bubble:.3f}（{actual_bubble*100:.1f}%）")
print(f"  管線效率：    {(1-actual_bubble)*100:.1f}%")

print("\n✓ 排程視覺化完成！")

# 第七節：比較 - 管線平行化 vs 資料平行化

讓我們比較 GPipe（管線平行化）與傳統資料平行化。

## 資料平行化
- 在每個裝置上複製整個模型
- 將批次分割到各裝置
- 同步梯度（all-reduce）
- **限制**：模型必須能放入單一裝置

## 管線平行化（GPipe）
- 將模型分割到各裝置
- 所有裝置處理同一批次（不同微批次）
- 不需要梯度同步
- **優勢**：可以訓練比單一裝置記憶體更大的模型

In [None]:
def simulate_data_parallelism(model_layers: List[Layer], 
                             batch_size: int, 
                             num_devices: int) -> Dict[str, float]:
    """模擬資料平行化時間。
    
    返回：
        包含時間分解的字典
    """
    # 每個裝置處理 batch_size/num_devices 個樣本
    local_batch_size = batch_size // num_devices
    
    # 時間（任意單位）
    forward_time = len(model_layers) * 1.0  # 每層一個單位
    backward_time = len(model_layers) * 1.0
    allreduce_time = 2.0  # 通訊開銷
    
    total_time = forward_time + backward_time + allreduce_time
    
    return {
        'forward': forward_time,
        'backward': backward_time,
        'communication': allreduce_time,
        'total': total_time,
        'efficiency': (forward_time + backward_time) / total_time
    }


def simulate_pipeline_parallelism(model_layers: List[Layer],
                                 batch_size: int,
                                 num_devices: int,
                                 num_microbatches: int) -> Dict[str, float]:
    """模擬管線平行化時間。"""
    layers_per_device = len(model_layers) // num_devices
    
    # 一個微批次通過一個分區的時間
    forward_time_per_micro = layers_per_device * 1.0
    backward_time_per_micro = layers_per_device * 1.0
    
    # 總管線時間
    # 填充管線：(K-1) + M 個微批次
    # 每步：通過一個分區前向或反向
    total_forward_steps = (num_devices - 1) + num_microbatches
    total_backward_steps = (num_devices - 1) + num_microbatches
    
    total_time = (total_forward_steps + total_backward_steps) * layers_per_device
    
    # 計算時間（排除氣泡）
    compute_time = 2 * num_microbatches * layers_per_device * num_devices
    
    return {
        'forward': total_forward_steps * layers_per_device,
        'backward': total_backward_steps * layers_per_device,
        'communication': 0,  # 沒有裝置間通訊！
        'total': total_time,
        'efficiency': compute_time / (total_time * num_devices),
        'bubble_fraction': compute_bubble_fraction(num_devices, num_microbatches)
    }


# 比較兩種方法
print("比較管線平行化 vs 資料平行化\n")
print("="*70)

total_layers = 12
batch_size = 32
num_devices = 4
num_microbatches = 8

# 模擬資料平行化
data_parallel_stats = simulate_data_parallelism(model_layers, batch_size, num_devices)

print("資料平行化：")
print(f"  配置：{num_devices} 個裝置，批次大小 {batch_size}")
print(f"  前向時間：       {data_parallel_stats['forward']:.1f} 單位")
print(f"  反向時間：       {data_parallel_stats['backward']:.1f} 單位")
print(f"  通訊時間：       {data_parallel_stats['communication']:.1f} 單位（all-reduce）")
print(f"  總時間：         {data_parallel_stats['total']:.1f} 單位")
print(f"  效率：           {data_parallel_stats['efficiency']*100:.1f}%")
print(f"  ⚠️  限制：模型必須能放入單一裝置！")

print("\n" + "="*70)

# 模擬管線平行化
pipeline_stats = simulate_pipeline_parallelism(model_layers, batch_size, num_devices, num_microbatches)

print("管線平行化（GPipe）：")
print(f"  配置：{num_devices} 個裝置，{num_microbatches} 個微批次")
print(f"  前向時間：       {pipeline_stats['forward']:.1f} 單位")
print(f"  反向時間：       {pipeline_stats['backward']:.1f} 單位")
print(f"  通訊時間：       {pipeline_stats['communication']:.1f} 單位（無！）")
print(f"  總時間：         {pipeline_stats['total']:.1f} 單位")
print(f"  效率：           {pipeline_stats['efficiency']*100:.1f}%")
print(f"  氣泡比例：       {pipeline_stats['bubble_fraction']*100:.1f}%")
print(f"  ✓ 優勢：可以訓練 {num_devices}× 更大的模型！")

print("\n" + "="*70)
print("\n關鍵差異：")
print("  • 資料平行：快速，但模型必須能放入單一裝置")
print("  • 管線平行：可以訓練巨型模型")
print("  • GPipe：沒有通訊開銷（不像資料平行）")
print("  • 權衡：管線有氣泡時間，資料平行有通訊開銷")

print("\n✓ 比較完成！")

# 第八節：完整的 GPipe 訓練迴圈

讓我們把所有內容整合起來：使用 GPipe 的完整訓練迴圈。

In [None]:
def compute_loss(outputs: List[np.ndarray], labels: List[np.ndarray]) -> float:
    """計算跨微批次的平均損失（簡化的 MSE）。"""
    total_loss = 0.0
    for output, label in zip(outputs, labels):
        total_loss += np.mean((output - label) ** 2)
    return total_loss / len(outputs)


def train_gpipe_epoch(pipeline: GPipePipelineWithRemat,
                     X_train: np.ndarray,
                     y_train: np.ndarray,
                     batch_size: int,
                     num_microbatches: int,
                     learning_rate: float) -> List[float]:
    """使用 GPipe 訓練一個 epoch。
    
    返回：
        每個小批次的損失列表
    """
    num_samples = X_train.shape[0]
    num_batches = num_samples // batch_size
    
    losses = []
    
    for batch_idx in range(num_batches):
        # 獲取小批次
        start = batch_idx * batch_size
        end = start + batch_size
        X_batch = X_train[start:end]
        y_batch = y_train[start:end]
        
        # 分割為微批次
        microbatches = split_into_microbatches(X_batch, y_batch, num_microbatches)
        
        # 前向傳遞
        outputs, boundary_inputs = pipeline.forward_pipeline_remat(microbatches)
        
        # 計算損失
        labels = [mb[1] for mb in microbatches]
        loss = compute_loss(outputs, labels)
        losses.append(loss)
        
        # 反向傳遞
        all_gradients = pipeline.backward_pipeline_remat(outputs, labels, boundary_inputs)
        
        # 累積梯度
        accumulated_grads = accumulate_gradients(all_gradients)
        
        # 更新參數
        apply_gradients(pipeline.partitions, accumulated_grads, learning_rate)
    
    return losses


# 生成合成資料集
print("建立合成資料集...\n")

num_train = 256
input_dim = 128
output_dim = 10

X_train = np.random.randn(num_train, input_dim)
y_train_labels = np.random.randint(0, output_dim, num_train)
y_train = np.eye(output_dim)[y_train_labels]

print(f"資料集：{num_train} 個樣本，輸入維度 {input_dim}，輸出維度 {output_dim}")

# 建立新模型和管線
print("\n初始化 GPipe 模型...")

layer_dims = [input_dim] + [256] * 10 + [output_dim]
activations = ['relu'] * 10 + ['linear']
model_layers = create_model(layer_dims, activations)

K = 4
partitions = partition_model(model_layers, K)
pipeline = GPipePipelineWithRemat(partitions)

print(f"  模型：{len(model_layers)} 層")
print(f"  分區：{K} 個裝置")

# 訓練配置
batch_size = 32
num_microbatches = 8
learning_rate = 0.001
num_epochs = 3

print(f"\n訓練配置：")
print(f"  批次大小：{batch_size}")
print(f"  微批次數：{num_microbatches}")
print(f"  學習率：{learning_rate}")
print(f"  Epochs：{num_epochs}")

# 訓練
print("\n" + "="*70)
print("訓練 GPipe 模型...")
print("="*70 + "\n")

all_losses = []

for epoch in range(num_epochs):
    pipeline.events = []  # 重置此 epoch 的事件
    
    losses = train_gpipe_epoch(pipeline, X_train, y_train, 
                               batch_size, num_microbatches, learning_rate)
    
    avg_loss = np.mean(losses)
    all_losses.extend(losses)
    
    print(f"Epoch {epoch+1}/{num_epochs}：平均損失 = {avg_loss:.6f}")

print("\n✓ 訓練完成！")

# 第九節：視覺化和分析

讓我們建立 GPipe 效能的綜合視覺化。

In [None]:
# 視覺化 1：訓練損失曲線
fig, axes = plt.subplots(2, 2, figsize=(14, 10))

# 圖 1：訓練損失
ax = axes[0, 0]
ax.plot(all_losses, linewidth=2, color='darkblue')
ax.set_xlabel('小批次', fontsize=11)
ax.set_ylabel('損失', fontsize=11)
ax.set_title('GPipe 訓練損失', fontsize=12, fontweight='bold')
ax.grid(True, alpha=0.3)

# 圖 2：氣泡比例 vs M（微批次數）
ax = axes[0, 1]
M_range = np.arange(1, 65)
K_values_plot = [2, 4, 8, 16]
colors = ['blue', 'green', 'orange', 'red']

for K_val, color in zip(K_values_plot, colors):
    bubbles = [compute_bubble_fraction(K_val, M) for M in M_range]
    ax.plot(M_range, bubbles, label=f'K={K_val}', linewidth=2, color=color)

ax.set_xlabel('微批次數 (M)', fontsize=11)
ax.set_ylabel('氣泡比例', fontsize=11)
ax.set_title('氣泡時間 vs 微批次數', fontsize=12, fontweight='bold')
ax.legend()
ax.grid(True, alpha=0.3)
ax.set_ylim([0, 1])

# 圖 3：重新實體化的記憶體節省
ax = axes[1, 0]
K_range = np.arange(2, 17)
layers_per_partition = 3
M_fixed = 8
activation_size_mb = 10

mem_without_remat = [estimate_memory_usage(M_fixed, K_val, layers_per_partition, 
                                            activation_size_mb, False) 
                     for K_val in K_range]
mem_with_remat = [estimate_memory_usage(M_fixed, K_val, layers_per_partition, 
                                        activation_size_mb, True) 
                  for K_val in K_range]

ax.plot(K_range, mem_without_remat, label='無重新實體化', linewidth=2, 
        marker='o', color='red', markersize=6)
ax.plot(K_range, mem_with_remat, label='有重新實體化', linewidth=2, 
        marker='s', color='green', markersize=6)
ax.set_xlabel('分區數 (K)', fontsize=11)
ax.set_ylabel('記憶體 (MB)', fontsize=11)
ax.set_title('記憶體使用量：重新實體化的影響', fontsize=12, fontweight='bold')
ax.legend()
ax.grid(True, alpha=0.3)

# 圖 4：管線效率 vs 配置
ax = axes[1, 1]
M_configs = [4, 8, 16, 32]
K_configs = np.arange(2, 17)

for M_val in M_configs:
    efficiencies = [1 - compute_bubble_fraction(K_val, M_val) for K_val in K_configs]
    ax.plot(K_configs, efficiencies, label=f'M={M_val}', linewidth=2, marker='o', markersize=5)

ax.set_xlabel('裝置數 (K)', fontsize=11)
ax.set_ylabel('管線效率', fontsize=11)
ax.set_title('管線效率 vs 配置', fontsize=12, fontweight='bold')
ax.legend()
ax.grid(True, alpha=0.3)
ax.set_ylim([0, 1])

plt.tight_layout()
plt.show()

print("\n✓ 視覺化完成！")

# 第十節：關鍵洞見和現代擴展

## GPipe 總結

### 核心思想
1. **管線平行化**：按層將模型分割到各裝置
2. **微批次處理**：分割小批次以減少氣泡時間
3. **重新實體化**：用計算換取記憶體效率
4. **F-then-B 排程**：先前向所有微批次，再反向所有

### 數學洞見

**氣泡比例**：
$$\text{Bubble} = \frac{K-1}{K-1+M}$$

**記憶體節省**（有重新實體化）：
$$\text{Memory}_{\text{remat}} = \frac{K}{L} \times \text{Memory}_{\text{standard}}$$

其中 L = 總層數，K = 分區數。

**加速比**（相比單一裝置）：
$$\text{Speedup} \approx \frac{K}{1 + \frac{K-1}{M}}$$

### 何時使用 GPipe

**使用 GPipe 當**：
- 模型無法放入單一裝置
- 序列化模型結構（各層）
- 裝置間頻寬有限
- 可以使用大的 M（許多微批次）

**避免 GPipe 當**：
- 模型能放入單一裝置（改用資料平行）
- M 很小（氣泡時間佔主導）
- 非序列化架構（例如，大量跳躍連接）

---

## 現代擴展

### 1. PipeDream（Harlap et al., 2018）
- **1F1B 排程**：交錯前向和反向
- 減少管線深度
- 更好的記憶體效率

### 2. Megatron-LM（Shoeybi et al., 2019）
- 結合管線 + 張量平行化
- 水平分割層（層內）
- 用於 5300 億參數的模型

### 3. ZeRO（Rajbhandari et al., 2020）
- 分區優化器狀態、梯度、參數
- 補充管線平行化
- 減少記憶體而無需複製

### 4. Varuna（Athlur et al., 2022）
- 自動管線排程優化
- 自適應微批次處理
- 處理異質裝置

---

## 實際考量

### 最佳 M（微批次數）
- **太小**：高氣泡比例
- **太大**：微批次管理開銷
- **經驗法則**：M ≈ 4×K

### 分區策略
- 均勻：每裝置相同層數
- 平衡：每裝置相同計算時間
- 記憶體感知：平衡記憶體使用

### 批次大小
- 大批次提高管線利用率
- 但可能影響泛化能力
- 用學習率縮放來補償

---

## 與其他論文的關聯

**論文 5（最優腦損傷）**：剪枝減少模型大小 → 需要更少管線階段

**論文 23（MDL）**：模型複雜度 vs 資料擬合 → 選擇 K（分區數）涉及權衡

**論文 14（神經架構搜索）**：可以使用 GPipe 搜索對單一裝置來說太大的架構

---

## 實際影響

GPipe 實現了：
- **AmoebaNet-B**：5.57 億參數（比之前最好的大 8 倍）
- **在 ImageNet 上訓練**，達到 84.4% top-1 準確率
- **GPT-3**：1750 億參數（結合多種技術包括管線平行化）
- **大型語言模型**：現代 LLM 使用管線 + 張量 + 資料平行化

---

**GPipe 的遺產**：展示了**模型平行化是可行的**，為訓練數千億參數的模型鋪平了道路。結合張量平行化和 ZeRO，它構成了現代大規模訓練的基礎！

In [None]:
# 最終示範：展示 K 和 M 之間的權衡
print("="*70)
print("GPipe 配置指南")
print("="*70)

print("\n1. 選擇 K（裝置數）：")
print("   • 受限於：可用加速器數量")
print("   • 更多 K = 可以訓練更大模型")
print("   • 更多 K = 更多氣泡時間（需要更大 M 來補償）")

print("\n2. 選擇 M（微批次數）：")
print("   • 經驗法則：M ≈ 4×K")
print("   • 更大 M = 更少氣泡時間")
print("   • 更大 M = 更多開銷")
print("   • 必須能整除批次大小")

print("\n3. 配置範例：")
configs = [
    (2, 8, 32),
    (4, 16, 64),
    (8, 32, 128),
    (16, 64, 256),
]

for K, M, batch in configs:
    bubble = compute_bubble_fraction(K, M)
    efficiency = 1 - bubble
    print(f"   K={K:2d}, M={M:2d}, batch={batch:3d} → "
          f"效率={efficiency*100:.1f}%, 氣泡={bubble*100:.1f}%")

print("\n" + "="*70)
print("✓ GPipe 實作完成！")
print("="*70)