# Creating and Scheduling the AML Pipeline

In [None]:
import json
from datetime import datetime

from azureml.core import Workspace
from azureml.core.authentication import InteractiveLoginAuthentication, ServicePrincipalAuthentication, \
    AzureCliAuthentication
from azureml.core.compute import AmlCompute
from azureml.core.datastore import Datastore
from azureml.core.runconfig import CondaDependencies, RunConfiguration
from azureml.data.data_reference import DataReference
from azureml.exceptions import AuthenticationException
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.pipeline.core.schedule import ScheduleRecurrence, Schedule
from azureml.pipeline.steps import PythonScriptStep

Read the pipeline configuration parameters that were created in the previous notebook:

In [None]:
from dotenv import find_dotenv, get_key
pipeline_config = "pipeline_config.json"
with open(pipeline_config) as f:
    j = json.loads(f.read())

Retrieve the AML workspace and compute target that were created previously:

In [None]:
def get_auth(env_path):
    if get_key(env_path, 'password') != "YOUR_SERVICE_PRINCIPAL_PASSWORD":
        aml_sp_password = get_key(env_path, 'password')
        aml_sp_tennant_id = get_key(env_path, 'tenant_id')
        aml_sp_username = get_key(env_path, 'username')
        auth = ServicePrincipalAuthentication(
            tenant_id=aml_sp_tennant_id,
            username=aml_sp_username,
            password=aml_sp_password
        )
    else:
        try:
            auth = AzureCliAuthentication()
            auth.get_authentication_header()
        except AuthenticationException:
            auth = InteractiveLoginAuthentication()

    return auth

# Get AML workspace
env_path = find_dotenv(raise_error_if_not_found=True)
ws = Workspace.from_config(auth=get_auth(env_path))
print(ws.name, ws.resource_group, ws.location, sep="\n")

blob_account = ws.get_default_datastore()

blob_account = blob_account.account_name
blob_key = blob_account.account_key

# AML compute target
compute_target = AmlCompute(ws, j["cluster_name"])

Create the scoring pipeline's input and output by registering the Azure blob containers that were created:

In [None]:
# Pipeline input and output
data_ds = Datastore.register_azure_blob_container(
    ws,
    datastore_name="data_ds",
    container_name=j["data_blob_container"],
    account_name=blob_account,
    account_key=blob_key,
)
data_dir = DataReference(datastore=data_ds, data_reference_name="data")

models_ds = Datastore.register_azure_blob_container(
    ws,
    datastore_name="models_ds",
    container_name=j["models_blob_container"],
    account_name=blob_account,
    account_key=blob_key,
)
models_dir = DataReference(datastore=models_ds, data_reference_name="models")

preds_ds = Datastore.register_azure_blob_container(
    ws,
    datastore_name="preds_ds",
    container_name=j["preds_blob_container"],
    account_name=blob_account,
    account_key=blob_key,
)
preds_dir = PipelineData(name="preds", datastore=preds_ds, is_directory=True)

Define the run configuration of the scoring pipeline by specifying the Python dependencies and enabling a Docker containerized execution environment:

In [None]:
# Run config
conda_dependencies = CondaDependencies.create(
    pip_packages=j["pip_packages"], python_version=j["python_version"]
)
run_config = RunConfiguration(conda_dependencies=conda_dependencies)
run_config.environment.docker.enabled = True

Create a scoring pipeline with multiple steps. Each step will execute the scoring Python script with different arguments, corresponding to a specific sensor. The steps will run independently in parallel and their execution will be managed by AML.

In [None]:
# Create a pipeline step for each (device, sensor) pair
steps = []
for device_id in j["device_ids"]:
    for sensor in j["sensors"]:
        preds_dir = PipelineData(name="preds", datastore=preds_ds, is_directory=True)
        step = PythonScriptStep(
            name="{}_{}".format(device_id, sensor),
            script_name=j["python_script_name"],
            arguments=[device_id, sensor, models_dir, data_dir, j["data_blob"], preds_dir],
            inputs=[models_dir, data_dir],
            outputs=[preds_dir],
            source_directory=j["python_script_directory"],
            compute_target=compute_target,
            runconfig=run_config,
            allow_reuse=False,
        )
        steps.append(step)

pipeline = Pipeline(workspace=ws, steps=steps)
pipeline.validate()

Publish the pipeline so that it becomes available for scheduling. Publishing a pipeline also allows deploying it as a web service.

In [None]:
# Publish pipeline
pipeline_name = "scoring_pipeline_{}".format(datetime.now().strftime("%y%m%d%H%M"))
published_pipeline = pipeline.publish(name=pipeline_name, description=pipeline_name)

Schedule the pipeline to run on the specified frequency and interval. This will also submit the initial pipeline run, as we don't specify any starting time for the schedule.

In [None]:
# Schedule pipeline
experiment_name = "exp_" + datetime.now().strftime("%y%m%d%H%M%S")
recurrence = ScheduleRecurrence(frequency=j['sched_frequency'], interval=j['sched_interval'])
schedule = Schedule.create(
    workspace=ws,
    name="{}_sched".format(j["resource_group_name"]),
    pipeline_id=published_pipeline.id,
    experiment_name=experiment_name,
    recurrence=recurrence,
    description="{}_sched".format(j["resource_group_name"]),
)