# VIII - Parallel and Distributed Execution
In this notebook, we will execute training across multiple nodes (or in parallel across a single node over multiple GPUs). We will train an image classification model with Resnet20 on the CIFAR-10 data set across multiple nodes in this notebook.

Azure Batch and Batch Shipyard have the ability to perform "gang scheduling" or scheduling multiple nodes for a single task. This is most commonly used for Message Passing Interface (MPI) jobs.

* [Setup](#section1)
* [Configure and Submit MPI Job and Submit](#section2)
* [Delete Multi-Instance Job](#section3)

<a id='section1'></a>

## Setup

Create a simple alias for Batch Shipyard

In [None]:
%alias shipyard SHIPYARD_CONFIGDIR=config python $HOME/batch-shipyard/shipyard.py %l

Check that everything is working

In [None]:
shipyard

Let's first delete the pool used in the non-advanced notebooks and wait for it to be removed so we can free up our core quota. We need to create a new pool with different settings and Docker image.

In [None]:
shipyard pool del -y --wait

Read in the account information we saved earlier

In [None]:
import json
import os

def read_json(filename):
    with open(filename, 'r') as infile:
        return json.load(infile)
    
def write_json_to_file(json_dict, filename):
    """ Simple function to write JSON dictionaries to files
    """
    with open(filename, 'w') as outfile:
        json.dump(json_dict, outfile)

account_info = read_json('account_information.json')

storage_account_key = account_info['storage_account_key']
storage_account_name = account_info['storage_account_name']
STORAGE_ALIAS = account_info['STORAGE_ALIAS']

Create the `resource_files` to randomly download train and test data for CNTK

In [None]:
import random

IMAGE_NAME = 'alfpark/cntk:2.0.rc2-gpu-1bit-sgd-python3.5-cuda8.0-cudnn5.1'

CNTK_TRAIN_DATA_FILE = 'Train_cntk_text.txt'
CNTK_TEST_DATA_FILE = 'Test_cntk_text.txt'
CNTK_DATA_BATCHES_FILE = 'cifar-10-batches-py.tar.gz'
URL_FMT = 'https://{}.blob.core.windows.net/{}/{}'

def select_random_data_storage_container():
    """Randomly select a storage account and container for CNTK train/test data.
    This is specific for the workshop to help distribute attendee load. This
    function will only work on Python2"""
    ss = random.randint(0, 4)
    cs = random.randint(0, 4)
    sa = '{}{}bigai'.format(ss, chr(ord('z') - ss))
    cont = '{}{}{}'.format(cs, chr(ord('i') - cs * 2), chr(ord('j') - cs * 2))
    return sa, cont

def create_resource_file_list():
    sa, cont = select_random_data_storage_container()
    ret = [{
        'file_path': CNTK_TRAIN_DATA_FILE,
        'blob_source': URL_FMT.format(sa, cont, CNTK_TRAIN_DATA_FILE)
    }]
    sa, cont = select_random_data_storage_container()
    ret.append({
        'file_path': CNTK_TEST_DATA_FILE,
        'blob_source': URL_FMT.format(sa, cont, CNTK_TEST_DATA_FILE)
    })
    sa, cont = select_random_data_storage_container()
    ret.append({
        'file_path': CNTK_DATA_BATCHES_FILE,
        'blob_source': URL_FMT.format(sa, cont, CNTK_DATA_BATCHES_FILE)
    })
    return ret

Create data set conversion scripts to be uploaded. On real production runs, we would already have this data pre-converted instead of converting at the time of node startup.

In [None]:
%%writefile convert_cifar10.py
from __future__ import print_function
import cifar_utils as ut

print ('Converting train data to png images...')
ut.saveTrainImages(r'./Train_cntk_text.txt', 'train')
print ('Done.')
print ('Converting test data to png images...')
ut.saveTestImages(r'./Test_cntk_text.txt', 'test')
print ('Done.')

In [None]:
%%writefile convert_cifar10.sh
#!/usr/bin/env bash

set -e
set -o pipefail

In [None]:
with open('convert_cifar10.sh', 'a') as fd:
    fd.write('\n\nIMAGE_NAME="{}"\n'.format(IMAGE_NAME))

In [None]:
%%writefile -a convert_cifar10.sh
CIFAR_DATA=$AZ_BATCH_NODE_SHARED_DIR/cifar10_data
CIFAR_BATCHES=cifar-10-batches-py.tar.gz

mv $AZ_BATCH_TASK_WORKING_DIR/*_cntk_text.txt $AZ_BATCH_TASK_WORKING_DIR/$CIFAR_BATCHES $CIFAR_DATA
echo "Converting CNTK train/test data, this will take some time..."
pushd $CIFAR_DATA
tar zxvpf $CIFAR_BATCHES
rm $CIFAR_BATCHES
chmod 755 run_cifar10.sh
mv run_cifar10.sh $AZ_BATCH_NODE_SHARED_DIR
popd
docker run --rm -v $CIFAR_DATA:$CIFAR_DATA -w $CIFAR_DATA $IMAGE_NAME /bin/bash -c "source /cntk/activate-cntk; cp /cntk/Examples/Image/DataSets/CIFAR-10/* .; python convert_cifar10.py"

Additionally, we'll create an MPI helper script for executing the MPI job. This helper script does the following:
1. Ensures that there are GPUs available to execute the task.
2. Parses the `$AZ_BATCH_HOST_LIST` for all of the hosts participating in the MPI job and creates a `hostfile` from it
3. Computes the total number of slots (processors)
4. Sets the proper CNTK training directory, script and options
5. Executes the MPI job via `mpirun`

In [None]:
%%writefile run_cifar10.sh
#!/usr/bin/env bash

set -e
set -o pipefail

# get number of GPUs on machine
ngpus=$(nvidia-smi -L | wc -l)
echo "num gpus: $ngpus"

if [ $ngpus -eq 0 ]; then
    echo "No GPUs detected."
    exit 1
fi

# get number of nodes
IFS=',' read -ra HOSTS <<< "$AZ_BATCH_HOST_LIST"
nodes=${#HOSTS[@]}

# create hostfile
touch hostfile
>| hostfile
for node in "${HOSTS[@]}"
do
    echo $node slots=$ngpus max-slots=$ngpus >> hostfile
done

# compute number of processors
np=$(($nodes * $ngpus))

# print configuration
echo "num nodes: $nodes"
echo "hosts: ${HOSTS[@]}"
echo "num mpi processes: $np"

# set cntk related vars
modeldir=/cntk/Examples/Image/Classification/ResNet/Python
trainscript=TrainResNet_CIFAR10_Distributed.py

# set training options
trainopts="--datadir $AZ_BATCH_NODE_SHARED_DIR/cifar10_data --outputdir $AZ_BATCH_TASK_WORKING_DIR/output --network resnet20 -q 1 -a 0"

# execute mpi job
/root/openmpi/bin/mpirun --allow-run-as-root --mca btl_tcp_if_exclude docker0 \
    -np $np --hostfile hostfile -x LD_LIBRARY_PATH --wdir $modeldir \
    /bin/bash -c "source /cntk/activate-cntk; python -u $trainscript $trainopts $*"

Move the files into a directory to be uploaded.

In [None]:
INPUT_CONTAINER = 'input-dist'
UPLOAD_DIR = 'dist_upload'

!rm -rf $UPLOAD_DIR
!mkdir -p $UPLOAD_DIR
!mv convert_cifar10.* run_cifar10.sh $UPLOAD_DIR
!ls -alF $UPLOAD_DIR

We will create the config structure to directly reference these files to ingress into Azure Storage. This obviates the need to call `blobxfer` as it will be done for us during pool creation.

In [None]:
config = {
    "batch_shipyard": {
        "storage_account_settings": STORAGE_ALIAS
    },
    "global_resources": {
        "docker_images": [
            IMAGE_NAME
        ],
        "files": [
            {
                "source": {
                    "path": UPLOAD_DIR
                },
                "destination": {
                    "storage_account_settings": STORAGE_ALIAS,
                    "data_transfer": {
                        "container": INPUT_CONTAINER
                    }
                }
            }
        ]
    }
}

Now we'll create the pool specification with a few modifications for this particular execution:
- `inter_node_communication_enabled` will ensure nodes are allocated such that they can communicate with each other (e.g., send and receive network packets)
- `input_data` specifies the scripts we created above to be downloaded into `$AZ_BATCH_NODE_SHARED_DIR/cifar10_data`
- `transfer_files_on_pool_creation` will transfer the `files` specified in `global_resources` to be transferred during pool creation (i.e., `pool add`)
- `resource_files` are the CNTK train and test data files
- `additional_node_prep_commands` are commands to execute for node preparation of all compute nodes. Our additional node prep command is to execute the conversion script we created in an earlier step above

**Note:** Most often it is better to scale up the execution first, prior to scale out. Due to our default core quota of just 20 cores, we are using 3 `STANDARD_NC6` nodes. In real production runs, we'd most likely scale up to multiple GPUs within a single node (parallel execution) such as `STANDARD_NC12` or `STANDARD_NC24` prior to scaling out to multiple NC nodes (parallel and distributed execution).

In [None]:
POOL_ID = 'gpupool-multi-instance'

pool = {
    "pool_specification": {
        "id": POOL_ID,
        "vm_size": "STANDARD_NC6",
        "vm_count": 3,
        "publisher": "Canonical",
        "offer": "UbuntuServer",
        "sku": "16.04-LTS",
        "ssh": {
            "username": "docker"
        },
        "inter_node_communication_enabled": True,
        "reboot_on_start_task_failed": False,
        "block_until_all_global_resources_loaded": True,
        "input_data": {
            "azure_storage": [
                {
                    "storage_account_settings": STORAGE_ALIAS,
                    "container": INPUT_CONTAINER,
                    "destination": "$AZ_BATCH_NODE_SHARED_DIR/cifar10_data"
                }
            ]
        },
        "transfer_files_on_pool_creation": True,
        "resource_files": create_resource_file_list(),
        "additional_node_prep_commands": [
            "/bin/bash $AZ_BATCH_NODE_SHARED_DIR/cifar10_data/convert_cifar10.sh"
        ]
    }
}

In [None]:
!mkdir config
write_json_to_file(config, os.path.join('config', 'config.json'))
write_json_to_file(pool, os.path.join('config', 'pool.json'))
print(json.dumps(config, indent=4, sort_keys=True))
print(json.dumps(pool, indent=4, sort_keys=True))

Create the pool, please be patient while the compute nodes are allocated.

In [None]:
shipyard pool add -y

Ensure that all compute nodes are `idle` and ready to accept tasks:

In [None]:
shipyard pool listnodes

<a id='section2'></a>

## Configure MPI Job and Submit

MPI jobs in Batch require execution as a multi-instance task. Essentially this allows multiple compute nodes to be used for a single task.

A few things to note in this jobs configuration:
- The `COMMAND` executes the `run_cifar10.sh` script that was uploaded earlier as part of the node preparation task.
- `auto_complete` is being set to `True` which forces the job to move from `active` to `completed` state once all tasks complete. Note that once a job has moved to `completed` state, no new tasks can be added to it.
- `multi_instance` property is populated which enables multiple nodes, e.g., `num_instances` to participate in the execution of this task. The `coordination_command` is the command that is run on all nodes prior to the `command`. Here, we are simply executing the Docker image to run the SSH server for the MPI daemon (e.g., orted, hydra, etc.) to initialize all of the nodes prior to running the application command.

In [None]:
JOB_ID = 'cntk-mpi-job'

# reduce the nubmer of epochs to 20 for purposes of this notebook
COMMAND = '$AZ_BATCH_NODE_SHARED_DIR/run_cifar10.sh -e 20'

jobs = {
    "job_specifications": [
        {
            "id": JOB_ID,
            "auto_complete": True,
            "tasks": [
                {
                    "image": IMAGE_NAME,
                    "remove_container_after_exit": True,
                    "command": COMMAND,
                    "gpu": True,
                    "multi_instance": {
                        "num_instances": "pool_current_dedicated",
                        "coordination_command": "/usr/sbin/sshd -D -p 23"
                    },
                }
            ],
        }
    ]
}

In [None]:
write_json_to_file(jobs, os.path.join('config', 'jobs.json'))
print(json.dumps(jobs, indent=4, sort_keys=True))

Submit the job and tail `stdout.txt`:

In [None]:
shipyard jobs add --tail stdout.txt

Using the command below we can check the status of our jobs. Once all jobs have an exit code we can continue. You can also view the **heatmap** of this pool on [Azure Portal](https://portal.azure.com) to monitor the progress of this job on the compute nodes under your Batch account.

<a id='section3'></a>

## Delete Multi-instance Job

Deleting multi-instance jobs running as Docker containers requires a little more care. We will need to first ensure that the job has entered `completed` state. In the above `jobs` configuration, we set `auto_complete` to `True` enabling the Batch service to automatically complete the job when all tasks finish. This also allows automatic cleanup of the running Docker containers used for executing the MPI job.

Special logic is required to cleanup MPI jobs since the `coordination_command` that runs actually detaches an SSH server. The job auto completion logic Batch Shipyard injects ensures that these containers are killed.

In [None]:
shipyard jobs listtasks

Once we are sure that the job is completed, then we issue the standard delete command:

In [None]:
shipyard jobs del -y --termtasks --wait

## Next Steps
You can proceed to the [Notebook: Clean Up](05_Clean_Up.ipynb) if you are done for now, or proceed to one of the following additional Notebooks:
* [Notebook: Automatic Model Selection](06_Advanced_Auto_Model_Selection.ipynb)
* [Notebook: Tensorboard Visualization](07_Advanced_Tensorboard.ipynb) - note this requires running this notebook on your own machine
* [Notebook: Keras with TensorFlow](09_Keras_Single_GPU_Training_With_Tensorflow.ipynb)