In [1]:
from azure.ai.ml import MLClient
from azure.ai.ml import command, Input, Output, load_component
from azure.identity import DefaultAzureCredential
from azure.ai.ml.parallel import parallel_run_function, RunFunction
from azure.ai.ml.constants import AssetTypes, InputOutputModes

In [2]:
sub_id = "b746917e-ceb7-4ae0-81e6-3ccd893cb0de"
rg = "dpv2"
workspace = "dpv2-wks"

ml_client = MLClient(DefaultAzureCredential(), sub_id, rg, workspace)

In [3]:
print(ml_client)

MLClient(credential=<azure.identity._credentials.default.DefaultAzureCredential object at 0x000001E53A375120>,
         subscription_id=b746917e-ceb7-4ae0-81e6-3ccd893cb0de,
         resource_group_name=dpv2,
         workspace_name=dpv2-wks)


In [4]:
prep_data = load_component(path="./src/prepare_data.yml")

In [5]:
print(prep_data)

$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
name: prepare_file_tabular_data
version: '1'
display_name: Prepare File Tabular Data
type: command
inputs:
  input_data:
    type: uri_folder
outputs:
  file_output_data:
    type: mltable
  tabular_output_data:
    type: mltable
command: python prepare_data.py --input_data ${{inputs.input_data}} --file_output_data
  ${{outputs.file_output_data}} --tabular_output_data ${{outputs.tabular_output_data}}
environment: azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu:5
code: ./
is_deterministic: true
tags: {}



In [6]:
from azure.ai.ml.entities import Environment

file_batch_inf = parallel_run_function(
    name = "file_batch_score",
    display_name="display: batch score with file dataset",
    description="description: batch score with file dataset",
    # metang: dict is just a way of to create inputs. input has 3 types of signatures. 
    # metang, dict to create the inputs using (one=1, two=2) format
    # metang, below two format are equal
    # method 1
    # inputs={
    # "job_data_path": Input(
    #        type=AssetTypes.MLTABLE,
    #        description="non dict approach create input"
    #    ),
    # }
    # method 2
    # inputs=dict(
    #    job_data_path=Input(
    #        type=AssetTypes.MLTABLE,
    #        description="the data to be split and scored in parallel",
    #    )
    # )
    inputs=dict(  
        job_data_path=Input(
            # metang: Is here file data type also as MLTable. What's the difference between this parallel step and the tabluar batch step below
            type=AssetTypes.MLTABLE,
            description="the data to be split and scored in parallel",
        )
    ),
    outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)),
    #outputs={
    #    "job_output_path" : Output(
    #        type = AssetTypes.MLTABLE
    #    )
    #},
    input_data="${{inputs.job_data_path}}",
    instance_count=2,
    max_concurrency_per_instance=1,
    mini_batch_size="1",
    mini_batch_error_threshold=1,
    retry_settings=dict(max_retries=2, timeout=60),
    logging_level="DEBUG",
    task=RunFunction(
        code = "./src",
        # metang: why we use entry_script, instead of like how we define commmand, just use command="python ..."?
        entry_script="file_batch_inference.py",
        program_arguments="--job_output_path ${{outputs.job_output_path}}",
        environment="azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu:1",        
    ),
)

In [7]:


tabular_batch_inf = parallel_run_function(
    name = "batch_score_with_tabular_input",
    display_name="display: Batch Score with Tabular Dataset",
    description="description: parallel component for batch score",
    inputs=dict(
        job_data_path=Input(
            type=AssetTypes.MLTABLE,
            description="description: the data to be split and scored",
        ),
        score_model = Input(
            type=AssetTypes.MLTABLE,
            description="description: input model"
        ),
    ),
    outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)),
    # metang: is input_data a fixed input
    input_data="${{inputs.job_data_path}}", # i was using input_data_path, and caused ""evalue": "(UserError) Value cannot be null. "
    instance_count=2,
    max_concurrency_per_instance=2,
    mini_batch_size="100",
    mini_batch_error_threshold=5,
    logging_level="DEBUG",
    retry_settings=dict(max_retries=2, timeout=60),
    task=RunFunction(
        code = "./src",
        entry_script="tabular_batch_inference.py",
        environment=Environment(
            image="mcr.microsoft.com/azureml/openmpi3.1.2-ubuntu18.04",
            conda_file="./src/environment_parallel.yml",
        ),
        append_row_to="${{outputs.job_output_path}}",
        program_arguments="--model ${{inputs.score_model}} "
        "--job_output_path ${{outputs.job_output_path}} "
        # metang: not understand why we need use --error_threshold 5 here again. why do we need define mini_batch_error_threshold=5 in code 4 lines above, not in program_argumentics here.
        "--error_threshold 5 "
        "--allowed_failed_percent 30 "
        "--task_overhead_timeout 1200 "
        "--progress_update_timeout 600 "
        "--first_task_creation_timeout 600 "
        "--copy_logs_to_parent True "
        "--resource_monitor_interva 20 "  
    ),
)

In [8]:
from azure.ai.ml.dsl import pipeline

@pipeline()
def parallel_in_pipeline(job_data_path, score_model_path):

    # step 1: prep_data
    prep_data_step = prep_data(input_data=job_data_path)
    # metang: why we need define the output type again. This is already defined in the component yaml?
    prep_data_step.outputs.file_output_data.type = AssetTypes.MLTABLE
    prep_data_step.outputs.tabular_output_data.type = AssetTypes.MLTABLE

    # step 2: batch process data (prs)
    batch_inf_file_step = file_batch_inf(
        job_data_path = prep_data_step.outputs.file_output_data
    )
    # metang: it's interesting to see that define the input type is not in the prs constructor parallel_run_function. That function expose the input, but the actual input needs to be defined the type in the pipeline constructor
    batch_inf_file_step.inputs.job_data_path.mode = (
        InputOutputModes.EVAL_MOUNT
    )
    batch_inf_file_step.outputs.job_output_path.type = AssetTypes.MLTABLE

    # step 3: batcn inferencing (prs)
    batch_inf_tabular_step = tabular_batch_inf(
        job_data_path = prep_data_step.outputs.tabular_output_data,
        score_model=score_model_path,        
    )
    # metang: direct mode mean using data asset id, isn't it?
    batch_inf_tabular_step.inputs.job_data_path.mode = (
        InputOutputModes.DIRECT
    )
    
    return {
        "job_output_file": batch_inf_file_step.outputs.job_output_path,
        "job_output_tabular": batch_inf_tabular_step.outputs.job_output_path,
    }

In [9]:
job_data_path = Input(
    path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT
)
score_model_path = Input(
    path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD
)

In [10]:


job = parallel_in_pipeline(job_data_path=job_data_path, score_model_path=score_model_path)


In [11]:
# metang: why here we dond define job.outputs.job_output_file type, but only define job.outputs.job_output_tabular??
job.outputs.job_output_tabular.type = AssetTypes.URI_FILE

job.settings.default_compute = "cpu-cluster"

In [12]:
ret_job = ml_client.jobs.create_or_update(job, experiment_name="prs_demo")



In [13]:
ret_job.services["Studio"].endpoint

'https://ml.azure.com/runs/sleepy_oxygen_dq2jbnb1gz?wsid=/subscriptions/b746917e-ceb7-4ae0-81e6-3ccd893cb0de/resourcegroups/dpv2/workspaces/dpv2-wks&tid=9c37dc6f-e051-496f-832c-e4fcf894c611'

credential ok
