# ChainerMN with Infiniband サンプルコード

## Azure ML service Python SDK 確認

In [9]:
import azureml.core

print("SDK version:", azureml.core.VERSION)

SDK version: 1.0.15


## ワークスペースへの接続
Azure Machine Learning service の [ワークスペース](https://docs.microsoft.com/azure/machine-learning/service/concept-azure-machine-learning-architecture#workspace) に接続します。Azureに対する認証が必要になります。

In [10]:
from azureml.core.workspace import Workspace

ws = Workspace.get(name='azureml', 
                      subscription_id='9c0f91b8-eb2f-484c-979c-15848c098a6b', 
                      resource_group='amlservice'
                   )
#ws = Workspace.from_config()
print('Workspace name: ' + ws.name, 
      'Azure region: ' + ws.location, 
      'Subscription id: ' + ws.subscription_id, 
      'Resource group: ' + ws.resource_group, sep = '\n')

Workspace name: azureml
Azure region: southeastasia
Subscription id: 9c0f91b8-eb2f-484c-979c-15848c098a6b
Resource group: amlservice


## 計算環境 Machine Learning Compute (旧Batch AI) の新規作成 or 既存環境設定
You will need to create a [compute target](https://docs.microsoft.com/azure/machine-learning/service/concept-azure-machine-learning-architecture#compute-target) to execute your training script on. In this tutorial, you create an [Azure Batch AI](https://docs.microsoft.com/azure/batch-ai/overview) cluster as your training compute resource. This code creates a cluster for you if it does not already exist in your workspace.

**Creation of the cluster takes approximately 5 minutes.** If the cluster is already in your workspace this code will skip the cluster creation process.

In [11]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

batchai_cluster_name = "gpucluster-ib"
vm_size = "Standard_NC24rs_v3"

try:
    # Check for existing cluster
    compute_target = ComputeTarget(ws,batchai_cluster_name)
    print('Found existing compute target ' + batchai_cluster_name)
except:
    # Else, create new one
    print('Creating a new compute target...')
    provisioning_config = AmlCompute.provisioning_configuration(vm_size = vm_size,
                                                                    vm_priority = "lowpriority",
                                                                    min_nodes = 0, 
                                                                    max_nodes = 2)
    compute_target = ComputeTarget.create(ws, batchai_cluster_name, provisioning_config)
    # can poll for a minimum number of nodes and for a specific timeout. 
    # if no min node count is provided it will use the scale settings for the cluster
    compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)

Found existing compute target gpucluster-ib


## リモート環境でのモデル開発
Now that we have the cluster ready to go, let's run our distributed training job.

### プロジェクトフォルダの作成
Create a directory that will contain all the necessary code from your local machine that you will need access to on the remote resource. This includes the training script and any additional files your training script depends on.

In [12]:
import os

project_folder = './script/chainer'
os.makedirs(project_folder, exist_ok=True)

In [13]:
%%writefile {project_folder}/chainer_mnist.py

from __future__ import print_function

import argparse

import chainer
import chainer.functions as F
import chainer.links as L
from chainer import training
from chainer.training import extensions

import chainermn


class MLP(chainer.Chain):

    def __init__(self, n_units, n_out):
        super(MLP, self).__init__(
            # the size of the inputs to each layer will be inferred
            l1=L.Linear(784, n_units),  # n_in -> n_units
            l2=L.Linear(n_units, n_units),  # n_units -> n_units
            l3=L.Linear(n_units, n_out),  # n_units -> n_out
        )

    def __call__(self, x):
        h1 = F.relu(self.l1(x))
        h2 = F.relu(self.l2(h1))
        return self.l3(h2)


def main():
    parser = argparse.ArgumentParser(description='ChainerMN example: MNIST')
    parser.add_argument('--batchsize', '-b', type=int, default=100,
                        help='Number of images in each mini-batch')
    parser.add_argument('--communicator', type=str,
                        default='hierarchical', help='Type of communicator')
    parser.add_argument('--epoch', '-e', type=int, default=20,
                        help='Number of sweeps over the dataset to train')
    parser.add_argument('--gpu', '-g', action='store_true',
                        help='Use GPU')
    parser.add_argument('--out', '-o', default='result',
                        help='Directory to output the result')
    parser.add_argument('--resume', '-r', default='',
                        help='Resume the training from snapshot')
    parser.add_argument('--unit', '-u', type=int, default=1000,
                        help='Number of units')
    args = parser.parse_args()

    # Prepare ChainerMN communicator.

    if args.gpu:
        if args.communicator == 'naive':
            print("Error: 'naive' communicator does not support GPU.\n")
            exit(-1)
        comm = chainermn.create_communicator(args.communicator)
        device = comm.intra_rank
    else:
        if args.communicator != 'naive':
            print('Warning: using naive communicator '
                  'because only naive supports CPU-only execution')
        comm = chainermn.create_communicator('naive')
        device = -1

    if comm.rank == 0:
        print('==========================================')
        print('Num process (COMM_WORLD): {}'.format(comm.size))
        if args.gpu:
            print('Using GPUs')
        print('Using {} communicator'.format(args.communicator))
        print('Num unit: {}'.format(args.unit))
        print('Num Minibatch-size: {}'.format(args.batchsize))
        print('Num epoch: {}'.format(args.epoch))
        print('==========================================')

    model = L.Classifier(MLP(args.unit, 10))
    if device >= 0:
        chainer.cuda.get_device_from_id(device).use()
        model.to_gpu()

    # Create a multi node optimizer from a standard Chainer optimizer.
    optimizer = chainermn.create_multi_node_optimizer(
        chainer.optimizers.Adam(), comm)
    optimizer.setup(model)

    # Split and distribute the dataset. Only worker 0 loads the whole dataset.
    # Datasets of worker 0 are evenly split and distributed to all workers.
    if comm.rank == 0:
        train, test = chainer.datasets.get_mnist()
    else:
        train, test = None, None
    train = chainermn.scatter_dataset(train, comm, shuffle=True)
    test = chainermn.scatter_dataset(test, comm, shuffle=True)

    train_iter = chainer.iterators.SerialIterator(train, args.batchsize)
    test_iter = chainer.iterators.SerialIterator(test, args.batchsize,
                                                 repeat=False, shuffle=False)

    updater = training.StandardUpdater(train_iter, optimizer, device=device)
    trainer = training.Trainer(updater, (args.epoch, 'epoch'), out=args.out)

    # Create a multi node evaluator from a standard Chainer evaluator.
    evaluator = extensions.Evaluator(test_iter, model, device=device)
    evaluator = chainermn.create_multi_node_evaluator(evaluator, comm)
    trainer.extend(evaluator)

    # Some display and output extensions are necessary only for one worker.
    # (Otherwise, there would just be repeated outputs.)
    if comm.rank == 0:
        trainer.extend(extensions.dump_graph('main/loss'))
        trainer.extend(extensions.LogReport())
        trainer.extend(extensions.PrintReport(
            ['epoch', 'main/loss', 'validation/main/loss',
             'main/accuracy', 'validation/main/accuracy', 'elapsed_time']))
        trainer.extend(extensions.ProgressBar())

    if args.resume:
        chainer.serializers.load_npz(args.resume, trainer)

    trainer.run()


if __name__ == '__main__':
    main()

Overwriting ./script/chainer/chainer_mnist.py


### Experiment "実験" の作成
[Experiment"実験"](https://docs.microsoft.com/azure/machine-learning/service/concept-azure-machine-learning-architecture#experiment) を作成し、Chainerによるモデル学習をトラックします。

In [14]:
from azureml.core import Experiment

experiment_name = 'chainermn-remote-IB'
experiment = Experiment(ws, name=experiment_name)

In [15]:
script_params = {
    '-g': '',
    '--communicator': 'non_cuda_aware',
    '-o': './outputs'
}

In [16]:
from azureml.train.dnn import *

estimator = Chainer(source_directory=project_folder,
                      compute_target=compute_target,
                      entry_script='chainer_mnist.py',
                      script_params=script_params,
                      node_count=2,
                      process_count_per_node=2,
                      distributed_backend='mpi',     
                      pip_packages=['mpi4py'],
                      use_gpu=True)

estimator.run_config.environment.environment_variables['NCCL_SOCKET_IFNAME'] = 'eth0'
estimator.run_config.environment.environment_variables['NCCL_IB_DISABLE'] = '1'

### ジョブの実行
Run your experiment by submitting your estimator object. Note that this call is asynchronous.

In [17]:
run = experiment.submit(estimator)
print(run)

Run(Experiment: chainermn-remote-IB,
Id: chainermn-remote-IB_1550723898_68697f3d,
Type: azureml.scriptrun,
Status: Queued)


### run のモニタリング
You can monitor the progress of the run with a Jupyter widget. Like the run submission, the widget is asynchronous and provides live updates every 10-15 seconds until the job completes.

In [21]:
from azureml.widgets import RunDetails
RunDetails(run).show() 

_UserRunWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': True, 'log_level': 'INFO', 's…

Alternatively, you can block until the script has completed training before running more code.

In [22]:
run.wait_for_completion(show_output=True) # this provides a verbose log

RunId: chainermn-remote-IB_1550723898_68697f3d

Streaming azureml-logs/60_control_log_rank_0.txt

This is an MPI job. Rank:0
Streaming log file azureml-logs/60_control_log_rank_0.txt
Streaming log file azureml-logs/80_driver_log_rank_0.txt

Streaming azureml-logs/80_driver_log_rank_0.txt

Num process (COMM_WORLD): 4
Using GPUs
Using non_cuda_aware communicator
Num unit: 1000
Num Minibatch-size: 100
Num epoch: 20
Downloading from http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz...
Downloading from http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz...
Downloading from http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz...
Downloading from http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz...
epoch       main/loss   validation/main/loss  main/accuracy  validation/main/accuracy  elapsed_time
[J     total [#.................................................]  3.33%
this epoch [#################################.................] 66.67%
       100 iter, 0 ep

[J     total [################################################..] 96.67%
this epoch [################..................................] 33.33%
      2900 iter, 19 epoch / 20 epochs
    70.581 iters/sec. Estimated time to finish: 0:00:01.416817.
[4A
[J     total [##################################################] 100.00%
this epoch [..................................................]  0.00%
      3000 iter, 20 epoch / 20 epochs
    70.611 iters/sec. Estimated time to finish: 0:00:00.
[4A[J

The experiment completed successfully. Finalizing run...
Cleaning up all outstanding Run operations, waiting 300.0 seconds
1 items cleaning up...
Cleanup took 0.10081291198730469 seconds

Execution Summary
RunId: chainermn-remote-IB_1550723898_68697f3d



{'runId': 'chainermn-remote-IB_1550723898_68697f3d',
 'target': 'gpucluster-ib',
 'status': 'Completed',
 'startTimeUtc': '2019-02-21T04:45:38.442617Z',
 'endTimeUtc': '2019-02-21T04:49:39.45075Z',
 'properties': {'azureml.runsource': 'experiment',
  'ContentSnapshotId': '25cb9c56-a461-4274-85ca-1b514a6a1ef1'},
 'runDefinition': {'Script': 'chainer_mnist.py',
  'Arguments': ['-g', '--communicator', 'non_cuda_aware', '-o', './outputs'],
  'SourceDirectoryDataStore': 'workspaceblobstore',
  'Framework': 0,
  'Communicator': 5,
  'Target': 'gpucluster-ib',
  'DataReferences': {'workspaceblobstore': {'DataStoreName': 'workspaceblobstore',
    'Mode': 'Mount',
    'PathOnDataStore': None,
    'PathOnCompute': None,
    'Overwrite': False}},
  'JobName': None,
  'AutoPrepareEnvironment': True,
  'MaxRunDurationSeconds': None,
  'NodeCount': 2,
  'Environment': {'Python': {'InterpreterPath': 'python',
    'UserManagedDependencies': False,
    'CondaDependencies': {'name': 'project_environment