# Introduction Azure Machine Learning Pipelines

<img src='https://cdn.thenewstack.io/media/2018/10/2e4f0988-az-ml-0.png'>

> https://docs.microsoft.com/en-us/azure/machine-learning/service/concept-ml-pipelines

An Azure Machine Learning pipeline is an independently executable workflow of a complete machine learning task. Subtasks are encapsulated as a series of steps within the pipeline. An Azure Machine Learning pipeline can be as simple as one that calls a Python script, so may do just about anything. Pipelines should focus on machine learning tasks such as:

Data preparation including importing, validating and cleaning, munging and transformation, normalization, and staging
Training configuration including parameterizing arguments, filepaths, and logging / reporting configurations
Training and validating efficiently and repeatably, which might include specifying specific data subsets, different hardware compute resources, distributed processing, and progress monitoring
Deployment, including versioning, scaling, provisioning, and access control

In [1]:
import sys
sys.version

'3.6.2 |Anaconda, Inc.| (default, Sep 30 2017, 18:42:57) \n[GCC 7.2.0]'

In [2]:
import datetime
now = datetime.datetime.now()
print(now)

2019-10-24 15:40:57.615841


In [3]:
import os
import azureml.core
from azureml.core import Workspace, Experiment, Datastore
from azureml.widgets import RunDetails

# Check core SDK version number
print("Version Azure ML service : ", azureml.core.VERSION)

Version Azure ML service :  1.0.69


### Pipeline-specific SDK imports

Here, we import key pipeline modules, whose use will be illustrated in the examples below.

In [4]:
from azureml.pipeline.core import Pipeline
from azureml.pipeline.steps import PythonScriptStep

print("Pipeline SDK-specific imports completed")

Pipeline SDK-specific imports completed


In [5]:
ws = Workspace.from_config()
print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\n')

# Default datastore
def_blob_store = ws.get_default_datastore() 
# The following call GETS the Azure Blob Store associated with your workspace.
# Note that workspaceblobstore is **the name of this store and CANNOT BE CHANGED and must be used as is** 
def_blob_store = Datastore(ws, "workspaceblobstore")
print("Blobstore's name: {}".format(def_blob_store.name))

azuremlservice
azuremlserviceresourcegroup
westeurope
70b8f39e-8863-49f7-b6ba-34a80799550c
Blobstore's name: workspaceblobstore


### Datastore concepts
A [Datastore](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.datastore.datastore?view=azure-ml-py) is a place where data can be stored that is then made accessible to a compute either by means of mounting or copying the data to the compute target. 

A Datastore can either be backed by an Azure File Storage (default) or by an Azure Blob Storage.

In this next step, we will upload the training and test set into the workspace's default storage (File storage), and another piece of data to Azure Blob Storage. When to use [Azure Blobs](https://docs.microsoft.com/en-us/azure/storage/blobs/storage-blobs-introduction), [Azure Files](https://docs.microsoft.com/en-us/azure/storage/files/storage-files-introduction), or [Azure Disks](https://docs.microsoft.com/en-us/azure/virtual-machines/linux/managed-disks-overview) is [detailed here](https://docs.microsoft.com/en-us/azure/storage/common/storage-decide-blobs-files-disks).

**Please take good note of the concept of the datastore.**

In [6]:
# get_default_datastore() gets the default Azure Blob Store associated with your workspace.
# Here we are reusing the def_blob_store object we obtained earlier
def_blob_store.upload_files(["./20news.pkl"], target_path="20newsgroups", overwrite=True)
print("Upload call completed")

Uploading an estimated of 1 files
Uploading ./20news.pkl
Uploaded ./20news.pkl, 1 files out of an estimated total of 1
Uploaded 1 files
Upload call completed


### Compute Targets
A compute target specifies where to execute your program such as a remote Docker on a VM, or a cluster. A compute target needs to be addressable and accessible by you.

**You need at least one compute target to send your payload to. We are planning to use Azure Machine Learning Compute exclusively for this tutorial for all steps. However in some cases you may require multiple compute targets as some steps may run in one compute target like Azure Machine Learning Compute, and some other steps in the same pipeline could run in a different compute target.**

*The example belows show creating/retrieving/attaching to an Azure Machine Learning Compute instance.*

#### List of Compute Targets on the workspace

In [7]:
cts = ws.compute_targets
for ct in cts:
    print(ct)

aks-aml-visual
VMDS3V2
aks-cluster01
vmDS15V2
StandardDS4V2
automlD2V2
automlD2v2
monclusterDS2V2
cpu-cluster


#### Retrieve or create a Azure Machine Learning compute
Azure Machine Learning Compute is a service for provisioning and managing clusters of Azure virtual machines for running machine learning workloads. Let's create a new Azure Machine Learning Compute in the current workspace, if it doesn't already exist. We will then run the training script on this compute target.

If we could not find the compute with the given name in the previous cell, then we will create a new compute here. We will create an Azure Machine Learning Compute containing **STANDARD_D2_V2 CPU VMs**. This process is broken down into the following steps:

1. Create the configuration
2. Create the Azure Machine Learning compute

**This process will take about 3 minutes and is providing only sparse output in the process. Please make sure to wait until the call returns before moving to the next cell.**

In [8]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

aml_compute_target = "cpu-cluster"
try:
    aml_compute = AmlCompute(ws, aml_compute_target)
    print("found existing compute target.")
except ComputeTargetException:
    print("creating new compute target")
    
    provisioning_config = AmlCompute.provisioning_configuration(vm_size = "STANDARD_D2_V2",
                                                                min_nodes = 1, 
                                                                max_nodes = 4)    
    aml_compute = ComputeTarget.create(ws, aml_compute_target, provisioning_config)
    aml_compute.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)
    
print("Azure Machine Learning Compute attached")


found existing compute target.
Azure Machine Learning Compute attached


In [9]:
# For a more detailed view of current Azure Machine Learning Compute status, use get_status()
# example: un-comment the following line.
print(aml_compute.get_status().serialize())

{'currentNodeCount': 1, 'targetNodeCount': 1, 'nodeStateCounts': {'preparingNodeCount': 0, 'runningNodeCount': 0, 'idleNodeCount': 1, 'unusableNodeCount': 0, 'leavingNodeCount': 0, 'preemptedNodeCount': 0}, 'allocationState': 'Steady', 'allocationStateTransitionTime': '2019-10-24T15:33:20.184000+00:00', 'errors': None, 'creationTime': '2019-10-24T15:32:06.172914+00:00', 'modifiedTime': '2019-10-24T15:32:21.736844+00:00', 'provisioningState': 'Succeeded', 'provisioningStateTransitionTime': None, 'scaleSettings': {'minNodeCount': 1, 'maxNodeCount': 4, 'nodeIdleTimeBeforeScaleDown': 'PT120S'}, 'vmPriority': 'Dedicated', 'vmSize': 'STANDARD_D2_V2'}


## Creating a Step in a Pipeline
A Step is a unit of execution. Step typically needs a target of execution (compute target), a script to execute, and may require script arguments and inputs, and can produce outputs. The step also could take a number of other parameters. Azure Machine Learning Pipelines provides the following built-in Steps:

- [**PythonScriptStep**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.python_script_step.pythonscriptstep?view=azure-ml-py): Adds a step to run a Python script in a Pipeline.
- [**AdlaStep**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.adla_step.adlastep?view=azure-ml-py): Adds a step to run U-SQL script using Azure Data Lake Analytics.
- [**DataTransferStep**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.data_transfer_step.datatransferstep?view=azure-ml-py): Transfers data between Azure Blob and Data Lake accounts.
- [**DatabricksStep**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.databricks_step.databricksstep?view=azure-ml-py): Adds a DataBricks notebook as a step in a Pipeline.
- [**HyperDriveStep**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.hyper_drive_step.hyperdrivestep?view=azure-ml-py): Creates a Hyper Drive step for Hyper Parameter Tuning in a Pipeline.
- [**AzureBatchStep**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.azurebatch_step.azurebatchstep?view=azure-ml-py): Creates a step for submitting jobs to Azure Batch
- [**EstimatorStep**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.estimator_step.estimatorstep?view=azure-ml-py): Adds a step to run Estimator in a Pipeline.
- [**MpiStep**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.mpi_step.mpistep?view=azure-ml-py): Adds a step to run a MPI job in a Pipeline.
- [**AutoMLStep**](https://docs.microsoft.com/en-us/python/api/azureml-train-automl/azureml.train.automl.automlstep?view=azure-ml-py): Creates a AutoML step in a Pipeline.

The following code will create a PythonScriptStep to be executed in the Azure Machine Learning Compute we created above using train.py, one of the files already made available in the `source_directory`.

A **PythonScriptStep** is a basic, built-in step to run a Python Script on a compute target. It takes a script name and optionally other parameters like arguments for the script, compute target, inputs and outputs. If no compute target is specified, default compute target for the workspace is used. You can also use a [**RunConfiguration**](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.runconfiguration?view=azure-ml-py) to specify requirements for the PythonScriptStep, such as conda dependencies and docker image.
> The best practice is to use separate folders for scripts and its dependent files for each step and specify that folder as the `source_directory` for the step. This helps reduce the size of the snapshot created for the step (only the specific folder is snapshotted). Since changes in any files in the `source_directory` would trigger a re-upload of the snapshot, this helps keep the reuse of the step when there are no changes in the `source_directory` of the step.

In [10]:
source_directory = './scriptspipelines'
print('Source directory for the step is {}.'.format(os.path.realpath(source_directory)))

# Syntax
# PythonScriptStep(
#     script_name, 
#     name=None, 
#     arguments=None, 
#     compute_target=None, 
#     runconfig=None, 
#     inputs=None, 
#     outputs=None, 
#     params=None, 
#     source_directory=None, 
#     allow_reuse=True, 
#     version=None, 
#     hash_paths=None)
# This returns a Step
step1 = PythonScriptStep(name="1_train_step",
                         script_name="train.py", 
                         compute_target=aml_compute, 
                         source_directory=source_directory,
                         allow_reuse=True)
print("Step1 created")

Source directory for the step is /mnt/azmnt/code/Users/Notebooks/Workshop/scriptspipelines.
Step1 created


**Note:** In the above call to PythonScriptStep(), the flag *allow_reuse* determines whether the step should reuse previous results when run with the same settings/inputs. This flag's default value is *True*; the default is set to *True* because, when inputs and parameters have not changed, we typically do not want to re-run a given pipeline step. 

If *allow_reuse* is set to *False*, a new run will always be generated for this step during pipeline execution. The *allow_reuse* flag can come in handy in situations where you do *not* want to re-run a pipeline step.

## Running a few steps in parallel
Here we are looking at a simple scenario where we are running a few steps (all involving PythonScriptStep)  in parallel. Running nodes in **parallel** is the default behavior for steps in a pipeline.

We already have one step defined earlier. Let's define few more steps.

In [11]:
# For this step, we use a different source_directory
source_directory = './scriptspipelines'
print('Source directory for the step is {}.'.format(os.path.realpath(source_directory)))

# All steps use the same Azure Machine Learning compute target as well
step2 = PythonScriptStep(name="2_compare_step",
                         script_name="compare.py", 
                         compute_target=aml_compute, 
                         source_directory=source_directory)

# Use a RunConfiguration to specify some additional requirements for this step.
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import DEFAULT_CPU_IMAGE

# create a new runconfig object
run_config = RunConfiguration()

# enable Docker 
run_config.environment.docker.enabled = True

# set Docker base image to the default CPU-based image
run_config.environment.docker.base_image = DEFAULT_CPU_IMAGE

# use conda_dependencies.yml to create a conda environment in the Docker image for execution
run_config.environment.python.user_managed_dependencies = False

# specify CondaDependencies obj
run_config.environment.python.conda_dependencies = CondaDependencies.create(conda_packages=['scikit-learn'])

# For this step, we use yet another source_directory
source_directory = './scriptspipelines'
print('Source directory for the step is {}.'.format(os.path.realpath(source_directory)))

step3 = PythonScriptStep(name="3_extract_step",
                         script_name="extract.py", 
                         compute_target=aml_compute, 
                         source_directory=source_directory,
                         runconfig=run_config)

# list of steps to run
steps = [step1, step2, step3]
print("Step lists created")

Source directory for the step is /mnt/azmnt/code/Users/Notebooks/Workshop/scriptspipelines.
Source directory for the step is /mnt/azmnt/code/Users/Notebooks/Workshop/scriptspipelines.
Step lists created


### Build the pipeline
Once we have the steps (or steps collection), we can build the [pipeline](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipeline.pipeline?view=azure-ml-py). By deafult, all these steps will run in **parallel** once we submit the pipeline for run.

A pipeline is created with a list of steps and a workspace. Submit a pipeline using [submit](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.experiment(class)?view=azure-ml-py#submit-config--tags-none----kwargs-). When submit is called, a [PipelineRun](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelinerun?view=azure-ml-py) is created which in turn creates [StepRun](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.steprun?view=azure-ml-py) objects for each step in the workflow.

In [12]:
# Syntax
# Pipeline(workspace, 
#          steps, 
#          description=None, 
#          default_datastore_name=None, 
#          default_source_directory=None, 
#          resolve_closure=True, 
#          _workflow_provider=None, 
#          _service_endpoint=None)

pipeline1 = Pipeline(workspace=ws, steps=steps)
print ("Pipeline is built")



Pipeline is built


### Validate the pipeline
You have the option to [validate](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipeline.pipeline?view=azure-ml-py#validate--) the pipeline prior to submitting for run. The platform runs validation steps such as checking for circular dependencies and parameter checks etc. even if you do not explicitly call validate method.

In [13]:
pipeline1.validate()
print("Pipeline validation complete")

Step 1_train_step is ready to be created [89369821]
Step 2_compare_step is ready to be created [e77f8631]
Step 3_extract_step is ready to be created [a1db9de7]
Pipeline validation complete


### Submit the pipeline
[Submitting](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipeline.pipeline?view=azure-ml-py#submit) the pipeline involves creating an [Experiment](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.experiment?view=azure-ml-py) object and providing the built pipeline for submission. 

In [14]:
# Submit syntax
# submit(experiment_name, 
#        pipeline_parameters=None, 
#        continue_on_step_failure=False, 
#        regenerate_outputs=False)

pipeline_run1 = Experiment(ws, 'Pipeline1').submit(pipeline1, regenerate_outputs=False)
print("Pipeline is submitted for execution")

Created step 1_train_step [89369821][c6fe9dd6-a828-4eb3-864e-8cdc6322ad1e], (This step will run and generate new outputs)
Created step 2_compare_step [e77f8631][c6e1bf42-fcc1-4196-a61e-d175d4da73cb], (This step will run and generate new outputs)
Created step 3_extract_step [a1db9de7][a12b10c1-4cc7-45fd-bde8-acbf6c36d7be], (This step will run and generate new outputs)
Submitted PipelineRun 17451507-dcba-42bb-889c-70cc00b5de5f
Link to Azure Portal: https://mlworkspace.azure.ai/portal/subscriptions/70b8f39e-8863-49f7-b6ba-34a80799550c/resourceGroups/azuremlserviceresourcegroup/providers/Microsoft.MachineLearningServices/workspaces/azuremlservice/experiments/Pipeline1/runs/17451507-dcba-42bb-889c-70cc00b5de5f
Pipeline is submitted for execution


**Note:** If regenerate_outputs is set to True, a new submit will always force generation of all step outputs, and disallow data reuse for any step of this run. Once this run is complete, however, subsequent runs may reuse the results of this run.


### Examine the pipeline run

#### Use RunDetails Widget
We are going to use the RunDetails widget to examine the run of the pipeline. You can click each row below to get more details on the step runs.

In [15]:
RunDetails(pipeline_run1).show()

A Jupyter Widget

#### Use Pipeline SDK objects
You can cycle through the node_run objects and examine job logs, stdout, and stderr of each of the steps.

In [23]:
step_runs = pipeline_run1.get_children()
for step_run in step_runs:
    status = step_run.get_status()
    print('Script:', step_run.name, 'status:', status)
    
    # Change this if you want to see details even if the Step has succeeded.
    if status == "Failed":
        joblog = step_run.get_job_log()
        print('job log:', joblog)

Script: 3_extract_step status: Finished
Script: 1_train_step status: Finished
Script: 2_compare_step status: Finished


#### Get additonal run details
If you wait until the pipeline_run is finished, you may be able to get additional details on the run. **Since this is a blocking call, the following code is commented out.**

In [21]:
#pipeline_run1.wait_for_completion()
#for step_run in pipeline_run1.get_children():
#    print("{}: {}".format(step_run.name, step_run.get_metrics()))

## Running a few steps in sequence
Now let's see how we run a few steps in sequence. We already have three steps defined earlier. Let's *reuse* those steps for this part.

We will reuse step1, step2, step3, but build the pipeline in such a way that we chain step3 after step2 and step2 after step1. Note that there is no explicit data dependency between these steps, but still steps can be made dependent by using the [run_after](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.builder.pipelinestep?view=azure-ml-py#run-after-step-) construct.

In [22]:
step2.run_after(step1)
step3.run_after(step2)

# Try a loop
#step2.run_after(step3)

# Now, construct the pipeline using the steps.

# We can specify the "final step" in the chain, 
# Pipeline will take care of "transitive closure" and 
# figure out the implicit or explicit dependencies
# https://www.geeksforgeeks.org/transitive-closure-of-a-graph/
pipeline2 = Pipeline(workspace=ws, steps=[step3])
print ("Pipeline is built")

pipeline2.validate()
print("Simple validation complete")



Pipeline is built
Step 3_extract_step is ready to be created [0ab53b5f]
Step 2_compare_step is ready to be created [f3350908]
Simple validation complete


In [24]:
pipeline_run2 = Experiment(ws, 'Pipeline2').submit(pipeline2)
print("Pipeline is submitted for execution")

Created step 3_extract_step [0ab53b5f][7a6a84a5-293b-4f0d-be0b-7050a3cc1ad6], (This step will run and generate new outputs)Created step 2_compare_step [f3350908][3e2a78c7-2d39-4bd0-8e4c-092323695c49], (This step will run and generate new outputs)

Created step 1_train_step [42c5d768][c6fe9dd6-a828-4eb3-864e-8cdc6322ad1e], (This step is eligible to reuse a previous run's output)
Submitted PipelineRun 04ab47ea-02fe-48d8-84d4-53be9c05004c
Link to Azure Portal: https://mlworkspace.azure.ai/portal/subscriptions/70b8f39e-8863-49f7-b6ba-34a80799550c/resourceGroups/azuremlserviceresourcegroup/providers/Microsoft.MachineLearningServices/workspaces/azuremlservice/experiments/Pipeline2/runs/04ab47ea-02fe-48d8-84d4-53be9c05004c
Pipeline is submitted for execution


In [25]:
RunDetails(pipeline_run2).show()

A Jupyter Widget

In [28]:
step_runs = pipeline_run2.get_children()
for step_run in step_runs:
    status = step_run.get_status()
    print('Script:', step_run.name, 'status:', status)
    
    # Change this if you want to see details even if the Step has succeeded.
    if status == "Failed":
        joblog = step_run.get_job_log()
        print('job log:', joblog)

Script: 3_extract_step status: Finished
Script: 2_compare_step status: Finished
Script: 1_train_step status: Finished


> Fin