# Creating and Scheduling the AML Pipeline

In [None]:
import os
import json
import sys
from datetime import datetime
from azureml.core.compute import AmlCompute
from azureml.core.datastore import Datastore
from azureml.data.data_reference import DataReference
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.pipeline.steps import PythonScriptStep
from azureml.core.runconfig import CondaDependencies, RunConfiguration
from azureml.core import Workspace, Run, Experiment
from azureml.core.authentication import InteractiveLoginAuthentication
from azureml.pipeline.core.schedule import ScheduleRecurrence, Schedule

Read the pipeline configuration parameters that were created in the previous notebook:

In [None]:
pipeline_config = "pipeline_config.json"
with open(pipeline_config) as f:
    j = json.loads(f.read())

Retrieve the AML workspace and compute target that were created previously:

In [None]:
# Authenticate
auth = InteractiveLoginAuthentication()

# AML workspace
aml_ws = Workspace.get(
    name=j["aml_work_space"],
    auth=auth,
    subscription_id=str(j["subscription_id"]),
    resource_group=j["resource_group_name"],
)

# AML compute target
compute_target = AmlCompute(aml_ws, j["cluster_name"])

Create the scoring pipeline's input and output by registering the Azure blob containers that were created:

In [None]:
# Pipeline input and output
data_ds = Datastore.register_azure_blob_container(
    aml_ws,
    datastore_name="data_ds",
    container_name=j["data_blob_container"],
    account_name=j["blob_account"],
    account_key=j["blob_key"],
)
data_dir = DataReference(datastore=data_ds, data_reference_name="data")

models_ds = Datastore.register_azure_blob_container(
    aml_ws,
    datastore_name="models_ds",
    container_name=j["models_blob_container"],
    account_name=j["blob_account"],
    account_key=j["blob_key"],
)
models_dir = DataReference(datastore=models_ds, data_reference_name="models")

preds_ds = Datastore.register_azure_blob_container(
    aml_ws,
    datastore_name="preds_ds",
    container_name=j["preds_blob_container"],
    account_name=j["blob_account"],
    account_key=j["blob_key"],
)
preds_dir = PipelineData(name="preds", datastore=preds_ds, is_directory=True)

Define the run configuration of the scoring pipeline by specifying the Python dependencies and enabling a Docker containerized execution environment:

In [None]:
# Run config
conda_dependencies = CondaDependencies.create(
    pip_packages=j["pip_packages"], python_version=j["python_version"]
)
run_config = RunConfiguration(conda_dependencies=conda_dependencies)
run_config.environment.docker.enabled = True

Create a scoring pipeline with multiple steps. Each step will execute the scoring Python script with different arguments, corresponding to a specific sensor. The steps will run independently in parallel and their execution will be managed by AML.

In [None]:
# Create a pipeline step for each (device, sensor) pair
steps = []
for device_id in j["device_ids"]:
    for sensor in j["sensors"]:
        preds_dir = PipelineData(name="preds", datastore=preds_ds, is_directory=True)
        step = PythonScriptStep(
            name="{}_{}".format(device_id, sensor),
            script_name=j["python_script_name"],
            arguments=[device_id, sensor, models_dir, data_dir, j["data_blob"], preds_dir],
            inputs=[models_dir, data_dir],
            outputs=[preds_dir],
            source_directory=j["python_script_directory"],
            compute_target=compute_target,
            runconfig=run_config,
            allow_reuse=False,
        )
        steps.append(step)

pipeline = Pipeline(workspace=aml_ws, steps=steps)
pipeline.validate()

Publish the pipeline so that it becomes available for scheduling. Publishing a pipeline also allows deploying it as a web service.

In [None]:
# Publish pipeline
pipeline_name = "scoring_pipeline_{}".format(datetime.now().strftime("%y%m%d%H%M"))
published_pipeline = pipeline.publish(name=pipeline_name, description=pipeline_name)

Schedule the pipeline to run on the specified frequency and interval. This will also submit the initial pipeline run, as we don't specify any starting time for the schedule.

In [None]:
# Schedule pipeline
experiment_name = "exp_" + datetime.now().strftime("%y%m%d%H%M%S")
recurrence = ScheduleRecurrence(frequency=j['sched_frequency'], interval=j['sched_interval'])
schedule = Schedule.create(
    workspace=aml_ws,
    name="{}_sched".format(j["resource_group_name"]),
    pipeline_id=published_pipeline.id,
    experiment_name=experiment_name,
    recurrence=recurrence,
    description="{}_sched".format(j["resource_group_name"]),
)