In [1]:
# https://docs.microsoft.com/en-us/azure/machine-learning/how-to-access-data#python-sdk

In [2]:
from azureml.core import Workspace
ws = Workspace.from_config()

### Create Datastore for sample images (from public account)

In [3]:
from azureml.core.datastore import Datastore

In [4]:
batchscore_blob = Datastore.register_azure_blob_container(ws, 
                      datastore_name="images_datastore", 
                      container_name="sampledata", 
                      account_name="pipelinedata", 
                      overwrite=True)
def_data_store = ws.get_default_datastore()

### Create Dataset Objects

In [5]:
from azureml.core.dataset import Dataset
from azureml.pipeline.core import PipelineData

In [22]:
input_images = Dataset.File.from_files((batchscore_blob, "batchscoring/images/"))
label_ds = Dataset.File.from_files((batchscore_blob, "batchscoring/labels/"))
output_dir = PipelineData(name="scores", 
                          datastore=def_data_store, 
                          output_path_on_compute="batchscoring/results")

In [24]:
print(input_images)

FileDataset
{
  "source": [
    "('images_datastore', 'batchscoring/images/')"
  ],
  "definition": [
    "GetDatastoreFiles"
  ]
}


In [25]:
print(label_ds)

FileDataset
{
  "source": [
    "('images_datastore', 'batchscoring/labels/')"
  ],
  "definition": [
    "GetDatastoreFiles"
  ]
}


In [7]:
def_data_store

{
  "name": "workspaceblobstore",
  "container_name": "azureml-blobstore-2dd703c0-9f8b-4e31-bb4d-9ff3f50aa329",
  "account_name": "amlworkbook2803154736",
  "protocol": "https",
  "endpoint": "core.windows.net"
}

In [18]:
# Optionally register the datasets to the workspace (to reuse them later)
input_images = input_images.register(workspace = ws, name = "input_images")
label_ds = label_ds.register(workspace = ws, name = "label_ds")

In [20]:
print(input_images)

FileDataset
{
  "source": [
    "('images_datastore', 'batchscoring/images/')"
  ],
  "definition": [
    "GetDatastoreFiles"
  ],
  "registration": {
    "id": "a0f585cb-7cb3-4968-b6cd-642355cce7c7",
    "name": "input_images",
    "version": 1,
    "workspace": "Workspace.create(name='aml-workbook', subscription_id='b060ea58-a590-43cc-86ea-8ee676be2a76', resource_group='aml-workbook')"
  }
}


In [21]:
print(label_ds)

FileDataset
{
  "source": [
    "('images_datastore', 'batchscoring/labels/')"
  ],
  "definition": [
    "GetDatastoreFiles"
  ],
  "registration": {
    "id": "f590d9ba-13c5-4efd-a15c-10cd0306ec14",
    "name": "label_ds",
    "version": 1,
    "workspace": "Workspace.create(name='aml-workbook', subscription_id='b060ea58-a590-43cc-86ea-8ee676be2a76', resource_group='aml-workbook')"
  }
}


### Download and register the model

In [9]:
import os
import tarfile
import urllib.request

if not os.path.isdir("models"):
    os.mkdir("models")

if not os.path.exists("./models/inception_v3.ckpt"):
    response = urllib.request.urlretrieve("http://download.tensorflow.org/models/inception_v3_2016_08_28.tar.gz", "model.tar.gz")
    tar = tarfile.open("model.tar.gz", "r:gz")
    tar.extractall("models")

In [10]:
from azureml.core.model import Model
 
model = Model.register(model_path="models/inception_v3.ckpt",
                       model_name="inception",
                       tags={"pretrained": "inception"},
                       description="Imagenet trained tensorflow inception",
                       workspace=ws)

Registering model inception


### Create and Attach the Remote Compute Target

In [11]:
from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.exceptions import ComputeTargetException
compute_name = "gpu-cluster-js01"

# checks to see if compute target already exists in workspace, else create it
try:
    compute_target = ComputeTarget(workspace=ws, name=compute_name)
except ComputeTargetException:
    config = AmlCompute.provisioning_configuration(vm_size="STANDARD_NC6",
                                                   vm_priority="lowpriority", 
                                                   min_nodes=0, 
                                                   max_nodes=1)

    compute_target = ComputeTarget.create(workspace=ws, name=compute_name, provisioning_configuration=config)
    compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)

Creating
Succeeded
AmlCompute wait for completion finished

Minimum number of nodes requested have been provisioned


### Write a scoring script

In [26]:
%%writefile batch_scoring.py

import os
import argparse
import datetime
import time
import tensorflow as tf
from math import ceil
import numpy as np
import shutil
from tensorflow.contrib.slim.python.slim.nets import inception_v3

from azureml.core import Run
from azureml.core.model import Model
from azureml.core.dataset import Dataset

slim = tf.contrib.slim

image_size = 299
num_channel = 3


def get_class_label_dict(labels_dir):
    label = []
    labels_path = os.path.join(labels_dir, 'labels.txt')
    proto_as_ascii_lines = tf.gfile.GFile(labels_path).readlines()
    for l in proto_as_ascii_lines:
        label.append(l.rstrip())
    return label


def init():
    global g_tf_sess, probabilities, label_dict, input_images

    parser = argparse.ArgumentParser(description="Start a tensorflow model serving")
    parser.add_argument('--model_name', dest="model_name", required=True)
    parser.add_argument('--labels_dir', dest="labels_dir", required=True)
    args, _ = parser.parse_known_args()
    print("args:", args)
    
    dl_li = Run.get_context().input_datasets['labels_input']
    dl_ii = Run.get_context().input_datasets['input_images']
    print("li:", dl_li)
    print("ii:", dl_ii)

    label_dict = get_class_label_dict(args.labels_dir)
    classes_num = len(label_dict)

    with slim.arg_scope(inception_v3.inception_v3_arg_scope()):
        input_images = tf.placeholder(tf.float32, [1, image_size, image_size, num_channel])
        logits, _ = inception_v3.inception_v3(input_images,
                                              num_classes=classes_num,
                                              is_training=False)
        probabilities = tf.argmax(logits, 1)

    config = tf.ConfigProto()
    config.gpu_options.allow_growth = True
    g_tf_sess = tf.Session(config=config)
    g_tf_sess.run(tf.global_variables_initializer())
    g_tf_sess.run(tf.local_variables_initializer())

    model_path = Model.get_model_path(args.model_name)
    saver = tf.train.Saver()
    saver.restore(g_tf_sess, model_path)


def file_to_tensor(file_path):
    image_string = tf.read_file(file_path)
    image = tf.image.decode_image(image_string, channels=3)

    image.set_shape([None, None, None])
    image = tf.image.resize_images(image, [image_size, image_size])
    image = tf.divide(tf.subtract(image, [0]), [255])
    image.set_shape([image_size, image_size, num_channel])
    return image


def run(mini_batch):
    result_list = []
    for file_path in mini_batch:
        print("fp:", file_path)
        test_image = file_to_tensor(file_path)
        out = g_tf_sess.run(test_image)
        result = g_tf_sess.run(probabilities, feed_dict={input_images: [out]})
        result_list.append(os.path.basename(file_path) + ": " + label_dict[result[0]])
    return result_list

Overwriting batch_scoring.py


### Build the pipeline

In [27]:
from azureml.core import Environment
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import DEFAULT_GPU_IMAGE

cd = CondaDependencies.create(pip_packages=["tensorflow-gpu==1.15.2",
                                            "azureml-core", "azureml-dataprep[fuse]"])
env = Environment(name="parallelenv")
env.python.conda_dependencies = cd
env.docker.base_image = DEFAULT_GPU_IMAGE

In [28]:
from azureml.pipeline.steps import ParallelRunConfig

parallel_run_config = ParallelRunConfig(
    environment=env,
    entry_script="batch_scoring.py",
    source_directory=".",
    output_action="append_row",
    mini_batch_size="20",
    error_threshold=1,
    compute_target=compute_target,
    process_count_per_node=2,
    node_count=1
)

### Create the Pipeline Step

In [29]:
from azureml.pipeline.steps import ParallelRunStep
from datetime import datetime

parallel_step_name = "batchscoring-" + datetime.now().strftime("%Y%m%d%H%M")

label_config = label_ds.as_named_input("labels_input")

batch_score_step = ParallelRunStep(
    name=parallel_step_name,
    inputs=[input_images.as_named_input("input_images")],
    output=output_dir,
    arguments=["--model_name", "inception",
               "--labels_dir", label_config],
    side_inputs=[label_config],
    parallel_run_config=parallel_run_config,
    allow_reuse=False
)

### Submit the Pipeline

In [30]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[batch_score_step])
pipeline_run = Experiment(ws, 'batch_scoring').submit(pipeline)
pipeline_run.wait_for_completion(show_output=True)

Created step batchscoring-202010212327 [dc3364fd][9889e7cf-6242-44c2-816f-f2ef95da8ca9], (This step will run and generate new outputs)
Using data reference input_images_0 for StepId [c678f2ce][d9e70b7f-3a5b-4292-b7bc-7fb2b00e0139], (Consumers of this data are eligible to reuse prior runs.)Using data reference labels_input_0 for StepId [b5883ba2][ffd377a9-d8f7-4adc-a2c6-ac66eea45dc0], (Consumers of this data are eligible to reuse prior runs.)

Submitted PipelineRun 8166fa84-5c06-4d9f-b0d7-1803c0fef5fc
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/batch_scoring/runs/8166fa84-5c06-4d9f-b0d7-1803c0fef5fc?wsid=/subscriptions/b060ea58-a590-43cc-86ea-8ee676be2a76/resourcegroups/aml-workbook/workspaces/aml-workbook
PipelineRunId: 8166fa84-5c06-4d9f-b0d7-1803c0fef5fc
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/batch_scoring/runs/8166fa84-5c06-4d9f-b0d7-1803c0fef5fc?wsid=/subscriptions/b060ea58-a590-43cc-86ea-8ee676be2a76/resourcegroups/aml-w


Streaming azureml-logs/70_driver_log.txt
2020/10/21 21:34:33 logger.go:297: Attempt 1 of http call to http://10.0.0.4:16384/sendlogstoartifacts/info
2020/10/21 21:34:33 logger.go:297: Attempt 1 of http call to http://10.0.0.4:16384/sendlogstoartifacts/status
[2020-10-21T21:34:34.761186] Entering context manager injector.
[context_manager_injector.py] Command line Options: Namespace(inject=['ProjectPythonPath:context_managers.ProjectPythonPath', 'Dataset:context_managers.Datasets', 'RunHistory:context_managers.RunHistory', 'TrackUserError:context_managers.TrackUserError'], invocation=['driver/amlbi_main.py', '--client_sdk_version', '1.15.0', '--scoring_module_name', 'batch_scoring.py', '--mini_batch_size', '20', '--error_threshold', '1', '--output_action', 'append_row', '--logging_level', 'INFO', '--run_invocation_timeout', '60', '--run_max_try', '3', '--create_snapshot_at_runtime', 'True', '--output', '/mnt/batch/tasks/shared/LS_root/jobs/aml-workbook/azureml/301df1ee-ce24-42ae-869c-5



PipelineRun Execution Summary
PipelineRun Status: Finished
{'runId': '8166fa84-5c06-4d9f-b0d7-1803c0fef5fc', 'status': 'Completed', 'startTimeUtc': '2020-10-21T21:27:48.939559Z', 'endTimeUtc': '2020-10-21T21:36:52.554779Z', 'properties': {'azureml.runsource': 'azureml.PipelineRun', 'runSource': 'SDK', 'runType': 'SDK', 'azureml.parameters': '{}'}, 'inputDatasets': [], 'logFiles': {'logs/azureml/executionlogs.txt': 'https://amlworkbook2803154736.blob.core.windows.net/azureml/ExperimentRun/dcid.8166fa84-5c06-4d9f-b0d7-1803c0fef5fc/logs/azureml/executionlogs.txt?sv=2019-02-02&sr=b&sig=JzPOd0HMMIys4srwJn7C6VlGP8GOScdVtvvq%2BHa5jPQ%3D&st=2020-10-21T21%3A18%3A28Z&se=2020-10-22T05%3A28%3A28Z&sp=r', 'logs/azureml/stderrlogs.txt': 'https://amlworkbook2803154736.blob.core.windows.net/azureml/ExperimentRun/dcid.8166fa84-5c06-4d9f-b0d7-1803c0fef5fc/logs/azureml/stderrlogs.txt?sv=2019-02-02&sr=b&sig=jQscJLCdFpVqWS9cEdidnakETrT7buiLFnzrlfpq%2FmM%3D&st=2020-10-21T21%3A18%3A28Z&se=2020-10-22T05%3A28

'Finished'

### Download and review output

In [31]:
import pandas as pd

batch_run = next(pipeline_run.get_children())
batch_output = batch_run.get_output_data("scores")
batch_output.download(local_path="inception_results")

for root, dirs, files in os.walk("inception_results"):
    for file in files:
        if file.endswith("parallel_run_step.txt"):
            result_file = os.path.join(root, file)

df = pd.read_csv(result_file, delimiter=":", header=None)
df.columns = ["Filename", "Prediction"]
print("Prediction has ", df.shape[0], " rows")
df.head(10)

Prediction has  110  rows


Unnamed: 0,Filename,Prediction
0,n02691156_5992_airplane.jpg,rapeseed
1,n02764044_41120_axe.jpg,"""plane, carpenters plane, woodworking plane"""
2,n02766320_8764_baby_bed.jpg,"crib, cot"
3,n02769748_23541_backpack.jpg,"backpack, back pack, knapsack, packsack, ruck..."
4,n02769748_3207_backpack.jpg,"backpack, back pack, knapsack, packsack, ruck..."
5,n02799071_5092_baseball.jpg,baseball
6,n02802426_5133_basketball.jpg,basketball
7,n02815834_4632_beaker.jpg,beaker
8,n02828884_8228_bench.jpg,park bench
9,n02834778_5255_bicycle.jpg,"mountain bike, all-terrain bike, off-roader"
