# 「分散データ並列訓練入門」

【原題】Getting Started with Distributed Data Parallel 

【原著】[Shen Li](https://mrshenli.github.io/)、[Joe Zhu](https://github.com/gunandrose4u)

【元URL】https://pytorch.org/tutorials/intermediate/ddp_tutorial.html

【翻訳】電通国際情報サービスISID HCM事業部　櫻井 亮佑

【日付】2020年11月20日

【チュトーリアル概要】

前提知識
- [PyTorch Distributed について](https://pytorch.org/tutorials/beginner/dist_overview.html)
- [DistributedDataParallel API ドキュメント](https://pytorch.org/docs/master/generated/torch.nn.parallel.DistributedDataParallel.html)
- [DistributedDataParallel について](https://pytorch.org/docs/master/notes/ddp.html)

DistributedDataParallel (DDP) は、複数のマシン間で実行可能な、モジュールレベルのデータ並列化を実装可能にします。

DDPを使用するアプリケーションでは、複数のプロセスを生成し、プロセスごとに単一のDDPインスタンスを作成する必要があります。

DDP は [torch.distributed](https://pytorch.org/tutorials/intermediate/dist_tuto.html) パッケージの集合通信を使用して、勾配とバッファを同期できます。

より具体的には、DDPでは `model.parameters()` で得られる各パラメータに対して自動微分のフックを登録し、対応する勾配がバックワードパスで計算されたときに、登録されたフックが起動します。

そして、DDPはその信号を使用して、プロセス間の勾配の同期を実施します。

詳細は、[DDPの設計について](https://pytorch.org/docs/master/notes/ddp.html) を参照してください。

推奨されるDDPの使用方法は、複製したモデルを複数のデバイスに展開できる状態にし、複製したモデルごとに1つのプロセスを生成することです。

DDPのプロセスは、同一のマシン、またはマシンを横断して存在することが可能ですが、プロセスをまたいでGPUデバイスを共有することはできません。

本チュートリアルでは、基本的なDDPのユースケースから始め、その後モデルのチェックポイントやDDPとモデル並列の組み合わせなど、より発展的なユースケースを解説します。



**留意事項**

本チュートリアルのコードは、8つのGPUを有するサーバー上で動作しますが、その他の環境にも適用できる内容になっています。

## `DataParallel` と `DistributedDataParallel`

内容に踏み込む前に、なぜ `DataParallel` ではなく、処理がより複雑になる`DistributedDataParallel` の使用を検討するのかを明確にしましょう。

- `DataParallel` は、シングルプロセス・マルチスレッドでシングルマシンでのみ機能しますが、`DistributedDataParallel` では、マルチプロセスでシングルマシン訓練、マルチマシン訓練のどちらでも機能するためです。
  通常、`DataParallel` は、スレッド間のGILの競合、イテレーション毎に複製するモデル、そして入力の分割と出力の収集によって発生するオーバーヘッドが原因となり、単一のマシン上であっても、`DistributedDataParallel` よりも遅くなります。
- 前のチュートリアル（日本語訳6_2）で、モデルが大きすぎて単一のGPUに収まらない場合は、モデル並列を利用してモデルを複数のGPUに分割する必要があったことを思い出してください。
  `DistributedDataParallel` は、モデル並列と共に動作できます。
  一方、 `DataParallel` はモデル並列と共に使うことはできません。
  DDPとモデル並列を組み合わせた場合、各DDPプロセスはモデル並列を使用し、すべてのプロセスが共同してデータ並列を使用することになります。
- モデルをマルチマシンに展開する必要がある場合、またはユースケースがデータ並列化のパラダイムに収まらない場合は、[RPC API](https://pytorch.org/docs/stable/rpc.html) にて、より一般的な分散訓練のサポートについて参照できます。

## 基本的なユースケース

DDPモジュールを作成するには、まずプロセスグループを適切にセットアップします。
より詳細な内容については、「PyTorchで実装する分散アプリケーション」（日本語チュートリアル6_4） に記載があります。

In [1]:
import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp

from torch.nn.parallel import DistributedDataParallel as DDP


def setup(rank, world_size):
    if sys.platform == 'win32':
        # Windowsプラットフォーム上では、Distribuedパッケージは
        # Glooバックエンドの集合通信のみサポートしています。
        # init_process_group内のinit_method パラメーターをローカルのファイルに設定してください。
        # 例 init_method="file:///f:/libtmp/some_file"
        init_method="file:///{your local file path}"

        # プロセスグループの初期化
        dist.init_process_group(
            "gloo",
            init_method=init_method,
            rank=rank,
            world_size=world_size
        )
    else:
        os.environ['MASTER_ADDR'] = 'localhost'
        os.environ['MASTER_PORT'] = '12355'

        # プロセスグループの初期化
        dist.init_process_group("gloo", rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()

それでは、簡易的なモジュールを作って、DDPでラップし、ダミーの入力データを与えてみましょう。

なお、DDPは、モデルの状態を、ランク0のプロセスから、DDPのコンストラクター内に存在するその他すべてのプロセスにブロードキャストするため、異なるDDPプロセスが異なるモデルのパラメータの初期値から開始する点については考慮・心配する必要はありません。

In [2]:
class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 5)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))


def demo_basic(rank, world_size):
    print(f"Running basic DDP example on rank {rank}.")
    setup(rank, world_size)

    # モデルを作成し、ランクidと共にGPUに移動
    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(rank)
    loss_fn(outputs, labels).backward()
    optimizer.step()

    cleanup()


def run_demo(demo_fn, world_size):
    mp.spawn(demo_fn,
             args=(world_size,),
             nprocs=world_size,
             join=True)

上記のように、DDPは下位レベルの分散通信の詳細部分を隠ぺいし、ローカルモデルであるかのようなクリーンなAPIを提供しています（日本語訳注：通常の、並列処理でない場合の実装とほぼ同じように実装を記載できます）。


勾配を同期する通信はバックワードパスの最中に発生し、バックワードの演算処理と重複して行われます。

そのため、`backward()` が返ってきたタイミングで、`param.grad` は同期化された勾配テンソルを既に含んでいます。




基本的なユースケースであれば、DDPにはプロセスグループをセットアップするためのコードがもう少し必要になります。

そして、より発展的なユースケースにDDPを適用する場合には、いくつか注意点があります。

## 歪んだ処理速度

DDPでは、コンストラクター、フォワードパス、そしてバックワードパスが分散処理の同期ポイントになります。

そして、異なるプロセスは同じ数の同期処理を起動し、同じ順序でこれらの同期ポイントに到達し、ほぼ同時に各同期ポイントに入ることが期待されています。

このようにしなければ、速く行われるプロセスが早く到着し、出遅れたプロセスを待ってタイムアウトしてしまうかもしれません。

したがって、ユーザーにはプロセス間でワークロードの分散を均等にする責任があります。



ですが、処理速度の歪み、バラつきは、例えば、ネットワークの遅延、リソースの競合、または予測不能なワークロードの急増によって不可避的に発生することがあります。

このような状況でのタイムアウトを避けるには、[init_process_group](https://pytorch.org/docs/stable/distributed.html#torch.distributed.init_process_group) を呼び出す際に、十分な `timeout` 値を与えておくことです。


## チェックポイントの保存と読み込み

並列処理において、`torch.save` で訓練中にモジュールのチェックポイントを取得し、`torch.load` でチェックポイントから復元するのは、通常の使用方法の場合と共通しています。


詳細は、[モデルの保存と読み込み](https://pytorch.org/tutorials/beginner/saving_loading_models.html) を参照してください。



DDPを使用する際の1つの最適化として、1つのプロセスのみでモデルを保存し、その後、すべてのプロセスで保存したモデルを読み込むことで、書き込みのオーバーヘッドを減らす手段があります。

これは、すべてのプロセスが同じパラメーターから開始し、勾配はバックワードパスにおいて同期されるため、最適化関数がパラメーターに同じ値を設定し続けるはずである、という点で正しい考え方です。

ただし、この最適化手法を用いる場合、モデルの保存が終了する前に、すべてのプロセスでモデルの読み込みが始まらないようにしてください。

また、モジュールを読み込む際は、あるプロセスが他のデバイスに入り込まないように、`map_location` 引数を適切に与える必要もあります。
`map_location` が抜けている場合、`torch.load` はまずモジュールをCPUに読み込み、その後モジュールが保存されたデバイスへと各パラメーターをコピーします。

つまり、同一マシン上のすべてのプロセスが同じデバイスのセットを使用する状況に陥ってしまいます。

より発展的な障害回復と柔軟性のあるサポートについては、[TorchElastic](https://pytorch.org/elastic) を参照してください。

In [3]:
def demo_checkpoint(rank, world_size):
    print(f"Running DDP checkpoint example on rank {rank}.")
    setup(rank, world_size)

    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    CHECKPOINT_PATH = tempfile.gettempdir() + "/model.checkpoint"
    if rank == 0:
        # すべてのプロセスが同じランダムなパラメーターから始まっており、バックワードパスで勾配が同期されるため、
        # すべてのプロセスが同じパラメーターを扱う必要があります。
        # そして、これがモデルを一つのプロセスに保存すれば十分である理由です。
        torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)

    # プロセス0がモデルを保存した後にプロセス1がモデルを読み込めるように、barrier()を使用します。
    dist.barrier()
    # map_location を適切に設定します。
    map_location = {'cuda:%d' % 0: 'cuda:%d' % rank}
    ddp_model.load_state_dict(
        torch.load(CHECKPOINT_PATH, map_location=map_location))

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(rank)
    loss_fn = nn.MSELoss()
    loss_fn(outputs, labels).backward()
    optimizer.step()
    
    # DDPのバックワードパスにおいてAllReduceの処理がすでに同期処理として機能しているため、
    # 下記のファイル削除処理をガードする目的で ist.barrier() を使用する必要はありません。
    if rank == 0:
        os.remove(CHECKPOINT_PATH)

    cleanup()

## DDPとモデル並列化の組み合わせ

DDPは、マルチGPUモデル（モデル並列化）と共に使用することも可能です。

マルチGPUモデルを包含したDDPは、膨大なデータ量で大規模なモデルを訓練する際に特に役立ちます。

In [4]:
class ToyMpModel(nn.Module):
    def __init__(self, dev0, dev1):
        super(ToyMpModel, self).__init__()
        self.dev0 = dev0
        self.dev1 = dev1
        self.net1 = torch.nn.Linear(10, 10).to(dev0)
        self.relu = torch.nn.ReLU()
        self.net2 = torch.nn.Linear(10, 5).to(dev1)

    def forward(self, x):
        x = x.to(self.dev0)
        x = self.relu(self.net1(x))
        x = x.to(self.dev1)
        return self.net2(x)

マルチGPUモデルをDDPに与える際は、`device_ids` と `output_device` を設定してはいけません。

入力データと出力データは、アプリケーション、またはモデルの `forward()` メソッドによって適切なデバイスに配置されます。

In [None]:
def demo_model_parallel(rank, world_size):
    print(f"Running DDP with model parallel example on rank {rank}.")
    setup(rank, world_size)

    # このプロセスで使用するmp_modelとデバイスをセットアップ
    dev0 = rank * 2
    dev1 = rank * 2 + 1
    mp_model = ToyMpModel(dev0, dev1)
    ddp_mp_model = DDP(mp_model)

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_mp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    # 出力は dev1 に行われる。
    outputs = ddp_mp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(dev1)
    loss_fn(outputs, labels).backward()
    optimizer.step()

    cleanup()


if __name__ == "__main__":
    n_gpus = torch.cuda.device_count()
    if n_gpus < 8:
        print(f"Requires at least 8 GPUs to run, but got {n_gpus}.")
    else:
        run_demo(demo_basic, 8)
        run_demo(demo_checkpoint, 8)
        run_demo(demo_model_parallel, 4)