MIT License

Copyright (c) Microsoft Corporation. All rights reserved.

This notebook is adapted from Microsoft Learning mslearn-dp100 

Copyright (c) 2021 PyLadies Amsterdam, Alyona Galyeva

# Create batch pipeline

## Connect to your workspace

In [2]:
from azureml.core import Workspace
ws = Workspace.from_config()

## Provision inference compute

We'll need a compute context for the pipeline, so we'll use the following code to specify an Azure Machine Learning compute cluster (it will be created if it doesn't already exist).

Important: Cluster names must be globally unique names between 2 to 16 characters in length. Valid characters are letters, digits, and the - character.

In [3]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

cluster_name = "mlopsbootcamp"

try:
    # Check for existing compute target
    inference_cluster = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # If it doesn't already exist, create it
    try:
        compute_config = AmlCompute.provisioning_configuration(vm_size='Standard_DS2_v2', max_nodes=2)
        inference_cluster = ComputeTarget.create(ws, cluster_name, compute_config)
        inference_cluster.wait_for_completion(show_output=True)
    except Exception as ex:
        print(ex)

Found existing cluster, use it.


Note: Compute instances and clusters are based on standard Azure virtual machine images. For this exercise, the Standard_DS11_v2 image is recommended to achieve the optimal balance of cost and performance. If your subscription has a quota that does not include this image, choose an alternative image; but bear in mind that a larger image may incur higher cost and a smaller image may not be sufficient to complete the tasks. Alternatively, ask your Azure administrator to extend your quota.

## Create a pipeline for batch inferencing

Now we're ready to define the pipeline we'll use for batch inferencing. Our pipeline will need Python code to perform the batch inferencing, so let's create a folder where we can keep all the files used by the pipeline:

In [4]:
import os
# Create a folder for the experiment files
experiment_folder = 'batch_pipeline'
os.makedirs(experiment_folder, exist_ok=True)

print(experiment_folder)

batch_pipeline


In [8]:
# let's check what models are registered in our workspace and get a path to our model of choice
from azureml.core import Model
model_list = Model.list(ws)
#model_list
model_path = Model.get_model_path('linear_regression', _workspace=ws)
print(model_list, model_path)

[Model(workspace=Workspace.create(name='mlops', subscription_id='1e7eebf7-7dfc-4e94-9219-13ee1fc677f1', resource_group='mlops_bootcamp'), name=linear_regression, id=linear_regression:1, version=1, tags={}, properties={})] azureml-models/linear_regression/1/linear_regression.pkl


In [9]:
# let's load our model and take a look what's inside
import joblib
model = joblib.load(model_path)
model


Pipeline(steps=[('preprocessor',
                 ColumnTransformer(remainder='passthrough',
                                   transformers=[('encoder',
                                                  OneHotEncoder(sparse=False),
                                                  [2, 3, 4])])),
                ('rfecv',
                 RFECV(cv=TimeSeriesSplit(gap=0, max_train_size=None, n_splits=3, test_size=None),
                       estimator=LinearRegression(), n_jobs=-1,
                       scoring='neg_mean_squared_error', verbose=2))])

In [11]:
mini_batch = list()
for (dirpath, dirnames, filenames) in os.walk("batch-data"):
    mini_batch += [os.path.join(dirpath, file) for file in filenames]
for elem in mini_batch:
    print(elem)

batch-data/155.csv
batch-data/141.csv
batch-data/97.csv
batch-data/83.csv
batch-data/68.csv
batch-data/6.csv
batch-data/54.csv
batch-data/40.csv
batch-data/41.csv
batch-data/7.csv
batch-data/55.csv
batch-data/69.csv
batch-data/82.csv
batch-data/168.csv
batch-data/96.csv
batch-data/140.csv
batch-data/154.csv
batch-data/142.csv
batch-data/156.csv
batch-data/80.csv
batch-data/94.csv
batch-data/43.csv
batch-data/57.csv
batch-data/5.csv
batch-data/56.csv
batch-data/4.csv
batch-data/42.csv
batch-data/95.csv
batch-data/81.csv
batch-data/157.csv
batch-data/143.csv
batch-data/85.csv
batch-data/91.csv
batch-data/147.csv
batch-data/153.csv
batch-data/46.csv
batch-data/52.csv
batch-data/1.csv
batch-data/53.csv
batch-data/47.csv
batch-data/152.csv
batch-data/146.csv
batch-data/90.csv
batch-data/84.csv
batch-data/92.csv
batch-data/86.csv
batch-data/150.csv
batch-data/144.csv
batch-data/51.csv
batch-data/3.csv
batch-data/45.csv
batch-data/79.csv
batch-data/78.csv
batch-data/44.csv
batch-data/50.csv
b

In [12]:
import numpy as np
def run(mini_batch):
    # This runs for each batch
    resultList = []

    # process each file in the batch
    for f in mini_batch:
        # Read comma-delimited data into an array
        data = np.genfromtxt(f, delimiter=',')
        # Reshape into a 2-dimensional array for model input
        prediction = model.predict(data.reshape(1, -1))
        # Append prediction to results
        resultList.append("{}: {}".format(os.path.basename(f), prediction[0]))
    return resultList

In [13]:
result = run(mini_batch)
result

['155.csv: 5660.9060375773115',
 '141.csv: 6455.407695342503',
 '97.csv: 6926.847248283706',
 '83.csv: 8170.295673985493',
 '68.csv: 7785.008321210013',
 '6.csv: 5343.761325931411',
 '54.csv: 6458.818014213481',
 '40.csv: 9275.43496363313',
 '41.csv: 9406.726810703602',
 '7.csv: 5743.964904506988',
 '55.csv: 6951.0167699319845',
 '69.csv: 7565.301060930669',
 '82.csv: 8008.092516591842',
 '168.csv: 5752.95602152778',
 '96.csv: 7375.132570083166',
 '140.csv: 6841.574950767086',
 '154.csv: 5571.863855103756',
 '142.csv: 6411.625913311922',
 '156.csv: 5977.161408001057',
 '80.csv: 6895.069375203993',
 '94.csv: 7973.407965752167',
 '43.csv: 9173.935429410356',
 '57.csv: 8080.800869954099',
 '5.csv: 5105.718918331259',
 '56.csv: 7466.198743375799',
 '4.csv: 5156.107293054456',
 '42.csv: 9451.966950263522',
 '95.csv: 7831.381331744058',
 '81.csv: 7558.893642050116',
 '157.csv: 6053.814717196756',
 '143.csv: 6259.699538812536',
 '85.csv: 8688.557910378553',
 '91.csv: 8713.076706982223',
 '147

Now we'll create a Python script to do the actual work, and save it in the pipeline folder:

In [14]:
%%writefile $experiment_folder/score.py
# windows users
# %%writefile $experiment_folder\score.py

import os
import numpy as np
from azureml.core import Model
import joblib

def init():
    # Runs when the pipeline step is initialized
    global model

    # load the model
    model_path = Model.get_model_path('linear_regression')
    model = joblib.load(model_path)

def run(mini_batch):
    # This runs for each batch
    resultList = []

    # process each file in the batch
    for f in mini_batch:
        # Read comma-delimited data into an array
        data = np.genfromtxt(f, delimiter=',')
        # Reshape into a 2-dimensional array for model input
        prediction = model.predict(data.reshape(1, -1))
        # Append prediction to results
        resultList.append("{}: {}".format(os.path.basename(f), prediction[0]))
    return resultList

Writing batch_pipeline/score.py


The pipeline will need an environment in which to run, so we'll create a Conda specification that includes the packages that the code uses.

In [15]:
%%writefile $experiment_folder/batch_environment.yml
# windows users
# %%writefile $experiment_folder\batch_environment.yml
name: batch_environment
dependencies:
- python=3.8
- numpy
- pandas
- scikit-learn
- pip:
  - azureml-core

Writing batch_pipeline/batch_environment.yml


Next we'll define a run context that includes the Conda environment.

In [16]:
from azureml.core import Environment
from azureml.core.runconfig import DEFAULT_CPU_IMAGE

# Create an Environment for the experiment
batch_env = Environment.from_conda_specification("experiment_env", experiment_folder + "/batch_environment.yml")
batch_env.docker.base_image = DEFAULT_CPU_IMAGE
print('Configuration ready.')

Configuration ready.


You're going to use a pipeline to run the batch prediction script, generate predictions from the input data, and save the results as a text file in the output folder. To do this, you can use a ParallelRunStep, which enables the batch data to be processed in parallel and the results collated in a single output file named parallel_run_step.txt.

In [18]:
from datetime import datetime

from azureml.pipeline.steps import ParallelRunConfig, ParallelRunStep
from azureml.data import OutputFileDatasetConfig
from azureml.core.runconfig import DockerConfiguration

# # Get the batch dataset for input
batch_data_set = ws.datasets['batch-data']

# Set the output location
default_ds = ws.get_default_datastore()
output_dir = OutputFileDatasetConfig(name='inferences')

# Define the parallel run step step configuration
parallel_run_config = ParallelRunConfig(
    source_directory=experiment_folder,
    entry_script="score.py",
    mini_batch_size="5",
    error_threshold=10,
    output_action="append_row",
    environment=batch_env,
    compute_target=inference_cluster,
    node_count=2)

parallel_step_name = "batchscoring-" + datetime.now().strftime("%Y%m%d%H%M")

# Create the parallel run step
parallelrun_step = ParallelRunStep(
    name=parallel_step_name,
    parallel_run_config=parallel_run_config,
    inputs=[batch_data_set.as_named_input('batch_data')],
    output=output_dir,
    arguments=[],
    allow_reuse=True
)

print('Steps defined')

Steps defined


Now it's time to put the step into a pipeline, and run it.

In [19]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline

# Create the pipeline
pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])

# Run the pipeline as an experiment
pipeline_run = Experiment(ws, 'nyc-energy-demand-batch').submit(pipeline)
pipeline_run.wait_for_completion(show_output=True)

arting to exit user context managers...
[2021-07-18T11:51:07.890191] Running Sidecar release cmd...
[2021-07-18T11:51:07.913284] INFO azureml.sidecar.sidecar: Received task: exit_contexts. Running on Linux at /mnt/hostfs/mnt/batch/tasks/shared/LS_root/jobs/mlops/azureml/25e0c3d3-cb8b-4c82-8f8a-418d67c5a868/wd/azureml/25e0c3d3-cb8b-4c82-8f8a-418d67c5a868
Enter __exit__ of DatasetContextManager
Unmounting /mnt/hostfs/mnt/batch/tasks/shared/LS_root/jobs/mlops/azureml/25e0c3d3-cb8b-4c82-8f8a-418d67c5a868/wd/batch_data_7d1b86d0-fc2b-4d23-a7d8-a46d268d9cb0.
Finishing unmounting /mnt/hostfs/mnt/batch/tasks/shared/LS_root/jobs/mlops/azureml/25e0c3d3-cb8b-4c82-8f8a-418d67c5a868/wd/batch_data_7d1b86d0-fc2b-4d23-a7d8-a46d268d9cb0.
Unmounting /mnt/hostfs/mnt/batch/tasks/shared/LS_root/jobs/mlops/azureml/25e0c3d3-cb8b-4c82-8f8a-418d67c5a868/wd/inferences_workspaceblobstore.
Finishing unmounting /mnt/hostfs/mnt/batch/tasks/shared/LS_root/jobs/mlops/azureml/25e0c3d3-cb8b-4c82-8f8a-418d67c5a868/wd/inf

'Finished'

When the pipeline has finished running, the resulting predictions will have been saved in the outputs of the experiment associated with the first (and only) step in the pipeline. You can retrieve it as follows:

In [20]:
import pandas as pd
import shutil

# Remove the local results folder if left over from a previous run
shutil.rmtree('batch-results', ignore_errors=True)

# Get the run for the first step and download its output
prediction_run = next(pipeline_run.get_children())
prediction_output = prediction_run.get_output_data('inferences')
prediction_output.download(local_path='batch-results')

# Traverse the folder hierarchy and find the results file
for root, dirs, files in os.walk('batch-results'):
    for file in files:
        if file.endswith('parallel_run_step.txt'):
            result_file = os.path.join(root,file)

# cleanup output format
df = pd.read_csv(result_file, delimiter=":", header=None)
df.columns = ["File", "Prediction"]

# Display the first 20 results
df.head(20)

Unnamed: 0,File,Prediction
0,1.csv,5760.163854
1,10.csv,7322.598573
2,100.csv,6114.918067
3,101.csv,6157.048048
4,102.csv,6294.140237
5,103.csv,6784.098353
6,104.csv,7327.244487
7,105.csv,7730.343989
8,106.csv,7640.45729
9,107.csv,8247.744176


## Publish the Pipeline and use its REST Interface

Now that you have a working pipeline for batch inferencing, you can publish it and use a REST endpoint to run it from an application.

In [21]:
published_pipeline = pipeline_run.publish_pipeline(name='Linear_regression_batch_prediction_pipeline',
                                                   description='Batch scoring using linear regression model',
                                                   version='1.0')

published_pipeline

Name,Id,Status,Endpoint
Linear_regression_batch_prediction_pipeline,c258f025-db37-4a12-9d23-0342999cd467,Active,REST Endpoint


Note that the published pipeline has an endpoint, which you can see in the Azure portal. You can also find it as a property of the published pipeline object:

In [22]:
rest_endpoint = published_pipeline.endpoint
print(rest_endpoint)

https://westeurope.api.azureml.ms/pipelines/v1.0/subscriptions/1e7eebf7-7dfc-4e94-9219-13ee1fc677f1/resourceGroups/mlops_bootcamp/providers/Microsoft.MachineLearningServices/workspaces/mlops/PipelineRuns/PipelineSubmit/c258f025-db37-4a12-9d23-0342999cd467


To use the endpoint, client applications need to make a REST call over HTTP. This request must be authenticated, so an authorization header is required. To test this out, we'll use the authorization header from your current connection to your Azure workspace, which you can get using the following code:

Note: A real application would require a service principal with which to be authenticated.

In [23]:
from azureml.core.authentication import InteractiveLoginAuthentication

interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()
print('Authentication header ready.')

Authentication header ready.


Once published, you can use this endpoint to initiate a batch inferencing job, as shown in the following example code:

In [24]:
import requests

rest_endpoint = published_pipeline.endpoint
response = requests.post(rest_endpoint, 
                         headers=auth_header, 
                         json={"ExperimentName": "nyc-energy-demand-batch"})
run_id = response.json()["Id"]
run_id

'54c61326-d4bb-4aa4-bc45-4da6abfb2501'

You can also schedule the published pipeline to have it run automatically, as shown in the following example code:

In [None]:
from azureml.pipeline.core import ScheduleRecurrence, Schedule

weekly = ScheduleRecurrence(frequency='Week', interval=1)
pipeline_schedule = Schedule.create(ws, name='Weekly Predictions',
                                        description='batch inferencing',
                                        pipeline_id=published_pipeline.id,
                                        experiment_name='Batch_Prediction',
                                        recurrence=weekly)

IMPORTANT: Remove inference cluster if you do not plan to work on exercises immediately!!!