# Batch endpoints for batch scoring

In [2]:
from azure.ai.ml import MLClient, Input
from azure.ai.ml.entities import (
    BatchEndpoint,
    BatchDeployment,
    Model,
    Environment,
    BatchRetrySettings,
    CodeConfiguration,
)
from azure.identity import DefaultAzureCredential
from azure.ai.ml.constants import AssetTypes, BatchDeploymentOutputAction

Configure the workspace

In [3]:
# enter details of your AML workspace
subscription_id = "7bb22fd9-06b5-445f-b8d4-570117630941"
resource_group = "RG_AML_Blogpost2022"
workspace = "AML_Blogpost2022"

ml_client = MLClient(
    DefaultAzureCredential(), subscription_id, resource_group, workspace
)

#  Create Batch Endpoint

Batch endpoints are endpoints that are used batch inferencing on large volumes of data over a period of time. Batch endpoints receive pointers to data and run jobs asynchronously to process the data in parallel on compute clusters. Batch endpoints store outputs to a data store for further analysis.

To create an online endpoint we will use BatchEndpoint. This class allows user to configure the following key aspects:

name - Name of the endpoint. Needs to be unique at the Azure region level
auth_mode - The authentication method for the endpoint. Currently only Azure Active Directory (Azure AD) token-based (aad_token) authentication is supported.
identity- The managed identity configuration for accessing Azure resources for endpoint provisioning and inference.
defaults - Default settings for the endpoint.
deployment_name - Name of the deployment that will serve as the default deployment for the endpoint.
description- Description of the endpoint.


## Configure endpoint

In [4]:
import random
import string

# Creating a unique endpoint name by including a random suffix
allowed_chars = string.ascii_lowercase + string.digits
endpoint_suffix = "".join(random.choice(allowed_chars) for x in range(5))
endpoint_name = "mnist-batch-" + endpoint_suffix

# endpoint configuration
endpoint = BatchEndpoint(
    name=endpoint_name,
    description="A batch endpoint for scoring images from the MNIST dataset.",
    tags={"type": "deep-learning"},
)

and finally, we create an endpoint

In [5]:
ml_client.begin_create_or_update(endpoint).result()

<azure.ai.ml._restclient.v2022_05_01.models._models_py3.BatchEndpointData at 0x7fb52c13ec50>

# Registering the model

We are going to deploy a model created using Torch to solve the typical MNIST ( )classification problem.

## Registering the model in the workspace
We need to register the model in order to use it with Azure Machine Learning:

In [8]:
model_name = "mnist-classification-torch"
model_local_path = "./Day22/model/"

In [9]:
if not any(filter(lambda m: m.name == model_name, ml_client.models.list())):
    print(f"Model {model_name} is not registered. Creating...")
    model = ml_client.models.create_or_update(
        Model(name=model_name, path=model_local_path, type=AssetTypes.CUSTOM_MODEL)
    )

Model mnist-classification-torch is not registered. Creating...


Uploading model (1.13 MBs):   0%|          | 0/1131465 [00:00<?, ?it/s]Uploading model (1.13 MBs): 100%|██████████| 1131465/1131465 [00:00<00:00, 15412747.23it/s]




In [21]:
#Let's get a reference to the model:
model = ml_client.models.get(name=model_name, label="latest")

# Create a deployment
A deployment is a set of resources required for hosting the model that does the actual inferencing.

In [11]:
%%writefile Day22/code/batch_driver.py

import os
import numpy as np
import pandas as pd
import tensorflow as tf
from PIL import Image
from azureml.core import Model


def init():
    global g_tf_sess

    # AZUREML_MODEL_DIR is an environment variable created during deployment
    # It is the path to the model folder (./azureml-models)
    # Please provide your model's folder name if there's one
    model_path = os.path.join(os.environ["AZUREML_MODEL_DIR"], "model")

    # contruct graph to execute
    tf.reset_default_graph()
    saver = tf.train.import_meta_graph(os.path.join(model_path, "mnist-tf.model.meta"))
    g_tf_sess = tf.Session(config=tf.ConfigProto(device_count={"GPU": 0}))
    saver.restore(g_tf_sess, os.path.join(model_path, "mnist-tf.model"))


def run(mini_batch):
    print(f"run method start: {__file__}, run({mini_batch})")
    resultList = []
    in_tensor = g_tf_sess.graph.get_tensor_by_name("network/X:0")
    output = g_tf_sess.graph.get_tensor_by_name("network/output/MatMul:0")

    for image in mini_batch:
        # prepare each image
        data = Image.open(image)
        np_im = np.array(data).reshape((1, 784))
        # perform inference
        inference_result = output.eval(feed_dict={in_tensor: np_im}, session=g_tf_sess)
        # find best probability, and add to result list
        best_result = np.argmax(inference_result)
        resultList.append([os.path.basename(image), best_result])

    return pd.DataFrame(resultList)

Overwriting Day22/code/batch_driver.py


# Creating the compute

Batch deployments can run on any Azure ML compute that already exists in the workspace. That means that multiple batch deployments can share the same compute infrastructure. In this example, we are going to work on an AzureML compute cluster called cpu-cluster. Let's verify the compute exists on the workspace or create it otherwise.

In [23]:
from azure.ai.ml.entities import AmlCompute

compute_name = "cpu-cluster"

if not any(filter(lambda m: m.name == compute_name, ml_client.compute.list())):
    print(f"Compute {compute_name} is not created. Creating...")
    compute_cluster = AmlCompute(
        name=compute_name,
        description="CPU cluster compute",
        min_instances=0,
        max_instances=1,
    )
    ml_client.compute.begin_create_or_update(compute_cluster).result()

Compute cpu-cluster is not created. Creating...


And create an environment

In [24]:
env = Environment(
    conda_file="./Day22/environment/conda.yaml",
    image="mcr.microsoft.com/azureml/openmpi3.1.2-ubuntu18.04:latest",
)

Configure the deployment

In [25]:
deployment = BatchDeployment(
    name="mnist-torch-dpl",
    description="A deployment using Torch to solve the MNIST classification dataset.",
    endpoint_name=endpoint_name,
    model=model,
    code_configuration=CodeConfiguration(
        code="./Day22/code/", scoring_script="batch_driver.py"
    ),
    environment=env,
    compute=compute_name,
    instance_count=2,
    max_concurrency_per_instance=2,
    mini_batch_size=10,
    output_action=BatchDeploymentOutputAction.APPEND_ROW,
    output_file_name="predictions.csv",
    retry_settings=BatchRetrySettings(max_retries=3, timeout=30),
    logging_level="info",
)

Create the deployment

In [26]:
ml_client.begin_create_or_update(deployment).result()

BatchDeployment({'endpoint_name': 'mnist-batch-r8c5k', 'type': None, 'name': 'mnist-torch-dpl', 'description': 'A deployment using Torch to solve the MNIST classification dataset.', 'tags': {}, 'properties': {}, 'id': '/subscriptions/7bb22fd9-06b5-445f-b8d4-570117630941/resourceGroups/RG_AML_Blogpost2022/providers/Microsoft.MachineLearningServices/workspaces/AML_Blogpost2022/batchEndpoints/mnist-batch-r8c5k/deployments/mnist-torch-dpl', 'Resource__source_path': None, 'base_path': '/mnt/batch/tasks/shared/LS_root/mounts/clusters/amlblog2022-ds12-v2/code/Users/tomaz.kastrun', 'creation_context': None, 'serialize': <msrest.serialization.Serializer object at 0x7fb4f3f40d90>, 'model': '/subscriptions/7bb22fd9-06b5-445f-b8d4-570117630941/resourceGroups/RG_AML_Blogpost2022/providers/Microsoft.MachineLearningServices/workspaces/AML_Blogpost2022/models/mnist-classification-torch/versions/1', 'code_configuration': {'code': '/subscriptions/7bb22fd9-06b5-445f-b8d4-570117630941/resourceGroups/RG_AM

In [27]:
#Let's update the default deployment name in the endpoint:
endpoint = ml_client.batch_endpoints.get(endpoint_name)
endpoint.defaults.deployment_name = deployment.name
ml_client.batch_endpoints.begin_create_or_update(endpoint).result()

<azure.ai.ml._restclient.v2022_05_01.models._models_py3.BatchEndpointData at 0x7fb4f3f43400>

# Invoke the endpoint

Let's use public data available in an Azure Storage Account.

In [28]:
input = Input(
    type="uri_folder",
    path="https://pipelinedata.blob.core.windows.net/sampledata/mnist",
)

invoke the job

In [30]:
job = ml_client.batch_endpoints.invoke(
    endpoint_name=endpoint_name,
    input=input,
)

GEt the job detail

In [31]:
ml_client.jobs.get(job.name)

Experiment,Name,Type,Status,Details Page
mnist-batch-r8c5k,967eaa94-8af4-4c73-8b9e-fdc111ce7918,pipeline,Failed,Link to Azure Machine Learning studio


We wait until the job finished

In [32]:
from time import sleep

print(f"Waiting for batch deployment job {job.name}", end="")
while ml_client.jobs.get(name=job.name).status not in ["Completed", "Failed"]:
    sleep(10)
    print(".", end="")

print(" [DONE]")

Waiting for batch deployment job 967eaa94-8af4-4c73-8b9e-fdc111ce7918 [DONE]


Download the results

In [35]:
scoring_job = list(ml_client.jobs.list(parent_job_name=job.name))[0]
#output the results
ml_client.jobs.download(name=scoring_job.name, download_path=".", output_name="score")

In [36]:
# and read using pandas
import pandas as pd

score = pd.read_csv(
    "Day22/score/predictions.csv",
    header=None,
    names=["file", "class"],
    sep=" ",
)
score

FileNotFoundError: [Errno 2] No such file or directory: 'Day22/score/predictions.csv'

In [37]:
# cleanup
ml_client.batch_endpoints.begin_delete(name=endpoint_name)

<azure.core.polling._poller.LROPoller at 0x7fb4e52f1930>

...........