## Pipeline Parallelism
이번 세션에서는 파이프라인 병렬화에 대해 알아보겠습니다.

### 1. Inter-layer model parallelism
파이프라인 병렬화는 Inter-layer 모델 병렬화를 개선한 것입니다. Inter-layer 모델 병렬화는 아래와 같이 특정 GPU에 특정 레이어들을 할당하는 모델 병렬화 방법이였죠. 아래 그림에서는 GPU1번에 1,2,3번 레이어가 할당되었고, GPU2번에 4,5번 레이어가 할당 되었는데, 이 때 쪼개진 하나의 조각을 `stage(스테이지)`라고 합니다. 아래 예시의 경우 2개의 스테이지로 분할되었습니다.

![](../images/inter_layer.png)

그러나 이전 레이어의 출력을 다음 레이어의 입력으로 하는 신경망의 특성상 특정 GPU의 연산이 끝나야 다른 GPU가 연산을 시작할 수 있습니다. 즉, 아래의 그림처럼 Inter-layer 모델 병렬화는 동시에 하나의 GPU만 사용할 수 있다는 치명적인 한계를 가지고 있습니다.

![](../images/inter_layer_2.png)
![](../images/inter_layer_3.gif)


## 2. GPipe
GPipe는 Google에서 개발된 파이프라인 병렬화 기법으로 Inter Layer 모델 병렬화 시 GPU가 쉬는 시간 (idle time)을 줄이기 위해 등장했으며, mini-batch를 micro-batch로 한번 더 쪼개서 학습 과정을 파이프라이닝 하는 방식으로 동작합니다.

![](../images/gpipe_1.png)

<br>

### Micro-batch
- Mini-batch는 전체 데이터셋을 n개로 분할한 서브샘플 집합입니다.
- Micro-batch는 Mini-batch를 m개로 한번 더 분할한 서브샘플 집합입니다.

![](../images/gpipe_2.png)

<br>

### Pipelining
GPipe는 미니배치를 마이크로 배치로 쪼개고 연산을 파이프라이닝 합니다. 붉은색 (GPU가 쉬는 부분)을 Bubble time이라고 하는데, Micro batch 사이즈가 커질 수록 Bubble time이 줄어드는 것을 알 수 있습니다.

![](../images/gpipe_3.gif)


## 3. GPipe with PyTorch
kakaobrain에서 공개한 `torchgpipe`를 사용하면 손쉽게 GPipe를 사용할 수 있습니다. 단, `nn.Sequential`로 래핑된 모델만 사용 가능하며 모든 모듈의 입력과 출력 타입은 `torch.Tensor` 혹은 `Tuple[torch.Tensor]`로 제한됩니다. 따라서 코딩하기가 상당히 까다롭습니다.

In [None]:
"""
src/gpipe.py
"""

import torch
import torch.nn as nn
from datasets import load_dataset
from torch.optim import Adam
from torch.utils.data import DataLoader
from torchgpipe import GPipe
from transformers import GPT2Tokenizer, GPT2LMHeadModel
from transformers.models.gpt2.modeling_gpt2 import GPT2Block as GPT2BlockBase


class GPT2Preprocessing(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.embed_dim = config.hidden_size
        self.wte = nn.Embedding(config.vocab_size, self.embed_dim)
        self.wpe = nn.Embedding(config.max_position_embeddings, self.embed_dim)
        self.drop = nn.Dropout(config.embd_pdrop)

    def forward(self, input_ids):
        input_shape = input_ids.size()
        input_ids = input_ids.view(-1, input_shape[-1])
        position_ids = torch.arange(
            0, input_shape[-1], dtype=torch.long, device=input_ids.device
        )
        position_ids = position_ids.unsqueeze(0).view(-1, input_shape[-1])
        inputs_embeds = self.wte(input_ids)
        position_embeds = self.wpe(position_ids)
        hidden_states = inputs_embeds + position_embeds
        hidden_states = self.drop(hidden_states)
        return hidden_states


class GPT2Block(GPT2BlockBase):
    def forward(self, hidden_states):
        hidden_states = super(GPT2Block, self).forward(
            hidden_states=hidden_states,
        )
        return hidden_states[0]


class GPT2Postprocessing(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.ln_f = nn.LayerNorm(
            config.hidden_size,
            eps=config.layer_norm_epsilon,
        )
        self.lm_head = nn.Linear(
            config.hidden_size,
            config.vocab_size,
            bias=False,
        )

    def forward(self, hidden_states):
        hidden_states = self.ln_f(hidden_states)
        lm_logits = self.lm_head(hidden_states)
        return lm_logits


def create_model_from_pretrained(model_name):
    pretrained = GPT2LMHeadModel.from_pretrained(model_name)
    preprocess = GPT2Preprocessing(pretrained.config)
    preprocess.wte.weight = pretrained.transformer.wte.weight
    preprocess.wpe.weight = pretrained.transformer.wpe.weight

    blocks = pretrained.transformer.h
    for i, block in enumerate(blocks):
        block.__class__ = GPT2Block

    postprocess = GPT2Postprocessing(pretrained.config)
    postprocess.ln_f.weight = pretrained.transformer.ln_f.weight
    postprocess.ln_f.bias = pretrained.transformer.ln_f.bias
    postprocess.lm_head.weight.data = pretrained.lm_head.weight.data.clone()

    return nn.Sequential(preprocess, *blocks, postprocess)


if __name__ == "__main__":
    world_size = 4

    tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
    tokenizer.pad_token = tokenizer.eos_token

    model = create_model_from_pretrained(model_name="gpt2")
    model = GPipe(
        model,
        balance=[4, 3, 3, 4],
        devices=[0, 1, 2, 3],
        chunks=world_size,
    )

    datasets = load_dataset("squad").data["train"]["context"]
    datasets = [str(sample) for sample in datasets]
    data_loader = DataLoader(datasets, batch_size=8, num_workers=8)

    optimizer = Adam(model.parameters(), lr=3e-5)
    loss_fn = nn.CrossEntropyLoss()

    for i, data in enumerate(data_loader):
        optimizer.zero_grad()
        tokens = tokenizer(data, return_tensors="pt", truncation=True, padding=True)
        input_ids = tokens.input_ids.to(0)
        labels = tokens.input_ids.to(world_size - 1)

        lm_logits = model(input_ids)
        shift_logits = lm_logits[..., :-1, :].contiguous()
        shift_labels = labels[..., 1:].contiguous()
        loss = nn.CrossEntropyLoss()(
            shift_logits.view(-1, shift_logits.size(-1)), shift_labels.view(-1)
        )
        loss.backward()
        optimizer.step()

        if i % 10 == 0:
            print(f"step: {i}, loss: {loss}")
        if i == 300:
            break


In [47]:
# !python -m torch.distributed.launch --nproc_per_node=4 ../src/gpipe.py
!python ../src/gpipe.py

Reusing dataset squad (/home/ubuntu/.cache/huggingface/datasets/squad/plain_text/1.0.0/d6ec3ceb99ca480ce37cdd35555d6cb2511d223b9150cce08a837ef62ffea453)
100%|█████████████████████████████████████████████| 2/2 [00:00<00:00, 55.94it/s]
step: 0, loss: 6.084661483764648
step: 10, loss: 3.2574026584625244
step: 20, loss: 2.796205759048462
step: 30, loss: 2.5538008213043213
step: 40, loss: 2.8463237285614014
step: 50, loss: 2.3466761112213135
step: 60, loss: 2.5407633781433105
step: 70, loss: 2.2434418201446533
step: 80, loss: 2.4792842864990234
step: 90, loss: 2.9400510787963867
step: 100, loss: 2.8163280487060547
step: 110, loss: 2.4787795543670654
step: 120, loss: 2.9588236808776855
step: 130, loss: 2.3893203735351562
step: 140, loss: 2.9571073055267334
step: 150, loss: 3.9219329357147217
step: 160, loss: 3.023880958557129
step: 170, loss: 3.018484592437744
step: 180, loss: 1.6825034618377686
step: 190, loss: 3.5461761951446533
step: 200, loss: 3.6606838703155518
step: 210, loss: 3.527740


## 4. 1F1B Pipelining (PipeDream)

Microsoft에서 공개한 `PipeDream`은 `GPipe`와는 약간 다른 방식의 파이프라이닝을 수행합니다. 흔히 이 방법을 1F1B라고 부르는데, 모든 Forward가 끝나고 나서 Backward를 수행하는 GPipe와 달리 `PipeDream`은 Forward와 Backward를 번갈아가면서 수행합니다.

<img src="../images/1f1b.png" width=600>

1F1B Pipelining에는 다음과 같은 두가지 챌린지가 존재합니다.
1. Weight version managing
2. Work partitioning

<br>

### 1) Weight version managinig
GPipe의 경우 하나의 weight 버전만 운용하지만 주기적으로 Pipeline flush가 일어납니다. Pipeline flush란 계산된 Gradient를 통해 파라미터를 업데이트 하는 과정입니다. 이러한 flush 과정 중에는 어떠한 forward, backward 연산도 하지 않기 때문에 처리 효율이 떨어집니다.

<img src="../images/pipeline_flush.png" width=600>

PipeDream은 이러한 flush 없이 계속해서 파라미터를 업데이트 해나갑니다. 따라서 forward와 backward가 모두 쉬는 시간이 사라집니다. 그러나 이를 위해서는 여러 버전의 파라미터 상태를 지속적으로 관리해야 합니다. 만약 최신버전의 파라미터만 저장하고 있으면 이전 layer의 출력이 다음 layer로 전송될 때, 다음 layer 부분이 업데이트 될 수도 있기 때문이죠.

<img src="../images/1f1b.gif" width=800>

이러한 문제를 막기 위해 여러 버전의 weight를 저장하여 관리하는데 그러면 weight를 저장하면 메모리 공간을 많이 차지하게 됩니다. 따라서 이 부분에서 트레이드 오프가 발생합니다.
- GPipe: 메모리 효율적, 프로세싱 비효율적
- PipeDream: 메모리 비효율적, 프로세싱 효율적
  
<br>

### 2) Work Partitioning
두번쨰 문제는 뉴럴넷을 어떻게 쪼갤건지에 대한 문제입니다. 단순히 Layer별로 동일한 수의 레이어를 갖게끔 하는 것이 항상 최고의 솔루션이라고 할 수는 없겠죠. 우리에게 가장 중요한 것은 idle time을 최소화을 최소화 하는 것입니다. 그러기 위해서는 각 파티션의 running time이 비슷해야겠죠. 그 이외에도 추가로 parameter size, activation memory 등을 고려해야 합니다.

<img src="../images/pipe_dream.png" width=600>

PipeDream은 Profiling과 Optimizing을 통해 최적의 Partioning 전략을 찾아냅니다.

<br><br>

## 5. Variation of 1F1B Pipelining

PipeDream의 1F1B 파이프라이닝을 개선한 두가지 버전의 파이프라인 전략을 소개합니다.

<br>

### 1) PipeDream 2BW (2-buffered weight update)
PipeDream 2BW는 PipeDream의 메모리 비효율성을 개선하기 위해 등장했습니다. 핵심 아이디어는 파이프라이닝 중에 Gradient Accumulation을 수행하는 것입니다. 여러개의 Gradient들을 모아두다가 한번에 업데이트를 수행하는 방식으로 메모리 비효율성 문제를 해결했죠. 2BW는 이전과 달리 단 두개의 weight version만 유지하면 됩니다.

![](../images/pipe_dream_2bw.png)

<br>

### 2) PipeDream Flush
PipeDream Flush는 1F1B와 Pipeline Flush를 결합한 파이프라이닝 방법입니다. 이 파이프라이닝 방법은 Flush가 일어나기 때문에 GPIpe와 비교하여 idle time은 비슷하나, forward-backward 과정에서 유지해야 하는 **activation memory가 줄어듭니다.** PipeDream Flush는 Flush가 일어나기 때문에 여러버전의 파라미터를 관리할 필요가 없습니다. 따라서 단일 가중치만 유지하면 되기 때문에 PipeDream 2BW보다도 더 메모리 효율적입니다. (지금까지 소개드린 기법들 중 가장 메모리 효율적입니다.)

![](../images/pipe_dream_flush.png)

![](../images/pipe_dream_flush_2.png)

<br>



### 잠깐... 근데 Activation Memory가 뭐야?
대부분의 Layer들은 Backward를 호출하기 전에 Forward에서 나온 출력값들을 저장하고 있습니다. 이는 `torch.autograd.Function`을 사용해보신 분들은 잘 아실텐데요. `ctx`변수에 forward 레이어의 출력값들을 저장해둡니다.


In [50]:
"""
참고: https://pytorch.org/tutorials/beginner/examples_autograd/two_layer_net_custom_function.html
"""

import torch


class ReLU(torch.autograd.Function):

    @staticmethod
    def forward(ctx, input):
        ctx.save_for_backward(input)
        # input 값을 저장하고 있음.
        
        return input.clamp(min=0)

    @staticmethod
    def backward(ctx, grad_output):
        input, = ctx.saved_tensors
        grad_input = grad_output.clone()
        grad_input[input < 0] = 0
        return grad_input

이는 미분값(Gradient)을 계산할때 Forward 과정에서 사용했던 값들이 필요하기 때문입니다. 다음 예시를 봅시다.

![](../images/max_pooling.png)

위는 Max Pooling 연산과 그에 대한 Gradient를 계산한 것입니다. Backward를 수행할때는 [[0.8, 1.2], [0.9, 0.5]]와 같은 (2, 2) 텐서가 입력으로 들어옵니다. 이 값을 가지고 오른쪽의 Gradient Matrix를 찾아내야 하는데 반드시 Forward에서 받았던 (4, 4)의 텐서가 필요합니다. 따라서 이 텐서를 메모리에 저장하고 있는 것이죠. 이렇게 Backward를 수행하기 위해 Forward 당시에 쓰였던 텐서들을 저장해두기 위해 필요한 메모리를 Activation Memory라고 합니다.

이제 Activation Memory가 뭔지 알았으니, PipeDream을 실습해볼까요? **PipeDream Flush는 MS의 분산처리 라이브러리 DeepSpeed에 구현되어 있습니다.** (참고: https://github.com/microsoft/DeepSpeed/issues/1110) 따라서 DeepSpeed를 사용해봅시다.

### DeepSpeed 명령어 사용법
아 참, 그 전에 `deepspeed`가 제공하는 매우 편리한 기능을 먼저 알아보고 가겠습니다. 기존에는 분산처리를 위해 `python -m torch.distributed.launch --nproc_per_node=n OOO.py`를 사용했으나 너무 길어서 불편했죠. DeepSpeed는 `deepspeed` 혹은 `ds`와 같은 명령어를 제공하고 있습니다. 

- `ds --num_gpus=n OOO.py`
- `deepspeed --num_gpus=n OOO.py`

위와 같은 명령어를 입력하면 `torch.distributed.launch`와 동일하게 작동합니다. 이제부터는 모든 분산처리 프로그램에 `deepspeed`의 명령어를 사용하도록 하겠습니다. (솔직히 `torch.distributed.launch`는 너무 길어요 😭)

In [None]:
"""
src/pipe_dream.py
"""
import deepspeed
import torch
import torch.nn as nn
from datasets import load_dataset
from deepspeed import PipelineModule
from torch.optim import Adam
from torch.utils.data import DataLoader
from tqdm import tqdm
from transformers import GPT2Tokenizer, GPT2LMHeadModel
from transformers.models.gpt2.modeling_gpt2 import GPT2Block as GPT2BlockBase
import torch.distributed as dist


class GPT2Preprocessing(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.embed_dim = config.hidden_size
        self.wte = nn.Embedding(config.vocab_size, self.embed_dim)
        self.wpe = nn.Embedding(config.max_position_embeddings, self.embed_dim)
        self.drop = nn.Dropout(config.embd_pdrop)

    def forward(self, input_ids):
        input_shape = input_ids.size()
        input_ids = input_ids.view(-1, input_shape[-1])
        position_ids = torch.arange(
            0, input_shape[-1], dtype=torch.long, device=input_ids.device
        )
        position_ids = position_ids.unsqueeze(0).view(-1, input_shape[-1])
        inputs_embeds = self.wte(input_ids)
        position_embeds = self.wpe(position_ids)
        hidden_states = inputs_embeds + position_embeds
        hidden_states = self.drop(hidden_states)
        return hidden_states


class GPT2Block(GPT2BlockBase):
    def forward(self, hidden_states):
        hidden_states = super(GPT2Block, self).forward(
            hidden_states=hidden_states,
        )
        return hidden_states[0]


class GPT2Postprocessing(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.ln_f = nn.LayerNorm(
            config.hidden_size,
            eps=config.layer_norm_epsilon,
        )
        self.lm_head = nn.Linear(
            config.hidden_size,
            config.vocab_size,
            bias=False,
        )

    def forward(self, hidden_states):
        hidden_states = self.ln_f(hidden_states)
        lm_logits = self.lm_head(hidden_states)
        return lm_logits


def create_model_from_pretrained(model_name):
    pretrained = GPT2LMHeadModel.from_pretrained(model_name)
    preprocess = GPT2Preprocessing(pretrained.config)
    preprocess.wte.weight = pretrained.transformer.wte.weight
    preprocess.wpe.weight = pretrained.transformer.wpe.weight

    blocks = pretrained.transformer.h
    for i, block in enumerate(blocks):
        block.__class__ = GPT2Block

    postprocess = GPT2Postprocessing(pretrained.config)
    postprocess.ln_f.weight = pretrained.transformer.ln_f.weight
    postprocess.ln_f.bias = pretrained.transformer.ln_f.bias
    postprocess.lm_head.weight.data = pretrained.lm_head.weight.data.clone()

    return nn.Sequential(preprocess, *blocks, postprocess)


def collate_fn(batch):
    batch_encoding = tokenizer.pad(
        {"input_ids": batch}, padding="max_length", max_length=1024
    )
    return batch_encoding.input_ids


def batch_fn(data):
    input_ids = data
    labels = data
    return input_ids, labels


def loss_fn(logits, labels):
    logits = logits[..., :-1, :].contiguous()
    labels = labels[..., 1:].contiguous()

    return nn.CrossEntropyLoss()(
        logits.view(-1, logits.size(-1)),
        labels.view(-1),
    )


if __name__ == "__main__":
    dist.init_process_group("nccl")
    world_size, rank = dist.get_world_size(), dist.get_rank()
    batch_size, train_steps = 16, 300
    train_samples = batch_size * train_steps

    tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
    tokenizer.pad_token = tokenizer.eos_token

    model = PipelineModule(
        create_model_from_pretrained(model_name="gpt2"),
        loss_fn=loss_fn,
        num_stages=world_size,
        partition_method="type:GPT2Block"
        # partition_method를 통해 병렬화 하고 싶은 레이어를 고를 수 있습니다.
    )
    engine, optimizer, _, _ = deepspeed.initialize(
        model=model,
        optimizer=Adam(model.parameters(), lr=3e-5),
        config={
            "train_batch_size": batch_size,
            "steps_per_print": 9999999,
            # turn off: https://github.com/microsoft/DeepSpeed/issues/1119
        },
    )
    engine.set_batch_fn(batch_fn)

    datasets = load_dataset("squad").data["train"]["context"]
    datasets = [str(sample) for i, sample in enumerate(datasets) if i < train_samples]
    datasets = [
        tokenizer(data, return_tensors="pt", max_length=1024).input_ids[0]
        for data in tqdm(datasets)
    ]
    data_loader = iter(
        DataLoader(
            sorted(datasets, key=len, reverse=True),
            # uniform length batching
            # https://mccormickml.com/2020/07/29/smart-batching-tutorial/
            batch_size=batch_size,
            num_workers=8,
            collate_fn=collate_fn,
            shuffle=False,
        )
    )

    for i in range(train_steps):
        loss = engine.train_batch(data_loader)

        if i % 10 == 0 and rank == 0:
            print(f"step: {i}, loss: {loss}")


In [48]:
!ds --num_gpus=4 ../src/pipe_dream.py

[2021-10-21 23:11:01,184] [INFO] [runner.py:360:main] cmd = /home/ubuntu/kevin/kevin_env/bin/python3 -u -m deepspeed.launcher.launch --world_info=eyJsb2NhbGhvc3QiOiBbMCwgMSwgMiwgM119 --master_addr=127.0.0.1 --master_port=29500 ../src/pipe_dream.py
[2021-10-21 23:11:02,065] [INFO] [launch.py:80:main] WORLD INFO DICT: {'localhost': [0, 1, 2, 3]}
[2021-10-21 23:11:02,065] [INFO] [launch.py:86:main] nnodes=1, num_local_procs=4, node_rank=0
[2021-10-21 23:11:02,065] [INFO] [launch.py:101:main] global_rank_mapping=defaultdict(<class 'list'>, {'localhost': [0, 1, 2, 3]})
[2021-10-21 23:11:02,065] [INFO] [launch.py:102:main] dist_world_size=4
[2021-10-21 23:11:02,065] [INFO] [launch.py:104:main] Setting CUDA_VISIBLE_DEVICES=0,1,2,3
SEED_LAYERS=False BASE_SEED=1234 SEED_FN=None
Using topology: {ProcessCoord(pipe=0, data=0): 0, ProcessCoord(pipe=1, data=0): 1, ProcessCoord(pipe=2, data=0): 2, ProcessCoord(pipe=3, data=0): 3}
[2021-10-21 23:11:24,460] [INFO] [module.py:365:_partition_layers] Part

Using /home/ubuntu/.cache/torch_extensions as PyTorch extensions root...
Using /home/ubuntu/.cache/torch_extensions as PyTorch extensions root...
Using /home/ubuntu/.cache/torch_extensions as PyTorch extensions root...
Emitting ninja build file /home/ubuntu/.cache/torch_extensions/utils/build.ninja...
Building extension module utils...
Allowing ninja to set a default number of workers... (overridable by setting the environment variable MAX_JOBS=N)
Using /home/ubuntu/.cache/torch_extensions as PyTorch extensions root...
ninja: no work to do.
Loading extension module utils...
Time to load utils op: 0.6987707614898682 seconds
Loading extension module utils...
Time to load utils op: 0.30276012420654297 seconds
[2021-10-21 23:14:06,793] [INFO] [engine.py:77:__init__] CONFIG: micro_batches=1 micro_batch_size=16
Loading extension module utils...
Time to load utils op: 0.3035085201263428 seconds
Loading extension module utils...
Time to load utils op: 0.10213756561279297 seconds
[2021-10-21 23

<br><br>

## 6. Intereaved Scheduling
이전에는 하나의 스테이지(연속된 레이어 집합)를 순차적으로 계산해서 결과 값을 출력했습니다. 예를 들면 8개의 레이어가 있고 2개의 디바이스가 주어졌다고 가정한다면, 일반적으로 1번 device에 1-4번 레이어, 2번 device에 5-8번 레이어에 할당되겠죠. 그러면 1번 device는 1~4번 레이어를 순차적으로 진행하여 출력했습니다. (GPipe, 1F1B 모두 이렇게 동작함)
  
![](../images/interleaved_1.png)

그러나 **Interleaved Scheduling은 Bubble time을 극도로 줄이기 위해 하나의 스테이지를 중첩해서 진행**합니다. 예를 들면  1번 device가 1-4번 레이어에 할당 되었다면, 1-2번 레이어의 동시에 3-4번 레이어를 동시에 수행합니다. 이렇게 되면 Bubble time은 줄어들지만 통신비용이 커지기 때문에 잘 조절할 필요가 있습니다. (트레이드 오프 존재) Interleaved Scheduling은 Megatron-LM에 구현되어 있는데요. 추후에 3D parallelism 실습할 때 사용해보도록 하겠습니다.