Copyright (c) Microsoft Corporation. All rights reserved.

Licensed under the MIT License.

# Image classification distributed training

This is a sample to demonstrate distributed training in azure machine learning. In this pipeline, we use designer built-in modules to do image preprocessing on cpu nodes, and a mpi custom module on distributed gpu nodes.


You will learn how to:
* Using unregistered modules: Load modules from local yaml spec using module SDK.
* Build pipeline with unregistered modules and AzureML built-in modules.


## Prerequisites
* Install azure cli with azure-cli-ml extension following the [instructions here](setup-environment.ipynb).

## Configure workspace and compute

In [None]:
from azureml.core import Workspace

# TODO: Configure your workspace information here.
workspace = Workspace.get(
    name='Your workspace name',
    subscription_id='Your subscription ID',
    resource_group='Your resource group name'
)
print(workspace.name, workspace.resource_group, workspace.location, workspace.subscription_id, sep = '\n')

## Retrieve or create an Azure Machine Learning compute target
Azure Machine Learning Compute is a service for provisioning and managing clusters of Azure virtual machines for running machine learning workloads. Create a new Azure Machine Learning Compute in the current workspace, if it doesn't already exist. We will then run the pipeline on this compute target.

If we could not find the compute with the given name, then we will create a new compute here. This process is broken down into the following steps:

1. Create the configuration
2. Create the Azure Machine Learning compute

**This process will take a few minutes and is providing only sparse output in the process. Please make sure to wait until the call returns before moving to the next cell.**

In [None]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.exceptions import ComputeTargetException

# specify aml compute name.
gpu_compute_target = 'gpu-cluster'
cpu_compute_target = 'compute-ds3v2'
try:
    gpu_compute = AmlCompute(workspace, gpu_compute_target)
    print("Found existing gpu compute target: {}".format(gpu_compute_target))
except:
    print("Need to create a new gpu compute")
    provisioning_config = AmlCompute.provisioning_configuration(vm_size = "STANDARD_NC6",
                                                                min_nodes = 0, 
                                                                max_nodes = 4)
    gpu_compute = ComputeTarget.create(workspace, gpu_compute_target, provisioning_config)
    gpu_compute.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)

try:
    cpu_compute = AmlCompute(workspace, cpu_compute_target)
    print("Found existing cpu compute target: {}".format(cpu_compute_target))
except:
    print("Creating a new cpu compute target: {}".format(cpu_compute_target))  
    provisioning_config = AmlCompute.provisioning_configuration(vm_size = "STANDARD_DS3_V2",
                                                                min_nodes = 0, 
                                                                max_nodes = 4)    
    cpu_compute = ComputeTarget.create(workspace, cpu_compute_target, provisioning_config)
    cpu_compute.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)

## Prepare dataset
This dataset is a subset of the [Open Images Dataset](https://storage.googleapis.com/openimages/web/index.html).
- training dataset contains 24 images (3 categories * 8 images per category)
- validation dataset contains 6 images (3 categories * 2 images per category)

This is a extremely small dataset only for demo usage for this notebook. Use larger datasets to train models for production usage. 

NOTE: Use zip file here to avoid performance issue of mounting file dataset with many subfolders.

In [None]:
import hashlib
from pathlib import Path
from azureml.core import Dataset

def register_dataset_by_path(ws: Workspace, dataset_name, path):
    if not path.is_dir():
        raise ValueError(f"Dataset must be a folder.")
    # Upload path to datastore
    m = hashlib.sha256()
    m.update(str(path).encode())
    ds_path = m.hexdigest()

    datastore = ws.get_default_datastore()
    path_on_datastore = folder_on_datastore = f'/data/{ds_path}'
    datastore.upload(str(path), target_path=folder_on_datastore)

    # Create a FileDataset
    datastore_paths = [(datastore, path_on_datastore + '/**')]
    dataset = Dataset.File.from_files(datastore_paths)
    print(f"Registering dataset for path {path}")
    dataset.register(ws, name=dataset_name, create_new_version=True)
    print("Dataset registered", dataset)
    return Dataset.get_by_name(ws, name=dataset_name)

train_image_dataset = register_dataset_by_path(workspace, dataset_name='TrainData', path=Path('assets/image-classification/train'))
val_image_dataset = register_dataset_by_path(workspace, dataset_name='ValidData', path=Path('assets/image-classification/val'))

## Load modules

In [None]:
from azureml.pipeline.wrapper import Module

# load built-in modules
convert_func = Module.load(workspace, namespace='azureml', name='Convert to Image Directory')
init_transform_func = Module.load(workspace, namespace='azureml', name='Init Image Transformation')
apply_transform_func = Module.load(workspace, namespace='azureml', name='Apply Image Transformation')

In [None]:
# load local modules
# this train module is an mpi module.
train_module = Module.from_yaml(workspace, yaml_file='modules/imagecnn-train/entry.spec.yaml')

## Set up a pipeline

Create a pipeline using the modules, and submit experiment to AzureML using module SDK.

Here is a [preview of the pipeline](assets/image-classification/pipeline.png).

In [None]:
from azureml.pipeline.wrapper import dsl

# define pipeline
@dsl.pipeline(name='image classification', description='image classification', default_compute_target=cpu_compute_target)
def generated_pipeline():
    convert_train = convert_func(
        input_dataset=train_image_dataset
    )
    
    convert_val = convert_func(
        input_dataset=val_image_dataset
    )
    
    init_trans = init_transform_func(
        resize='False',
        size=256,
        center_crop='False',
        crop_size=224,
        pad='False',
        padding=0,
        color_jitter='False',
        grayscale='False',
        random_resized_crop='False',
        random_resized_crop_size=256,
        random_crop='False',
        random_crop_size=224,
        random_horizontal_flip='True',
        random_vertical_flip='False',
        random_rotation='False',
        random_rotation_degrees=0,
        random_affine='False',
        random_affine_degrees=0,
        random_grayscale='False',
        random_perspective='False'
    )
    
    apply_trans_on_train = apply_transform_func(
        mode='For training',
        input_image_transformation=init_trans.outputs.output_image_transformation,
        input_image_directory=convert_train.outputs.output_image_directory
    )
    
    apply_trans_on_val = apply_transform_func(
        mode='For inference',
        input_image_transformation=init_trans.outputs.output_image_transformation,
        input_image_directory=convert_val.outputs.output_image_directory
    )
    
    train = train_module(
        train_data=apply_trans_on_train.outputs.output_image_directory,
        valid_data=apply_trans_on_val.outputs.output_image_directory,
        data_backend='pytorch',
        pretrained_weights=None,
        epochs=4,
        seed=123,
        batch_size=16,
        save_checkpoint_epochs=2
    )
    # perform distributed training with 2 nodes.
    # note: process_count_per_node should be 1 because this module will launch distributed processes based on node device count.
    train.runsettings.configure(target=gpu_compute_target, node_count=2, process_count_per_node=1)

In [None]:
# create a pipeline
pipeline = generated_pipeline()

In [None]:
# validate pipeline and visualize the graph
pipeline.validate()

In [None]:
# submit a pipeline run
run = pipeline.submit(experiment_name='image_classification')

# Show details of the run
run


In [None]:
run.wait_for_completion()