In [1]:
# import required libraries
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
from azure.ai.ml import MLClient, Input, Output, load_component
from azure.ai.ml.dsl import pipeline
from azure.ai.ml.entities import Environment, ResourceConfiguration
from azure.ai.ml.constants import AssetTypes, InputOutputModes
from azure.ai.ml.parallel import parallel_run_function, RunFunction

In [2]:
try:
    credential = DefaultAzureCredential()
    # Check if given credential can get token successfully.
    credential.get_token("https://management.azure.com/.default")
except Exception as ex:
    # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
    credential = InteractiveBrowserCredential()

In [3]:
# Get a handle to workspace
ml_client = MLClient.from_config(credential=credential)

# Retrieve an already attached Azure Machine Learning Compute.
cpu_compute_target = "akacomputenode2"
print(ml_client.compute.get(cpu_compute_target))

Found the config file in: /config.json


enable_node_public_ip: true
id: /subscriptions/1a1e86ad-fe04-43a8-9dc4-0c907e547b71/resourceGroups/rg-dp100-labs/providers/Microsoft.MachineLearningServices/workspaces/mlw-dp100-labs/computes/akacomputenode2
idle_time_before_scale_down: 120
location: eastus2
max_instances: 2
min_instances: 0
name: akacomputenode2
provisioning_state: Succeeded
size: Standard_F4s_v2
ssh_public_access_enabled: false
tier: dedicated
type: amlcompute



In [4]:
# Declare parallel job with run_function task
batch_inferencing_with_mini_batch_size = parallel_run_function(
    name="batch_inferencing_with_mini_batch_size",
    display_name="Batch Inferencing with mini_batch_size",
    description="parallel job to do batch inferencing with mini_batch_size on mltable tabular input",
    tags={
        "azureml_parallel_example": "2a_sdk",
    },
    inputs=dict(
        input_data=Input(
            type=AssetTypes.MLTABLE,
            description="Input tabular mltable data.",
            mode=InputOutputModes.DIRECT,  # [Important] To use mltable tabular data, it is required to use 'direct' mode.
        ),
        score_model=Input(
            type=AssetTypes.URI_FOLDER,
            description="Folder contains the model file.",
            mode=InputOutputModes.DOWNLOAD,
        ),
    ),
    outputs=dict(
        job_output_file=Output(
            type=AssetTypes.URI_FILE,
            mode=InputOutputModes.RW_MOUNT,
        ),
    ),
    input_data="${{inputs.input_data}}",  # Define which input data will be splitted into mini-batches
    mini_batch_size="10kb",  # Use 'mini_batch_size' as the data division method. For tabular input data, it split data by physical size.
    instance_count=2,  # Use 2 nodes from compute cluster to run this parallel job.
    max_concurrency_per_instance=1,  # Create 2 worker processors in each compute node to execute mini-batches.
    error_threshold=5,  # Monitor the failures of item processed by the gap between mini-batch input count and returns. 'Batch inferencing' scenario should return a list, dataframe, or tuple with the successful items to try to meet this threshold.
    mini_batch_error_threshold=5,  # Monitor the failed mini-batch by exception, time out, or null return. When failed mini-batch count is higher than this setting, the parallel job will be marked as 'failed'.
    retry_settings=dict(
        max_retries=2,  # Define how many retries when mini-batch execution is failed by exception, time out, or null return.
        timeout=60,  # Define the timeout in second for each mini-batch execution.
    ),
    logging_level="DEBUG",
    environment_variables={
        "AZUREML_PARALLEL_EXAMPLE": "2a_sdk",
    },
    task=RunFunction(
        code="./script",
        entry_script="iris_prediction.py",
        environment=Environment(
            image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04",
            conda_file="./environment/environment_parallel.yml",
        ),
        program_arguments="--model ${{inputs.score_model}} "  # Passthrough input model folder path into script.
        "--allowed_failed_percent 30 "  # Advanced parallel setting in arguments. Visit https://learn.microsoft.com/en-us/azure/machine-learning/how-to-use-parallel-job-in-pipeline?tabs=cliv2#consider-automation-settings for more details.
        "--task_overhead_timeout 1200 "
        "--progress_update_timeout 600 "
        "--resource_monitor_interval 20 ",
        append_row_to="${{outputs.job_output_file}}",  # Define where to output the aggregated returns from each mini-batches.
    ),
)

In [5]:
# Declare the inputs of the job.
input_iris_data = Input(
    path="./neural-iris-mltable", type=AssetTypes.MLTABLE, mode=InputOutputModes.DIRECT
)
input_model_folder = Input(
    path="./iris-model", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD
)

# Declare pipeline structure.
@pipeline(
    display_name="parallel job for iris batch inferencing",
)
def parallel_job_in_pipeline():
    # Declare parallel inferencing job.
    parallel_train = batch_inferencing_with_mini_batch_size(
        input_data=input_iris_data,
        score_model=input_model_folder,
    )

    # User could override parallel job run-level property when invoke that parallel job/component in pipeline.
    parallel_train.resources.instance_count = 2
    parallel_train.max_concurrency_per_instance = 2
    parallel_train.mini_batch_error_threshold = 10
    parallel_train.outputs.job_output_file.path = "azureml://datastores/${{default_datastore}}/paths/${{name}}/aggregated_returns.csv"


# Create pipeline instance
my_job = parallel_job_in_pipeline()

# Set pipeline level compute
my_job.tags.update
my_job.settings.default_compute = "akacomputenode2"

In [6]:
print(my_job)


Class AutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class AutoDeleteConditionSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseAutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class IntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class ProtectionLevelSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseIntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.


display_name: parallel job for iris batch inferencing
type: pipeline
jobs:
  batch_inferencing_with_mini_batch_size:
    type: parallel
    inputs:
      input_data:
        mode: direct
        type: mltable
        path: azureml:./neural-iris-mltable
      score_model:
        mode: download
        type: uri_folder
        path: azureml:./iris-model
    outputs:
      job_output_file:
        mode: rw_mount
        type: uri_file
        path: azureml://datastores/${{default_datastore}}/paths/${{name}}/aggregated_returns.csv
    resources:
      instance_count: 2
    error_threshold: 5
    input_data: ${{inputs.input_data}}
    logging_level: DEBUG
    max_concurrency_per_instance: 2
    mini_batch_error_threshold: 10
    mini_batch_size: '10240'
    retry_settings:
      timeout: 60
      max_retries: 2
    task:
      type: run_function
      code: /mnt/batch/tasks/shared/LS_root/mounts/clusters/shanazure311/code/batch-inference/script
      entry_script: iris_prediction.py
      

In [7]:
pipeline_job = ml_client.jobs.create_or_update(
    my_job,
    experiment_name="hello-world-parallel-job",
)
pipeline_job

pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.UriFileJobOutput'> and will be ignored


Experiment,Name,Type,Status,Details Page
hello-world-parallel-job,careful_star_17zy9sfpgt,pipeline,NotStarted,Link to Azure Machine Learning studio
