Copyright (c) Microsoft Corporation. All rights reserved.  
Licensed under the MIT License.

![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-getting-started.png)

# Azure Machine Learning Pipelines: Getting Started

## Overview


A common scenario when using machine learning components is to have a data workflow that includes the following steps:

- Preparing/preprocessing a given dataset for training, followed by
- Training a machine learning model on this data, and then
- Deploying this trained model in a separate environment, and finally
- Running a batch scoring task on another data set, using the trained model.

Azure's Machine Learning pipelines give you a way to combine multiple steps like these into one configurable workflow, so that multiple agents/users can share and/or reuse this workflow. Machine learning pipelines thus provide a consistent, reproducible mechanism for building, evaluating, deploying, and running ML systems.

To get more information about Azure machine learning pipelines, please read our [Azure Machine Learning Pipelines](https://aka.ms/pl-concept) overview, or the [readme article](https://aka.ms/pl-readme).

In this notebook, we provide a gentle introduction to Azure machine learning pipelines. We build a pipeline that runs jobs unattended on different compute clusters; in this notebook, you'll see how to use the basic Azure ML SDK APIs for constructing this pipeline.
 

## Prerequisites and Azure Machine Learning Basics
If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure you go through the [configuration notebook](https://aka.ms/pl-config) first if you haven't. This sets you up with a working config file that has information on your workspace, subscription id, etc. 


### Azure Machine Learning Imports

In this first code cell, we import key Azure Machine Learning modules that we will use below. 

In [1]:
import os
import azureml.core
from azureml.core import Workspace, Experiment, Datastore
from azureml.widgets import RunDetails

# Check core SDK version number
print("SDK version:", azureml.core.VERSION)

SDK version: 1.0.72


### Pipeline-specific SDK imports

Here, we import key pipeline modules, whose use will be illustrated in the examples below.

In [2]:
from azureml.pipeline.core import Pipeline
from azureml.pipeline.steps import PythonScriptStep

print("Pipeline SDK-specific imports completed")

Pipeline SDK-specific imports completed


### Initialize Workspace

Initialize a [workspace](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.workspace(class%29) object from persisted configuration.

In [3]:
ws = Workspace.from_config()
print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\n')

# Default datastore
def_blob_store = ws.get_default_datastore() 
# The following call GETS the Azure Blob Store associated with your workspace.
# Note that workspaceblobstore is **the name of this store and CANNOT BE CHANGED and must be used as is** 
def_blob_store = Datastore(ws, "workspaceblobstore")
print("Blobstore's name: {}".format(def_blob_store.name))

mtcs-dev-aml
mtcs-aml-rg
westus2
256c7222-4083-4ba7-8714-baa0df54bfe6
Blobstore's name: workspaceblobstore


### Required data and script files for the the tutorial
Sample files required to finish this tutorial are already copied to the corresponding source_directory locations. Even though the .py provided in the samples does not have much "ML work" as a data scientist, you will work on this extensively as part of your work. To complete this tutorial, the contents of these files are not very important. The one-line files are for demostration purpose only.

### Datastore concepts
A [Datastore](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.datastore.datastore?view=azure-ml-py) is a place where data can be stored that is then made accessible to a compute either by means of mounting or copying the data to the compute target. 

A Datastore can either be backed by an Azure File Storage (default) or by an Azure Blob Storage.

In this next step, we will upload the training and test set into the workspace's default storage (File storage), and another piece of data to Azure Blob Storage. When to use [Azure Blobs](https://docs.microsoft.com/en-us/azure/storage/blobs/storage-blobs-introduction), [Azure Files](https://docs.microsoft.com/en-us/azure/storage/files/storage-files-introduction), or [Azure Disks](https://docs.microsoft.com/en-us/azure/virtual-machines/linux/managed-disks-overview) is [detailed here](https://docs.microsoft.com/en-us/azure/storage/common/storage-decide-blobs-files-disks).

**Please take good note of the concept of the datastore.**

#### Upload data to default datastore
Default datastore on workspace is the Azure  File storage. The workspace has a Blob storage associated with it as well. Let's upload a file to each of these storages.

In [4]:
from azureml.data.data_reference import DataReference
from azureml.pipeline.core import PipelineData

In [5]:
%%writefile ./amlpipedata.csv
idx,num,msg
1,1,init

Overwriting ./amlpipedata.csv


In [45]:
# get_default_datastore() gets the default Azure Blob Store associated with your workspace.
# Here we are reusing the def_blob_store object we obtained earlier

filename = "amlpipedata.csv"
file_path = os.path.join(os.getcwd(),filename)
# print(file_path)

def_blob_store.upload_files(["./amlpipedata.csv"], target_path="sampledata", overwrite=True)
# print("Upload call completed")

blob_input_data = DataReference(
    datastore=def_blob_store,
    data_reference_name="test_data",
    path_on_datastore="sampledata/amlpipedata.csv")

# print("DataReference object created")

Uploading an estimated of 1 files
Uploading ./amlpipedata.csv
Uploaded ./amlpipedata.csv, 1 files out of an estimated total of 1
Uploaded 1 files


In [47]:
out_blob_store = Datastore(ws, "output")

blob_output_data = DataReference(
    datastore=out_blob_store,
    data_reference_name="interm_data",
    path_on_datastore="interm"
)

interm_pipelineData = PipelineData(
    name="interm_pipelinedata", 
    datastore=out_blob_store, 
    output_name="amlpipedata", 
    output_overwrite=True)

### Compute Targets
A compute target specifies where to execute your program such as a remote Docker on a VM, or a cluster. A compute target needs to be addressable and accessible by you.

**You need at least one compute target to send your payload to. We are planning to use Azure Machine Learning Compute exclusively for this tutorial for all steps. However in some cases you may require multiple compute targets as some steps may run in one compute target like Azure Machine Learning Compute, and some other steps in the same pipeline could run in a different compute target.**

*The example belows show creating/retrieving/attaching to an Azure Machine Learning Compute instance.*

#### List of Compute Targets on the workspace

In [None]:
# cts = ws.compute_targets
# for ct in cts:
#     print(ct)

#### Retrieve or create a Azure Machine Learning compute
Azure Machine Learning Compute is a service for provisioning and managing clusters of Azure virtual machines for running machine learning workloads. Let's create a new Azure Machine Learning Compute in the current workspace, if it doesn't already exist. We will then run the training script on this compute target.

If we could not find the compute with the given name in the previous cell, then we will create a new compute here. We will create an Azure Machine Learning Compute containing **STANDARD_D2_V2 CPU VMs**. This process is broken down into the following steps:

1. Create the configuration
2. Create the Azure Machine Learning compute

**This process will take about 3 minutes and is providing only sparse output in the process. Please make sure to wait until the call returns before moving to the next cell.**

In [8]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

aml_compute_target = "aml-cpu-2"
try:
    aml_compute = AmlCompute(ws, aml_compute_target)
    print("found existing compute target.")
except ComputeTargetException:
    print("creating new compute target")
    
#     provisioning_config = AmlCompute.provisioning_configuration(vm_size = "STANDARD_D2_V2",
#                                                                 min_nodes = 1, 
#                                                                 max_nodes = 4)    
#     aml_compute = ComputeTarget.create(ws, aml_compute_target, provisioning_config)
#     aml_compute.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)
    
print("Azure Machine Learning Compute attached")


found existing compute target.
Azure Machine Learning Compute attached


In [9]:
# For a more detailed view of current Azure Machine Learning Compute status, use get_status()
# example: un-comment the following line.
print(aml_compute)

AmlCompute(workspace=Workspace.create(name='mtcs-dev-aml', subscription_id='256c7222-4083-4ba7-8714-baa0df54bfe6', resource_group='mtcs-aml-rg'), name=aml-cpu-2, id=/subscriptions/256c7222-4083-4ba7-8714-baa0df54bfe6/resourceGroups/mtcs-aml-rg/providers/Microsoft.MachineLearningServices/workspaces/mtcs-dev-aml/computes/aml-cpu-2, type=AmlCompute, provisioning_state=Succeeded, location=westus2, tags=None)


**Wait for this call to finish before proceeding (you will see the asterisk turning to a number).**

Now that you have created the compute target, let's see what the workspace's compute_targets() function returns. You should now see one entry named 'amlcompute' of type AmlCompute.

**Now that we have completed learning the basics of Azure Machine Learning (AML), let's go ahead and start understanding the Pipeline concepts.**

## Creating a Step in a Pipeline
A Step is a unit of execution. Step typically needs a target of execution (compute target), a script to execute, and may require script arguments and inputs, and can produce outputs. The step also could take a number of other parameters. Azure Machine Learning Pipelines provides the following built-in Steps:

- [**PythonScriptStep**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.python_script_step.pythonscriptstep?view=azure-ml-py): Adds a step to run a Python script in a Pipeline.
- [**AdlaStep**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.adla_step.adlastep?view=azure-ml-py): Adds a step to run U-SQL script using Azure Data Lake Analytics.
- [**DataTransferStep**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.data_transfer_step.datatransferstep?view=azure-ml-py): Transfers data between Azure Blob and Data Lake accounts.
- [**DatabricksStep**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.databricks_step.databricksstep?view=azure-ml-py): Adds a DataBricks notebook as a step in a Pipeline.
- [**HyperDriveStep**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.hyper_drive_step.hyperdrivestep?view=azure-ml-py): Creates a Hyper Drive step for Hyper Parameter Tuning in a Pipeline.
- [**AzureBatchStep**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.azurebatch_step.azurebatchstep?view=azure-ml-py): Creates a step for submitting jobs to Azure Batch
- [**EstimatorStep**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.estimator_step.estimatorstep?view=azure-ml-py): Adds a step to run Estimator in a Pipeline.
- [**MpiStep**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.mpi_step.mpistep?view=azure-ml-py): Adds a step to run a MPI job in a Pipeline.
- [**AutoMLStep**](https://docs.microsoft.com/en-us/python/api/azureml-train-automl/azureml.train.automl.automlstep?view=azure-ml-py): Creates a AutoML step in a Pipeline.

The following code will create a PythonScriptStep to be executed in the Azure Machine Learning Compute we created above using train.py, one of the files already made available in the `source_directory`.

A **PythonScriptStep** is a basic, built-in step to run a Python Script on a compute target. It takes a script name and optionally other parameters like arguments for the script, compute target, inputs and outputs. If no compute target is specified, default compute target for the workspace is used. You can also use a [**RunConfiguration**](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.runconfiguration?view=azure-ml-py) to specify requirements for the PythonScriptStep, such as conda dependencies and docker image.
> The best practice is to use separate folders for scripts and its dependent files for each step and specify that folder as the `source_directory` for the step. This helps reduce the size of the snapshot created for the step (only the specific folder is snapshotted). Since changes in any files in the `source_directory` would trigger a re-upload of the snapshot, this helps keep the reuse of the step when there are no changes in the `source_directory` of the step.

In [10]:
# Uses default values for PythonScriptStep construct.

source_directory = os.path.join(os.getcwd(),"train")

PythonStep = PythonScriptStep(name="train_step",
                         script_name="train.py", 
                         compute_target=aml_compute, 
                         arguments=["--input_data", blob_input_data, "--output_train", interm_pipelineData],
                         inputs=[blob_input_data],
                         outputs=[interm_pipelineData],
                         source_directory=source_directory,
                         allow_reuse=True)

**Note:** In the above call to PythonScriptStep(), the flag *allow_reuse* determines whether the step should reuse previous results when run with the same settings/inputs. This flag's default value is *True*; the default is set to *True* because, when inputs and parameters have not changed, we typically do not want to re-run a given pipeline step. 

If *allow_reuse* is set to *False*, a new run will always be generated for this step during pipeline execution. The *allow_reuse* flag can come in handy in situations where you do *not* want to re-run a pipeline step.

## Config input and output refrence for ADB Step 

In [27]:
# Reference the data uploaded to blob storage using a PipelineParameter and a DataPath
from azureml.pipeline.core import PipelineParameter
from azureml.data.datapath import DataPath, DataPathComputeBinding

datapath = DataPath(datastore=out_blob_store, path_on_datastore='interm')
datapath_param = PipelineParameter(name="intrem_data", default_value=datapath)
interm_pipelineData = (datapath_param, DataPathComputeBinding(mode='mount'))

In [44]:
datapath.create_data_reference

<bound method DataPath.create_data_reference of <azureml.data.datapath.DataPath object at 0x7f88f830ecc0>>

In [12]:
# from azureml.pipeline.core import PipelineParameter

# pipeline_param = PipelineParameter(name="my_pipeline_param", default_value="pipeline_param1")

# Here is input for ADB step
# processed_data1 = PipelineData("processed_data1",datastore=def_blob_store)
adb_output = PipelineData("adb_output", datastore=out_blob_store)

In [None]:
# interm_pipelineData.as_input("interm_input_for_ADB")
# interm_pipelineData.as_mount(input_name="amlpipe_adb_step")
# interm_pipelineData.as_download()

In [None]:
# from azureml.data import dbfs_datastore
# Datastore.register_dbfs(workspace=ws, datastore_name="aml_adb_dbfs")

## Databricks as Compute Target 

https://docs.microsoft.com/en-us/azure/machine-learning/service/how-to-create-your-first-pipeline


In [13]:
from azureml.core.compute import DatabricksCompute

# Replace with your account info before running.
 
db_compute_name=os.getenv("DATABRICKS_COMPUTE_NAME", "adb-dataprep") # Databricks compute name
db_resource_group=os.getenv("DATABRICKS_RESOURCE_GROUP", "mtcs-dev-databrick") # Databricks resource group
db_workspace_name=os.getenv("DATABRICKS_WORKSPACE_NAME", "mtcs-dev") # Databricks workspace name
db_access_token=os.getenv("DATABRICKS_ACCESS_TOKEN", "dapi78fb8c0624e5885da48fe1bd63fa1a5b") # Databricks access token

try:
    databricks_compute = DatabricksCompute(workspace=ws, name=db_compute_name)
    print('Compute target {} already exists'.format(db_compute_name))
except ComputeTargetException:
    print('Compute not found, will use below parameters to attach new one')
    print('db_compute_name {}'.format(db_compute_name))
    print('db_resource_group {}'.format(db_resource_group))
    print('db_workspace_name {}'.format(db_workspace_name))
    print('db_access_token {}'.format(db_access_token))
 
    config = DatabricksCompute.attach_configuration(
        resource_group = db_resource_group,
        workspace_name = db_workspace_name,
        access_token= db_access_token)
    databricks_compute=ComputeTarget.attach(ws, db_compute_name, config)
    databricks_compute.wait_for_completion(True)

databricks_compute

Compute target adb-dataprep already exists


DatabricksCompute(workspace=Workspace.create(name='mtcs-dev-aml', subscription_id='256c7222-4083-4ba7-8714-baa0df54bfe6', resource_group='mtcs-aml-rg'), name=adb-dataprep, id=/subscriptions/256c7222-4083-4ba7-8714-baa0df54bfe6/resourceGroups/mtcs-aml-rg/providers/Microsoft.MachineLearningServices/workspaces/mtcs-dev-aml/computes/adb-dataprep, type=Databricks, provisioning_state=Succeeded, location=westus2, tags=None)

## Databricks Step

In [28]:
from azureml.core.databricks import PyPiLibrary
from azureml.pipeline.steps import DatabricksStep

notebook_path=os.getenv("DATABRICKS_NOTEBOOK_PATH", "/aml-pipelines/aml-pipeline-adb-step") # Databricks notebook path
amlLib = PyPiLibrary(package="azureml-sdk[automl_databricks]")
# amlLib = PyPiLibrary(package="azureml-sdk[databricks]")

ADBStep = DatabricksStep(
    name="AML_Pipeline_ADB_Step",
#     inputs=[interm_pipelineData],
#     outputs=[adb_output],
    num_workers=1,
    notebook_path=notebook_path,
    notebook_params={'interm_pipelineData': interm_pipelineData},
    run_name='AML_Pipeline_Run',
    compute_target=databricks_compute,
    existing_cluster_id = "0727-170559-robin360",
#     spark_version= "5.5.x-cpu-ml-scala2.11",
    pypi_libraries=[amlLib],
    allow_reuse=False
)

### Build the pipeline
Once we have the steps (or steps collection), we can build the [pipeline](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipeline.pipeline?view=azure-ml-py). By deafult, all these steps will run in **parallel** once we submit the pipeline for run.

A pipeline is created with a list of steps and a workspace. Submit a pipeline using [submit](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.experiment(class)?view=azure-ml-py#submit-config--tags-none----kwargs-). When submit is called, a [PipelineRun](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelinerun?view=azure-ml-py) is created which in turn creates [StepRun](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.steprun?view=azure-ml-py) objects for each step in the workflow.

In [29]:
ADBStep.run_after(PythonStep)

pipeline = Pipeline(workspace=ws, steps=[ADBStep])



### Submit the pipeline
[Submitting](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipeline.pipeline?view=azure-ml-py#submit) the pipeline involves creating an [Experiment](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.experiment?view=azure-ml-py) object and providing the built pipeline for submission. 

In [30]:
pipeline_run = Experiment(ws, 'Hello_World2').submit(pipeline)
print("Pipeline is submitted for execution")

Created step AML_Pipeline_ADB_Step [35abaa6d][11578713-702e-4b8b-9c73-025c12b92070], (This step will run and generate new outputs)Created step train_step [62f04315][4cbe5771-707c-4d2c-937e-5890f9b72191], (This step is eligible to reuse a previous run's output)

Created data reference output_61c636ef for StepId [dd440a74][f3a67e8a-331a-49d9-bebd-f37973c890b8], (Consumers of this data will generate new runs.)
Using data reference test_data for StepId [0b04a04e][53e84b3f-2fc3-4636-9e3f-6fbdad53b56d], (Consumers of this data are eligible to reuse prior runs.)
Submitted PipelineRun 2275167d-1ae7-4dd4-afbc-2dad13ac404f
Link to Azure Machine Learning studio: https://ml.azure.com/experiments/Hello_World2/runs/2275167d-1ae7-4dd4-afbc-2dad13ac404f?wsid=/subscriptions/256c7222-4083-4ba7-8714-baa0df54bfe6/resourcegroups/mtcs-aml-rg/workspaces/mtcs-dev-aml
Pipeline is submitted for execution


### Examine the pipeline run

#### Use RunDetails Widget
We are going to use the RunDetails widget to examine the run of the pipeline. You can click each row below to get more details on the step runs.

In [31]:
RunDetails(pipeline_run).show()

_PipelineWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', …

_UserRunWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', '…

In [18]:
# Get Steps
for step in pipeline_run.get_steps():
    print("Outputs of step " + step.name)
    
    # Get a dictionary of StepRunOutputs with the output name as the key 
    output_dict = step.get_outputs()
    
    for name, output in output_dict.items():
        
        output_reference = output.get_port_data_reference() # Get output port data reference
        print("\tname: " + name)
        print("\tdatastore: " + output_reference.datastore_name)
        print("\tpath on datastore: " + output_reference.path_on_datastore)

In [None]:
print("End of Notebook")