# Caffe2 GPU Distributed


## Introduction

This example demonstrates how to run standard Caffe2 [resnet50_trainer.py](https://github.com/caffe2/caffe2/blob/master/caffe2/python/examples/resnet50_trainer.py) example using Batch AI. You can run it on a single or multiple compute nodes.

## Details

- Standard Caffe2 sample script [resnet50_trainer.py](https://github.com/caffe2/caffe2/blob/master/caffe2/python/examples/resnet50_trainer.py) is used;
- MNIST Dataset has been translated into a lmdb database, and can be obtained at http://download.caffe2.ai/databases/mnist-lmdb.zip;
- NFS will be used for rendezvous temp files to coordinate between each shard/node 
- Standard output of the job will be stored on Azure File Share.

## Instructions

### Install Dependencies and Create Configuration file.
Follow [instructions](/recipes) to install all dependencies and create configuration file.

### Read Configuration and Create Batch AI client

In [1]:
from __future__ import print_function

import time
from datetime import datetime
import os
import sys
import zipfile

from azure.storage.file import FileService, FilePermissions
import azure.mgmt.batchai.models as models

# utilities.py contains helper functions used by different notebooks
sys.path.append('../../')
import utilities

cfg = utilities.Configuration('../../configuration.json')
client = utilities.create_batchai_client(cfg)

### Create File Share

For this example we will create a new File Share with name `batchaicntkgpupythonsample` under your storage account.

**Note** You don't need to create new file share for every cluster. We are doing this in this sample to simplify resource management for you.

In [2]:
azure_file_share_name = 'batchaichaisample'
service = FileService(cfg.storage_account_name, cfg.storage_account_key)
service.create_share(azure_file_share_name, fail_on_exist=False)

False

### Create Single Node NFS

- To run distributed Caffe2 training, we need to mount a single node NFS file server to all the GPU nodes, so that they can use a shared for rendezvous temp files to coordinate between each shard/node

The file server will have the following parameters:

In [3]:
fs_Name = 'caffe2nfs'
parameters = models.FileServerCreateParameters(
    location = cfg.location,
    vm_size='Standard_D14_V2',
    ssh_configuration=models.SshConfiguration(
        user_account_settings=models.UserAccountSettings(
            admin_user_name=cfg.admin,
            admin_user_password=cfg.admin_password)),
    data_disks=models.DataDisks(
        disk_size_in_gb=10,
        disk_count=2,
        storage_account_type='Standard_LRS')
)

Create the single node NFS file server

In [4]:
utilities.create_resource_group(cfg)
_ = client.file_servers.create(cfg.resource_group, fs_Name, parameters)

Let us wait until the provisioning state FileServer enters "Succeed"

In [5]:
fs = client.file_servers.get(cfg.resource_group, fs_Name)
nfs_id = fs.id
print ("FileServer {0} provisioning state = {1}".format(nfs_id, fs.provisioning_state.name))

FileServer /subscriptions/8001b49e-e169-4ee6-a5ff-9c3ea5385053/resourceGroups/batchaitests/providers/Microsoft.BatchAI/fileservers/caffe2nfs provisioning state = succeeded


### Configure Compute Cluster

- For this example we will use a gpu cluster of `STANDARD_NC6` nodes. Number of nodes in the cluster is configured with `nodes_count` variable;
- We will mount file share at folder with name `external`. Full path of this folder on a computer node will be `$AZ_BATCHAI_MOUNT_ROOT/external`;
- We will mount NFS at folder with name `fileserver`. Full path of this folder on a computer node will be `$AZ_BATCHAI_MOUNT_ROOT/fileserver`;
- We will call the cluster `nc6`;

So, the cluster will have the following parameters:

In [6]:
azure_file_share = 'external'
nfs_file_share = 'fileserver'
nodes_count = 2
cluster_name = 'dsvm'

volumes = models.MountVolumes(
    azure_file_shares=[
        models.AzureFileShareReference(
            account_name=cfg.storage_account_name,
            credentials=models.AzureStorageCredentialsInfo(
                account_key=cfg.storage_account_key),
            azure_file_url = 'https://{0}.file.core.windows.net/{1}'.format(
                cfg.storage_account_name, azure_file_share_name),
            relative_mount_path=azure_file_share)],
    file_servers = [
         models.FileServerReference(
             file_server = models.ResourceId(nfs_id),
             relative_mount_path = nfs_file_share,
             mount_options = "rw")
    ]
)

parameters = models.ClusterCreateParameters(
    location=cfg.location,
    vm_size="STANDARD_NC6",
    virtual_machine_configuration=models.VirtualMachineConfiguration(
        image_reference=models.ImageReference(
            publisher="microsoft-ads",
            offer="linux-data-science-vm-ubuntu",
            sku="linuxdsvmubuntu",
            version="latest")),
    scale_settings=models.ScaleSettings(
        manual=models.ManualScaleSettings(target_node_count=nodes_count)
    ),
    node_setup=models.NodeSetup(
        mount_volumes=volumes
    ),
    user_account_settings=models.UserAccountSettings(
        admin_user_name=cfg.admin,
        admin_user_password=cfg.admin_password,
        admin_user_ssh_public_key=cfg.admin_ssh_key
    )
)

### Create Compute Cluster

In [7]:
_ = client.clusters.create(cfg.resource_group, cluster_name, parameters)

### Monitor Cluster Creation

Get the just created cluster. utilities.py contains a helper function to print out all kind of nodes count in the cluster.

In [8]:
cluster = client.clusters.get(cfg.resource_group, cluster_name)
utilities.print_cluster_status(cluster)

Cluster state: AllocationState.steady Target: 2; Allocated: 2; Idle: 0; Unusable: 0; Running: 2; Preparing: 0


### Deploy MNIST Dataset

For demonstration purposes, we will download preprocessed MNIST dataset to the current directory and upload it to file share directory named `mnist`.

In [9]:
mnist_dataset_url = 'http://download.caffe2.ai/databases/mnist-lmdb.zip'
if not os.path.exists('mnist_train_lmdb') or not os.path.exists('mnist_test_lmdb'):
    utilities.download_file(mnist_dataset_url, 'caffe2_mnist_dataset.zip')
    print('Extracting Caffe2 MNIST dataset...', end='')
    with zipfile.ZipFile('caffe2_mnist_dataset.zip', 'r') as z:
        z.extractall('.')
    print('Done')

Downloading http://download.caffe2.ai/databases/mnist-lmdb.zip ...Done
Extracting Caffe2 MNIST dataset...Done


#### Create File Share and Upload MNIST Dataset

In [10]:
mnist_dataset_directory = 'mnist'

There are multiple ways to create folders and upload files into Azure File Share - you can use [Azure Portal](https://ms.portal.azure.com), [Storage Explorer](http://storageexplorer.com/), [Azure CLI2](/azure-cli-extension) or Azure SDK for your preferable programming language.
In this example we will use Azure SDK for python to copy files into file share.

In [11]:
service = FileService(cfg.storage_account_name, cfg.storage_account_key)
service.create_directory(
    azure_file_share_name, mnist_dataset_directory,
    fail_on_exist=False)
# Since uploading can take significant time, let's check first if the
# file has been uploaded already.
for d in ['mnist-train-nchw-lmdb', 'mnist-test-nchw-lmdb']:
    service.create_directory(
        azure_file_share_name, os.path.join(mnist_dataset_directory, d),
        fail_on_exist=False)
    for f in ['data.mdb', 'lock.mdb']:
        if service.exists(azure_file_share_name, os.path.join(mnist_dataset_directory, d, f)):
            continue
        service.create_file_from_path(
            azure_file_share_name, os.path.join(mnist_dataset_directory, d), f, os.path.join(d,f))
print('Done')

Done


### Deploy Sample Script and Configure the Input Directories

- For each job we will create a folder containing a copy of [resnet50_trainer.py](https://github.com/chainer/chainer/blob/master/examples/mnist/train_mnist.py). This allows each job to have it's own copy of the sample script (in case you would like to change it).

In [12]:
caffe2SampleDir = "Caffe2Samples"
service = FileService(cfg.storage_account_name, cfg.storage_account_key)
service.create_directory(
    azure_file_share_name, caffe2SampleDir, fail_on_exist=False)
service.create_file_from_path(
    azure_file_share_name, caffe2SampleDir, 'resnet50_trainer.py', 'resnet50_trainer.py')

- The job needs to know where to find resnet50_trainer.py script. So, we will configure an input directory for the script:

In [13]:
input_directories = [
    models.InputDirectory(
        id='SCRIPT',
        path='$AZ_BATCHAI_MOUNT_ROOT/{0}/{1}'.format(azure_file_share, caffe2SampleDir)),
    models.InputDirectory(
        id='DATASET',
        path='$AZ_BATCHAI_MOUNT_ROOT/{0}/{1}'.format(azure_file_share, mnist_dataset_directory))
]

The job will be able to reference those directories using ```$AZ_BATCHAI_INPUT_SCRIPT``` and ```$AZ_BATCHAI_INPUT_DATASET``` environment variables.

### Configure Output Directories
We will store standard and error output of the job in Azure File Share:

In [14]:
std_output_path_prefix = "$AZ_BATCHAI_MOUNT_ROOT/{0}".format(azure_file_share)

The temp output will be stored in NFS File Share:

In [15]:
output_directories = [
    models.OutputDirectory(
        id='TEMP',
        path_prefix='$AZ_BATCHAI_MOUNT_ROOT/{0}'.format(nfs_file_share),
        path_suffix="temp"),
]

### Configure Job
- The job will use `caffe2ai/caffe2` container.
- Will use configured previously input and output directories;
- Will run modified `resnet50_trainer.py` from SCRIPT input directory;
- Will output standard output and error streams to file share;
- Will use TEMP output directory shared directory for rendezvous temp files to coordinate between each node. 
- For demostration purpose, we will only run 5 epochs with epoch size as 2000


In [16]:
job_name = datetime.utcnow().strftime("caffe2_%m_%d_%Y_%H%M%S")
parameters = models.job_create_parameters.JobCreateParameters(
     location=cfg.location,
     cluster=models.ResourceId(cluster.id),
     node_count=2,
     input_directories=input_directories,
     std_out_err_path_prefix=std_output_path_prefix,
     output_directories=output_directories,
     container_settings=models.ContainerSettings(
         models.ImageSourceRegistry(image='caffe2ai/caffe2')),
     caffe2_settings = models.Caffe2Settings(
         python_script_file_path='$AZ_BATCHAI_INPUT_SCRIPT/resnet50_trainer.py',
         command_line_args='--num_shards 2 --shard_id $AZ_BATCHAI_TASK_INDEX --run_id 0 --epoch_size 2000 --num_epochs 5 --train_data $AZ_BATCHAI_INPUT_DATASET/mnist-train-nchw-lmdb --file_store_path $AZ_BATCHAI_OUTPUT_TEMP'))

### Create a training Job and wait for Job completion


In [17]:
_ = client.jobs.create(cfg.resource_group, job_name, parameters)
print('Created Job: {}'.format(job_name))

Created Job: caffe2_09_29_2017_011958


### Wait for Job to Finish
The job will start running when the cluster will have enought idle nodes. The following code waits for job to start running printing the cluster state. During job run, the code prints current content of stdout.txt.

**Note** Execution may take several minutes to complete.

In [18]:
utilities.wait_for_job_completion(client, cfg.resource_group, job_name, cluster_name, 'stdouterr', 'stderr-0.txt')

Cluster state: AllocationState.steady Target: 2; Allocated: 2; Idle: 0; Unusable: 0; Running: 2; Preparing: 0
Job state: running ExitCode: None
Waiting for job output to become available...
2017/09/29 01:20:16 cmd is  export LD_LIBRARY_PATH=/usr/local/nvidia/lib:/usr/local/nvidia/lib64;export PYTHONPATH=/usr/local; python /mnt/batch/tasks/shared/LS_root/mounts/external/Caffe2Samples/resnet50_trainer.py --num_shards 2 --shard_id 0 --run_id 0 --epoch_size 2000 --num_epochs 5 --train_data /mnt/batch/tasks/shared/LS_root/mounts/external/mnist/mnist-train-nchw-lmdb --file_store_path /mnt/batch/tasks/shared/LS_root/mounts/fileserver/8001b49e-e169-4ee6-a5ff-9c3ea5385053/batchaitests/jobs/caffe2_09_29_2017_011958/outputs/temp 
INFO:resnet50_trainer:Running on GPUs: [0]
INFO:resnet50_trainer:Using epoch size: 1984
INFO:data_parallel_model:Parallelizing model for devices: [0]
INFO:data_parallel_model:Create input and model training operators
INFO:data_parallel_model:Model for GPU : 0
INFO:data_p

### Download stdout.txt and stderr.txt files for the Job

In [None]:
files = client.jobs.list_output_files(cfg.resource_group, job_name, models.JobsListOutputFilesOptions("stdOuterr")) 
for file in list(files):
    utilities.download_file(file.download_url, file.name)
print("All files Downloaded")

In [None]:
for n in range(nodes_count):
    print('stdout-{0}.txt content:'.format(n))
    with open('stderr-{0}.txt'.format(n)) as f:
        print(f.read())

### Delete the Job

In [None]:
client.jobs.delete(cfg.resource_group, job_name)

### Delete the Cluster
When you are finished with the sample and don't want to submit any more jobs you can delete the cluster using the following code.

In [None]:
client.clusters.delete(cfg.resource_group, cluster_name)

### Delete File Share
When you are finished with the sample and don't want to submit any more jobs you can delete the file share completely with all files using the following code.

In [None]:
service = FileService(cfg.storage_account_name, cfg.storage_account_key)
service.delete_share(azure_file_share_name)