In [None]:
from azureml.core import Dataset, Datastore, Workspace
from azureml.core.authentication import InteractiveLoginAuthentication
from azureml.core.environment import Environment
from azureml.core.experiment import Experiment
from azureml.core.runconfig import RunConfiguration
from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.core import Pipeline
from azureml.pipeline.steps import ParallelRunConfig, ParallelRunStep, PythonScriptStep
from datetime import datetime
import toml

## Load configuration

In [None]:
with open('config.toml', 'r') as f:
    config = toml.load(f)
with open('secrets.toml', 'r') as f:
    secrets = toml.load(f)
config = {**config, **secrets}

## Connect to workspace

In [None]:
def connect_to_workspace(subscription_id: str, resource_group: str, aml_workspace_name: str, tenant_id: str = None) \
        -> Workspace:
    interactive_auth = InteractiveLoginAuthentication(tenant_id=tenant_id)
    workspace = Workspace(subscription_id, resource_group, aml_workspace_name, auth=interactive_auth)
    return workspace

In [None]:
ws = connect_to_workspace(config['subscription_id'], config['resource_group'], config['aml_workspace'], config['tenant_id'])

## Define some variables

In [None]:
experiment_name = 'parallelization_tutorial'
compute_target_cpu = ws.compute_targets['kmi-cmpclstr-cpu']
jobs_per_node_cpu = 8

## Create an execution environment (only necessary once)

In [None]:
# def create_environment(workspace, name, base_environment, conda_dependencies, pip_dependencies, wait_for_completion=True):
#     env = Environment.get(workspace=workspace, name=base_environment).clone(name)
#     conda_dep = env.python.conda_dependencies
#     for dep in conda_dependencies:
#         conda_dep.add_conda_package(dep)
#     for dep in pip_dependencies:
#         conda_dep.add_pip_package(dep)
#     env.python.conda_dependencies=conda_dep
#     env.register(workspace=workspace)
#     if wait_for_completion:
#         env.build(workspace).wait_for_completion()
        
# name = 'tutorial-environment'
# base_environment = 'AzureML-Minimal'
# conda_dependencies = ['numpy', 'pandas']
# pip_dependencies = ['opencensus-ext-azure', 'parse', 'tqdm']
# create_environment(ws, name, base_environment, conda_dependencies, pip_dependencies)

## Get a handle of the execution environment (if it already exists)

In [None]:
environment = Environment.get(workspace=ws, name='tutorial-environment')

## Configure environment variables

In [None]:
environment.environment_variables = {
    'APPLICATIONINSIGHTS_CONNECTION_STRING': config['app_insights_connection_string']
}

## Get a handle to the datastore

In [None]:
datastore_name = 'tutorial_parallelization'
datastore = Datastore(ws, datastore_name)

## Define the partitioning step

In [None]:
# define the run configuration
run_configuration = RunConfiguration()
run_configuration.environment = environment

# define the input dataset
ds_raw = Dataset.get_by_name(workspace=ws, name='tutorial-parallelization-raw')

# define the output dataset
ds_partitioned = OutputFileDatasetConfig(destination=(datastore, 'partitioned/{run-id}')) \
    .register_on_complete(name='tutorial-parallelization-partitioned')

# define the step
partition_step = PythonScriptStep(
    name='partition-step',
    source_directory='.',
    script_name='partition_step.py',
    compute_target=compute_target_cpu,
    arguments=['--output-dir', ds_partitioned.as_mount()],
    inputs=[ds_raw.as_named_input('ds_raw').as_mount()],
    runconfig=run_configuration,
    allow_reuse=True
)

## Define the parallelized processing step

In [None]:
# define the output dataset
ds_processed = OutputFileDatasetConfig(destination=(datastore, 'processed/{run-id}'))\
    .register_on_complete(name='tutorial-parallelization-processed')

# define the run configuration
parallel_run_config = ParallelRunConfig(
    source_directory='.',
    entry_script='processing_step.py',
    mini_batch_size=1,
    error_threshold=0,
    output_action='summary_only',
    environment=environment,
    compute_target=compute_target_cpu,
    process_count_per_node=jobs_per_node_cpu,
    node_count=compute_target_cpu.get_status().scale_settings.maximum_node_count,
    run_invocation_timeout=300,
    run_max_try=1
)

# define the step
processing_step = ParallelRunStep(
    name='processing-step',
    parallel_run_config=parallel_run_config,
    arguments=['--output-dir', ds_processed.as_mount()],
    inputs=[ds_partitioned.as_input('ds_partitioned').as_mount()],
    allow_reuse=True
)

## Define the aggregation step

In [None]:
# define the run configuration
run_configuration = RunConfiguration()
run_configuration.environment = environment

# define the output dataset
ds_aggregated = OutputFileDatasetConfig(destination=(datastore, 'aggregated/{run-id}'))\
    .register_on_complete(name='tutorial-parallelization-aggregated')

# define the step
aggregation_step = PythonScriptStep(
    name='aggregation-step',
    source_directory='.',
    script_name='aggregation_step.py',
    compute_target=compute_target_cpu,
    arguments=['--output-dir', ds_aggregated.as_mount()],
    inputs=[ds_processed.as_input('ds_processed').as_mount()],
    runconfig=run_configuration,
    allow_reuse=True
)

## Define the final pipeline and experiment

In [None]:
pipeline = Pipeline(workspace=ws, steps=[partition_step, processing_step, aggregation_step])
experiment = Experiment(ws, experiment_name)

## Submit the experiment and wait for completion

In [None]:
print(f'Experiment submitted at {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}')
run = experiment.submit(pipeline)
run.wait_for_completion(show_output=False)
print(f'Experiment terminated at {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}')