Copyright (c) Microsoft Corporation. All rights reserved.   
Licensed under the MIT License.

# Using AML Pipelines for distributed batch prediction
This notebook demonstrates how to run a distributed batch prediction job. __[Inception-V3 model](https://arxiv.org/abs/1512.00567)__  and unlabeled images from __[ImageNet](http://image-net.org/)__ dataset will be used.

## Prerequisites
Make sure you go through the [00.aml-pipeline-configuration](./00.aml-pipeline-configuration.ipynb) Notebook first if you haven't.

In [None]:
import os
from azureml.core import Workspace, Run, Experiment

ws = Workspace.from_config()
print('Workspace name: ' + ws.name, 
      'Azure region: ' + ws.location, 
      'Subscription id: ' + ws.subscription_id, 
      'Resource group: ' + ws.resource_group, sep = '\n')

Create a folder that would contain python scripts that would be run on remote computes.

In [None]:
scripts_folder = "scripts"

if not os.path.isdir(scripts_folder):
    os.mkdir(scripts_folder)

## Create and attach Compute targets
Use the below code to create and attach Compute targets. In this notebook we would use a 3 node batch ai cluster.

In [None]:
from azureml.core.compute import BatchAiCompute, ComputeTarget

# Batch AI compute
cluster_name = "gpu-cluster"
try:
    cluster = BatchAiCompute(ws, cluster_name)
    print("found existing cluster.")
except:
    print("creating new cluster")
    provisioning_config = BatchAiCompute.provisioning_configuration(vm_size="STANDARD_NC6",
                                                                    autoscale_enabled=True,
                                                                    cluster_min_nodes=0, 
                                                                    cluster_max_nodes=3)

    # create the cluster
    cluster = ComputeTarget.create(ws, cluster_name, provisioning_config)
    cluster.wait_for_completion(show_output=True)

# Python scripts to run

`batch_predict.py` takes input images in `dataset_path`, pretrained models in `model_dir`. Each node fills in a list called `predictions`. Using `comm.gather` gets all the `predictions` from worker nodes into a list.

Eventually the aggregated predictions are written to `results-label.txt` to `outputs` directory which stores it in artifacts associated with the run.

In [None]:
%%writefile $scripts_folder/batch_predict.py
import argparse
import datetime
from math import ceil
import numpy as np
import os
import shutil
import tensorflow as tf
from tensorflow.contrib.slim.python.slim.nets import inception_v3
import time
from azureml.core.model import Model
from mpi4py import MPI

slim = tf.contrib.slim

parser = argparse.ArgumentParser(description="Start a tensorflow model serving")
parser.add_argument('--model_dir', dest="model_dir", required=True)
parser.add_argument('--dataset_path', dest="dataset_path", required=True)
parser.add_argument('--batch_size', dest="batch_size", type=int, required=True)

args = parser.parse_args()

image_size = 299
num_channel = 3

# read mpi env vars to assign rank and partitions
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
    
def get_class_label_dict(label_file):
  label = []
  proto_as_ascii_lines = tf.gfile.GFile(label_file).readlines()
  for l in proto_as_ascii_lines:
    label.append(l.rstrip())
  return label


class DataIterator:
    def __init__(self, data_dir):
        self.file_paths = []
        image_list = sorted(os.listdir(data_dir))
        partition_size = len(image_list) // size
        image_list = image_list[rank * partition_size: (rank + 1) * partition_size]
        self.file_paths = [data_dir + '/' + file_name.rstrip() for file_name in image_list ]

        self.labels = [1 for file_name in self.file_paths]

    @property
    def size(self):
        return len(self.labels)

    def input_pipeline(self, batch_size):
        images_tensor = tf.convert_to_tensor(self.file_paths, dtype=tf.string)
        labels_tensor = tf.convert_to_tensor(self.labels, dtype=tf.int64)
        input_queue = tf.train.slice_input_producer([images_tensor, labels_tensor], shuffle=False)
        labels = input_queue[1]
        images_content = tf.read_file(input_queue[0])

        image_reader = tf.image.decode_jpeg(images_content, channels=num_channel, name="jpeg_reader")
        float_caster = tf.cast(image_reader, tf.float32)
        new_size = tf.constant([image_size, image_size], dtype=tf.int32)
        images = tf.image.resize_images(float_caster, new_size)
        images = tf.divide(tf.subtract(images, [0]), [255])

        image_batch, label_batch = tf.train.batch([images, labels], batch_size=batch_size, capacity=5 * batch_size)
        return image_batch

def main(_):
    start_time = datetime.datetime.now()
    label_file_name = os.path.join(args.model_dir, "labels.txt")
    label_dict = get_class_label_dict(label_file_name)
    classes_num = len(label_dict)
    test_feeder = DataIterator(data_dir=args.dataset_path)
    total_size = len(test_feeder.labels)
    count = 0
    # get model from model registry
    model_path = os.path.join(args.model_dir, "inception_v3.ckpt")
    predictions = []
    with tf.Session() as sess:
        test_images = test_feeder.input_pipeline(batch_size=args.batch_size)
        with slim.arg_scope(inception_v3.inception_v3_arg_scope()):
            input_images = tf.placeholder(tf.float32, [args.batch_size, image_size, image_size, num_channel])
            logits, _ = inception_v3.inception_v3(input_images,
                                                        num_classes=classes_num,
                                                        is_training=False)
            probabilities = tf.argmax(logits, 1)

        sess.run(tf.global_variables_initializer())
        sess.run(tf.local_variables_initializer())
        coord = tf.train.Coordinator()
        threads = tf.train.start_queue_runners(sess=sess, coord=coord)
        saver = tf.train.Saver()
        saver.restore(sess, model_path)
        i = 0
        while count < total_size and not coord.should_stop():
            test_images_batch = sess.run(test_images)
            file_names_batch = test_feeder.file_paths[i*args.batch_size: min(test_feeder.size, (i+1)*args.batch_size)]
            results = sess.run(probabilities, feed_dict={input_images: test_images_batch})
            new_add = min(args.batch_size, total_size-count)
            count += new_add
            i += 1
            for j in range(new_add):
                predictions.append(os.path.basename(file_names_batch[j]) + ": " + label_dict[results[j]])
        coord.request_stop()
        coord.join(threads)
            
        # get results from all nodes
        predictions_lst = comm.gather(predictions, root=0)
        
        # write results
        if rank == 0:
            out_filename = "outputs/result-labels.txt"
            with open(out_filename, "w") as fp:
                for preds in predictions_lst:
                    for line in preds:
                        fp.write(line + "\n")
                    

if __name__ == "__main__":
    tf.app.run()

## Prepare Model and Input data

Download the public blob container `sampledata` containing images from ImageNet evaluation set and `Inception v3` model to your own storage account. Change the `"<storage-account-name>"` and `"<storage-account-key>"` to refer to the storage account's key. Change `data_container_name` and `model_container_name` to refer to your containers' names.

We have shared two public containers `sample-images` and `inception-model` in a storage account `pipelinedata` that contains the data that would go in these two containers. 

In [None]:
import os

account_name = os.environ.get("PIPELINE_DATA_ACCOUNT_NAME", "<storage-account-name>")
account_key = os.environ.get("PIPELINE_DATA_ACCOUNT_KEY", "<storage-account-key")

data_container_name = "sample-images"
model_container_name = "inception-model"

Create datastores called `images_datastore` and `model_datastore` from your blob containers. The `overwrite=True` step overwrites any datastore that was created previously with that name. 


In [None]:
from azureml.core.datastore import Datastore

images_data = Datastore.register_azure_blob_container(ws, datastore_name="images_datastore", container_name=data_container_name, 
                                                        account_name=account_name, account_key=account_key, overwrite=True)
inception_model = Datastore.register_azure_blob_container(ws, datastore_name="model_datastore", container_name=model_container_name, 
                                                        account_name=account_name, account_key=account_key, overwrite=True)

# Specify where the input data is stored

In [None]:
from azureml.core import Datastore
from azureml.data.data_reference import DataReference
from azureml.pipeline.core import PipelineData

input_images = DataReference(datastore=images_data, data_reference_name="input_images")

model_dir = DataReference(datastore=inception_model, data_reference_name="input_model")

# Steps to run

A subset of the parameters to the python script can be given as input when we re-run a `PublishedPipeline`. In the current example, we define `batch_size` taken by the script as such parameter.

In [None]:
from azureml.pipeline.core.graph import PipelineParameter

batch_size_param = PipelineParameter(name="param_batch_size", default_value=20)

In [None]:
from azureml.pipeline.steps import MpiStep

inception_model_name = "inception_v3.ckpt"

batch_pred_step = MpiStep(
    name="batch prediction",
    script_name="batch_predict.py",
    arguments=["--dataset_path", input_images, 
               "--model_dir", model_dir,
               "--batch_size", batch_size_param],
    target=cluster,
    node_count=3, 
    process_count_per_node=1,
    inputs=[input_images, label_dir, model_dir],
    pip_packages=["mpi4py", "tensorflow-gpu==1.10.0"],
    use_gpu=True,
    source_directory=scripts_folder
)

In [None]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[batch_pred_step])
pipeline_run = Experiment(ws, 'mpi_batch_prediction').submit(pipeline, pipeline_params={"param_batch_size": 20})

# Monitor run

In [None]:
from azureml.train.widgets import RunDetails
RunDetails(pipeline_run).show()

In [None]:
pipeline_run.wait_for_completion(show_output=True)

# Download and review output

In [None]:
step_run = list(pipeline_run.get_children())[0]
step_run.download_file("./outputs/result-labels.txt")

In [None]:
import pandas as pd
df = pd.read_csv("result-labels.txt", delimiter=":", header=None)
df.columns = ["Filename", "Prediction"]
df.head()

# Publish a pipeline and rerun using a REST call

## Create a published pipeline

In [None]:
published_pipeline = pipeline_run.publish_pipeline(
    name="Inception v3 prediction", description="Batch prediction using Inception v3 model", version="1.0")

published_id = published_pipeline.id

## Rerun using REST call

## Get AAD token

In [None]:
from azureml.core.authentication import AzureCliAuthentication
import requests

cli_auth = AzureCliAuthentication()
aad_token = cli_auth.get_authentication_header()

## Run published pipeline using its REST endpoint

In [None]:
rest_endpoint = published_pipeline.endpoint
# specify batch size when running the pipeline
response = requests.post(rest_endpoint, 
                         headers=aad_token, 
                         json={"ExperimentName": "mpi_batch_prediction",
                               "ParameterAssignments": {"param_batch_size": 50}})
run_id = response.json()["Id"]

## Monitor the new run

In [None]:
from azureml.pipeline.core.run import PipelineRun
published_pipeline_run = PipelineRun(ws.experiments["mpi_batch_prediction"], run_id)

RunDetails(published_pipeline_run).show()