<a href="https://colab.research.google.com/github/liuguliu/study_notebook/blob/main/Megatron_LM%E7%AC%94%E8%AE%B0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
GPT_ARGS=" \
  --tensor-model-parallel-size 4 \
  --pipeline-mode-parallel-size 2 \
  --sequence-parallel \
  --num-layers 32 \
  --hidden-size 2560 \
  --num-attention-heads 32 \
  --seq-length 1024 \
  --max-position-embeddings 1024 \
  --micro-batch-size 4 \
  --global-batch-size 1024 \
  --lr 0.8e-4 \
  --train-iters 500000 \
  --lr-decay-style cosine \
  --min-lr 0.8e-5 \
  --weight-decay 1e-1 \
  --lr-warmup-fraction .01 \
  --clip-grad 1.0 \
  --fp16 \
  --use-rotary-position-embeddings \
  --no-gradient-accumulation-fusion \
  --recompute-activations \
"
print(GPT_ARGS)

DATA_ARGS=" \
  --data-path $DATA_PATH \
  --vocab-file $VOCAB_FILE \
  --merge-file $MERGE_FILE \
  --data-impl mmap \
  --split 949501 \
"
print(DATA_ARGS)

OUTPUT_ARGS=" \
  --log-interval 1 \
  --tensorboard-dir $TENSORBOARD_PATH \
  --save-interval 10000 \
  --eval-interval 1000 \
  --eval-iters 10 \
"

torchrun $DISTRIBUTED_ARGS pretrain_gpt.py \
  $GPT_ARGS \
  $DATA_ARGS \
  $OUTPUT_ARGS \
  --distributed-backend nccl \
  --save $CHECKPOINT_PATH \
  --load $CHECKPOINT_PATH



   --tensor-model-parallel-size 4 


In [None]:
from megatron.core import mpu, tensor_parallel

mpu.initialize_model_parallel(args.tensor_model_parallel_size,
            args.pipeline_model_parallel_size,
            args.virtual_pipeline_model_parallel_size,
            args.pipeline_model_parallel_split_rank)

In [None]:
'''colossalai 2D张量并行'''

import colossalai
import colossalai.nn as col_nn
import torch
from colossalai.utils import print_rank_0
from colossalai.context import ParallelMode
from colossalai.core import global_context as gpc
from colossalai.utils import get_current_device

# 并行设置
CONFIG = dict(parallel = dict(
    data = 1,
    pipeline = 1,
    tensor = dict(size=4, mode='2d'),
))

parser = colossalai.get_default_parser()

colossalai.launch(config = CONFIG,
      rank = args.rank,
      world_size = args.world_size,
      local_rank = rags.local_rank,
      host = args.host,
      port = args.port)

class MLP(torch.nn.Module):
  def __init__(self, dim: int = 256):
    super().__init__()
    intermediate_dim = dim * 4
    self.dense_1 = col_nn.Linear(dim, intermediate_dim)
    print_rank_0(f'Weight of the first linear layer: {self.dense_1.weight.shape}')
    self.activation = torch.nn.GELU()
    self.dense_2 = col_nn.Linear(intermediate_dim, dim)
    print_rank_0(f'Weight of the second linear layer: {self.dense_2.weight.shape}')
    self.dropout = col_nn.Dropout(0.1)

    def forward(self, x):
      x = self.dense_1(x)
      print_rank_0(f'Output of the first linear layer: {x.shape}')
      x = self.activation(x)
      x = self.dense_2(x)
      print_rank_0(f'Output of the second linear layer: {x.shape}')
      x = self. dropout(x)
      return x


# 创建模型
m = MLP()

# 随机输入一些数据来运行这个模型
x = torch.randn((16,256), device = get_current_device())

# partition input
torch.distributed.broadcast(x, src = 0)
x = torch.chunk(x, 2, dim=0)[gpc.get_local_rank(ParallelMode.PARALLEL_2D_COL)]
x = torch.chunk(x, 2, dim=-1)[gpc.get_local_rank(ParallelMode.PARALLEL_2D_ROW)]
print_rank_0(f'Input: {x.shape}')

x = m(x)


In [None]:
'''colossalai 2.5D张量并行'''

# 并行设置
CONFIG = dict(parallel = dict(
    data = 1,
    pipeline = 1,
    tensor = dict(size=8, mode='2.5d', depth=2),
))

# 创建模型
m = MLP()

# 随机输入一些数据来运行这个模型
x = torch.randn((16,256), device=get_current_device())

# partition input
torch.distributed.broadcast(x, src=0)
x = torch.chunk(x, 2, dim=0)[gpc.get_local_rank(ParalleMode.PARALLEL_2P5D_DEP)]
x = torch.chunk(x, 2, dim=0)[gpc.get_local_rank(ParallelMode.PARALLEL_2P5D_COL)]
x = torch.chunk(x, 2, dim=-1)[gpc.get_local_rank(ParallelMode.PARALLEL_2P5D_ROW)]
print_rank_0(f'Input:{x.shape}')

x = m(x)

In [None]:
'''colossalai 3D张量并行'''

# 并行设置
CONFIG = dict(parallel = dict(
    data = 1,
    pipeline = 1,
    tensor = dict(size=8, model='3d'),
))

# 创建模型
m = MLP()

# 随机输入一些数据来运行这个模型
x = torch.randn((16, 256), device = get_current_device())

# partition input
torch.distributed.broadcast(x, src = 0)
x = torch.chunk(x, 2, dim=0)[gpc.get_local_rank(ParallelMode.PARALLEL_3D_WEIGHT)]
x = torch.chunk(x, 2, dim=0)[gpc.get_local_rank(ParallelMode.PARALLEL_3D_INPUT)]
x = torch.chunk(x, 2, dim=-1)[gpc.get_local_rank(ParallelMode.PARALLEL_3D_OUTPUT)]
print_rank_0(f'Input: {x.shape}')

x = m(x)

In [None]:
'''pytorch张量并行'''

from torch.distributed._tensor import DeviceMesh
from torch.distributed.tensor.parallel import PairwiseParallel, parallelize_module

# 通过设备网络根据给定的 world_size 创建分片计划
device_mesh = DeviceMesh("cuda", torch.arange(0, args.world_size))

# 创建模型并移动到GPU
model = ToyModel().cuda(rank)

# 为并行化模块创建优化器
LR = 0.25
optimizer = torch.optim.SGD(model.parameters(), lr = LR)

# 根据给定的并行风格并行化模块（这里指定为PairwiseParallel），
# colwise 和 rowwise 样式串联为固定对，就像【Megatron-LM】(https://arxiv.org/abs/1909.08053)所做的那样。
model = parallellize_module(model, device_mesh, PairwiseParallel())

# 对分片模块执行多次前向/后向传播和优化器对参数进行更新
for i in range(args.iter_nums):
  # 对于 TP, 所有 TP rank的输入需要相同。
  # 设置随机种子是为了模块数据加载器的行为。
  if rank==0:
    print(f"-----------{i}-----------------")
  torch.manual_seed(i)
  inp = torch.rand(20,10).cuda(rank)
  if rank==0:
    print(f"rank: {rank}, input shape: {inp.shape}")
  output = model(inp)
  if rank==0:
    print(f"rank: {rank}, input shape: {output.shape}")
  output.sum().backward()
  optimizer.step()



# DDP(Distributed Data Parallel)


In [None]:
import torch
import t dist
import torch.multiprocessing as mp
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP

def example(rank, world_size):
  # create default process group
  dist.init_process_group("gloo", rank=rank, world_size=world_size)
  # create local model
  model = nn.Linear(10,10).to(rank)
  # construct DDP model
  ddp_model = DDP(model, device_ids=[rank])
  # define loss function and optimizer
  loss_fn = nn.MSELoss()
  optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

  # forward pass
  outputs = ddp_model(torch.randn(20,10).to(rank))
  labels = torch.randn(20, 10).to(rank)
  # backward pass
  loss_fn(outputs, labels).backward()
  # update parameters
  optimizer.step()

def main():
  world_size = 2
  mp.spawn(example, args=(world_size,), nprocs=world_size, join=True)


if __name__=="__main__":
  # Environment variables which need to be
  # set when using c10d's default "env"
  # initialization mode.
  os.environ["MASTER_ADDR"] = "localhost"
  os.environ['MASTER_PORT'] = "29500"
  main()

# FSDP (Fully Sharded Data Parallel)

In [None]:
from torch.distributed.fsdp import(
    FullyShardedDataParallel,
    CPUOffload,
)
from torch.distributed.fsdp.wrap import(
    default_auto_wrap_policy,
)
import torch.nn as nn

class model(nn.Module):
  def __init__(self):
    super().__init__()
    self.layer1 = nn.Linear(8, 4)
    self.layer2 = nn.Linear(4, 16)
    self.layer3 = nn.Linear(16, 4)

model = DistributedDataParallel(model())
fsdp_model = FullyShardedDataParallel(
    model(),
    fsdp_auto_wrap_policy = default_auto_wrap_policy,
    cpu_offload = CPUOffload(offload_params=True),
)

In [None]:
# 手动包装（Manual Wrapping）
# https://zhuanlan.zhihu.com/p/650002268
from torch.distributed.fsdp import(
    FullyShardedDataParallel,
    CPUOffload,
)
from torch.distributed.fsdp.wrap import(
    enable_wrap,
    wrap,
)

import torch.nn as nn
from typing import Dict

class model(nn.Module):
  def __init__(self):
    super().__init__()
    self.layer1 = wrap(nn.Linear(8,4))
    self.layer2 = nn.Linear(4,16)
    self.layer3 = wrap(nn.Linear(16,4))

wrapper_kwargs = Dict(cpu_offload=CPUOffload(offload_params=True))
with enable_wrap(wrapper_cls=FullyShardedDataParallel, **wrapper_kwargs):
  fsdp_model = wrap(model())

optim = torch.optim.Adam(fsdp_model.parameters(), lr=0.0001)
for sample,label in next_batch():
  out = fsdp_model(input)
  loss = criterion(out, label)
  loss.backward()
  optim.step()


