# Creating a Batch Inferencing Service

In previous labs, you used an Azure ML *pipeline* to automate the training and registration of a model, and you published a model as a web service for real-time *inferencing* (getting predictions from a model). Now you'll combine these two concepts to create a pipeline for *batch inferencing*. What does that mean? Well, imagine a health clinic takes patient measurements all day, saving the details for each patient in a separate file. Then overnight, the diabetes prediction model can be used to process all of the day's patient data as a batch, generating predictions that will be waiting the following morning so that the clinic can follow up with patients who are predicted to be at risk of diabetes. That's what you'll implement in this exercise.


## Connect to Your Workspace

Now connect to your workspace using the Azure ML SDK.

> **Note**: You may be prompted to authenticate. Just copy the code and click the link provided to sign into your Azure subscription, and then return to this notebook.

In [26]:
from azureml import core, pipeline
from azureml.pipeline import steps
from azureml.core import compute, conda_dependencies, runconfig, authentication
import pandas as pd
import numpy as np
import joblib
from sklearn import model_selection, tree, metrics
import os
import shutil
import requests


In [2]:
ws = core.Workspace.from_config()
print(f'Ready to use Azure ML {core.VERSION} to work with {ws.name}')

Ready to use Azure ML 1.12.0 to work with workspace


## Train and Register a Model

Now run the cell below to train and register a model.

In [3]:
experiment = core.Experiment(ws, 'diabetes-training')
run = experiment.start_logging()
print('Starting experiment:', experiment.name)

print('Loading Data...')
diabetes: pd.DataFrame = pd.read_csv('data/diabetes.csv')

X = diabetes[
        [
            'Pregnancies', 'PlasmaGlucose', 'DiastolicBloodPressure',
            'TricepsThickness', 'SerumInsulin', 'BMI', 'DiabetesPedigree',
            'Age'
        ]
    ].to_numpy()
y = diabetes['Diabetic'].to_numpy()

X_train, X_test, y_train, y_test = model_selection.train_test_split(
    X, y, test_size=0.30, random_state=0
)

print('Training a decision tree model')
model = tree.DecisionTreeClassifier().fit(X_train, y_train)

y_hat = model.predict(X_test)
acc = np.average(y_hat == y_test)
print('Accuracy:', acc)
run.log('Accuracy', acc)

y_scores = model.predict_proba(X_test)
auc = metrics.roc_auc_score(y_test,y_scores[:,1])
print('AUC:', auc)
run.log('AUC', auc)

model_file = 'diabetes_model.pkl'
joblib.dump(model, model_file)
run.upload_file(f'outputs/{model_file}', model_file)

run.complete()

run.register_model(
    model_path='outputs/diabetes_model.pkl', model_name='diabetes_model',
    tags={'Training context':'Inline Training'},
    properties={
        'AUC': run.get_metrics()['AUC'], 
        'Accuracy': run.get_metrics()['Accuracy']
    }
)

print('Model trained and registered.')


Starting experiment: diabetes-training
Loading Data...
Training a decision tree model
Accuracy: 0.8893333333333333
AUC: 0.8770884123588237
Model trained and registered.


## Generate and Upload Batch Data

Since we don't actually have a fully staffed clinic with patients from whom to get new data for this course, you'll generate a random sample from our diabetes CSV file and use those to test the pipeline. Then you'll upload that data to a datastore in the Azure Machine Learning workspace and register a dataset for it.

In [5]:
diabetes = pd.read_csv('data/diabetes2.csv')
sample = diabetes[
    [
        'Pregnancies', 'PlasmaGlucose', 'DiastolicBloodPressure',
        'TricepsThickness', 'SerumInsulin', 'BMI', 'DiabetesPedigree',
        'Age'
    ]
].sample(n=100).to_numpy()

batch_folder = './batch-data'

print('Saving files...')
for i in range(100):
    file_name = str(i+1) + '.csv'
    sample[i].tofile(os.path.join(batch_folder, file_name), sep=',')
print('files saved!')

print('Uploading files to datastore...')
default_ds = ws.get_default_datastore()
default_ds.upload(
    src_dir='batch-data', target_path='batch-data', overwrite=True,
    show_progress=True
)

batch_data_set = core.Dataset.File.from_files((default_ds, 'batch-data/'))
batch_data_set = batch_data_set.register(
    workspace=ws,
    name='batch-data',
    description='batch data',
    create_new_version=True
)

print("Done!")


Saving files...
files saved!
Uploading files to datastore...
Uploading an estimated of 100 files
Uploading batch-data/1.csv
Uploading batch-data/10.csv
Uploading batch-data/100.csv
Uploading batch-data/11.csv
Uploading batch-data/12.csv
Uploading batch-data/13.csv
Uploading batch-data/14.csv
Uploading batch-data/15.csv
Uploading batch-data/16.csv
Uploading batch-data/17.csv
Uploading batch-data/18.csv
Uploading batch-data/19.csv
Uploading batch-data/2.csv
Uploading batch-data/20.csv
Uploading batch-data/21.csv
Uploading batch-data/22.csv
Uploading batch-data/23.csv
Uploading batch-data/24.csv
Uploading batch-data/25.csv
Uploading batch-data/26.csv
Uploading batch-data/27.csv
Uploading batch-data/28.csv
Uploading batch-data/29.csv
Uploading batch-data/3.csv
Uploading batch-data/30.csv
Uploading batch-data/31.csv
Uploading batch-data/32.csv
Uploading batch-data/33.csv
Uploading batch-data/34.csv
Uploading batch-data/35.csv
Uploading batch-data/36.csv
Uploading batch-data/37.csv
Uploaded 

## Create Compute

We'll need a compute context for the pipeline, so we'll create an Azure Machine Learning compute cluster in your workspace (or use an existing one if you have created it previously).

> **Important**: Change *your-compute-cluster* to the unique name for your compute cluster in the code below before running it!

In [7]:
cluster_name = "susumu-cluster"

inference_cluster = compute.ComputeTarget(ws, cluster_name)
print('Found existing cluster, use it.')

inference_cluster.wait_for_completion(show_output=True)


Found existing cluster, use it.
Succeeded
AmlCompute wait for completion finished

Minimum number of nodes requested have been provisioned


## Create a Pipeline for Batch Inferencing

Next we'll define a run context that includes the dependencies required by the script.


In [11]:
cd = conda_dependencies.CondaDependencies.create(
    pip_packages=[
        'scikit-learn', 'azureml-defaults', 'azureml-core',
        'azureml-dataprep[fuse]',
    ]
)

batch_env = core.Environment('batch_environment')
batch_env.python.conda_dependencies = cd
batch_env.docker.enabled = True
batch_env.docker.base_image = runconfig.DEFAULT_CPU_IMAGE
print('Configuration ready.')


Configuration ready.


You're going to use a pipeline to run the batch prediction script, generate predictions from the input data, and save the results as a text file in the output folder. To do this, you can use a **ParallelRunStep**, which enables the batch data to be processed in parallel and the results collated in a single output file named *parallel_run_step.txt*.

In [17]:
default_ds = ws.get_default_datastore()

output_dir = pipeline.core.PipelineData(
    'inferences',
    datastore=default_ds,
    output_path_on_compute='diabetes/results'
)

experiment_folder = 'batch-pipeline'
parallel_run_config = steps.ParallelRunConfig(
    environment=batch_env,
    entry_script='batch_diabetes.py',
    error_threshold=10,
    output_action='append_row',
    compute_target=inference_cluster,
    node_count=1,
    source_directory=experiment_folder,
)

parallel_run_step = steps.ParallelRunStep(
    'batch-score-diabetes',
    parallel_run_config,
    [batch_data_set.as_named_input('diabetes_batch')],
    output=output_dir,
    arguments=[],
)

print('Steps defined')


Steps defined


Now it's time to put the step into a pipeline, and run it.

> **Note**: This may take some time!

In [19]:
p = pipeline.core.Pipeline(ws, [parallel_run_step])
pipeline_run = core.Experiment(ws, 'batch_prediction_pipeline').submit(p)
print('Running pipeline...')
pipeline_run.wait_for_completion(show_output=True)


Created step batch-score-diabetes [675c03cc][3d93909b-34e6-44f1-8447-e3da061d573f], (This step will run and generate new outputs)
Created data reference diabetes_batch_0 for StepId [39cc58fa][ee94ac98-61e2-4067-b87f-c8564f9c0b9a], (Consumers of this data will generate new runs.)
Submitted PipelineRun 35a034bd-1ec8-41de-9f4b-fb8cdc067bc0
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/batch_prediction_pipeline/runs/35a034bd-1ec8-41de-9f4b-fb8cdc067bc0?wsid=/subscriptions/84170def-2683-47c0-91ed-1f34057afd69/resourcegroups/resources/workspaces/workspace
Running pipeline...
PipelineRunId: 35a034bd-1ec8-41de-9f4b-fb8cdc067bc0
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/batch_prediction_pipeline/runs/35a034bd-1ec8-41de-9f4b-fb8cdc067bc0?wsid=/subscriptions/84170def-2683-47c0-91ed-1f34057afd69/resourcegroups/resources/workspaces/workspace
PipelineRun Status: NotStarted
PipelineRun Status: Running


StepRunId: 50eb4267-4f7b-415f-9a22-dea8aab

'Finished'

When the pipeline has finished running, the resulting predictions will have been saved in the outputs of the experiment associated with the first (and only) step in the pipeline. You can retrieve it as follows:

In [21]:
shutil.rmtree('diabetes-results', ignore_errors=True)

prediction_run = next(pipeline_run.get_children())
prediction_output = prediction_run.get_output_data('inferences')
prediction_output.download(local_path='diabetes-results')

result_file = None
for root, dirs, files in os.walk('diabetes-results'):
    for file in files:
        if file.endswith('parallel_run_step.txt'):
            result_file = os.path.join(root,file)

df = pd.read_csv(result_file, delimiter=":", header=None)
df.columns = ["File", "Prediction"]

df.head(20)


Unnamed: 0,File,Prediction
0,45.csv,0
1,46.csv,1
2,47.csv,1
3,48.csv,1
4,49.csv,0
5,5.csv,0
6,50.csv,1
7,51.csv,0
8,52.csv,0
9,53.csv,1


## Publish the Pipeline and use its REST Interface

Now that you have a working pipeline for batch inferencing, you can publish it and use a REST endpoint to run it from an application.

In [22]:
published_pipeline = pipeline_run.publish_pipeline(
    name='Diabetes_Parallel_Batch_Pipeline', 
    description='Batch scoring of diabetes data', version='1.0'
)

published_pipeline


Name,Id,Status,Endpoint
Diabetes_Parallel_Batch_Pipeline,5810608f-0e6c-4de6-ad61-1fb4c5ff4550,Active,REST Endpoint


Note that the published pipeline has an endpoint, which you can see in the Azure portal. You can also find it as a property of the published pipeline object:

In [23]:
rest_endpoint = published_pipeline.endpoint
print(rest_endpoint)

https://brazilsouth.api.azureml.ms/pipelines/v1.0/subscriptions/84170def-2683-47c0-91ed-1f34057afd69/resourceGroups/resources/providers/Microsoft.MachineLearningServices/workspaces/workspace/PipelineRuns/PipelineSubmit/5810608f-0e6c-4de6-ad61-1fb4c5ff4550


To use the endpoint, client applications need to make a REST call over HTTP. This request must be authenticated, so an authorization header is required. To test this out, we'll use the authorization header from your current connection to your Azure workspace, which you can get using the following code:

> **Note**: A real application would require a service principal with which to be authenticated.

In [25]:
interactive_auth = authentication.InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()
print('Authentication header ready.')


Authentication header ready.


Now we're ready to call the REST interface. The pipeline runs asynchronously, so we'll get an identifier back, which we can use to track the pipeline experiment as it runs:

In [28]:
rest_endpoint = published_pipeline.endpoint
response = requests.post(
    rest_endpoint,
    json={"ExperimentName": "Batch_Pipeline_via_REST"},
    headers=auth_header,
)
run_id = response.json()["Id"]
run_id


'b26daa70-a736-4098-a070-2e913b60d0aa'

Since we have the run ID, we can view the experiment as it runs:

In [29]:
published_pipeline_run = pipeline.core.run.PipelineRun(
    ws.experiments["Batch_Pipeline_via_REST"], run_id
)
print('Running pipeline...')
published_pipeline_run.wait_for_completion(show_output=True)


Running pipeline...
PipelineRunId: b26daa70-a736-4098-a070-2e913b60d0aa
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/Batch_Pipeline_via_REST/runs/b26daa70-a736-4098-a070-2e913b60d0aa?wsid=/subscriptions/84170def-2683-47c0-91ed-1f34057afd69/resourcegroups/resources/workspaces/workspace
PipelineRun Status: Running


StepRunId: 40f90548-d872-409e-8eae-21b50b1c90ff
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/Batch_Pipeline_via_REST/runs/40f90548-d872-409e-8eae-21b50b1c90ff?wsid=/subscriptions/84170def-2683-47c0-91ed-1f34057afd69/resourcegroups/resources/workspaces/workspace
StepRun( batch-score-diabetes ) Status: Running

Streaming azureml-logs/55_azureml-execution-tvmps_8410546b7adf966d2a3b8d06f26b312ab538e8546a1ee55bc59596eba9084c19_d.txt
2020-08-17T22:32:13Z Starting output-watcher...
2020-08-17T22:32:13Z IsDedicatedCompute == True, won't poll for Low Pri Preemption
2020-08-17T22:32:15Z Executing 'Copy ACR Details file' on 10.0.0.4


'Finished'

As before, the results are in the output of the first pipeline step:

In [30]:
shutil.rmtree("diabetes-results", ignore_errors=True)

prediction_run = next(published_pipeline_run.get_children())
prediction_output = prediction_run.get_output_data("inferences")
prediction_output.download(local_path="diabetes-results")

for root, dirs, files in os.walk("diabetes-results"):
    for file in files:
        if file.endswith('parallel_run_step.txt'):
            result_file = os.path.join(root,file)

df = pd.read_csv(result_file, delimiter=":", header=None)
df.columns = ["File", "Prediction"]

df.head(20)


Unnamed: 0,File,Prediction
0,18.csv,0
1,19.csv,0
2,2.csv,0
3,20.csv,1
4,21.csv,1
5,22.csv,1
6,23.csv,0
7,24.csv,0
8,25.csv,1
9,26.csv,1


Now you have a pipeline that can be used to batch process daily patient data.

**More Information**: For more details about using pipelines for batch inferencing, see the [How to Run Batch Predictions](https://docs.microsoft.com/azure/machine-learning/how-to-run-batch-predictions) in the Azure Machine Learning documentation.