This is a modified version of https://github.com/Azure/MachineLearningNotebooks/blob/master/how-to-use-azureml/machine-learning-pipelines/parallel-run/tabular-dataset-inference-iris.ipynb
Valerie Carey modified this script to demonstrate issues with Azure when Parquet files are used in a ParallelRunStep

Original Link Copyright (c) Microsoft Corporation. All rights reserved.
Licensed under the MIT License.



<h1> Batch Inference Parquet Issues - Summary </h1>

Modified version of https://github.com/Azure/MachineLearningNotebooks/blob/master/how-to-use-azureml/machine-learning-pipelines/parallel-run/tabular-dataset-inference-iris.ipynb

Here I show that an input dataset based on a Parquet file is NOT split up for batching, making a parallel run for that dataset type nonsensical.

This is a problem for large jobs where you want to batch. I was trying over and over with different mini batch sizes and run invocation imeouts, and with Parquet I always get 1 mini batch (on the experiment summary page), regardless of file size.  However I see multiple processes try to run, presumably because the first won't finish.  I was doing Shapley explanations and so most often I would time out and my experiment would fail.  Shapley explanations are a prime candidate for parallelization because it is time consuming.

I found this issue on Stack Overflow where the response was that you have to split your data into files: https://stackoverflow.com/questions/64869372/azure-ml-python-sdk-mini-batch-size-not-working-as-expected-on-parallelrunconfig\
However this is NOT true for CSV datasets which are split up just fine.  And, official Microsoft documentation implies parquet files are supported, for instance https://techcommunity.microsoft.com/t5/azure-ai/batch-inference-in-azure-machine-learning/ba-p/1417010

The documentation for parallel running does not make it clear that it doesn't work for Parquet.  For example:
* https://docs.microsoft.com/en-us/python/api/azureml-contrib-pipeline-steps/azureml.contrib.pipeline.steps.parallel_run_config.parallelrunconfig?view=azure-ml-py
* https://docs.microsoft.com/en-us/python/api/azureml-contrib-pipeline-steps/azureml.contrib.pipeline.steps.parallelrunstep?view=azure-ml-py


<h4> Prerequisites </h4>

I have deleted the parts of the example code that connect to a workspace, get blob storage, create a compute instance, etc.  I assume you have these resources already.  You can modify the code below to connect to your own resources

<h2> Connect to workspace </h2>
Modify the code below to connect to your own workspace

In [1]:
# Check core SDK version number
import azureml.core

print("SDK version:", azureml.core.VERSION)

SDK version: 1.20.0


In [2]:
WORKSPACE_NAME = 'YOUR-INFO-HERE'
WORKSPACE_SUBSCRIPTION_ID = "YOUR-INFO-HERE"
WORKSPACE_RESOURCE_GROUP = "YOUR-INFO-HERE"

from azureml.core.workspace import Workspace

ws = Workspace.get(name=WORKSPACE_NAME,
               subscription_id=WORKSPACE_SUBSCRIPTION_ID,
               resource_group=WORKSPACE_RESOURCE_GROUP)

<h2> Set your datastore and compute </h2>
Modify the code below to specify your own datastore and blob storage locations

In [3]:
from azureml.core import Datastore

# input datastore
iris_data = Datastore.get(ws, 'YOUR-INFO-HERE')

# path on the datastore where you will store the model object and data files
iris_data_path = 'YOUR-INFO-HERE/.../'

In [4]:
# Name of the compute target which should be provisioned already, and have at least 2 nodes
COMPUTE_TARGET_NAME = 'YOUR-INFO-HERE'

<h2> Get the model object and data </h2>
I get data from the sklearn iris dataset, and the pretrained model object from:
https://github.com/Azure-Samples/Machine-Learning-Operationalization/blob/master/samples/python/code/iris/model.pkl

In [5]:
# Get a temporary location for storage of downloaded items
import tempfile
iris_data_tmpdir = tempfile.mkdtemp()
print(iris_data_tmpdir)

/tmp/tmpdym8_3se


In [6]:
# Get the model object from the Github repo

import requests 

model_url = 'https://github.com/Azure-Samples/Machine-Learning-Operationalization/raw/master/samples/python/code/iris/model.pkl'
r = requests.get(model_url, allow_redirects=True)
model_data_local = os.path.join(iris_data_tmpdir, 'model.pkl')

open(model_data_local, 'wb').write(r.content)

924

In [7]:
# Get the iris dataset, as CSV and Parquet

from sklearn import datasets

iris = datasets.load_iris()

import pandas as pd
iris_df = pd.DataFrame(data=iris['data'], columns = iris['feature_names'])
print(len(iris_df))

# Save as CSV
iris_data_local_csv =  os.path.join(iris_data_tmpdir, 'iris.csv')
iris_df.to_csv(iris_data_local_csv, sep = ',', index = False)

# Save as Parquet
iris_data_local_parquet =  os.path.join(iris_data_tmpdir, 'iris.parquet')
iris_df.to_parquet(iris_data_local_parquet)

150


In [8]:
# list temporary directory contents
os.listdir(iris_data_tmpdir)

['iris.csv', 'iris.parquet', 'model.pkl']

<h2> Create tablular datasets </h2>
Move the iris data to blob storate and create datasets for both CSV and Parquet formats

In [9]:
# Move files to blob storage
iris_data.upload_files(files=[iris_data_local_csv, iris_data_local_parquet],
                              target_path=iris_data_path, overwrite=True)

Uploading an estimated of 2 files
Uploading /tmp/tmpdym8_3se/iris.csv
Uploaded /tmp/tmpdym8_3se/iris.csv, 1 files out of an estimated total of 2
Uploading /tmp/tmpdym8_3se/iris.parquet
Uploaded /tmp/tmpdym8_3se/iris.parquet, 2 files out of an estimated total of 2
Uploaded 2 files


$AZUREML_DATAREFERENCE_71b63019836e4276a9d9a6baabc98afa

In [11]:
# Create datasets
from azureml.core.dataset import Dataset

iris_ds_csv = Dataset.Tabular.from_delimited_files(path=[(iris_data,  
                                                           '/'.join([iris_data_path, 'iris.csv']))], 
                                                           validate=False)
iris_ds_parquet = Dataset.Tabular.from_parquet_files(path=[(iris_data,  
                                                           '/'.join([iris_data_path, 'iris.parquet']))], 
                                                           validate=False)

<h2> Set output folder </h2>
I use the same datastore for outputs as inputs.  You could put in your own information

In [12]:
from azureml.pipeline.core import PipelineData
output_folder = PipelineData(name='inferences', datastore=iris_data)

<h2> Register model with workspace </h2>

In [13]:
from azureml.core.model import Model

# register downloaded model
model = Model.register(model_path = model_data_local,
                       model_name = "iris-prs", # this is the name the model is registered as
                       tags = {'pretrained': "iris"},
                       workspace = ws)

Registering model iris-prs


<h2> Create experiment script </h2>
I use this notebook to output a .py file

In [14]:
# Get a temporary location for storage of downloaded items
import tempfile
py_tmpdir = tempfile.mkdtemp()
print(py_tmpdir)

py_outfile = os.path.join(py_tmpdir, 'iris_score.py')

/tmp/tmpjs3ml8_g


In [15]:
%%writefile $py_outfile
import io
import pickle
import argparse
import numpy as np

from azureml.core.model import Model
from sklearn.linear_model import LogisticRegression
from azureml_user.parallel_run import EntryScript

def init():
    global iris_model

    logger = EntryScript().logger
    logger.info("init() is called.")

    parser = argparse.ArgumentParser(description="Iris model serving")
    parser.add_argument('--model_name', dest="model_name", required=True)
    args, unknown_args = parser.parse_known_args()

    model_path = Model.get_model_path(args.model_name)
    with open(model_path, 'rb') as model_file:
        iris_model = pickle.load(model_file)


def run(input_data):
    logger = EntryScript().logger
    logger.info("run() is called with: {}.".format(input_data))

    # make inference
    num_rows, num_cols = input_data.shape
    logger.info("num rows: {}.".format(num_rows))
    pred = iris_model.predict(input_data).reshape((num_rows, 1))

    # cleanup output
    result = input_data.drop(input_data.columns[4:], axis=1)
    result['variety'] = pred

    return result


Writing /tmp/tmpjs3ml8_g/iris_score.py


In [16]:
os.listdir(py_tmpdir)

['iris_score.py']

<h2> Set up for batch run </h2>

In [17]:
# Set the environment
from azureml.core import Environment
from azureml.core.runconfig import CondaDependencies

predict_conda_deps = CondaDependencies.create(pip_packages=["scikit-learn==0.20.3",
                                                            "azureml-core", "azureml-dataset-runtime[pandas,fuse]"])

predict_env = Environment(name="predict_environment")
predict_env.python.conda_dependencies = predict_conda_deps
predict_env.docker.enabled = True
predict_env.spark.precache_packages = False

In [18]:
# Configure the parallel fun

from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig

# In a real-world scenario, you'll want to shape your process per node and nodes to fit your problem domain.
parallel_run_config = ParallelRunConfig(
    source_directory=py_tmpdir,
    entry_script='iris_score.py',  # the user script to run against each input
    mini_batch_size='1KB',
    error_threshold=5,
    output_action='append_row',
    append_row_file_name="iris_outputs.txt",
    environment=predict_env,
    compute_target=COMPUTE_TARGET_NAME, 
    node_count=2,
    run_invocation_timeout=600
)

<h2> Run the pipeline for the CSV input </h2>
Note that this works fine, and uses 2-3 mini batches/processes

In [19]:
# Create the pipeline step
distributed_csv_iris_step = ParallelRunStep(
    name='example-iris-csv',
    inputs=[iris_ds_csv.as_named_input('iris_data')],
    output=output_folder,
    parallel_run_config=parallel_run_config,
    arguments=['--model_name', 'iris-prs'],
    allow_reuse=False
)

In [20]:
# Run the pipeline
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[distributed_csv_iris_step])

pipeline_run = Experiment(ws, 'iris-prs').submit(pipeline)

Created step example-iris-csv [3c691518][34ab4066-185e-40d9-b7cc-6470f9ce65fc], (This step will run and generate new outputs)
Submitted PipelineRun f3f94f2d-1084-42e6-b84b-ad04608f539b
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/iris-prs/runs/f3f94f2d-1084-42e6-b84b-ad04608f539b?wsid=/subscriptions/47a09743-a743-494e-945d-4022653e134e/resourcegroups/rg-flex-flightrisk-data-001/workspaces/mlw-flightrisk-dev


In [21]:
## Wait the run for completion 
pipeline_run.wait_for_completion(show_output=False)

PipelineRunId: f3f94f2d-1084-42e6-b84b-ad04608f539b
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/iris-prs/runs/f3f94f2d-1084-42e6-b84b-ad04608f539b?wsid=/subscriptions/47a09743-a743-494e-945d-4022653e134e/resourcegroups/rg-flex-flightrisk-data-001/workspaces/mlw-flightrisk-dev
{'runId': 'f3f94f2d-1084-42e6-b84b-ad04608f539b', 'status': 'Completed', 'startTimeUtc': '2021-02-10T16:07:51.00528Z', 'endTimeUtc': '2021-02-10T16:10:54.227702Z', 'properties': {'azureml.runsource': 'azureml.PipelineRun', 'runSource': 'SDK', 'runType': 'SDK', 'azureml.parameters': '{}'}, 'inputDatasets': [], 'outputDatasets': [], 'logFiles': {'logs/azureml/executionlogs.txt': 'https://mlwflightriskd8499840071.blob.core.windows.net/azureml/ExperimentRun/dcid.f3f94f2d-1084-42e6-b84b-ad04608f539b/logs/azureml/executionlogs.txt?sv=2019-02-02&sr=b&sig=29n5q%2BaHRjjuSm%2BtfyJVYcxgQBfQ11axoh2hgHpL1fw%3D&st=2021-02-10T15%3A59%3A42Z&se=2021-02-11T00%3A09%3A42Z&sp=r', 'logs/azureml/stderrlogs.tx

'Finished'

At this point you can view the experiment and it should be successful.  Note that more than 1 mini batches are used!!  I get 3 mini batches
The size of the CSV data on disk (for me) is 2.412 KiB

<h2> Now, try the parquet pipeline </h2>
Note that this works but only uses 1 mini batch!  It doesn't matter what the batch size is, the Parquet file is always just 1 batch.

In [22]:
# Create the pipeline step
distributed_parquet_iris_step = ParallelRunStep(
    name='example-iris-parquet',
    inputs=[iris_ds_parquet.as_named_input('iris_data')],
    output=output_folder,
    parallel_run_config=parallel_run_config,
    arguments=['--model_name', 'iris-prs'],
    allow_reuse=False
)

In [23]:
# Run the pipeline
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[distributed_parquet_iris_step])

pipeline_run = Experiment(ws, 'iris-prs').submit(pipeline)

Created step example-iris-parquet [79769832][6f63348f-456b-4698-81fc-cf4a8775b953], (This step will run and generate new outputs)
Submitted PipelineRun 2d13965f-4ee3-47d4-b64f-4a497e9c23cf
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/iris-prs/runs/2d13965f-4ee3-47d4-b64f-4a497e9c23cf?wsid=/subscriptions/47a09743-a743-494e-945d-4022653e134e/resourcegroups/rg-flex-flightrisk-data-001/workspaces/mlw-flightrisk-dev


In [24]:
## Wait the run for completion 
pipeline_run.wait_for_completion(show_output=False)

PipelineRunId: 2d13965f-4ee3-47d4-b64f-4a497e9c23cf
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/iris-prs/runs/2d13965f-4ee3-47d4-b64f-4a497e9c23cf?wsid=/subscriptions/47a09743-a743-494e-945d-4022653e134e/resourcegroups/rg-flex-flightrisk-data-001/workspaces/mlw-flightrisk-dev
{'runId': '2d13965f-4ee3-47d4-b64f-4a497e9c23cf', 'status': 'Completed', 'startTimeUtc': '2021-02-10T16:11:11.827901Z', 'endTimeUtc': '2021-02-10T16:13:45.428686Z', 'properties': {'azureml.runsource': 'azureml.PipelineRun', 'runSource': 'SDK', 'runType': 'SDK', 'azureml.parameters': '{}'}, 'inputDatasets': [], 'outputDatasets': [], 'logFiles': {'logs/azureml/executionlogs.txt': 'https://mlwflightriskd8499840071.blob.core.windows.net/azureml/ExperimentRun/dcid.2d13965f-4ee3-47d4-b64f-4a497e9c23cf/logs/azureml/executionlogs.txt?sv=2019-02-02&sr=b&sig=9Szyl9eu8UwOG%2BWwTwdPhwo8yyF20kBt2ieRMDycCmc%3D&st=2021-02-10T16%3A01%3A41Z&se=2021-02-11T00%3A11%3A41Z&sp=r', 'logs/azureml/stderrlogs.txt

'Finished'

Note that for this run, you only get 1 mini batch.  The size of data on disk is 4.874 KiB (larger than CSV, and yet only 1 batch is used for Parquet as opposed to 3 with CSV!)