# Orchestrate remote training with Azure Machine Learning service



![AML Arch](https://github.com/jakazmie/AIDays/raw/master/DataScientistTrack/02-AML-EndToEndWalkthrough/images/amlarch.png)

### Create AML workspace


In [None]:
from azureml.core import Workspace

subscription_id ='<your subscription id>'
resource_group ='<your resource group>'
workspace_name = '<your workspace name>'

try:
   ws = Workspace(subscription_id = subscription_id, resource_group = resource_group, workspace_name = workspace_name)

   print('Workspace configuration succeeded. You are all set!')
except:
   print('Workspace not found. Creating...')
   workspace_region = 'southcentralus'
   ws = Workspace.create(name = workspace_name,
                subscription_id = subscription_id,
                resource_group = resource_group, 
                location = workspace_region,
                create_resource_group = True,
                exist_ok = True)

ws.get_details()
ws.write_config()

### Create a training script

To submit the job to the cluster, first create a training script. Run the following code to create the training script called `train.py` in the directory you just created. The scripts encapsulates the code we prepared in the first part of the lab.

In [None]:
import os
script_folder = './script'
os.makedirs(script_folder, exist_ok=True)

In [None]:
%%writefile $script_folder/train.py

import os
import tensorflow as tf
from tensorflow.keras.applications import resnet50
from tensorflow.keras.preprocessing import image
from tensorflow.keras.utils import to_categorical

import numpy as np
import random
    
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Dropout, Flatten, Input
from tensorflow.keras.regularizers import l1_l2

from azureml.core import Run



# This is a generator that yields batches of preprocessed images
class ImageGenerator(tf.keras.utils.Sequence):    
    
    def __init__(self, img_dir, preprocess_fn=None, batch_size=64):
        
        # Create the dictionary that maps class names into numeric labels 
        folders = os.listdir(img_dir)
        folders.sort()
        indexes = range(len(folders))
        label_map = {key: value for (key, value) in zip(folders, indexes)}
        self.num_classes = len(label_map)
        
        # Create a list of all images in a root folder with associated numeric labels
        labeled_image_list = [(os.path.join(img_dir, folder, image), label_map[folder]) 
                              for folder in folders 
                              for image in os.listdir(os.path.join(img_dir, folder))
                              ]
        # Shuffle the list
        random.shuffle(labeled_image_list)
        # Set image list and associated label list
        self.image_list, self.label_list = zip(*labeled_image_list) 
        # Set batch size
        self.batch_size = batch_size
       
        # Set the pre-processing function passed as a parameter
        self.preprocess_fn = preprocess_fn
        
        # Set number of batches
        self.n_batches = len(self.image_list) // self.batch_size
        if len(self.image_list) % self.batch_size > 0:
            self.n_batches += 1
            
    def __len__(self):
        
        return self.n_batches
    
    def __getitem__(self, index):
        pathnames = self.image_list[index*self.batch_size:(index+1)*self.batch_size]
        images = self.__load_images(pathnames)
        
        return images
    
    # Load a set of images passed as a parameter into a NumPy array
    def __load_images(self, pathnames):
        images = []
        for pathname in pathnames:
            img = image.load_img(pathname, target_size=(224,224,3))
            img = image.img_to_array(img)
            images.append(img)
        images = np.asarray(images)
        if self.preprocess_fn != None:
            images = self.preprocess_fn(images)   
        
        return images
    
    # Return labels in one-hot encoding
    def get_labels(self):
        
        return to_categorical(np.asarray(self.label_list), self.num_classes)
    

def fcn_classifier(input_shape=(2048,), units=512, classes=6,  l1=0.01, l2=0.01):
    features = Input(shape=input_shape)
    x = Dense(units, activation='relu')(features)
    x = Dropout(0.5)(x)
    y = Dense(classes, activation='softmax', kernel_regularizer=l1_l2(l1=l1, l2=l2))(x)
    model = Model(inputs=features, outputs=y)
    model.compile(optimizer='adadelta', loss='categorical_crossentropy', metrics=['accuracy'])
    return model


def train_evaluate(run):
   
    # Create bottleneck featurs
    train_images_dir = os.path.join(FLAGS.data_folder, 'train')
    valid_images_dir = os.path.join(FLAGS.data_folder, 'valid')

    train_generator = ImageGenerator(train_images_dir, resnet50.preprocess_input)
    valid_generator = ImageGenerator(valid_images_dir, resnet50.preprocess_input)

    featurizer = resnet50.ResNet50(
                weights = 'imagenet', 
                input_shape=(224,224,3), 
                include_top = False,
                pooling = 'avg')

    print("Generating bottleneck features")
    train_features = featurizer.predict_generator(train_generator, verbose=1)
    train_labels = train_generator.get_labels()

    valid_features = featurizer.predict_generator(valid_generator, verbose=1)
    valid_labels = valid_generator.get_labels()
    
    # Create a classifier
    model = fcn_classifier(input_shape=(2048,), units=FLAGS.units, l1=FLAGS.l1, l2=FLAGS.l2)
    
    # Start training
    print("Starting training")
    model.fit(train_features, train_labels,
          batch_size=64,
          epochs=20,
          shuffle=True,
          validation_data=(valid_features, valid_labels))
          
    # Save the trained model to outp'uts which is a standard folder expected by AML
    print("Training completed.")
    os.makedirs('outputs', exist_ok=True)
    model_file = os.path.join(FLAGS.save_model_dir, 'model.hd5')
    print("Saving model to: {0}".format(model_file))
    model.save(model_file)
    

FLAGS = tf.app.flags.FLAGS

# Default global parameters
tf.app.flags.DEFINE_integer('batch_size', 32, "Number of images per batch")
tf.app.flags.DEFINE_integer('epochs', 10, "Number of epochs to train")
tf.app.flags.DEFINE_integer('units', 512, "Number of epochs to train")
tf.app.flags.DEFINE_float('l1', 0.01, "Number of epochs to train")
tf.app.flags.DEFINE_float('l2', 0.01, "Number of epochs to train")
tf.app.flags.DEFINE_string('data_folder', 'aerialsmall', "Folder with training and validation images")
tf.app.flags.DEFINE_string('save_model_dir', './outputs', "A folder for saving trained model")


def main(argv=None):
    # get hold of the current run
    run = Run.get_submitted_run()
    train_evaluate(run)
  

if __name__ == '__main__':
    tf.app.run()

### Create Experiment

**Experiment** is a logical container in an Azure ML Workspace. It hosts run records which can include run metrics and output artifacts from your experiments.

In [None]:
experiment_name = 'keras-training-on-gpu-cluster'

from azureml.core import Experiment
exp = Experiment(workspace=ws, name=experiment_name)

### Create remote compute cluster

**Creation of the cluster takes approximately 5 minutes.** If the cluster is already in the workspace this code uses it and skips the creation process.

In [None]:
from azureml.core.compute import ComputeTarget, BatchAiCompute
from azureml.core.compute_target import ComputeTargetException

# choose a name for your cluster
batchai_cluster_name = ws.name + 'gpucluster'

try:
    # look for the existing cluster by name
    compute_target = ComputeTarget(workspace=ws, name=batchai_cluster_name)
    if type(compute_target) is BatchAiCompute:
        print('found compute target {}, just use it.'.format(batchai_cluster_name))
    else:
        print('{} exists but it is not a Batch AI cluster. Please choose a different name.'.format(batchai_cluster_name))
except ComputeTargetException:
    print('creating a new compute target...')
    compute_config = BatchAiCompute.provisioning_configuration(vm_size="STANDARD_NC6", # GPU-based VM
                                                                #vm_priority='lowpriority', # optional
                                                                autoscale_enabled=True,
                                                                cluster_min_nodes=1, 
                                                                cluster_max_nodes=4)

    # create the cluster
    compute_target = ComputeTarget.create(ws, batchai_cluster_name, compute_config)
    
    # can poll for a minimum number of nodes and for a specific timeout. 
    # if no min node count is provided it uses the scale settings for the cluster
    compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)
    
    # Use the 'status' property to get a detailed status for the current cluster. 
    print(compute_target.status.serialize())

### Configure datastore
The dataset we will use for training has been uploaded to a public Azure blob storage container. We will register this container as the datastore with the workspace. This will create a connection to data for remote compute targets. 

In [None]:
from azureml.core import Datastore

datastore_name = 'aerialsmall'
try:
    ds = Datastore.register_azure_blob_container(workspace=ws, datastore_name=datastore_name,
                                            container_name='aerialsmall',
                                            account_name='azureailabs')
    print('Creating new datastore')
except:
    ds = Datastore(ws, datastore_name)
    print('Found existing datastore:', ds.name)
   
print(ds.name, ds.datastore_type, ds.account_name, ds.container_name)

### Configure data access



In [None]:
from azureml.core.runconfig import DataReferenceConfiguration
dr = DataReferenceConfiguration(datastore_name=ds.name, 
                   path_on_datastore=None, 
                   path_on_compute=datastore_name,
                   mode='download', # download files from datastore to compute target
                   overwrite=True)

### Create an estimator

An estimator object is used to submit the training run.  Create the estimator by running the following code to define:

* The name of the estimator object, `est`
* The directory that contains your scripts. All the files in this directory are uploaded into the cluster nodes for execution. 
* The compute target.  In this case you will use the Batch AI cluster you created
* The training script name, train.py
* Parameters required from the training script 
* Python packages needed for training

In this tutorial, the target is the Batch AI cluster. All files in the script folder are uploaded into the cluster nodes for execution. The data_folder is set to use the datastore (`ds.as_download`).

In [None]:
from azureml.train.estimator import Estimator

script_params = {
    '--data_folder': ds.as_download(),
    '--l1': 0.01,
    '--l2': 0.01,
    '--units': 512,
    '--epochs': 10
}

est = Estimator(source_directory=script_folder,
                script_params=script_params,
                compute_target=compute_target,
                entry_script='train.py',
                node_count=1,
                process_count_per_node=1,
                use_gpu=True,
                pip_packages=['h5py','pillow','tensorflow-gpu']
                )

### Submit the job to the cluster

Run the experiment by submitting the estimator object.

In [None]:
run = exp.submit(config=est)
run

Since the call is asynchronous, it returns a **Preparing** or **Running** state as soon as the job is started.

### Monitor a remote run

In total, the first run takes **approximately 10 minutes**. But for subsequent runs, as long as the script dependencies don't change, the same image is reused and hence the container start up time is much faster.

Here is what's happening while you wait:

- **Image creation**: A Docker image is created matching the Python environment specified by the estimator. The image is uploaded to the workspace. This stage happens once for each Python environment since the container is cached for subsequent runs.  During image creation, logs are streamed to the run history. You can monitor the image creation progress using these logs.

- **Scaling**: If the remote cluster requires more nodes to execute the run than currently available, additional nodes are added automatically. 

- **Running**: In this stage, the necessary scripts and files are sent to the compute target, then data stores are mounted/copied, then the entry_script is run. While the job is running, stdout and the ./logs directory are streamed to the run history. You can monitor the run's progress using these logs.

- **Post-Processing**: The ./outputs directory of the run is copied over to the run history in your workspace so you can access these results.


You can check the progress of a running job in multiple ways. This tutorial uses a Jupyter widget as well as a `wait_for_completion` method. 

### Jupyter widget

Watch the progress of the run with a Jupyter widget.  Like the run submission, the widget is asynchronous and provides live updates every 10-15 seconds until the job completes. 

Note: Currently, there is a problem with RunDetails widget in DSVM. 

In [None]:
from azureml.train.widgets import RunDetails
RunDetails(run).show()

### Get log results upon completion

Model training and monitoring happen in the background. Wait until the model has completed training before running more code. Use `wait_for_completion` to show when the model training is complete.

In [None]:
run.wait_for_completion(show_output=True) # specify True for a verbose log

### Display run results

The training has completed. You can see the logs generated during the run by executing `Run.get_file_names()` method.

In [None]:
print(run.get_file_names())

## Register model
The last step in the training script wrote the file `model.hd5` in the directory named `outputs` in the VM of the cluster where the job is executed. `outputs` is a special directory in that all content in this  directory is automatically uploaded to your workspace.  This content appears in the run record in the experiment under your workspace. Hence, the model file is now also available in your workspace.

You can register the model so that it can be later quried, examined and deployed.

In [None]:
model = run.register_model(model_name='aerial_classifier', model_path='outputs/model.hd5')
print(model.name, model.id, model.version, sep = '\t')

The model is ready for deployment.