# Create a Batch Inferencing Service

Imagine a health clinic takes patient measurements all day, saving the details for each patient in a separate file. Then overnight, the diabetes prediction model can be used to process all of the day's patient data as a batch, generating predictions that will be waiting the following morning so that the clinic can follow up with patients who are predicted to be at risk of diabetes. With Azure Machine Learning, you can accomplish this by creating a *batch inferencing pipeline*; and that's what you'll implement in this exercise.

## Connect to your workspace

To get started, connect to your workspace.

> **Note**: If you haven't already established an authenticated session with your Azure subscription, you'll be prompted to authenticate by clicking a link, entering an authentication code, and signing into Azure.

In [1]:
import azureml.core
from azureml.core import Workspace
# import azureml.core # Mohammad Shahbaz 
# Load the workspace from the saved config file
ws = Workspace.from_config()
print('Ready to use Azure ML {} to work with {}'.format(azureml.core.VERSION, ws.name))

Ready to use Azure ML 1.40.0 to work with amldemo220512


## Train and register a model

Now let's train and register a model to deploy in a batch inferencing pipeline.

In [2]:
from azureml.core import Experiment
from azureml.core import Model
import pandas as pd
import numpy as np
import joblib
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve

# Create an Azure ML experiment in your workspace
experiment = Experiment(workspace=ws, name='mslearn-train-diabetes-batch')
run = experiment.start_logging()
print("Starting experiment:", experiment.name)

# load the diabetes dataset
print("Loading Data...")
diabetes = pd.read_csv('data/diabetes-batch.csv')

# Separate features and labels
X, y = diabetes[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age']].values, diabetes['Diabetic'].values

# Split data into training set and test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, random_state=0)

# Train a decision tree model
print('Training a decision tree model')
model = DecisionTreeClassifier().fit(X_train, y_train)

# calculate accuracy
y_hat = model.predict(X_test)
acc = np.average(y_hat == y_test)
print('Accuracy:', acc)
run.log('Accuracy', np.float(acc))

# calculate AUC
y_scores = model.predict_proba(X_test)
auc = roc_auc_score(y_test,y_scores[:,1])
print('AUC: ' + str(auc))
run.log('AUC', np.float(auc))

# Save the trained model
model_file = 'diabetes_model-batch.pkl'
joblib.dump(value=model, filename=model_file)
run.upload_file(name = 'outputs/' + model_file, path_or_stream = './' + model_file)

# Complete the run
run.complete()

# Register the model
run.register_model(model_path='outputs/diabetes_model-batch.pkl', model_name='diabetes_model-batch',
                   tags={'Training context':'Inline Training'},
                   properties={'AUC': run.get_metrics()['AUC'], 'Accuracy': run.get_metrics()['Accuracy']})

print('Model trained and registered.')

Starting experiment: mslearn-train-diabetes-batch
Loading Data...
Training a decision tree model
Accuracy: 0.89
AUC: 0.8771033384745509
Model trained and registered.


## Generate and upload batch data

Since we don't actually have a fully staffed clinic with patients from whom to get new data for this exercise, you'll generate a random sample from our diabetes CSV file, upload that data to a datastore in the Azure Machine Learning workspace, and register a dataset for it.

In [3]:
from azureml.core import Datastore, Dataset
import pandas as pd
import os

# Set default data store
ws.set_default_datastore('workspaceblobstore')
default_ds = ws.get_default_datastore()

# Enumerate all datastores, indicating which is the default
for ds_name in ws.datastores:
    print(ds_name, "- Default =", ds_name == default_ds.name)

# Load the diabetes data
diabetes = pd.read_csv('data/diabetes2-batch.csv')
# Get a 100-item sample of the feature columns (not the diabetic label)
sample = diabetes[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age']].sample(n=100).values

# Create a folder
batch_folder = './batch-data'
os.makedirs(batch_folder, exist_ok=True)
print("Folder created!")

# Save each sample as a separate file
print("Saving files...")
for i in range(100):
    fname = str(i+1) + '.csv'
    sample[i].tofile(os.path.join(batch_folder, fname), sep=",")
print("files saved!")

# Upload the files to the default datastore
print("Uploading files to datastore...")
default_ds = ws.get_default_datastore()
default_ds.upload(src_dir="batch-data", target_path="batch-data", overwrite=True, show_progress=True)

# Register a dataset for the input data
batch_data_set = Dataset.File.from_files(path=(default_ds, 'batch-data/'), validate=False)
try:
    batch_data_set = batch_data_set.register(workspace=ws, 
                                             name='batch-data',
                                             description='batch data',
                                             create_new_version=True)
except Exception as ex:
    print(ex)

print("Done!")

iris_model_datastore - Default = False
iris_datastore_data - Default = False
destinatdb - Default = False
advworks - Default = False
azureml_globaldatasets - Default = False
workspaceworkingdirectory - Default = False
workspaceartifactstore - Default = False
workspacefilestore - Default = False
workspaceblobstore - Default = True
Folder created!
Saving files...
files saved!
Uploading files to datastore...
Uploading an estimated of 101 files
Uploading batch-data/.amlignore
Uploaded batch-data/.amlignore, 1 files out of an estimated total of 101
Uploading batch-data/1.csv
Uploaded batch-data/1.csv, 2 files out of an estimated total of 101
Uploading batch-data/10.csv
Uploaded batch-data/10.csv, 3 files out of an estimated total of 101
Uploading batch-data/100.csv
Uploaded batch-data/100.csv, 4 files out of an estimated total of 101
Uploading batch-data/11.csv
Uploaded batch-data/11.csv, 5 files out of an estimated total of 101
Uploading batch-data/12.csv
Uploaded batch-data/12.csv, 6 file

"Datastore.upload" is deprecated after version 1.0.69. Please use "Dataset.File.upload_directory" to upload your files             from a local directory and create FileDataset in single method call. See Dataset API change notice at https://aka.ms/dataset-deprecation.


## Create compute

We'll need a compute context for the pipeline, so we'll use the following code to specify an Azure Machine Learning compute cluster (it will be created if it doesn't already exist).

> **Important**: Change *your-compute-cluster* to the name of your compute cluster in the code below before running it! Cluster names must be globally unique names between 2 to 16 characters in length. Valid characters are letters, digits, and the - character.

In [4]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

cluster_name = "cpu-cluster"

try:
    # Check for existing compute target
    inference_cluster = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # If it doesn't already exist, create it
    try:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS11_V2', max_nodes=2)
        inference_cluster = ComputeTarget.create(ws, cluster_name, compute_config)
        inference_cluster.wait_for_completion(show_output=True)
    except Exception as ex:
        print(ex)
    

Found existing cluster, use it.


> **Note**: Compute instances and clusters are based on standard Azure virtual machine images. For this exercise, the *Standard_DS11_v2* image is recommended to achieve the optimal balance of cost and performance. If your subscription has a quota that does not include this image, choose an alternative image; but bear in mind that a larger image may incur higher cost and a smaller image may not be sufficient to complete the tasks. Alternatively, ask your Azure administrator to extend your quota.

## Create a pipeline for batch inferencing

Now we're ready to define the pipeline we'll use for batch inferencing. Our pipeline will need Python code to perform the batch inferencing, so let's create a folder where we can keep all the files used by the pipeline:

In [5]:
import os
# Create a folder for the experiment files
experiment_folder = 'batch_pipeline'
os.makedirs(experiment_folder, exist_ok=True)

print(experiment_folder)

batch_pipeline


Now we'll create a Python script to do the actual work, and save it in the pipeline folder:

In [6]:
%%writefile $experiment_folder/batch_diabetes.py
import os
import numpy as np
from azureml.core import Model
import joblib


def init():
    # Runs when the pipeline step is initialized
    global model

    # load the model
    model_path = Model.get_model_path('diabetes_model-batch')
    model = joblib.load(model_path)


def run(mini_batch):
    # This runs for each batch
    resultList = []

    # process each file in the batch
    for f in mini_batch:
        # Read the comma-delimited data into an array
        data = np.genfromtxt(f, delimiter=',')
        # Reshape into a 2-dimensional array for prediction (model expects multiple items)
        prediction = model.predict(data.reshape(1, -1))
        # Append prediction to results
        resultList.append("{}: {}".format(os.path.basename(f), prediction[0]))
    return resultList

Overwriting batch_pipeline/batch_diabetes.py


The pipeline will need an environment in which to run, so we'll create a Conda specification that includes the packages that the code uses.

In [7]:
%%writefile $experiment_folder/batch_environment.yml
name: batch_environment
dependencies:
- python=3.6.2
- scikit-learn
- pip
- pip:
  - azureml-defaults

Overwriting batch_pipeline/batch_environment.yml


Next we'll define a run context that includes the Conda environment.

In [8]:
from azureml.core import Environment
from azureml.core.runconfig import DEFAULT_CPU_IMAGE

# Create an Environment for the experiment
batch_env = Environment.from_conda_specification("experiment_env", experiment_folder + "/batch_environment.yml")
batch_env.docker.base_image = DEFAULT_CPU_IMAGE
print('Configuration ready.')

Configuration ready.


You're going to use a pipeline to run the batch prediction script, generate predictions from the input data, and save the results as a text file in the output folder. To do this, you can use a **ParallelRunStep**, which enables the batch data to be processed in parallel and the results collated in a single output file named *parallel_run_step.txt*.

In [9]:
from azureml.pipeline.steps import ParallelRunConfig, ParallelRunStep
from azureml.data import OutputFileDatasetConfig

output_dir = OutputFileDatasetConfig(name='inferences')

parallel_run_config = ParallelRunConfig(
    source_directory=experiment_folder,
    entry_script="batch_diabetes.py",
    mini_batch_size="5",
    error_threshold=10,
    output_action="append_row",
    environment=batch_env,
    compute_target=inference_cluster,
    node_count=2)

parallelrun_step = ParallelRunStep(
    name='batch-score-diabetes',
    parallel_run_config=parallel_run_config,
    inputs=[batch_data_set.as_named_input('diabetes_batch')],
    output=output_dir,
    arguments=[],
    allow_reuse=True
)

print('Steps defined')

Steps defined


Now it's time to put the step into a pipeline, and run it.

> **Note**: This may take some time!

In [39]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline, PipelineEndpoint

pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])
pipeline_run = Experiment(ws, 'mslearn-diabetes-batch').submit(pipeline)
pipeline_run.wait_for_completion(show_output=True)


pipelineName = "diabetes-batch-pipeline-versioned"
pipelineDescription = "Batch scoring of diabetes data"



try:
    pipeline_endpoint = PipelineEndpoint.get(ws, name=pipelineName)
    #Pipeline exists
    published = pipeline.publish(
                name=pipelineName
    )
    pipeline_endpoint = PipelineEndpoint.get(
        workspace=ws, name=pipelineName
    )
    pipeline_endpoint.add_default(published)
except:
    #Pipeline does not exists
    published = pipeline.publish(
                name=pipelineName
            )
    pipeline_endpoint = PipelineEndpoint.publish(
                workspace=ws,
                name=pipelineName,
                pipeline=published,
                description=pipelineDescription,
        )


Created step batch-score-diabetes [4585f3b3][d9136802-0258-43ce-b649-2b2a75ace2d6], (This step is eligible to reuse a previous run's output)
Submitted PipelineRun a3e16295-fa04-4208-a818-d04aafe0dd2f
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/a3e16295-fa04-4208-a818-d04aafe0dd2f?wsid=/subscriptions/624b21b8-a8d9-4d63-9da0-053d47bb64e4/resourcegroups/wrkshp_machinelearning_demo_220512/workspaces/amldemo220512&tid=72f988bf-86f1-41af-91ab-2d7cd011db47
PipelineRunId: a3e16295-fa04-4208-a818-d04aafe0dd2f
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/a3e16295-fa04-4208-a818-d04aafe0dd2f?wsid=/subscriptions/624b21b8-a8d9-4d63-9da0-053d47bb64e4/resourcegroups/wrkshp_machinelearning_demo_220512/workspaces/amldemo220512&tid=72f988bf-86f1-41af-91ab-2d7cd011db47
PipelineRun Status: NotStarted
PipelineRun Status: Running

PipelineRun Execution Summary
PipelineRun Status: Finished
{'runId': 'a3e16295-fa04-4208-a818-d04aafe0dd2f', 'status': 'Completed', 'start

When the pipeline has finished running, the resulting predictions will have been saved in the outputs of the experiment associated with the first (and only) step in the pipeline. You can retrieve it as follows:

In [40]:
import pandas as pd
import shutil

# Remove the local results folder if left over from a previous run
shutil.rmtree('diabetes-results', ignore_errors=True)

# Get the run for the first step and download its output
prediction_run = next(pipeline_run.get_children())
prediction_output = prediction_run.get_output_data('inferences')
prediction_output.download(local_path='diabetes-results')

# Traverse the folder hierarchy and find the results file
for root, dirs, files in os.walk('diabetes-results'):
    for file in files:
        if file.endswith('parallel_run_step.txt'):
            result_file = os.path.join(root,file)

# cleanup output format
df = pd.read_csv(result_file, delimiter=":", header=None)
df.columns = ["File", "Prediction"]

# Display the first 20 results
df.head(20)

Unnamed: 0,File,Prediction
0,1.csv,0
1,10.csv,0
2,100.csv,0
3,11.csv,0
4,12.csv,0
5,13.csv,0
6,14.csv,0
7,15.csv,0
8,16.csv,0
9,17.csv,0


## Publish the Pipeline and use its REST Interface

Now that you have a working pipeline for batch inferencing, you can publish it and use a REST endpoint to run it from an application.

In [12]:
#published_pipeline = pipeline_run.publish_pipeline(
#    name='diabetes-batch-pipeline', description='Batch scoring of diabetes data', version='1.0')

#published_pipeline

Name,Id,Status,Endpoint
diabetes-batch-pipeline,5d1873d0-9f27-4438-9ad9-a2739f360162,Active,REST Endpoint


Note that the published pipeline has an endpoint, which you can see in the Azure portal. You can also find it as a property of the published pipeline object:

In [43]:
rest_endpoint = published.endpoint
print(rest_endpoint)

https://eastus.api.azureml.ms/pipelines/v1.0/subscriptions/624b21b8-a8d9-4d63-9da0-053d47bb64e4/resourceGroups/wrkshp_machinelearning_demo_220512/providers/Microsoft.MachineLearningServices/workspaces/amldemo220512/PipelineRuns/PipelineSubmit/16d5f82d-1305-4f55-9b96-48d49971e4c6


To use the endpoint, client applications need to make a REST call over HTTP. This request must be authenticated, so an authorization header is required. To test this out, we'll use the authorization header from your current connection to your Azure workspace, which you can get using the following code:

> **Note**: A real application would require a service principal with which to be authenticated.

In [45]:
from azureml.core.authentication import InteractiveLoginAuthentication

interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()
print('Authentication header ready.')

Authentication header ready.


Now we're ready to call the REST interface. The pipeline runs asynchronously, so we'll get an identifier back, which we can use to track the pipeline experiment as it runs:

In [46]:
import requests

rest_endpoint = published.endpoint
response = requests.post(rest_endpoint, 
                         headers=auth_header, 
                         json={"ExperimentName": "mslearn-diabetes-batch"})
run_id = response.json()["Id"]
run_id

'addf3eaf-fbb6-46ca-b117-3fefe22f33e6'

Since we have the run ID, we can use the **RunDetails** widget to view the experiment as it runs:

In [47]:
from azureml.pipeline.core.run import PipelineRun
from azureml.widgets import RunDetails

published_pipeline_run = PipelineRun(ws.experiments['mslearn-diabetes-batch'], run_id)

# Block until the run completes
published_pipeline_run.wait_for_completion(show_output=True)

PipelineRunId: addf3eaf-fbb6-46ca-b117-3fefe22f33e6
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/addf3eaf-fbb6-46ca-b117-3fefe22f33e6?wsid=/subscriptions/624b21b8-a8d9-4d63-9da0-053d47bb64e4/resourcegroups/wrkshp_machinelearning_demo_220512/workspaces/amldemo220512&tid=72f988bf-86f1-41af-91ab-2d7cd011db47

PipelineRun Execution Summary
PipelineRun Status: Finished
{'runId': 'addf3eaf-fbb6-46ca-b117-3fefe22f33e6', 'status': 'Completed', 'startTimeUtc': '2022-05-19T19:35:37.19589Z', 'endTimeUtc': '2022-05-19T19:35:38.52293Z', 'services': {}, 'properties': {'azureml.runsource': 'azureml.PipelineRun', 'runSource': 'Unavailable', 'runType': 'HTTP', 'azureml.parameters': '{}', 'azureml.continue_on_step_failure': 'False', 'azureml.pipelineid': '16d5f82d-1305-4f55-9b96-48d49971e4c6', 'azureml.pipelineComponent': 'pipelinerun'}, 'inputDatasets': [], 'outputDatasets': [], 'logFiles': {'logs/azureml/executionlogs.txt': 'https://strgamldemo220512.blob.core.windows.net/azureml/E

'Finished'

Wait for the pipeline run to complete, and then run the following cell to see the results.

As before, the results are in the output of the first pipeline step:

In [48]:
import pandas as pd
import shutil

# Remove the local results folder if left over from a previous run
shutil.rmtree('diabetes-results', ignore_errors=True)

# Get the run for the first step and download its output
prediction_run = next(pipeline_run.get_children())
prediction_output = prediction_run.get_output_data('inferences')
prediction_output.download(local_path='diabetes-results')

# Traverse the folder hierarchy and find the results file
for root, dirs, files in os.walk('diabetes-results'):
    for file in files:
        if file.endswith('parallel_run_step.txt'):
            result_file = os.path.join(root,file)

# cleanup output format
df = pd.read_csv(result_file, delimiter=":", header=None)
df.columns = ["File", "Prediction"]

# Display the first 20 results
df.head(20)

Unnamed: 0,File,Prediction
0,1.csv,0
1,10.csv,0
2,100.csv,0
3,11.csv,0
4,12.csv,0
5,13.csv,0
6,14.csv,0
7,15.csv,0
8,16.csv,0
9,17.csv,0


# Write the results_file to the default datastore in the predicts folder

In [49]:
# write to storage

print("Workspace: " + ws.name, "Region: " + ws.location, sep = '\n')

# Default datastore
default_store = ws.get_default_datastore() 

default_store.upload_files([result_file], 
                           target_path = 'predictions', 
                           overwrite = True, 
                           show_progress = True)



print("Upload calls completed.")


Workspace: amldemo220512
Region: eastus
Uploading an estimated of 1 files
Uploading diabetes-results/dataset/aebf8186-b546-44ad-a2bb-542305d0aed3/inferences/parallel_run_step.txt
Uploaded diabetes-results/dataset/aebf8186-b546-44ad-a2bb-542305d0aed3/inferences/parallel_run_step.txt, 1 files out of an estimated total of 1
Uploaded 1 files
Upload calls completed.


- # Write the dataframe to Azure SQL Server. 
    - ### This dataframe was filled a couple cells up with the data from the results_file.txt local file
    - ### using the default key vault to store the password for my Azure SQL db. 
    - ### Could actually put the whole connection string in the path, see the code cell below for that example
    - ### running more than once will duplicate the data

In [53]:
#write to sql

import pyodbc
import pandas as pd
from azureml.core import Keyvault

keyvault = ws.get_default_keyvault()
secret_value = keyvault.get_secret(name="put password secret name here")

# Some other example server values are
# server = 'localhost\sqlexpress' # for a named instance
# server = 'myserver,port' # to specify an alternate port
server = 'put server name here' 
database = 'put database name here' 
username = 'put username here' 
password = secret_value
driver= '{ODBC Driver 17 for SQL Server}'

with pyodbc.connect('DRIVER='+driver+';SERVER=tcp:'+server+';PORT=1433;DATABASE='+database+';UID='+username+';PWD='+ password) as conn:
    with conn.cursor() as cursor:
# Insert Dataframe into SQL Server:
        for index, row in df.iterrows():
            cursor.execute("INSERT INTO dbo.Diagnosis ([File],[Prediction]) values(?,?)", row.File, row.Prediction)
        conn.commit()
    cursor.close()

#  Put the whole connection string in the path
### test with querying the table we just wrote to

In [54]:
secret_connection_string = keyvault.get_secret(name="put connection string secret name here")

driver= '{ODBC Driver 17 for SQL Server}'

with pyodbc.connect('DRIVER='+driver+';'+secret_connection_string) as conn:
    with conn.cursor() as cursor:
# Insert Dataframe into SQL Server:
        cursor.execute("SELECT * FROM dbo.Diagnosis")
        row = cursor.fetchone()
        while row:
            print (str(row[0]) + " " + str(row[1]))
            row = cursor.fetchone()

1.csv 0
10.csv 0
100.csv 0
11.csv 0
12.csv 0
13.csv 0
14.csv 0
15.csv 0
16.csv 0
17.csv 0
18.csv 0
19.csv 1
2.csv 1
20.csv 0
21.csv 0
22.csv 1
23.csv 1
24.csv 1
25.csv 0
26.csv 0
27.csv 0
28.csv 1
29.csv 0
3.csv 1
30.csv 1
31.csv 0
32.csv 1
33.csv 1
34.csv 1
35.csv 0
36.csv 1
37.csv 1
38.csv 0
39.csv 0
4.csv 0
40.csv 1
41.csv 0
42.csv 0
43.csv 0
44.csv 0
45.csv 0
46.csv 0
47.csv 0
48.csv 1
49.csv 0
5.csv 0
50.csv 0
51.csv 1
52.csv 1
53.csv 0
54.csv 1
55.csv 1
56.csv 1
57.csv 0
58.csv 0
59.csv 1
6.csv 0
60.csv 0
61.csv 0
62.csv 0
63.csv 1
64.csv 0
65.csv 0
66.csv 1
67.csv 0
68.csv 0
69.csv 0
7.csv 0
70.csv 0
71.csv 1
72.csv 0
73.csv 0
74.csv 0
75.csv 0
76.csv 0
77.csv 0
78.csv 0
79.csv 1
8.csv 1
80.csv 0
81.csv 1
82.csv 0
83.csv 1
84.csv 0
85.csv 0
86.csv 1
87.csv 0
88.csv 0
89.csv 1
9.csv 0
90.csv 0
91.csv 0
92.csv 1
93.csv 0
94.csv 0
95.csv 1
96.csv 0
97.csv 0
98.csv 0
99.csv 0
