# Creating a Batch Inferencing Service

In previous labs, you used an Azure ML *pipeline* to automate the training and registration of a model, and you published a model as a web service for real-time *inferencing* (getting predictions from a model). Now you'll combine these two concepts to create a pipeline for *batch inferencing*. What does that mean? Well, imagine a health clinic takes patient measurements all day, saving the details for each patient in a separate file. Then overnight, the diabetes prediction model can be used to process all of the day's patient data as a batch, generating predictions that will be waiting the following morning so that the clinic can follow up with patients who are predicted to be at risk of diabetes. That's what you'll implement in this exercise.

## Before You Start

Before you start this lab, ensure that you have completed the *Create an Azure Machine Learning Workspace* and *Create a Compute Instance* tasks in [Lab 1: Getting Started with Azure Machine Learning](./labdocs/Lab01.md). Then open this notebook in Jupyter on your Compute Instance.

Let's start by ensuring that you have the latest version of the Azure ML SDK installed.

In [None]:
!pip install --upgrade azureml-sdk[notebooks,automl,explain]

## Connect to Your Workspace

Now connect to your workspace using the Azure ML SDK.

> **Note**: You may be prompted to authenticate. Just copy the code and click the link provided to sign into your Azure subscription, and then return to this notebook.

In [1]:
import azureml.core
from azureml.core import Workspace

# Load the workspace from the config file
ws = Workspace.from_config()
print('Ready to use Azure ML {} to work with {}'.format(azureml.core.VERSION, ws.name))

Ready to use Azure ML 1.18.0 to work with nikhilsuthardp100


## Train and Register a Model

Now run the cell below to train and register a model.

In [2]:
from azureml.core import Experiment
from azureml.core import Model
import pandas as pd
import numpy as np
import joblib
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve

# Create an Azure ML experiment in your workspace
experiment = Experiment(workspace = ws, name = "diabetes-training")
run = experiment.start_logging()
print("Starting experiment:", experiment.name)

# load the diabetes dataset
print("Loading Data...")
diabetes = pd.read_csv('data/diabetes.csv')

# Separate features and labels
X, y = diabetes[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age']].values, diabetes['Diabetic'].values

# Split data into training set and test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, random_state=0)

# Train a decision tree model
print('Training a decision tree model')
model = DecisionTreeClassifier().fit(X_train, y_train)

# calculate accuracy
y_hat = model.predict(X_test)
acc = np.average(y_hat == y_test)
print('Accuracy:', acc)
run.log('Accuracy', np.float(acc))

# calculate AUC
y_scores = model.predict_proba(X_test)
auc = roc_auc_score(y_test,y_scores[:,1])
print('AUC: ' + str(auc))
run.log('AUC', np.float(auc))

# Save the trained model
model_file = 'diabetes_model.pkl'
joblib.dump(value=model, filename=model_file)
run.upload_file(name = 'outputs/' + model_file, path_or_stream = './' + model_file)

# Complete the run
run.complete()

# Register the model
run.register_model(model_path='outputs/diabetes_model.pkl', model_name='diabetes_model',
                   tags={'Training context':'Inline Training'},
                   properties={'AUC': run.get_metrics()['AUC'], 'Accuracy': run.get_metrics()['Accuracy']})

print('Model trained and registered.')

Starting experiment: diabetes-training
Loading Data...
Training a decision tree model
Accuracy: 0.8893333333333333
AUC: 0.8761132394646499
Model trained and registered.


## Generate and Upload Batch Data

Since we don't actually have a fully staffed clinic with patients from whom to get new data for this course, you'll generate a random sample from our diabetes CSV file and use those to test the pipeline. Then you'll upload that data to a datastore in the Azure Machine Learning workspace and register a dataset for it.

In [3]:
from azureml.core import Datastore, Dataset
import pandas as pd
import os

# Load the diabetes data
diabetes = pd.read_csv('data/diabetes2.csv')
# Get a 100-item sample of the feature columns (not the diabetic label)
sample = diabetes[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age']].sample(n=100).values

# Create a folder
batch_folder = './batch-data'
os.makedirs(batch_folder, exist_ok=True)
print("Folder created!")

# Save each sample as a separate file
print("Saving files...")
for i in range(100):
    fname = str(i+1) + '.csv'
    sample[i].tofile(os.path.join(batch_folder, fname), sep=",")
print("files saved!")

# Upload the files to the default datastore
print("Uploading files to datastore...")
default_ds = ws.get_default_datastore()
default_ds.upload(src_dir="batch-data", target_path="batch-data", overwrite=True, show_progress=True)

# Register a dataset for the input data
batch_data_set = Dataset.File.from_files(path=(default_ds, 'batch-data/'), validate=False)
try:
    batch_data_set = batch_data_set.register(workspace=ws, 
                                             name='batch-data',
                                             description='batch data',
                                             create_new_version=True)
except Exception as ex:
    print(ex)

print("Done!")

Folder created!
Saving files...
files saved!
Uploading files to datastore...
Uploading an estimated of 100 files
Uploading batch-data/17.csv
Uploaded batch-data/17.csv, 1 files out of an estimated total of 100
Uploading batch-data/1.csv
Uploaded batch-data/1.csv, 2 files out of an estimated total of 100
Uploading batch-data/10.csv
Uploaded batch-data/10.csv, 3 files out of an estimated total of 100
Uploading batch-data/100.csv
Uploaded batch-data/100.csv, 4 files out of an estimated total of 100
Uploading batch-data/11.csv
Uploaded batch-data/11.csv, 5 files out of an estimated total of 100
Uploading batch-data/12.csv
Uploaded batch-data/12.csv, 6 files out of an estimated total of 100
Uploading batch-data/13.csv
Uploaded batch-data/13.csv, 7 files out of an estimated total of 100
Uploading batch-data/14.csv
Uploaded batch-data/14.csv, 8 files out of an estimated total of 100
Uploading batch-data/15.csv
Uploaded batch-data/15.csv, 9 files out of an estimated total of 100
Uploading batc

Done!


## Create Compute

We'll need a compute context for the pipeline, so we'll create an Azure Machine Learning compute cluster in your workspace (or use an existing one if you have created it previously).

> **Important**: Change *your-compute-cluster* to a unique name for your compute cluster in the code below before running it! Cluster names must be globally unique names between 2 to 16 characters in length. Valid characters are letters, digits, and the - character.

In [4]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

cluster_name = "nikhilvmcluster"

try:
    # Check for existing compute target
    inference_cluster = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # If it doesn't already exist, create it
    try:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS11_V2', max_nodes=2)
        inference_cluster = ComputeTarget.create(ws, cluster_name, compute_config)
        inference_cluster.wait_for_completion(show_output=True)
    except Exception as ex:
        print(ex)
    

Found existing cluster, use it.


## Create a Pipeline for Batch Inferencing

Now we're ready to define the pipeline we'll use for batch inferencing. Our pipeline will need Python code to perform the batch inferencing, so let's create a folder where we can keep all the files used by the pipeline:

In [5]:
import os
# Create a folder for the experiment files
experiment_folder = 'batch_pipeline'
os.makedirs(experiment_folder, exist_ok=True)

print(experiment_folder)

batch_pipeline


Now we'll create a Python script to do the actual work, and save it in the pipeline folder:

In [6]:
%%writefile $experiment_folder/batch_diabetes.py
import os
import numpy as np
from azureml.core import Model
import joblib


def init():
    # Runs when the pipeline step is initialized
    global model

    # load the model
    model_path = Model.get_model_path('diabetes_model')
    model = joblib.load(model_path)


def run(mini_batch):
    # This runs for each batch
    resultList = []

    # process each file in the batch
    for f in mini_batch:
        # Read the comma-delimited data into an array
        data = np.genfromtxt(f, delimiter=',')
        # Reshape into a 2-dimensional array for prediction (model expects multiple items)
        prediction = model.predict(data.reshape(1, -1))
        # Append prediction to results
        resultList.append("{}: {}".format(os.path.basename(f), prediction[0]))
    return resultList

Writing batch_pipeline/batch_diabetes.py


Next we'll define a run context that includes the dependencies required by the script

In [7]:
from azureml.core import Environment
from azureml.core.runconfig import DEFAULT_CPU_IMAGE
from azureml.core.runconfig import CondaDependencies

# Add dependencies required by the model
# For scikit-learn models, you need scikit-learn
# For parallel pipeline steps, you need azureml-core and azureml-dataprep[fuse]
cd = CondaDependencies.create(pip_packages=['scikit-learn','azureml-defaults','azureml-core','azureml-dataprep[fuse]'])

batch_env = Environment(name='batch_environment')
batch_env.python.conda_dependencies = cd
batch_env.docker.enabled = True
batch_env.docker.base_image = DEFAULT_CPU_IMAGE
print('Configuration ready.')

Configuration ready.


You're going to use a pipeline to run the batch prediction script, generate predictions from the input data, and save the results as a text file in the output folder. To do this, you can use a **ParallelRunStep**, which enables the batch data to be processed in parallel and the results collated in a single output file named *parallel_run_step.txt*.

In [8]:
from azureml.pipeline.steps import ParallelRunConfig, ParallelRunStep
from azureml.pipeline.core import PipelineData

default_ds = ws.get_default_datastore()

output_dir = PipelineData(name='inferences', 
                          datastore=default_ds, 
                          output_path_on_compute='diabetes/results')

parallel_run_config = ParallelRunConfig(
    source_directory=experiment_folder,
    entry_script="batch_diabetes.py",
    mini_batch_size="5",
    error_threshold=10,
    output_action="append_row",
    environment=batch_env,
    compute_target=inference_cluster,
    node_count=2)

parallelrun_step = ParallelRunStep(
    name='batch-score-diabetes',
    parallel_run_config=parallel_run_config,
    inputs=[batch_data_set.as_named_input('diabetes_batch')],
    output=output_dir,
    arguments=[],
    allow_reuse=True
)

print('Steps defined')

Steps defined


Now it's time to put the step into a pipeline, and run it.

> **Note**: This may take some time! You can monitor the experiment run in [Azure Machine Learning studio](https://ml.azure.com).

In [9]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])
pipeline_run = Experiment(ws, 'batch_prediction_pipeline').submit(pipeline)
print('Running pipeline...')
pipeline_run.wait_for_completion(show_output=True)

Created step batch-score-diabetes [e3a7cbed][ba9712e0-6e09-4cc4-b589-67720d23bf7d], (This step will run and generate new outputs)
Submitted PipelineRun 99449476-8765-452c-a28f-e734e6f92a0a
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/batch_prediction_pipeline/runs/99449476-8765-452c-a28f-e734e6f92a0a?wsid=/subscriptions/71bfcf50-7e10-4546-9c9a-fd4f1ee42434/resourcegroups/nikhil-suthardp100/workspaces/nikhilsuthardp100
Running pipeline...
PipelineRunId: 99449476-8765-452c-a28f-e734e6f92a0a
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/batch_prediction_pipeline/runs/99449476-8765-452c-a28f-e734e6f92a0a?wsid=/subscriptions/71bfcf50-7e10-4546-9c9a-fd4f1ee42434/resourcegroups/nikhil-suthardp100/workspaces/nikhilsuthardp100
PipelineRun Status: NotStarted
PipelineRun Status: Running


StepRunId: 9212ba86-c3d2-4914-89e8-68de3eacc10c
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/batch_prediction_pipeline/runs/9212ba8


#
# To activate this environment, use
#
#     $ conda activate /azureml-envs/azureml_be341871f2a1af415628bd3b45b58373
#
# To deactivate an active environment, use
#
#     $ conda deactivate

[91m

  current version: 4.7.12
  latest version: 4.9.2

Please update conda by running

    $ conda update -n base -c defaults conda


Removing intermediate container 0f983070c0ea
 ---> 4d538f17b8ea
Step 9/15 : ENV PATH /azureml-envs/azureml_be341871f2a1af415628bd3b45b58373/bin:$PATH
 ---> Running in 60eda27ca390
Removing intermediate container 60eda27ca390
 ---> ee0f7c4802f9
Step 10/15 : ENV AZUREML_CONDA_ENVIRONMENT_PATH /azureml-envs/azureml_be341871f2a1af415628bd3b45b58373
 ---> Running in 27b906bd50f0
Removing intermediate container 27b906bd50f0
 ---> c2c439fa3602
Step 11/15 : ENV LD_LIBRARY_PATH /azureml-envs/azureml_be341871f2a1af415628bd3b45b58373/lib:$LD_LIBRARY_PATH
 ---> Running in 13492735998c
Removing intermediate container 13492735998c
 ---> 69503e2b9d9a
Step 12/15 : COPY azureml-e


Streaming azureml-logs/65_job_prep-tvmps_e15e28665ad0e097097afebcb8c998a58e4f7578fb14939532ae6270bc3b1953_d.txt
[2020-11-25T11:17:27.076407] Entering job preparation.
[2020-11-25T11:17:27.795303] Starting job preparation.
[2020-11-25T11:17:27.795345] Extracting the control code.
[2020-11-25T11:17:27.814072] fetching and extracting the control code on master node.
[2020-11-25T11:17:27.814131] Starting extract_project.
[2020-11-25T11:17:27.814419] Starting to extract zip file.
[2020-11-25T11:17:28.755378] Finished extracting zip file.
[2020-11-25T11:17:28.889554] Using urllib.request Python 3.0 or later
[2020-11-25T11:17:28.889616] Start fetching snapshots.
[2020-11-25T11:17:28.889656] Start fetching snapshot.
[2020-11-25T11:17:28.889668] Retrieving project from snapshot: 2e2f939c-cb97-4152-904d-7039469577eb
Starting the daemon thread to refresh tokens in background for process with pid = 78
[2020-11-25T11:17:29.826921] Finished fetching snapshot.
[2020-11-25T11:17:29.826971] Start fetc



PipelineRun Execution Summary
PipelineRun Status: Finished
{'runId': '99449476-8765-452c-a28f-e734e6f92a0a', 'status': 'Completed', 'startTimeUtc': '2020-11-25T11:06:18.888818Z', 'endTimeUtc': '2020-11-25T11:19:39.506076Z', 'properties': {'azureml.runsource': 'azureml.PipelineRun', 'runSource': 'SDK', 'runType': 'SDK', 'azureml.parameters': '{}'}, 'inputDatasets': [], 'outputDatasets': [], 'logFiles': {'logs/azureml/executionlogs.txt': 'https://nikhilsuthardp7211953111.blob.core.windows.net/azureml/ExperimentRun/dcid.99449476-8765-452c-a28f-e734e6f92a0a/logs/azureml/executionlogs.txt?sv=2019-02-02&sr=b&sig=3SJp6dzYmHogZH3tORw0ws4giNwkH7F6J3akDY4O73s%3D&st=2020-11-25T10%3A56%3A29Z&se=2020-11-25T19%3A06%3A29Z&sp=r', 'logs/azureml/stderrlogs.txt': 'https://nikhilsuthardp7211953111.blob.core.windows.net/azureml/ExperimentRun/dcid.99449476-8765-452c-a28f-e734e6f92a0a/logs/azureml/stderrlogs.txt?sv=2019-02-02&sr=b&sig=yLS8aJzxaWAn488OJJxcS5ZiD18zkAcj3PcRFS%2FIzkw%3D&st=2020-11-25T10%3A56%3

'Finished'

When the pipeline has finished running, the resulting predictions will have been saved in the outputs of the experiment associated with the first (and only) step in the pipeline. You can retrieve it as follows:

In [10]:
import pandas as pd
import shutil

shutil.rmtree('diabetes-results', ignore_errors=True)

prediction_run = next(pipeline_run.get_children())
prediction_output = prediction_run.get_output_data('inferences')
prediction_output.download(local_path='diabetes-results')


for root, dirs, files in os.walk('diabetes-results'):
    for file in files:
        if file.endswith('parallel_run_step.txt'):
            result_file = os.path.join(root,file)

# cleanup output format
df = pd.read_csv(result_file, delimiter=":", header=None)
df.columns = ["File", "Prediction"]

# Display the first 20 results
df.head(20)

Unnamed: 0,File,Prediction
0,22.csv,1
1,23.csv,0
2,24.csv,0
3,25.csv,0
4,26.csv,1
5,59.csv,0
6,6.csv,0
7,60.csv,1
8,61.csv,0
9,62.csv,0


## Publish the Pipeline and use its REST Interface

Now that you have a working pipeline for batch inferencing, you can publish it and use a REST endpoint to run it from an application.

In [11]:
published_pipeline = pipeline_run.publish_pipeline(
    name='Diabetes_Parallel_Batch_Pipeline', description='Batch scoring of diabetes data', version='1.0')

published_pipeline

Name,Id,Status,Endpoint
Diabetes_Parallel_Batch_Pipeline,22a31c28-54cd-4a9c-ad20-bb9b3d7c1610,Active,REST Endpoint


Note that the published pipeline has an endpoint, which you can see in the Azure portal. You can also find it as a property of the published pipeline object:

In [12]:
rest_endpoint = published_pipeline.endpoint
print(rest_endpoint)

https://eastus2.api.azureml.ms/pipelines/v1.0/subscriptions/71bfcf50-7e10-4546-9c9a-fd4f1ee42434/resourceGroups/nikhil-suthardp100/providers/Microsoft.MachineLearningServices/workspaces/nikhilsuthardp100/PipelineRuns/PipelineSubmit/22a31c28-54cd-4a9c-ad20-bb9b3d7c1610


To use the endpoint, client applications need to make a REST call over HTTP. This request must be authenticated, so an authorization header is required. To test this out, we'll use the authorization header from your current connection to your Azure workspace, which you can get using the following code:

> **Note**: A real application would require a service principal with which to be authenticated.

In [13]:
from azureml.core.authentication import InteractiveLoginAuthentication

interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()
print('Authentication header ready.')

Authentication header ready.


Now we're ready to call the REST interface. The pipeline runs asynchronously, so we'll get an identifier back, which we can use to track the pipeline experiment as it runs:

In [14]:
import requests

rest_endpoint = published_pipeline.endpoint
response = requests.post(rest_endpoint, 
                         headers=auth_header, 
                         json={"ExperimentName": "Batch_Pipeline_via_REST"})
run_id = response.json()["Id"]
run_id

'48a0bd37-da94-4422-bcab-d861c926d9ac'

Since we have the run ID, we can view the experiment as it runs:

In [None]:
from azureml.pipeline.core.run import PipelineRun

published_pipeline_run = PipelineRun(ws.experiments["Batch_Pipeline_via_REST"], run_id)
print('Running pipeline...')
published_pipeline_run.wait_for_completion(show_output=True)

Running pipeline...
PipelineRunId: 48a0bd37-da94-4422-bcab-d861c926d9ac
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/Batch_Pipeline_via_REST/runs/48a0bd37-da94-4422-bcab-d861c926d9ac?wsid=/subscriptions/71bfcf50-7e10-4546-9c9a-fd4f1ee42434/resourcegroups/nikhil-suthardp100/workspaces/nikhilsuthardp100
PipelineRun Status: Running


StepRunId: ee745dfb-5d31-4584-830e-145f4da50a01
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/Batch_Pipeline_via_REST/runs/ee745dfb-5d31-4584-830e-145f4da50a01?wsid=/subscriptions/71bfcf50-7e10-4546-9c9a-fd4f1ee42434/resourcegroups/nikhil-suthardp100/workspaces/nikhilsuthardp100
StepRun( batch-score-diabetes ) Status: NotStarted

Streaming azureml-logs/55_azureml-execution-tvmps_9ca2d4dc7491f214fa6ce9e3f79cdb41d04d2fe8b3c333575f6a1616047f1e2e_d.txt
2020-11-25T11:23:07Z Starting output-watcher...
2020-11-25T11:23:07Z IsDedicatedCompute == True, won't poll for Low Pri Preemption
Login Succeeded
Using default 

As before, the results are in the output of the first pipeline step:

In [None]:
import pandas as pd
import shutil

shutil.rmtree("diabetes-results", ignore_errors=True)

prediction_run = next(published_pipeline_run.get_children())
prediction_output = prediction_run.get_output_data("inferences")
prediction_output.download(local_path="diabetes-results")


for root, dirs, files in os.walk("diabetes-results"):
    for file in files:
        if file.endswith('parallel_run_step.txt'):
            result_file = os.path.join(root,file)

# cleanup output format
df = pd.read_csv(result_file, delimiter=":", header=None)
df.columns = ["File", "Prediction"]

# Display the first 20 results
df.head(20)

Now you have a pipeline that can be used to batch process daily patient data.

**More Information**: For more details about using pipelines for batch inferencing, see the [How to Run Batch Predictions](https://docs.microsoft.com/azure/machine-learning/how-to-run-batch-predictions) in the Azure Machine Learning documentation.