Copyright (c) Microsoft Corporation. All rights reserved.

Licensed under the MIT License.

# Distributed Image Classification with PyTorch

In this tutorial, you will train a PyTorch model on the [CIFAR10](http://www.cs.toronto.edu/~kriz/cifar.html) dataset using distributed training with PyTorch's `DistributedDataParallel` module across an AKS-HCI CPU cluster. The training dataset are stored in NFS server. 

## Prerequisites

*    [ A Kubernetes cluster deployed on Azure Stack HCI, connected to Azure through ARC](https://docs.microsoft.com/en-us/azure-stack/aks-hci/connect-to-arc).
     

*    [ Setup a NFS Server ](https://github.com/Azure/AML-Kubernetes/blob/master/docs/setup-ephemeral-nfs-volume.md) 


*    Last but not least, you need to be able to run a Notebook. (azureml-core, azureml-opendatasets, numpy, matplotlib, requests are required)


   If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure you go through the configuration Notebook located at [here](https://github.com/Azure/MachineLearningNotebooks) first. This sets you up with a working config file that has information on your workspace, subscription id, etc.

In [1]:
# Check core SDK version number
import azureml.core

print("SDK version:", azureml.core.VERSION)

SDK version: 1.34.0


## Initialize AzureML workspace

Initialize a [Workspace](https://docs.microsoft.com/azure/machine-learning/service/concept-azure-machine-learning-architecture#workspace) object from the existing workspace you created in the Prerequisites step. `Workspace.from_config()` creates a workspace object from the details stored in `config.json`. 

If you haven't done already please go to `config.json` file and fill in your workspace information.

In [2]:
from azureml.core.workspace import Workspace,  ComputeTarget
from azureml.exceptions import ComputeTargetException

ws = Workspace.from_config()
print('Workspace name: ' + ws.name, 
      'Azure region: ' + ws.location, 
      'Subscription id: ' + ws.subscription_id, 
      'Resource group: ' + ws.resource_group, sep='\n')

If you run your code in unattended mode, i.e., where you can't give a user input, then we recommend to use ServicePrincipalAuthentication or MsiAuthentication.
Please refer to aka.ms/aml-notebook-auth for different authentication mechanisms in azureml-sdk.


Workspace name: demo-ws
Azure region: eastus
Subscription id: 86204643-5a96-427b-b6bb-b35b2bd6e6ce
Resource group: AKS-HCI


## Download cifar10 data

Use this function to download cifar10 data later. This function allows you to avoid download the data again when you run this notebook multiple times. The actual download time may take 5 minutes.

In [None]:
import os
import requests
import tempfile

def download_cifar10_data():
    
    path = 'https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz'
    downloaded_folder = os.path.join(os.getcwd(), 'cifar10-data')
    os.makedirs(downloaded_folder, exist_ok=True) # download data to 'cifar10-data' folder

    data = requests.get(path, allow_redirects=True).content
    with open(os.path.join(downloaded_folder, path.split('/')[-1]), 'wb') as f:
        f.write(data)
        
    return downloaded_folder

downloaded_folder = download_cifar10_data()
downloaded_folder

## Prepare the cifar10 data to NFS server

The following cell will upload "cifar-10-python.tar.gz" to NFS server

In [None]:
nfs_mount_path = "/home/jiadu/data"

import os, shutil
cifar10_dir = os.path.join(nfs_mount_path, 'cifar10')
shutil.rmtree(cifar10_dir, ignore_errors=True)
os.makedirs(cifar10_dir, exist_ok=True)

In [None]:
for filename in os.listdir(downloaded_folder):
    filepath = os.path.join(downloaded_folder, filename)
    destpath = os.path.join(cifar10_dir, filename)
    print(f"Copying files from {filepath} to {destpath}")
    shutil.copyfile(filepath, destpath)

## Setup compute target

Find the attach name for the Arc enabled  Azure Stack Hub kubernetes cluster in your AzureML workspace to create a ComputeTarget:

attach_name is the attached name for your ASH cluster you setup in [this step](https://github.com/Azure/AML-Kubernetes/blob/master/docs/ASH/AML-ARC-Compute.md)

In [None]:
from azureml.core.compute import KubernetesCompute

attach_name = "<NAME_OF_AML_ATTACHED_COMPUTE_OF_YOUR_ASH_CLUSTER>"
arcK_target = KubernetesCompute(ws, attach_name)
print("using compute target: ", arcK_target.name)

## Configure the training job and submit

### Create an experiement

In [None]:
from azureml.core import Experiment

experiment_name = 'pytorch-cifar-distr'
experiment = Experiment(ws, name=experiment_name)

### Create an environment

In [None]:
from azureml.core import Environment

pytorch_env = Environment.from_conda_specification(name = 'pytorch-1.6-cpu', file_path = 'pytorch-script/conda_dependencies.yml')

# Specify a CPU base image
pytorch_env.docker.enabled = True
pytorch_env.docker.base_image = 'mcr.microsoft.com/azureml/openmpi3.1.2-ubuntu18.04'


### Configure the training job: torch.distributed with GLOO backend

Create a ScriptRunConfig object to specify the configuration details of your training job, including your training script, environment to use, and the compute target to run on.

In order to run a distributed PyTorch job with **torch.distributed** using the GLOO backend, create a `PyTorchConfiguration` and pass it to the `distributed_job_config` parameter of the ScriptRunConfig constructor. Specify `communication_backend='Gloo'` in the PyTorchConfiguration. The below code will configure node_count = 2. These is the number of worker nodes. The number of  distributed jobs will be 3 if one master node is used.  GLOO backend which is recommended backend for communications between CPUs.

Tthe script for distributed training of CIFAR10 is already provided for you at `pytorch-script/cifar_dist_main.py`. In practice, you should be able to take any custom PyTorch training script as is and run it with Azure ML without having to modify your code.

With node_count=2, training for one epoch may take 20 mins with vm size comparable to Standard_DS3_v2

In [None]:
from azureml.core import ScriptRunConfig
from azureml.core.runconfig import PyTorchConfiguration
from azureml.core import Dataset
import os

data_folder = os.path.join("/nfs_share", "cifar10") # training data are saved to <mountPoint>/cifar10
args = [
        '--data-folder', data_folder,
        '--dist-backend', 'gloo',
       '--epochs', 1 #20
           ]

distributed_job_config=PyTorchConfiguration(communication_backend='Gloo', node_count=2) #configuring AML pytorch config

project_folder = "pytorch-script"
run_script = "cifar_dist_main.py"
src = ScriptRunConfig(
                     source_directory=project_folder,
                      script=run_script,
                      arguments=args,
                      compute_target=arcK_target,
                      environment=pytorch_env,
                      distributed_job_config=distributed_job_config)

### Submit the job
Run your experiment by submitting your ScriptRunConfig object. Note that this call is asynchronous.

In [None]:
run = experiment.submit(src)
run.wait_for_completion(show_output=True) # this provides a verbose log

### Register the model

Register the trained model.

In [None]:
register_model_name = 'cifar10torch'
model = run.register_model(model_name=register_model_name, model_path='outputs/cifar10torch.pt')

The machine learning model named "cifar10torch" should be registered in your AzureML workspace.

### Get the model

In [None]:
from azureml.core.model import Model
model = Model(ws, model_name)
model_id = f"azureml:{model.name}:{model.version}"
print(f"Get {model.name}, latest version {model.version}, id in endpoint.yml: {model_id}")

## Deploy and score a machine learning model by using a managed online endpoint

AZ CLI only now

In [None]:
endpoint = 'pytorch-cifar10-jiadu'

import os
from pathlib import Path
prefix = Path(os.getcwd())
endpoint_file = str(prefix.joinpath("pytorch-cifar10.yml"))
print(f"Using Endpoint file: {endpoint_file}, please replace model id (<model-id>) and compute target id (<compute-id>) according above output")

In [None]:
import helpers
from azureml.core.workspace import Workspace

ws = Workspace.from_config()
print('Workspace name: ' + ws.name, 
      'Azure region: ' + ws.location, 
      'Subscription id: ' + ws.subscription_id, 
      'Resource group: ' + ws.resource_group, sep='\n')
helpers.run(f"az ml endpoint create -n {endpoint} -f {endpoint_file} -w {ws.name} -g {ws.resource_group}")

## Test trained model

Get score_uri and access_token from AZ CLI (Currently only AZ CLI supported)

In [None]:
# get score_url and access_token from AZ CLI
import helpers
from azureml.core.workspace import Workspace
ws = Workspace.from_config()
cmd = f"az ml endpoint show -n {endpoint} -w {ws.name} -g {ws.resource_group}"
properties = helpers.run(cmd, return_output=True, no_output=True)

cmd = f"az ml endpoint get-credentials -n {endpoint} -w {ws.name} -g {ws.resource_group}"
credentials = helpers.run(cmd, return_output=True, no_output=True)

### Test with inputs

For testing purpose, first image (a cat) from test batch is extracted (it is saved at test_imgs/test_img_0_cat.jpg). It is shown here: 

![fishy](test_imgs/test_img_0_cat.jpg)

After some data process, the image converted to json as input for the trained model. The outputs are logits for each class per image:

In [None]:
import json
prop_response = json.loads(properties.replace(os.linesep,""))
score_uri = prop_response["scoring_uri"]

cred_response = json.loads(credentials.replace(os.linesep, ""))
access_token = cred_response["accessToken"]

with open("cifar_test_input_pytorch.json", "r") as fp:
    inputs_json = json.load(fp)
    
inputs = json.dumps(inputs_json)

import requests
# second number should be 2
test = inputs
headers = {'Content-Type': 'application/json', 'Authorization': f"Bearer {access_token}"}
r = requests.post(score_uri, data=test, headers=headers)
print(f"predictions: {r.content}")
predicts = r.content

Then you can easily get the predictions of labels:

In [None]:
import numpy as np
classes = ('plane', 'car', 'bird', 'cat', 'deer',
           'dog', 'frog', 'horse', 'ship', 'truck')
np_predicts = np.array(predicts)
pred_indexes = np.argmax(np_predicts, 1)

predict_labels = [classes[i] for i in pred_indexes]
print(predict_labels)

Depending on your model, you may or may not get the correct label which is "cat".

## Next steps

1. Learn how to [download model then upload to Azure Storage blobs](../AML-model-download-upload.ipynb)
2. Learn how to [inference using KFServing with model in Azure Storage Blobs](https://aka.ms/kfas)
3. Learn Pipeline Steps with [Object Segmentation](../object-segmentation-on-azure-stack/)