# Exercise 9 - Batch Inferencing with Pipelines

In the previous exercise, you used an Azure ML *pipeline* to automate the training and registration of a model, and prior to that you published a model as a web service for real-time *inferencing* (getting predictions from a model). Now you'll combine these two concepts to create a pipeline for *batch inferencing*. What does that mean? Well, imagine a health clinic takes patient measurements all day, saving the details for each patient in a separate file. Then overnight, the diabetes prediction model can be used to process all of the day's patient data as a batch, generating predictions that will be waiting the following morning so that the clinic can follow up with patients who are predicted to be at risk of diabetes. That's what we'll implement in this exercise.

> **Important**: This exercise assumes you have completed the previous exercise in this series - specifically, you must have:
>
> - Created an Azure ML Workspace.
> - Created an Azure ML Compute cluster.
> - Trained and registered a diabetes model.
>
> If you haven't done that, you'll need to do so before proceeding any further!

## Task 1: Connect to Your Workspace

The first thing you need to do is to connect to your workspace using the Azure ML SDK. Let's start by ensuring you still have the latest version installed.

In [None]:
!pip install --upgrade azureml-sdk[notebooks]
import azureml.core
print("Ready to use Azure ML", azureml.core.VERSION)

Now you're ready to connect to your workspace.

> **Note**: If the authenticated session with your Azure subscription has expired since you completed the previous exercise, you'll be prompted to reauthenticate.

In [None]:
from azureml.core import Workspace

# Load the workspace from the saved config file
ws = Workspace.from_config()
print('Ready to work with', ws.name)

## Task 2: Generate and Upload Batch Data

Since we don't actually have a fully staffed clinic with patients from whom to get new data, we'll generate a random sample from our diabetes CSV file and use those to test the pipeline. Then we'll upload that data to a datastore in the Azure ML workspace.

> **Note**: In reality, you'd likely use an existing blob container that you've added to the workspace as a datastore rather than the default datastore that was created with your workspace, but we'll ignore that detail for now.

In [None]:
from azureml.core import Datastore
import pandas as pd
import os

# Load the diabetes data
diabetes = pd.read_csv('data/diabetes2.csv')
# Get a 100-item sample of the feature columns (not the diabetic label)
sample = diabetes[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age']].sample(n=100).values

# Create a folder
batch_folder = './batch_data'
os.makedirs(batch_folder, exist_ok=True)
print("Folder created!")

# Save each sample as a separate file
print("Saving files...")
for i in range(100):
    fname = str(i+1) + '.csv'
    sample[i].tofile(os.path.join(batch_folder, fname), sep=",")
print("files saved!")

# Upload the files to the default datastore
print("Uploading files to datastore...")
default_ds = ws.get_default_datastore()
default_ds.upload(src_dir="batch_data", target_path="batch_data", overwrite=True, show_progress=True)
print("Done!")

## Task 3: Create Compute

We'll need a compute context for the pipeline, so we'll use the Azure ML compute cluster you used in the previous exercises (it will be created if it doesn't already exist).

In [None]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

# Choose a name for your CPU cluster
cpu_cluster_name = "cpu-cluster"

# Verify that cluster does not exist already
try:
    cpu_cluster = ComputeTarget(workspace=ws, name=cpu_cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # Create an AzureMl Compute resource (a container cluster)
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS12_V2', max_nodes=4)
    cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, compute_config)

cpu_cluster.wait_for_completion(show_output=True)

## Task 4: Create a Pipeline for Batch Inferencing

Now we're ready to define the pipeline we'll use for batch inferencing. First, we'll need a way to pass the patient files into the pipeline; so we'll create a *DataReference* object for the input data.

In [None]:
from azureml.data.data_reference import DataReference
from azureml.pipeline.core import PipelineData

input_dir = DataReference(datastore=default_ds, 
                             data_reference_name="batch_data",
                             path_on_datastore="batch_data",
                             mode="download"
                            )

print("Data reference created!")

Our pipeline will need Python code to perform the batch inferencing, so let's create a folder where we can keep all the files used by the pipeline:

In [None]:
import os
# Create a folder for the experiment files
experiment_name = 'batch_pipeline'
experiment_folder = './' + experiment_name
os.makedirs(experiment_folder, exist_ok=True)

print(experiment_folder)

Now we'll create a Python script to do the actual work, and save it in the pipeline folder:

In [None]:
%%writefile $experiment_folder/batch_diabetes.py

import argparse
import joblib
from azureml.core import Workspace, Model, Run
from glob import glob
import os
import numpy as np
from azureml.core.model import Model
import azureml.train.automl # Required for AutoML models
import shutil
    
global model

# Get the experiment run context
run = Run.get_context()

# Get parameters
parser = argparse.ArgumentParser()
parser.add_argument('--input_dir', type=str, dest='input_dir', default="batch_data", help='folder containing input data')
args = parser.parse_args()
input_dir = args.input_dir
run.log("Input Folder", input_dir)

# load the model
model_path = Model.get_model_path('diabetes_model')
model = joblib.load(model_path)

# Load the input data
file_path = os.path.join(input_dir, "*.csv")
file_names = glob(file_path)
run.log("File Count", len(file_names))
input_data = np.asarray([np.genfromtxt(f, delimiter=',') for f in file_names])

# Score the input data
predictions = model.predict(input_data)

# Save the results
output_dir = "outputs"
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, "results.txt")
with open(output_path, 'w') as output_file:
    for i in range(len(predictions)):
        output_file.write(os.path.split(file_names[i])[1] + ": " + str(predictions[i]) + "\n")
    output_file.flush()
output_file.close()

# We're done
run.complete()

Next we'll define a run context that includes the dependencies required by the script

In [None]:
from azureml.core.runconfig import DEFAULT_CPU_IMAGE
from azureml.core.runconfig import CondaDependencies, RunConfiguration

# Add dependencies required by the model
# For scikit-learn models, you need scikit-learn
# If the model was trained using AutoML and includes pre-processing, you need the Azure ML AutoML package
cd = CondaDependencies.create(pip_packages=["scikit-learn", "azureml-sdk[automl]"])

amlcompute_run_config = RunConfiguration(conda_dependencies=cd)
amlcompute_run_config.environment.docker.enabled = True
amlcompute_run_config.environment.docker.base_image = DEFAULT_CPU_IMAGE
print("Configuration ready.")

OK, now we're ready to define a pipeline step, which will run the Python script to load the model, use it to generate predictions from the input data, and save the results as a text file in the output folder.

In [None]:
from azureml.pipeline.steps import PythonScriptStep

batch_score_step = PythonScriptStep(
    name="batch_scoring",
    source_directory = experiment_folder,
    script_name="batch_diabetes.py",
    arguments=["--input_dir", input_dir],
    compute_target=cpu_cluster,
    inputs=[input_dir],
    runconfig=amlcompute_run_config
)

print(batch_score_step.name)

Now it's time to put all of the pieces together in a pipeline, and run it.

In [None]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[batch_score_step])
pipeline_run = Experiment(ws, experiment_name).submit(pipeline)
pipeline_run.wait_for_completion(show_output=True)

When the pipeline has finished running, the resulting predictions will have been saved in the outputs of the experiment associated with the first (and only) step in the pipeline, so we can easily retrieve it as follows:

In [None]:
import pandas as pd

# Get the first step (it will be the first child of the pipeline run)
step_run = list(pipeline_run.get_children())[0]

# Get the results.txt file from the outputs
step_run.download_file("./outputs/results.txt")

# Load the file into a pandas dataframe
df = pd.read_csv("results.txt", delimiter=":", header=None)
df.columns = ["Filename", "Prediction"]

# Display the first 20 results
df.head(20)

## Task 5: Publish the Pipeline and use its REST Interface

Now that you have a working pipeline for batch inferencing, you can publish it and use a REST endpoint to run it from an application.

In [None]:
published_pipeline = pipeline_run.publish_pipeline(
    name="Diabetes_Batch_Pipeline", description="Batch scoring of diabetes data", version="1.0")

published_pipeline

Note that the published pipeline has an endpoint, which you can see in the Azure portal. You can also find it as a property of the published pipeline object:

In [None]:
rest_endpoint = published_pipeline.endpoint
print(rest_endpoint)

To use the endpoint, client applications need to make a REST call over HTTP. This request must be authenticated, so an authorization header is required. To test this out, we'll use the authorization header from your current connection to your Azure workspace, which you can get using the following code:

> **Note**: A real application would require a service principal with which to be authenticated.

In [None]:
from azureml.core.authentication import InteractiveLoginAuthentication

interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()
print(auth_header)

Now we're ready to call the REST interface. The pipeline runs asynchronously, so we'll get an identifier back, which we can use to track the pipeline experiment as it runs:

In [None]:
import requests

rest_endpoint = published_pipeline.endpoint
response = requests.post(rest_endpoint, 
                         headers=auth_header, 
                         json={"ExperimentName": "Batch_Pipeline_via_REST"})
run_id = response.json()["Id"]
run_id

Since we have the run ID, we can use the **RunDetails** widget to view the experiment as it runs:

In [None]:
from azureml.pipeline.core.run import PipelineRun
from azureml.widgets import RunDetails

published_pipeline_run = PipelineRun(ws.experiments["Batch_Pipeline_via_REST"], run_id)
RunDetails(published_pipeline_run).show()

As before, the results are in the output of the first pipeline step:

In [None]:
import pandas as pd

step_run = list(published_pipeline_run.get_children())[0]
step_run.get_file_names()
step_run.download_file("./outputs/results.txt")
df = pd.read_csv("results.txt", delimiter=":", header=None)
df.columns = ["Filename", "Prediction"]
df.head(100)

Now you have a pipeline that can be used to batch process daily patient data.

**More Information**: For more details about using pipelines for batch inferencing, see the [How to Run Batch Predictions](https://docs.microsoft.com/en-us/azure/machine-learning/service/how-to-run-batch-predictions) in the Azure Machine Learning documentation.