In [1]:
import json
from datetime import datetime
import azureml.core
from azureml.core import Workspace, Datastore, Dataset, Environment, Experiment
from azureml.data import FileDataset
from azureml.data.dataset_consumption_config import DatasetConsumptionConfig
from azureml.pipeline.core import Pipeline, PipelineDraft, PipelineData, PipelineParameter
from azureml.core.compute import ComputeTarget, AmlCompute, DataFactoryCompute
from azureml.pipeline.steps import DataTransferStep
from azureml.data.datapath import DataPath, DataPathComputeBinding
from azureml.data.data_reference import DataReference

print(azureml.core.VERSION)
version = dict(zip(['major','minor','patch'], azureml.core.VERSION.split('.')))
ws = Workspace.from_config()

1.10.0


In [2]:
if int(version['major']) >= 1: 
    if int(version['minor']) == 10:
        from azureml.pipeline.steps import ParallelRunConfig, ParallelRunStep
    else:
        from azureml.contrib.pipeline.steps import ParallelRunConfig, ParallelRunStep 

In [3]:
compute_name = "aml-compute1"
vm_size = "STANDARD_DS1_v2"
if compute_name in ws.compute_targets:
    compute_target = ws.compute_targets[compute_name]
    if compute_target and type(compute_target) is AmlCompute:
        print('Found compute target: ' + compute_name)
else:
    print('compute target not found, refer to 02_create_compute_cluster to create compute target...')

Found compute target: aml-compute1


In [4]:
adf_name = 'datafactory1'
if adf_name in ws.compute_targets:
    adf_target = ws.compute_targets[adf_name]
    if adf_target and type(adf_target) is DataFactoryCompute:
        print('Found adf target: ' + adf_name)
else:
    print('compute target not found, refer to 02a_create_compute_ADF to attach ADF target...')

Found adf target: datafactory1


In [5]:
datastore_name = 'godzilla'
if datastore_name in ws.datastores:
    datastore = ws.datastores[datastore_name]
    if datastore and type(datastore) is Datastore: 
        print('Found datastore: ' + datastore_name)
else: 
    print('datastore not found...')

images_dataset_name = 'images_partition'
path_on_datastore = datastore.path('images')
input_images_dataset = Dataset.File.from_files(path=path_on_datastore, validate=False)
output_data_path = DataPath(datastore=datastore, path_on_datastore='output')

In [8]:
# get side input dataset

metadata_dataset_name =  'metadata_ds'

metadata_ds = Dataset.get_by_name(workspace=ws, name=metadata_dataset_name)
if metadata_ds and type(metadata_ds) is FileDataset: 
    print('Found metadata: ' + ', '.join(metadata_ds.to_path()) )
else: 
    print('dataset not found, refer to 01a_register_metadata_dataset.ipynb to create and register metadata dataset')
  
metadata_config = metadata_ds.as_named_input('metadata_input').as_mount()

Found metadata: /20200810_images.csv


In [9]:
print(metadata_config)

<azureml.data.dataset_consumption_config.DatasetConsumptionConfig object at 0x000001EE9CBA0DF0>


In [10]:
step_output_dir = PipelineData(
    name="scores", 
    datastore=ws.get_default_datastore(),
    # datastore=datastore,
    output_path_on_compute="batchscoring/results")

In [11]:
step_output_dir

$AZUREML_DATAREFERENCE_scores

# pipeline parameters

In [13]:
# kv = ws.get_default_keyvault()
# print(len(kv.get_secret(pipeline_kv_readapi)))
pipeline_inpart = PipelineParameter(name="pipeline_inpart", default_value='2020/08/10')
pipeline_metadata = PipelineParameter(name="pipeline_metadata", default_value='20200810_images.csv')
pipeline_kv_customimg = PipelineParameter(name="pipeline_kv_customimg", default_value='api-custom-vision')
pipeline_kv_readapi = PipelineParameter(name="pipeline_kv_readapi", default_value='api-readapi')
pipeline_dataset_param = PipelineParameter(name='pipeline_dataset_id', default_value=input_images_dataset)
pipeline_dataset_output = PipelineParameter(name='pipeline_output_dataset_id', default_value=output_data_path)


# reference (pipeline_dataset_output, DataPathComputeBinding())

In [14]:
print(pipeline_inpart)
print(pipeline_dataset_param)

PipelineParameter_Name:pipeline_inpart_Default:2020/08/10
PipelineParameter_Name:pipeline_dataset_id_Default:FileDataset
{
  "source": [
    "('godzilla', 'images')"
  ],
  "definition": [
    "GetDatastoreFiles"
  ]
}


# python envinronment configuration 

In [15]:
# conda dependencies 
env_name = 'MAG-ParallelRunEnv'
print(env_name in ws.environments)
if env_name in ws.environments: 
    env = ws.environments.get(env_name)
    if env and type(env) is Environment: 
        print('Found environment: ' + env_name)
else: 
    print('environment not found, refer to 01_config_notebook to register envinroment...')


True
Found environment: MAG-ParallelRunEnv


# parallel run config and step

In [17]:
parallelrunconfig = ParallelRunConfig(
    environment=env, 
    entry_script='minibatch_process.py', 
    error_threshold=1,
    output_action='append_row',
    compute_target=compute_target, 
    node_count=1, 
    process_count_per_node=2,
    mini_batch_size='2',
    source_directory='scripts', 
    description='description of batch step config',
    logging_level='INFO'
)


In [18]:


#parallelrunconfig('script', 'other stuff')
parallelrunstep = ParallelRunStep(
    name='cv-detection-batch-dataset-step', 
    parallel_run_config=parallelrunconfig, 
    inputs=[DatasetConsumptionConfig('dataset_param_config', pipeline_dataset_param).as_mount()], 
    # inputs=[ Dataset.File.from_files((godzilla_datastore, 'images')).as_named_input('anpr_images').as_mount()],
    # inputs=[]
    side_inputs=[metadata_config], 
    output=step_output_dir,
    arguments=[
        '--input_partition', pipeline_inpart, 
        '--metadata_config', metadata_config,
        '--metadata', pipeline_metadata,
        '--kv_customimage', pipeline_kv_customimg, 
        '--kv_readapi', pipeline_kv_readapi], 
    allow_reuse=False
)


# ('config', 'inputs as mount', 'arguments passing in pipeline args')

# data transfer step 

to move score data from out temp storage to blog storage

https://github.com/Azure/MachineLearningNotebooks/blob/master/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-data-transfer.ipynb


https://github.com/Azure/MachineLearningNotebooks/blob/master/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-showcasing-datapath-and-pipelineparameter.ipynb 

In [15]:
adf_target

DataFactoryCompute(workspace=Workspace.create(name='magaml', subscription_id='907c8efc-c2c8-4c49-a4e1-aeb880e10c88', resource_group='aml'), name=datafactory1, id=/subscriptions/907c8efc-c2c8-4c49-a4e1-aeb880e10c88/resourceGroups/aml/providers/Microsoft.MachineLearningServices/workspaces/magaml/computes/datafactory1, type=DataFactory, provisioning_state=Succeeded, location=australiaeast, tags=None)

In [16]:


transfer_output_toblob = DataTransferStep(
    name="transfer output to blob", 
    source_data_reference=step_output_dir, 
    source_reference_type='directory',
    destination_data_reference=(pipeline_dataset_output, DataPathComputeBinding()), 
    destination_reference_type='directory', 
    compute_target=adf_target
)

transfer_output_toblob.run_after(parallelrunstep)

In [17]:
transfer_output_toblob

<azureml.pipeline.steps.data_transfer_step.DataTransferStep at 0x22d52f3bbe0>

# prepare pipeline 

1. run pipeline
2. create a draft pipeline: draft pipelines represent a mutable pipeline which can be used to submit runs and create Published Pipelines. draft pipelines can be iterated using `PipelineDrafts` 

#2. publish and print end point



In [18]:
pipeline = Pipeline(workspace=ws, steps=[parallelrunstep,transfer_output_toblob])





In [20]:

# pipelinedraft = PipelineDraft.create(
#     workspace=ws, 
#     pipeline=pipeline,
#     name= f'ANPR Batch Scoring {strDate} {strTime}', 
#     experiment_name='ANPR Batch Scoring', 
#     continue_on_step_failure=True, 
#     tags={'dev': 'true'}, 
#     properties={'date': strDate, 'time': strTime}
# )
# pipeline_draft = PipelineDraft.create(workspace=ws,
#                                          name="TestPipelineDraft",
#                                          description="draft description",
#                                          experiment_name="helloworld",
#                                          pipeline=pipeline,
#                                          continue_on_step_failure=True,
#                                          tags={'dev': 'true'},
#                                          properties={'train': 'value'})

In [21]:
pipeline_run = Experiment(ws, 'MAG-batch-paramdataset').submit(pipeline)
pipeline_run.wait_for_completion(show_output=True)

4250-a688-b2299980f5e1 as folder.
Processing 'metadata_input'
Execution failed with rslex.
Fallback to clex.
Processing dataset FileDataset
{
  "source": [
    "('godzilla', 'metadata')"
  ],
  "definition": [
    "GetDatastoreFiles"
  ],
  "registration": {
    "id": "c2a04b5d-a5e1-4adb-92ad-5055edce627c",
    "name": "metadata_ds",
    "version": 1,
    "description": "anpr input images metadata",
    "workspace": "Workspace.create(name='magaml', subscription_id='907c8efc-c2c8-4c49-a4e1-aeb880e10c88', resource_group='aml')"
  }
}
Mounted metadata_input to /mnt/batch/tasks/shared/LS_root/jobs/magaml/azureml/385c9097-e94a-400a-942e-0abfba1ec97b/mounts/godzilla/metadata
Exit __enter__ of DatasetContextManager
Entering Run History Context Manager.
Preparing to call script [ driver/amlbi_main.py ] with arguments: ['--client_sdk_version', '1.10.0', '--scoring_module_name', 'minibatch_process.py', '--mini_batch_size', '2', '--error_threshold', '1', '--output_action', 'append_row', '--loggin

'Finished'

In [24]:
dtnow = datetime.now()
strDate = dtnow.strftime('%Y/%m/%d')
strTime = dtnow.strftime('%H:%M')

In [25]:
published_pipeline = pipeline_run.publish_pipeline(
    name='MAG-batchscore-dataset', 
    description=f'published pipeline {strDate} {strTime} ', 
    version='2.0', 
    continue_on_step_failure=True)
print(published_pipeline.endpoint)

https://australiaeast.api.azureml.ms/pipelines/v1.0/subscriptions/907c8efc-c2c8-4c49-a4e1-aeb880e10c88/resourceGroups/aml/providers/Microsoft.MachineLearningServices/workspaces/magaml/PipelineRuns/PipelineSubmit/01bb90bc-0b7a-4d38-b7ec-aae9808df60d
