# Image Classification Training Pipeline

In this sample, image preprocessing is on cpu nodes while training on distributed gpu nodes.

In [1]:
import os
from azureml.core import Workspace
from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.pipeline.wrapper import Module, Pipeline, dsl

## Configure workspace and compute

In [2]:
# configure workspace information here.
workspace = Workspace.get(
    name='itp-pilot',
    subscription_id='4aaa645c-5ae2-4ae9-a17a-84b9023bc56a',
    resource_group='itp-pilot-ResGrp'
)
print(workspace.name, workspace.resource_group, workspace.location, workspace.subscription_id, sep = '\n')

If you run your code in unattended mode, i.e., where you can't give a user input, then we recommend to use ServicePrincipalAuthentication or MsiAuthentication.
Please refer to aka.ms/aml-notebook-auth for different authentication mechanisms in azureml-sdk.
itp-pilot
itp-pilot-ResGrp
westeurope
4aaa645c-5ae2-4ae9-a17a-84b9023bc56a


In [3]:
# specify aml compute name.
gpu_compute_target = 'gpu-cluster'
cpu_compute_target = 'compute-ds3v2'
try:
    gpu_compute = AmlCompute(workspace, gpu_compute_target)
    print("Found existing gpu compute target: {}".format(gpu_compute_target))
except:
    print("Need to create a new gpu compute")
    provisioning_config = AmlCompute.provisioning_configuration(vm_size = "STANDARD_NC6",
                                                                min_nodes = 0, 
                                                                max_nodes = 4)
    gpu_compute = ComputeTarget.create(workspace, gpu_compute_target, provisioning_config)
    gpu_compute.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)

try:
    cpu_compute = AmlCompute(workspace, cpu_compute_target)
    print("Found existing cpu compute target: {}".format(cpu_compute_target))
except:
    print("Creating a new cpu compute target: {}".format(cpu_compute_target))  
    provisioning_config = AmlCompute.provisioning_configuration(vm_size = "STANDARD_DS3_V2",
                                                                min_nodes = 0, 
                                                                max_nodes = 4)    
    cpu_compute = ComputeTarget.create(workspace, cpu_compute_target, provisioning_config)
    cpu_compute.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)

Found existing gpu compute target: gpu-cluster
Found existing cpu compute target: compute-ds3v2


## Prepare dataset
This smaller imagenet dataset is a subset of the official one.
- training dataset contains 9100 images (7 categories * 1300 images per category)
- validation dataset contains 350 images (7 categories * 50 images per category)

Need to use zip file here to avoid perf issue of mounting file dataset with many subfolders.

In [4]:
# get dataset
from azureml.core.dataset import Dataset

train_image_dataset = Dataset.get_by_name(workspace, name='ImageNetSmallerTrainData')
val_image_dataset = Dataset.get_by_name(workspace, name='ImageNetSmallerValidData')

## Load modules

In [5]:
# load built-in modules
convert_func = Module.load(workspace, namespace='azureml', name='Convert to Image Directory')
init_transform_func = Module.load(workspace, namespace='azureml', name='Init Image Transformation')
apply_transform_func = Module.load(workspace, namespace='azureml', name='Apply Image Transformation')

In [6]:
# load local modules
# this train module is a mpi module.
module_folder = r'./modules'
yaml_file_name = 'entry.spec.yaml'
train_module = Module.from_yaml(workspace, yaml_file=f'{module_folder}/ConvNets/{yaml_file_name}')

## Set up a pipeline

In [7]:
# define pipeline
@dsl.pipeline(name='image classification', description='image classification', default_compute_target='compute-ds3v2')
def generated_pipeline():
    convert_train = convert_func(
        input_dataset=train_image_dataset
    )
    
    convert_val = convert_func(
        input_dataset=val_image_dataset
    )
    
    init_trans = init_transform_func(
        resize='False',
        size=256,
        center_crop='False',
        crop_size=224,
        pad='False',
        padding=0,
        color_jitter='False',
        grayscale='False',
        random_resized_crop='False',
        random_resized_crop_size=256,
        random_crop='False',
        random_crop_size=224,
        random_horizontal_flip='True',
        random_vertical_flip='False',
        random_rotation='False',
        random_rotation_degrees=0,
        random_affine='False',
        random_affine_degrees=0,
        random_grayscale='False',
        random_perspective='False'
    )
    
    apply_trans_on_train = apply_transform_func(
        mode='For training',
        input_image_transformation=init_trans.outputs.output_image_transformation,
        input_image_directory=convert_train.outputs.output_image_directory
    )
    
    apply_trans_on_val = apply_transform_func(
        mode='For inference',
        input_image_transformation=init_trans.outputs.output_image_transformation,
        input_image_directory=convert_val.outputs.output_image_directory
    )
    
    train = train_module(
        train_data=apply_trans_on_train.outputs.output_image_directory,
        valid_data=apply_trans_on_val.outputs.output_image_directory,
        data_backend='pytorch',
        pretrained_weights=None,
        epochs=4,
        seed=123,
        batch_size=16,
        save_checkpoint_epochs=2
    )
    # perform distributed training with 2 nodes.
    # note: process_count_per_node should be 1 because this module will launch distributed processes based on node device count.
    train.runsettings.configure(target=gpu_compute_target, node_count=2, process_count_per_node=1)

In [8]:
# create a pipeline
pipeline = generated_pipeline()

In [9]:
# validate pipeline and visualize the graph
pipeline.validate()

<IPython.core.display.Javascript object>

ValidateView(container_id='container_id_20d51ad4-9ab7-44aa-b4e6-90ec2da7b9c7_widget', env_json='{"subscription…

{'result': 'validation passed', 'errors': []}

In [1]:
# submit a pipeline run
pipeline.submit(experiment_name='image_classification').wait_for_completion()