# DDP: Distributed Data Parallel

複数のGPUを活用した分散学習。

In [1]:
import torch
from torch import nn, optim
from torch.utils.data import DataLoader
from torchvision.datasets import MNIST

## 分散学習

深層学習において、必要な計算を複数のコンピュータに分散させること。いくつかの種類があって、例えばデータを分散させるものとモデルを分散させるもの、またパラメータの更新を同期するか否か、など。今回は、DDPと呼ばれる、モデルを分散させ、パラメータの更新を同期する分散学習について説明する。

\*DDPというのはPyTorchが用意したAPIの名前で、一般的な名前かと言われるとそうではないと思う。ただここではDDPと呼ぶことにする。

$N$個のデータをバッチサイズ$B$で分割し、$M=N/B$個のバッチを得たとする。

$R$個のデバイスがあるとき、DDPでは$M$個のバッチを均等に（$M/R$個ずつ）分配する。また各デバイスが同じモデルのコピーを持っているとする。学習が始まると、各デバイスで一つずつバッチを処理する。ここで、バッチを一つ処理する度に各デバイスで勾配を共有し、パラメータを更新する。パラメータが更新されたら次のバッチへ進む。これを繰り返すことで並列的な学習を行う。

勾配を共有というのは単に足し合わせているか平均をとっていると思ってよい。単純にバッチサイズが$B\times R$になったような感じ。各デバイスに同じ勾配が渡るので、更新後のパラメータもデバイス間で同じになる。

デバイス間でバッチの処理速度に違いがある場合、遅い方に合わせられる。全てのデバイスがバッチを処理するまで待つということ。

このあたりの図解が下記資料に

- https://www.cc.u-tokyo.ac.jp/events/lectures/111/20190124-1.pdf

## PyTorchでの実装

各デバイスで実行するプロセスを呼び出し可能なオブジェクトとして定義し、`torch.multiprocessing`で動かす。デバイス間での勾配の共有には`torch.nn.parallel.DistributedDataParallel`を使う。

[Getting Started with Distributed Data Parallel](https://pytorch.org/tutorials/intermediate/ddp_tutorial.html)

### `DistributedSampler`

[torch.utils.data.distributed.DistributedSampler](https://pytorch.org/docs/stable/data.html#torch.utils.data.distributed.DistributedSampler)

データセットを分割する際に用いる。

In [2]:
from torch.utils.data import Dataset, DistributedSampler

適当なデータセットを用意。

In [17]:
class MyDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

ds = MyDataset(torch.arange(20))

ここで二つの`Sampler`を用意してみる。

In [18]:
sampler1 = DistributedSampler(ds, num_replicas=2, rank=0, shuffle=False)
sampler2 = DistributedSampler(ds, num_replicas=2, rank=1, shuffle=False)

`num_replicas`はデバイス数、`rank`はデバイスID。

これを使って二つの`DataLoader`を作成する。

In [19]:
dataloader1 = DataLoader(ds, batch_size=5, sampler=sampler1)
dataloader2 = DataLoader(ds, batch_size=5, sampler=sampler2)

for x in dataloader1:
    print(x)
print()
for x in dataloader2:
    print(x)

tensor([0, 2, 4, 6, 8])
tensor([10, 12, 14, 16, 18])

tensor([1, 3, 5, 7, 9])
tensor([11, 13, 15, 17, 19])


重複無しで綺麗に二つに分割されている。これを使ってデバイスごとにデータを分割する。

`shuffle=True`にするとランダムにデータを分割する。

In [20]:
sampler1 = DistributedSampler(ds, num_replicas=2, rank=0, shuffle=True)
sampler2 = DistributedSampler(ds, num_replicas=2, rank=1, shuffle=True)
dataloader1 = DataLoader(ds, batch_size=5, sampler=sampler1)
dataloader2 = DataLoader(ds, batch_size=5, sampler=sampler2)
for x in dataloader1:
    print(x)
print()
for x in dataloader2:
    print(x)

tensor([ 4, 13,  7,  3,  9])
tensor([11, 16, 10, 15,  1])

tensor([ 5, 19, 14,  6, 17])
tensor([ 2, 18, 12,  8,  0])


重複はしないようになっている。

ちなみに、何回実行しても同じ分け方になる。中でシードが固定されているのだと思う。epochを変えると分け方が変わる。

In [21]:
sampler1.set_epoch(1)
sampler2.set_epoch(1)
for x in dataloader1:
    print(x)
print()
for x in dataloader2:
    print(x)

tensor([ 5,  2, 19,  1,  4])
tensor([ 0, 16, 15,  6, 12])

tensor([13, 11, 18,  9,  7])
tensor([14, 10,  3, 17,  8])


データ数がデバイス数で割り切れない場合、余りの数だけ重複を発生させて数を揃えてくれる。

In [15]:
ds = MyDataset(torch.arange(21)) # 2で割り切れない数にした
sampler1 = DistributedSampler(ds, num_replicas=2, rank=0, shuffle=False)
sampler2 = DistributedSampler(ds, num_replicas=2, rank=1, shuffle=False)
dataloader1 = DataLoader(ds, batch_size=5, sampler=sampler1)
dataloader2 = DataLoader(ds, batch_size=5, sampler=sampler2)
for x in dataloader1:
    print(x)
print()
for x in dataloader2:
    print(x)

tensor([0, 2, 4, 6, 8])
tensor([10, 12, 14, 16, 18])
tensor([20])

tensor([1, 3, 5, 7, 9])
tensor([11, 13, 15, 17, 19])
tensor([0])


### `DistributedDataParallel`

勾配を共有してくれる。

[Distributed Data Parallel](https://pytorch.org/docs/stable/notes/ddp.html)

In [None]:
from torch.nn.parallel import DistributedDataParallel as DDP

使い方は簡単で、バックエンドを指定してモデルをラップするだけ。あとは逆伝播を行うときに勝手に勾配を共有してくれる（多分）。

```python
import torch.distributed as dist
dist.init_process_group("nccl", rank=rank, world_size=n_gpu)
model = model.to(rank)
model = DDP(model, device_ids=[rank])
```

<u>バックエンドを指定して</u>とか言ったけど、俺は全く理解していない。とりあえずGPU使うなら`nccl`指定しておけば良いっぽい。詳細は: [Distributed communication package - torch.distributed](https://pytorch.org/docs/stable/distributed.html)

あと追加で環境変数の設定も必要。pythonから設定しちゃえばいいと思う。

```python
import os
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "12355"
```

バックエンドや環境変数の設定はおまじないだと思ってコピペしておけばいい。

一つ注意があって、DDPでラップしたモデルは`model.module`でアクセスする必要がある。`model`だけだとDDPのオブジェクトが返ってくる。

```python
torch.save(model.module.state_dict(), "model.pth")
```

### `torch.multiprocessing`

最後にこれらをまとめる。

[torch.multiprocessing](https://pytorch.org/docs/stable/multiprocessing.html)

以下の形で実行する。

```python
import torch.multiprocessing as mp
mp.spawn(
    train,
    args=(n_gpu, ...),
    nprocs=n_gpu,
    join=True
)
```

これはjupyterからは動かせないので注意。pythonファイルとして実行する必要がある。

あと、`if __name__ == "__main__":`の中に書かないと動かない。というかもしかしたらこれを書けばjupyterからも動くかもしれない。試してないから知らんけど。

`train`はデバイスごとに実行する関数。`args`は`train`に渡す引数。それ以外はおまじない。

`train`はこんな感じで書けばいいかな。

```python
def train(rank, n_gpu, model, dataset, n_epochs):
    dist.init_process_group("nccl", rank=rank, world_size=n_gpu)
    model = model.to(rank)
    model = DDP(model, device_ids=[rank])

    sampler = DistributedSampler(dataset, num_replicas=n_gpu, rank=rank, shuffle=True)
    loader = DataLoader(dataset, sampler=sampler, batch_size=8)
    for n in range(n_epochs):
        sampler.set_epoch(n)
        for x, label in loader:
            x = x.to(rank)
            label = label.to(rank)
            y_pred = model(x)
            loss = criterion(y_pred, label)
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
```

## 実践MNIST

実際にMNISTをDDPで学習するサンプルコードを載っけておーしまい。（動作確認してない。あとでやる（どうせやらない）。）

↑のように、呼び出し可能なオブジェクトを関数として実装してもいいけど、classでまとめた方が扱い易そうだったのでそうした。

```python
import os
import argparse

import torch
from torch import nn, optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler
import torch.multiprocessing as mp


class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        self.fc1 = nn.Linear(64 * 7 * 7, 128)
        self.fc2 = nn.Linear(128, 10)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.pool(self.relu(self.conv1(x)))
        x = self.pool(self.relu(self.conv2(x)))
        x = x.view(-1, 64 * 7 * 7)
        x = self.relu(self.fc1(x))
        x = self.fc2(x)
        return x


class Trainer:
    def __init__(self, model, dataset, loss_fn, optimizer, n_gpu, batch_size, n_epochs):
        self.model = model
        self.dataset = dataset
        self.loss_fn = loss_fn
        self.optimizer = optimizer
        self.n_gpu = n_gpu
        self.batch_size = batch_size
        self.n_epochs = n_epochs

    def setup(self, rank):
        dist.init_process_group("nccl", rank=rank, world_size=self.n_gpu)
        self.model = self.model.to(rank)
        self.model = DDP(self.model, device_ids=[rank])
        self.model = torch.compile(self.model)
        torch.set_float32_matmul_precision("high")

    def __call__(self, rank):
        self.setup(rank)
        sampler = DistributedSampler(
            self.dataset,
            num_replicas=self.n_gpu,
            rank=rank,
            shuffle=True
        )
        dataloader = DataLoader(
            self.dataset,
            batch_size=self.batch_size,
            sampler=sampler,
            shuffle=(sampler is None), # shuffleはsampler側で決める。この書き方は慣習
        )
        for n in range(self.n_epochs):
            sampler.set_epoch(n)
            for x, label in dataloader:
                x = x.to(rank)
                label = label.to(rank)
                self.optimizer.zero_grad()
                out = self.model(x)
                loss = self.loss_fn(out, label)
                loss.backward()
                self.optimizer.step()
            if rank == 0:
                torch.save(self.model._orig_mod.state_dict(), "model.pth")


def main(args):
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "12355"
    dataset = datasets.MNIST(
        root='./data',
        train=True,
        download=True,
        transform=transforms.ToTensor(),
    )
    model = SimpleCNN()
    loss_fn = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    batch_size = 8
    n_epochs = 10
    trainer = Trainer(
        model,
        dataset,
        loss_fn,
        optimizer,
        args.n_gpus,
        batch_size,
        n_epochs
    )
    mp.spawn(
        trainer,
        nprocs=args.n_gpus,
        join=True,
    )


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--n_gpus", type=int)
    args = parser.parse_args()
    main(args)
```