# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license.

# Get ML Workspace

In [1]:
import azureml.core
from azureml.core import Workspace, Experiment, Datastore, Environment, Dataset
from azureml.core.compute import AmlCompute
from azureml.core.compute import ComputeTarget
from azureml.core.compute_target import ComputeTargetException
from azureml.widgets import RunDetails
from azureml.core.authentication import InteractiveLoginAuthentication

import os
from dotenv import load_dotenv

In [2]:
load_dotenv() 

True

In [3]:
# Initilize Workspace

ws_name = os.environ['AML_WORKSPACE_NAME']
subscription_id = os.environ['AML_SUBSCRIPTION_ID']
resource_group = os.environ['AML_RESOURCE_GROUP']
tenant_id = os.environ['AML_TENANT_ID']

interactive_auth = InteractiveLoginAuthentication(tenant_id=tenant_id)

ws = Workspace(workspace_name=ws_name,
                   subscription_id=subscription_id,
                   resource_group=resource_group,
                   auth=interactive_auth)

print(ws.get_details())

{'id': '/subscriptions/0b3f04a9-6375-4341-a513-dd53731a99a4/resourceGroups/dstoolkit-route-optimization/providers/Microsoft.MachineLearningServices/workspaces/amlrouteoptimization', 'name': 'amlrouteoptimization', 'identity': {'principal_id': '6a398b87-cd4d-4ff3-b66f-344edbdd9bd5', 'tenant_id': '72f988bf-86f1-41af-91ab-2d7cd011db47', 'type': 'SystemAssigned'}, 'location': 'australiaeast', 'type': 'Microsoft.MachineLearningServices/workspaces', 'tags': {}, 'sku': 'Basic', 'workspaceid': 'bb87308f-2811-4bdc-8c73-339822d22a39', 'sdkTelemetryAppInsightsKey': 'bd22fe07-ca7e-4337-bd98-3d5116261c1f', 'description': '', 'friendlyName': 'amlrouteoptimization', 'keyVault': '/subscriptions/0b3f04a9-6375-4341-a513-dd53731a99a4/resourceGroups/dstoolkit-route-optimization/providers/Microsoft.Keyvault/vaults/amlrouteoptimi7471905268', 'applicationInsights': '/subscriptions/0b3f04a9-6375-4341-a513-dd53731a99a4/resourceGroups/dstoolkit-route-optimization/providers/Microsoft.insights/components/amlroute

# Get Compute Cluster

In [4]:
# Retrieve or create an Aml compute
min_nodes = int(os.environ['AML_MIN_NODES'])
max_nodes = int(os.environ['AML_MAX_NODES'])

aml_compute_target = os.environ['AML_COMPUTE_NAME']
try:
    aml_compute = AmlCompute(ws, aml_compute_target)
    print("found existing compute target.")
except ComputeTargetException:
    print("creating new compute target")
    
    provisioning_config = AmlCompute.provisioning_configuration(vm_size = "STANDARD_D2_V2",
                                                                min_nodes = min_nodes, 
                                                                max_nodes = max_nodes)    
    aml_compute = ComputeTarget.create(ws, aml_compute_target, provisioning_config)
    aml_compute.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)

creating new compute target
InProgress.
SucceededProvisioning operation finished, operation "Succeeded"
Succeeded
AmlCompute wait for completion finished

Minimum number of nodes requested have been provisioned


# Create Run Configuration

In [5]:
# Default datastore (Azure blob storage)
def_blob_store = ws.get_default_datastore()

# source directory
source_directory = '../src'
    
print(f'Source code is in {source_directory} directory.')

Source code is in ../src directory.


In [6]:
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import DEFAULT_CPU_IMAGE

# create a new runconfig object
run_config = RunConfiguration()

# environment
env = Environment('op-env')

# enable Docker 
env.docker.enabled = True
# set Docker base image to the default CPU-based image
env.docker.base_image = DEFAULT_CPU_IMAGE
# use conda_dependencies.yml to create a conda environment in the Docker image for execution
env.python.user_managed_dependencies = False
# specify CondaDependencies obj
env.python.conda_dependencies = CondaDependencies.create(conda_packages=['or-tools'])

# set environment
run_config.environment = env

'enabled' is deprecated. Please use the azureml.core.runconfig.DockerConfiguration object with the 'use_docker' param instead.


# Step 1: Reduce the search space of the problem

In [10]:
from azureml.pipeline.core import Pipeline, PipelineParameter, PipelineData
from azureml.pipeline.steps import PythonScriptStep

from azureml.data.data_reference import DataReference

# Model input
order_file = os.environ['MODEL_INPUT_ORDER_FILE']
distance_file = os.environ['MODEL_INPUT_DISTANCE_FILE']

# List of packages to be delivered
order = DataReference(
    datastore=def_blob_store,
    data_reference_name="order",
    path_on_datastore=order_file)

# The pair-wise distance between cities
distance = DataReference(
    datastore=def_blob_store,
    data_reference_name="distance",
    path_on_datastore=distance_file)

# Naming the intermediate data 
model_result_partial = PipelineData("model_result_partial",datastore=def_blob_store)
model_input_reduced = PipelineData("model_input_reduced",datastore=def_blob_store)

reduce_step = PythonScriptStep(
    script_name="reduce.py", 
    arguments=["--order", order, "--distance", distance, "--model_result_partial", model_result_partial, "--model_input_reduced", model_input_reduced],
    inputs=[order, distance],
    outputs=[model_result_partial, model_input_reduced],
    compute_target=aml_compute, 
    source_directory=source_directory,
    runconfig=run_config
)

# Step 2: Partition the problem

In [11]:
# Naming the intermediate data 
model_input_list = PipelineData("model_input_list",datastore=def_blob_store)

parition_step = PythonScriptStep(
    script_name="partiition.py", 
    arguments=["--model_input_reduced", model_input_reduced, "--model_input_list", model_input_list],
    inputs=[model_input_reduced],
    outputs=[model_input_list],
    compute_target=aml_compute, 
    source_directory=source_directory,
    runconfig=run_config
)

# Step 3: Solve individual problem

In [12]:
from azureml.contrib.pipeline.steps import ParallelRunStep, ParallelRunConfig

# Naming the intermediate data 
model_result_list = PipelineData("model_result_list",datastore=def_blob_store)

parallel_run_config = ParallelRunConfig(
    source_directory=source_directory,
    entry_script='solve.py',
    mini_batch_size="1",
    error_threshold=1,
    output_action="append_row",
    environment=env,
    compute_target=aml_compute,
    node_count=max_nodes)

solve_step = ParallelRunStep(
    name="solve",
    parallel_run_config=parallel_run_config,
    inputs=[model_input_list],
    output=model_result_list,
    allow_reuse=True
)

Exception: Step input must be of any type: dict_keys([<class 'azureml.data.tabular_dataset.TabularDataset'>, <class 'azureml.pipeline.core.pipeline_output_dataset.PipelineOutputTabularDataset'>, <class 'azureml.data.file_dataset.FileDataset'>, <class 'azureml.pipeline.core.pipeline_output_dataset.PipelineOutputFileDataset'>]), found <class 'azureml.pipeline.core.builder.PipelineData'>

# Step 4: Merge the result

In [None]:
# Naming the intermediate data 
model_result_final = PipelineData("model_result_final",datastore=def_blob_store)

merge_step = PythonScriptStep(
    script_name="merge.py", 
    arguments=["--model_input", model_input, "--model_result_partial", model_result_partial, "--model_result_list", model_result_list, "--model_result_final", model_result_final],
    inputs=[model_input, model_result_partial, model_result_list],
    outputs=[model_result_final],
    compute_target=aml_compute, 
    source_directory=source_directory,
    runconfig=run_config
)

# Create the Pipeline

In [None]:
pipeline = Pipeline(workspace=ws, steps=[reduce_step, parition_step, solve_step, merge_step])
print("Pipeline is built")

pipeline_run = Experiment(ws, 'optimization_example').submit(pipeline)
print("Pipeline is submitted for execution")

RunDetails(pipeline_run).show()

pipeline_run.wait_for_completion(show_output=True)