## References
- https://github.com/Azure/MachineLearningNotebooks/blob/master/how-to-use-azureml/machine-learning-pipelines/parallel-run/file-dataset-image-inference-mnist.ipynb

In [1]:
import pandas as pd
import numpy as np
import azureml.core
from azureml.core import Workspace
from azureml.core.authentication import ServicePrincipalAuthentication

from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.image import ContainerImage, Image
from azureml.core.model import Model
from azureml.data.dataset_consumption_config import DatasetConsumptionConfig
from azureml.pipeline.core import PipelineParameter
from azureml.core import Environment, Experiment
from azureml.core import Datastore, Dataset
from azureml.core import Keyvault
from azureml.data.datapath import DataPath, DataPathComputeBinding
from azureml.pipeline.steps import PythonScriptStep

In [2]:
#keyvault = ws.get_default_keyvault()
#key =keyvault.get_secret('adlsgen6key')

In [3]:
ws = Workspace.from_config()
keyvault = ws.get_default_keyvault()

tenant_id = keyvault.get_secret("tenantid")
client_id = keyvault.get_secret("clientid")
client_secret = keyvault.get_secret("clientsecret")
sablobsampleaccountkey = keyvault.get_secret("sablobsampleaccountkey")
'''
sp = ServicePrincipalAuthentication(tenant_id=tenant_id, # tenantID
                                    service_principal_id=client_id, # clientId
                                    service_principal_password=client_secret) # clientSecret

subscription_id = '7e48a1e8-8d3e-4e00-8bc0-098c43f5ace7'

# Azure Machine Learning resource group NOT the managed resource group
resource_group = 'rg-mlops-demo-dev' 

#Azure Machine Learning workspace name, NOT Azure Databricks workspace
workspace_name = 'ws-demo' 
ws = Workspace.get(name=workspace_name,
                   auth=sp,
                   subscription_id=subscription_id
                  ,resource_group=resource_group)
#ws.get_details()
dstore = ws.get_default_datastore()
'''


In [4]:
model_name = "diabetes"
model_version = 5


model_list = Model.list(workspace=ws)
model = [m for m in model_list if m.version == model_version and m.name == model_name]
if len(model) > 0:
    model = model[0]
else:
    print("No model found")
    
print(
    "Model picked: {} \nModel Description: {} \nModel Version: {}".format(
        model.name, model.description, model.version
    )
)

Model picked: diabetes 
Model Description: None 
Model Version: 5


In [5]:
import os
from azureml.core.compute import AmlCompute, ComputeTarget

# choose a name for your cluster
compute_name = "cpu-cluster"
# This example uses CPU VM. For using GPU VM, set SKU to STANDARD_NC6
vm_size =  "STANDARD_D2_V2"


if compute_name in ws.compute_targets:
    compute_target = ws.compute_targets[compute_name]
    if compute_target and type(compute_target) is AmlCompute:
        print('found compute target. just use it. ' + compute_name)
else:
    print('creating a new compute target...')
    provisioning_config = AmlCompute.provisioning_configuration(vm_size = vm_size,
                                                                min_nodes = 0, 
                                                                max_nodes = 4)

    # create the cluster
    compute_target = ComputeTarget.create(ws, compute_name, provisioning_config)
    
    # can poll for a minimum number of nodes and for a specific timeout. 
    # if no min node count is provided it will use the scale settings for the cluster
    compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)
    
     # For a more detailed view of current AmlCompute status, use get_status()
    print(compute_target.get_status().serialize())

found compute target. just use it. cpu-cluster


In [6]:
from azureml.core import Environment
from azureml.core.model import InferenceConfig

#os.chdir("./code/scoring")
env = Environment.from_conda_specification(name = "diabetesenv",
                                             file_path = "./code/scoring/conda_dependencies.yml")


In [7]:
from azureml.core.dataset import Dataset

#datastore_name="dstore_diabetes"
#adls_datastore = ws.datastores[datastore_name]

datastore_name="dstore_blob_diabetes"

if datastore_name in ws.datastores:
    blob_datastore = ws.datastores[datastore_name]
else:
    print("Creating datastore")
    blob_datastore = Datastore.register_azure_blob_container(
       workspace=ws,
       datastore_name=datastore_name,
       container_name="diabetes", # subscription id of ADLS account
       account_name="sablobsample", # ADLS account name
       account_key = sablobsampleaccountkey)
    
#path_on_datastore = adls_datastore.path('diabetes_inference')
input_diabetes_ds = Dataset.File.from_files(path=(blob_datastore, "/diabetes_inference"), validate=False)

In [10]:
datapath = DataPath(datastore=blob_datastore, path_on_datastore='diabetes_inference1')
datapath_param = PipelineParameter(name="output_datapath", default_value=datapath)
#datapath_input = (datapath_param, DataPathComputeBinding(mode='mount'))
datapath_output = (datapath_param, DataPathComputeBinding(mode='mount'))

In [13]:
Dataset.File.from_files(path=datapath_output)

UserErrorException: UserErrorException:
	Message: Invalid tuple for path. Please make sure the tuple consists of a datastore and a path/SQL query
	InnerException None
	ErrorResponse 
{
    "error": {
        "code": "UserError",
        "message": "Invalid tuple for path. Please make sure the tuple consists of a datastore and a path/SQL query"
    }
}

In [9]:
ds_prep_step = PythonScriptStep(
    name='ds_prep_step',
    source_directory="./code",
    script_name="ds_prep_step.py",
    arguments=["--arg1", datapath_output],
    #inputs=[datapath_input],
    outputs=[datapath_output],
    compute_target=compute_target
    )
print("train_step created")

ValueError: Unexpected output type: <class 'azureml.pipeline.core.graph._PipelineIO'>

In [67]:
pipeline_param = PipelineParameter(name="diabetes_param", default_value=input_diabetes_ds)
    input_diabetes_ds_consumption = DatasetConsumptionConfig("diabetes_param_config", pipeline_param).as_mount()

In [68]:
from azureml.pipeline.core import Pipeline, PipelineData

output_dir = PipelineData(name="/diabetes_inference_results", datastore=blob_datastore)

In [69]:
from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig

parallel_run_config = ParallelRunConfig(
    source_directory="./code",
    entry_script="scoring/score_with_parallelrun.py",
    mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"),
    error_threshold=10,
    output_action="append_row",
    append_row_file_name="mnist_outputs.txt",
    environment=env,
    compute_target=compute_target,
    process_count_per_node=PipelineParameter(name="process_count_param", default_value=2),
    node_count=2
)

In [70]:
parallelrun_step = ParallelRunStep(
    name="predict-diabetes",
    parallel_run_config=parallel_run_config,
    inputs=[ input_diabetes_ds_consumption ],
    output=output_dir,
    allow_reuse=False
)

## Pass list as dataset to ParallelRunStep

In [77]:
Union(range(100))

NameError: name 'Union' is not defined

In [76]:
pipeline_param_id = PipelineParameter(name="diabetes_id", default_value=list(range(100)))
input_diabetes_id_ds_consumption = DatasetConsumptionConfig("diabetes_id_param_config", pipeline_param_id).as_mount()

parallelrun_step = ParallelRunStep(
    name="predict-diabetes",
    parallel_run_config=parallel_run_config,
    inputs=[ input_diabetes_id_ds_consumption ],
    output=output_dir,
    allow_reuse=False
)

ValueError: Default value is of unsupported type: list

In [71]:
pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])

In [72]:
#path_on_datastore = mnist_data.path('mnist/0.png')
new_diabetets_ds = Dataset.File.from_files(path=(blob_datastore, "/diabetes_inference1"), validate=False)

In [73]:
pipeline_run_2 = Experiment(ws, 'diabetes_pred').submit(pipeline, 
                                   pipeline_parameters={"diabetes_param": new_diabetets_ds, 
                                                        "batch_size_param": "1",
                                                        "process_count_param": 1}
)

Created step predict-diabetes [1012e7eb][685420f5-8cb1-40e1-97e2-9ee772d0ba92], (This step will run and generate new outputs)
Submitted PipelineRun 8c40d80c-02ef-487e-b184-984920ea6df0
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/8c40d80c-02ef-487e-b184-984920ea6df0?wsid=/subscriptions/7e48a1e8-8d3e-4e00-8bc0-098c43f5ace7/resourcegroups/rg-mlops-demo-dev/workspaces/ws-demo&tid=72f988bf-86f1-41af-91ab-2d7cd011db47


In [74]:
published_pipeline1 = pipeline_run_2.publish_pipeline(
     name="Diabetes inference Pipeline",
     description="Diabetes inference Pipeline",
     version="1.0")