# Parallel Batch Scoring pipeline example

In this example, we'll build a pipeline that is able to batch score data in parallel on one or multiple nodes. This can be used to either score large amounts of data or train many models in parallel.

In [1]:
import os
import azureml.core
from azureml.core import Workspace, Experiment, Dataset, RunConfiguration
from azureml.pipeline.core import Pipeline
from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig
from azureml.data import OutputFileDatasetConfig
from azureml.data.dataset_consumption_config import DatasetConsumptionConfig

print("Azure ML SDK version:", azureml.core.VERSION)


Azure ML SDK version: 1.28.0


First, we will connect to the workspace. The command `Workspace.from_config()` will either:
* Read the local `config.json` with the workspace reference (given it is there) or
* Use the `az` CLI to connect to the workspace and use the workspace attached to via `az ml folder attach -g <resource group> -w <workspace name>`

In [2]:
ws = Workspace.from_config()
print(f'WS name: {ws.name}\nRegion: {ws.location}\nSubscription id: {ws.subscription_id}\nResource group: {ws.resource_group}')

WS name: demo-ent-ws
Region: westeurope
Subscription id: bcbf34a7-1936-4783-8840-8f324c37f354
Resource group: demo


# Preparation

Let's register the provided `model.pkl` as model in our workspace. We'll use this model for batch scoring in the pipeline:

In [3]:
from azureml.core.model import Model
Model.register(model_path="model.pkl",
               model_name="credit_model_tutorial",
               description="Example model for batch scoring tutorial",
               workspace=ws)

Registering model credit_model_tutorial


Model(workspace=Workspace.create(name='demo-ent-ws', subscription_id='bcbf34a7-1936-4783-8840-8f324c37f354', resource_group='demo'), name=credit_model_tutorial, id=credit_model_tutorial:4, version=4, tags={}, properties={})

Let's also register a dataset with data that we want to use for batch scoring (the following dataset is different from the already registered one for training; this one is made by multiple files, so you need to register it also):

In [4]:
from azureml.core import Dataset

datastore = ws.get_default_datastore()

datastore.upload(src_dir='../data-batch-scoring', target_path='german-credit-batch-tutorial', overwrite=True)

ds = Dataset.File.from_files(path=[(datastore, 'german-credit-batch-tutorial')])
ds.register(ws, name='german-credit-batch-tutorial', description='Dataset for batch scoring tutorial', create_new_version=True)

Uploading an estimated of 4 files
Uploading ../data-batch-scoring/german_credit_data_batch_test_00.csv
Uploaded ../data-batch-scoring/german_credit_data_batch_test_00.csv, 1 files out of an estimated total of 4
Uploading ../data-batch-scoring/german_credit_data_batch_test_01.csv
Uploaded ../data-batch-scoring/german_credit_data_batch_test_01.csv, 2 files out of an estimated total of 4
Uploading ../data-batch-scoring/german_credit_data_batch_test_02.csv
Uploaded ../data-batch-scoring/german_credit_data_batch_test_02.csv, 3 files out of an estimated total of 4
Uploading ../data-batch-scoring/german_credit_data_batch_test_03.csv
Uploaded ../data-batch-scoring/german_credit_data_batch_test_03.csv, 4 files out of an estimated total of 4
Uploaded 4 files


{
  "source": [
    "('workspaceblobstore', 'german-credit-batch-tutorial')"
  ],
  "definition": [
    "GetDatastoreFiles"
  ],
  "registration": {
    "id": "e2fb1a77-e1c7-48bf-b897-9816ea287564",
    "name": "german-credit-batch-tutorial",
    "version": 1,
    "description": "Dataset for batch scoring tutorial",
    "workspace": "Workspace.create(name='demo-ent-ws', subscription_id='bcbf34a7-1936-4783-8840-8f324c37f354', resource_group='demo')"
  }
}

Next, let's reference our newly created batch scoring dataset, so that we can use it as the pipeline input:

In [5]:
batch_dataset = Dataset.get_by_name(ws, "german-credit-batch-tutorial")
batch_dataset_consumption = DatasetConsumptionConfig("batch_dataset", batch_dataset).as_download()

Now let's create a output dataset that will contain our predictions. This gives us complete freedom where we want to store the predictions on the datastore:

In [6]:
datastore = ws.get_default_datastore()

# This will put the output results into a pre-defined folder on our datastore and
# optionally register it as a dataset (not required).
# The output file name is defined into the parallel_runconfig.yml file.
output_dataset = OutputFileDatasetConfig(name='batch_results',
                                         destination=(datastore, 'batch-scoring-results/{run-id}')).register_on_complete(name='batch-scoring-results')


Next, we can create a [`ParallelRunStep`](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.parallelrunstep?view=azure-ml-py) that runs our batch scoring code in parallel on one or more nodes. In this case, we use a [`ParallelRunConfig`](https://docs.microsoft.com/en-us/python/api/azureml-contrib-pipeline-steps/azureml.contrib.pipeline.steps.parallel_run_config.parallelrunconfig?view=azure-ml-py) from a YAML file, that defines our batch scoring job (source script, environment, parallelization, target cluster, etc.).

`ParallelRunStep` is used for processing large amounts of data in parallel. It works by breaking up your data into batches that are processed in parallel. The batch size node count, and other tunable parameters to speed up your parallel processing can be controlled with the `ParallelRunConfig` class.

For example, `ParallelRunStep` is used in the [Many Model Solution Accelerator](https://github.com/microsoft/solution-accelerator-many-models/tree/6af9070d785a9c1d6d326d0d8c0c06c45b37b7e5) when training with a [custom script](https://github.com/microsoft/solution-accelerator-many-models/blob/6af9070d785a9c1d6d326d0d8c0c06c45b37b7e5/Custom_Script/02_CustomScript_Training_Pipeline.ipynb).

[Here](https://github.com/Azure/MachineLearningNotebooks/tree/master/how-to-use-azureml/machine-learning-pipelines/parallel-run) more details about Azure ML Batch Inference using the `ParallelRunStep`.

In [7]:
parallel_run_config = ParallelRunConfig.load_yaml(workspace=ws, path="parallel_runconfig.yml")

batch_step = ParallelRunStep(
    name="batch-inference-step",
    parallel_run_config=parallel_run_config,
    arguments=['--model_name', 'credit_model_tutorial'],
    inputs=[batch_dataset_consumption],
    side_inputs=[],
    output=output_dataset,
    allow_reuse=False
)

steps = [batch_step]

Finally, we can create our pipeline object and validate it. This will check the input and outputs are properly linked and that the pipeline graph is a non-cyclic graph:

In [8]:
pipeline = Pipeline(workspace=ws, steps=steps)
pipeline.validate()

Step batch-inference-step is ready to be created [e71189e3]


[]

Lastly, we can submit the pipeline against an experiment:

In [9]:
pipeline_run = Experiment(ws, 'mlops-workshop-pipelines-20210524').submit(pipeline)
pipeline_run.wait_for_completion()

Created step batch-inference-step [e71189e3][d7a3e6da-4940-447c-98a4-aa56db819f2b], (This step will run and generate new outputs)
Submitted PipelineRun b4fa542f-32f9-402a-8234-76cd97ecf9c9
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/b4fa542f-32f9-402a-8234-76cd97ecf9c9?wsid=/subscriptions/bcbf34a7-1936-4783-8840-8f324c37f354/resourcegroups/demo/workspaces/demo-ent-ws&tid=1f053027-5c7a-4f10-8444-ca55e5715f27
PipelineRunId: b4fa542f-32f9-402a-8234-76cd97ecf9c9
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/b4fa542f-32f9-402a-8234-76cd97ecf9c9?wsid=/subscriptions/bcbf34a7-1936-4783-8840-8f324c37f354/resourcegroups/demo/workspaces/demo-ent-ws&tid=1f053027-5c7a-4f10-8444-ca55e5715f27
PipelineRun Status: Running


StepRunId: fe18e68e-b8d3-4684-b642-8529226556c8
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/fe18e68e-b8d3-4684-b642-8529226556c8?wsid=/subscriptions/bcbf34a7-1936-4783-8840-8f324c37f354/resourcegroups/demo/workspaces/demo-


Streaming azureml-logs/75_job_post-tvmps_21892ccbf5da86c1d0c990ba7db029e600adb6221dbeb7a780514fb2a737bcc1_d.txt
[2021-05-25T09:24:21.732647] Entering job release
[2021-05-25T09:24:23.478714] Starting job release
[2021-05-25T09:24:23.479494] Logging experiment finalizing status in history service.
Starting the daemon thread to refresh tokens in background for process with pid = 386[2021-05-25T09:24:23.479824] job release stage : upload_datastore starting...

[2021-05-25T09:24:23.480645] job release stage : start importing azureml.history._tracking in run_history_release.
[2021-05-25T09:24:23.480920] job release stage : execute_job_release starting...
[2021-05-25T09:24:23.483646] job release stage : copy_batchai_cached_logs starting...
[2021-05-25T09:24:23.483912] job release stage : copy_batchai_cached_logs completed...
[2021-05-25T09:24:23.561877] Entering context manager injector.
[2021-05-25T09:24:23.610376] job release stage : send_run_telemetry starting...
[2021-05-25T09:24:23.626



PipelineRun Execution Summary
PipelineRun Status: Finished
{'runId': 'b4fa542f-32f9-402a-8234-76cd97ecf9c9', 'status': 'Completed', 'startTimeUtc': '2021-05-25T09:18:31.107195Z', 'endTimeUtc': '2021-05-25T09:32:16.152438Z', 'properties': {'azureml.runsource': 'azureml.PipelineRun', 'runSource': 'SDK', 'runType': 'SDK', 'azureml.parameters': '{}'}, 'inputDatasets': [], 'outputDatasets': [], 'logFiles': {'logs/azureml/executionlogs.txt': 'https://demoentws5367325393.blob.core.windows.net/azureml/ExperimentRun/dcid.b4fa542f-32f9-402a-8234-76cd97ecf9c9/logs/azureml/executionlogs.txt?sv=2019-02-02&sr=b&sig=SCqdEcrpjjjHWw838PLhPRXoCatQ%2FO0mP%2Fs%2BN55X2ho%3D&st=2021-05-25T09%3A11%3A58Z&se=2021-05-25T17%3A21%3A58Z&sp=r', 'logs/azureml/stderrlogs.txt': 'https://demoentws5367325393.blob.core.windows.net/azureml/ExperimentRun/dcid.b4fa542f-32f9-402a-8234-76cd97ecf9c9/logs/azureml/stderrlogs.txt?sv=2019-02-02&sr=b&sig=CeETTgxzj5XimtpxN3AzLOppUi2XRvi0Tm116Dl1eLA%3D&st=2021-05-25T09%3A11%3A58Z&s

'Finished'

Last but not least, we can now download the resulting dataset and have a look at our predictions. For easy of use, we'll just download it here to a folder named `temp`:

In [10]:
Dataset.get_by_name(ws, "batch-scoring-results").download(target_path="temp/", overwrite=True)

# Read the file content
with open('temp/batch-predictions.txt','r') as f:
    print(f.read())

0 0.06820236095865362 0.9317976390413464
1 0.6843944764926586 0.31560552350734145
2 0.14786576475019952 0.8521342352498005
3 0.6406113601900081 0.3593886398099919
4 0.48906412482859263 0.5109358751714074
5 0.2699403411724228 0.7300596588275772
6 0.07079286684323505 0.929207133156765
7 0.6106057088728849 0.38939429112711516
8 0.018350514598001078 0.9816494854019989
9 0.5025497165991059 0.49745028340089414
10 0.545672695374676 0.454327304625324
11 0.8555835633596343 0.14441643664036577
12 0.3589346747081955 0.6410653252918045
13 0.39338672905056804 0.606613270949432
14 0.6314221266671478 0.36857787333285225
15 0.4375982497304032 0.5624017502695968
16 0.032246317122612056 0.9677536828773879
17 0.43043651877220335 0.5695634812277967
18 0.5802322548178415 0.41976774518215854
19 0.08642474551715895 0.913575254482841
20 0.07493746803499945 0.9250625319650005
21 0.23053073655002376 0.7694692634499762
22 0.42507834477789685 0.5749216552221031
23 0.2294804827823228 0.7705195172176772
24 0.044238