Copyright (c) Microsoft Corporation. All rights reserved.  
Licensed under the MIT License.

# Azure Machine Learning Pipelines: LearnAI Demo

This Notebook shows basic construction of a **pipeline** and some of its key features. 

Do not forget to read [Azure Machine Learning Pipelines](https://docs.microsoft.com/en-us/azure/machine-learning/service/concept-ml-pipelines) overview, and try following along the article [creating your first pipeline](https://docs.microsoft.com/en-us/azure/machine-learning/service/how-to-create-your-first-pipeline).

In this demo we are creating an AML Pipeline that has three steps. A data preparation step, a training step, and a compare step that compares the newly built model with the model currently in production. **All scripts are for demonstration purpose only, you may replace the scripts with actual code when you want to reuse this sample.**

For a more real-life sample using AML Pipelines, please refer to the official git repo at https://aka.ms/aml-pipeline-notebooks, especially https://aka.ms/pl-batch-score and https://aka.ms/pl-style-trans.

In [None]:
import azureml.core
# Check core SDK version number
print("SDK version:", azureml.core.VERSION)

## Setting up AML
### Initialize Workspace

Initialize a [workspace](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.workspace(class%29) object from persisted configuration.

In [None]:
from azureml.core import Workspace

ws = Workspace.from_config()
print("Workspace {} is loaded".format(ws.name))

### Upload data to the datastore
We are uploading the training data and current production model to the datastore that is attached to the workspace. We will use these files later.

In [None]:
from azureml.core import Datastore

# Get the default blob storage
def_blob_store = Datastore(ws, "workspaceblobstore")
print("Blobstore's name: {}".format(def_blob_store.name))

# target_path is the directory at the destination
# Uploading training file
def_blob_store.upload_files(['./data/traindata.csv'],
                            target_path = 'data',
                            overwrite = True, 
                            show_progress = True)

# Uploading model in production
def_blob_store.upload_files(['./model/model.pkl'],
                            target_path = 'model',
                            overwrite = True, 
                            show_progress = True)

### See your files using Azure Portal
Once you successfully uploaded the files, you can browse to them (or upload more files) using [Azure Portal](https://portal.azure.com). At the portal, make sure you have selected your subscription (click *Resource Groups* and then select the subscription). Then look for your **Machine Learning Workspace**. It has a link to your storage. Click on the storage link. It will take you to a page where you can see [Blobs](https://docs.microsoft.com/en-us/azure/storage/blobs/storage-blobs-introduction), [Files](https://docs.microsoft.com/en-us/azure/storage/files/storage-files-introduction), [Tables](https://docs.microsoft.com/en-us/azure/storage/tables/table-storage-overview), and [Queues](https://docs.microsoft.com/en-us/azure/storage/queues/storage-queues-introduction). We have just uploaded a file to the Blob storage. You should be able to see the file in the blob storage location. 

### Retrieve or create an Azure Machine Learning compute
Azure Machine Learning Compute is a service for provisioning and managing clusters of Azure virtual machines for running machine learning workloads. Let's create a new Azure Machine Learning Compute in the current workspace, if it doesn't already exist. We will then run the training script on this compute target.

If we could not find the compute with the given name in the previous cell, then we will create a new compute here. We will create an Azure Machine Learning Compute containing **STANDARD_D2_V2 CPU VMs and STANDARD_NC6 GPU VMs**. This process is broken down into the following steps:

1. Create the configuration
2. Create the Azure Machine Learning compute

**This process will take about 3 minutes and is providing only sparse output in the process. Please make sure to wait until the call returns before moving to the next cell.**

In [None]:
from azureml.core.compute import AmlCompute

# CPU Cluster
cpu_compute_target = "cpucluster"

try:
    cpu_compute = AmlCompute(ws, cpu_compute_target)
    print("Found existing compute target: {}".format(cpu_compute.name))
except:
    provisioning_config = AmlCompute.provisioning_configuration(vm_size = "STANDARD_D2_V2",
                                                                min_nodes = 1,
                                                                max_nodes = 4)
    cpu_compute = AmlCompute.create(ws, cpu_compute_target, provisioning_config)
    cpu_compute.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)
    print("Created new compute target: {}".format(cpu_compute.name))

# GPU Cluster
gpu_compute_target = "gpucluster"

try:
    gpu_compute = AmlCompute(ws, gpu_compute_target)
    print("Found existing compute target: {}".format(gpu_compute.name))
except:
    provisioning_config = AmlCompute.provisioning_configuration(vm_size = "STANDARD_NC6",
                                                                min_nodes = 1,
                                                                max_nodes = 4)
    gpu_compute = AmlCompute.create(ws, gpu_compute_target, provisioning_config)
    gpu_compute.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)
    print("Created new compute target: {}".format(gpu_compute.name))

## Create a Pipeline
A Step is a unit of execution. Step typically needs a target of execution (compute target), a script to execute, and may require script arguments and inputs, and can produce outputs. The step also could take a number of other parameters. Azure Machine Learning Pipelines currently has these built-in Steps: [PythonScriptStep](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.python_script_step.pythonscriptstep?view=azure-ml-py), [EstimatorStep](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.estimator_step.estimatorstep?view=azure-ml-py), [MpiStep](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.mpi_step.mpistep?view=azure-ml-py), [AdlaStep](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.adla_step.adlastep?view=azure-ml-py), [DataTransferStep](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.data_transfer_step.datatransferstep?view=azure-ml-py), [DatabricksStep](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.databricks_step.databricksstep?view=azure-ml-py), and [HyperDriveStep](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.hyper_drive_step.hyperdrivestep?view=azure-ml-py).

### Define a Pipeline Step with inputs and outputs 
#### Modeling input data
A step in the pipeline can take data as input. This data can be a data source that lives in one of the accessible data locations, or intermediate data produced by a previous step in the pipeline. An already existing data is typically called a Datasource and is represented by [DataReference](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.data_reference.datareference?view=azure-ml-py) object. A DataReference could be a pointer to a file or a directory.

In [None]:
from azureml.data.data_reference import DataReference

file_input_data = DataReference(
    datastore=def_blob_store,
    data_reference_name="raw_train_data",
    path_on_datastore="data/traindata.csv")
print("DataReference object created")

#### Modeling output or intermediate data
Intermediate data (or output of a Step) is represented by [PipelineData](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelinedata?view=azure-ml-py) object. PipelineData can be produced by one step and consumed in another step by providing the PipelineData object as an output of one step and the input of one or more steps.

In [None]:
from azureml.pipeline.core import PipelineData

processed_train_data = PipelineData("processed_train_data", datastore=def_blob_store)
print("PipelineData object created")

#### Define Dataprep Step that consumes train data (datasource) and produces an intermediate data.
Here we are using a **PythonScriptStep**, a basic built-in step to run a Python Script on the compute target. The following code will create a PythonScriptStep to be executed in the Azure Machine Learning Compute we created above using dataprep.py, a file available in the project folder.

If you open `dataprep.py` in the local machine and examine the arguments, inputs, and outputs for the script, you will get a good sense of why the script argument names used below are important. 

In [None]:
from azureml.pipeline.steps import PythonScriptStep

project_folder = "scripts/dataprep"

dataprepStep = PythonScriptStep(name="dataprep_step",
                         script_name="dataprep.py",
                         arguments=["--input_data", file_input_data, "--processed_data", processed_train_data],
                         inputs=[file_input_data],
                         outputs=[processed_train_data],
                         compute_target=cpu_compute,
                         source_directory=project_folder,
                         allow_reuse=False)

print("dataprepStep step created")

**Note:** In the above call to PythonScriptStep(), the flag *allow_reuse* determines whether the step should reuse previous results when run with the same settings/inputs. This flag's default value is *True*; the default is set to *True* because, when inputs and parameters have not changed, we typically do not want to re-run a given pipeline step. 

If *allow_reuse* is set to *False*, a new run will always be generated for this step during pipeline execution. The *allow_reuse* flag can come in handy in situations where you do *not* want to re-run a pipeline step.

#### Define Train Step that consumes intermediate data produced Data Prep step and produces an intermediate data.
In this step, we define a step that consumes an intermediate data and produces intermediate data.

Make sure you open `train.py` in your local machine and examine the arguments, inputs, and outputs for the script. 

In [None]:
project_folder = "scripts/train"

output_model_location = PipelineData("output_model_location", datastore=def_blob_store)

trainStep = PythonScriptStep(name="train_step",
                         script_name="train.py",
                         arguments=["--input_data", processed_train_data, "--output_train", output_model_location],
                         inputs=[processed_train_data],
                         outputs=[output_model_location],
                         compute_target=gpu_compute,
                         source_directory=project_folder,
                         allow_reuse=False)

print("trainStep step created")

#### Define Compare Step that consumes a datasource and an intermediate data and produces an intermediate data.
In this step, we define a step that consumes a datasource and an intermediate data and produces an intermediate data.

In [None]:
# Location of the model to be compared
prod_model_location = DataReference(
    datastore=def_blob_store,
    data_reference_name="prod_model_location",
    path_on_datastore="models")
print("DataReference object created")

We will pass in the version of the existing model to compare as a [PipelineParameter](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.graph.pipelineparameter?view=azure-ml-py). This will help with calling the REST endpoint of the published pipeline later.

In [None]:
from azureml.pipeline.core.graph import PipelineParameter
model_version_number = PipelineParameter(name="model_version", default_value=1.0)
print("Pipeline parameter created")

Make sure you open `compare.py` in your local machine and examine the arguments, inputs, and outputs for the script. 

In [None]:
project_folder = "scripts/compare"

compare_result = PipelineData("compare_result", datastore=def_blob_store)

compareStep = PythonScriptStep(name="compare_step",
                         script_name="comparemodels.py",
                         arguments=["--new_model_location", output_model_location,
                                    "--prod_model_location", prod_model_location,
                                    "--model_version", model_version_number,
                                    "--compare_result", compare_result],
                         inputs=[output_model_location, prod_model_location],
                         outputs=[compare_result],
                         compute_target=cpu_compute,
                         source_directory=project_folder,
                         allow_reuse=False)

print("compareStep step created")

## Build the pipeline
Once we have the steps (or steps collection), we can build the [pipeline](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipeline.pipeline?view=azure-ml-py). By deafult, all these steps will start as soon as all dependencies are fulfilled. 

Submit a pipeline using [submit](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.experiment%28class%29?view=azure-ml-py#submit). When submit is called, a [PipelineRun](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelinerun?view=azure-ml-py) is created which in turn creates [StepRun](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.steprun?view=azure-ml-py) objects for each step in the workflow.

You have the option to [validate](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipeline.pipeline?view=azure-ml-py#validate) the pipeline prior to submitting for run. The platform runs validation steps such as checking for circular dependencies and parameter checks etc. even if you do not explicitly call validate method.

In [None]:
from azureml.pipeline.core import Pipeline

steps = [dataprepStep, trainStep, compareStep]

training_pipeline = Pipeline(workspace=ws, steps=steps)
print("Pipeline is built")

training_pipeline.validate()
print("Pipeline validation complete")

## Submit the pipeline
[Submitting](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipeline.pipeline?view=azure-ml-py#submit) the pipeline involves creating an [Experiment](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.experiment?view=azure-ml-py) object and providing the built pipeline for submission. 

In [None]:
from azureml.core import Experiment

training_pipeline_run = Experiment(ws, 'Compare_Models_Experiment').submit(training_pipeline, regenerate_outputs=True)
print("Pipeline is submitted for execution")

**Note:** If regenerate_outputs is set to True, a new submit will always force generation of all step outputs, and disallow data reuse for any step of this run. Once this run is complete, however, subsequent runs may reuse the results of this run.


## Examine the pipeline run
We are going to use the RunDetails widget to examine the run of the pipeline. You can click each row below to get more details on the step runs.

In [None]:
from azureml.widgets import RunDetails
RunDetails(training_pipeline_run).show()

## Publish the pipeline
Once you are satisfied with the results of your experiment, you may want to publish the pipeline to get a REST endpoint so the pipeline can be invoked later.

In [None]:
published_training_pipeline = training_pipeline.publish(name="Compare_Models_Pipeline",
                                                        description="This pipeline compares models")
print("The published pipeline ID is {}".format(published_training_pipeline.id))

## Run published pipeline using its REST endpoint
To invoke the run of the preceding pipeline, you need an Azure Active Directory authentication header token, as described in [AzureCliAuthentication](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.authentication.azurecliauthentication?view=azure-ml-py) class.

In [None]:
from azureml.core.authentication import AzureCliAuthentication
import requests

cli_auth = AzureCliAuthentication()
aad_token = cli_auth.get_authentication_header()

training_pipeline_rest_ep = published_training_pipeline.endpoint

print("The published pipeline REST endpoint is {}".format(training_pipeline_rest_ep))

# specify the param when running the pipeline
response = requests.post(training_pipeline_rest_ep,
                         headers=aad_token,
                         json={"ExperimentName": "Compare_v2_Models_Experiment",
                               "RunSource": "SDK",
                               "ParameterAssignments": {"model_version": 2.0}})
pipeline_run_id = response.json()["Id"]

print("The run ID is {}".format(pipeline_run_id))

## Examine the run
We can examine the run of the pipeline that we just invoked via the REST endpoint.

In [None]:
from azureml.pipeline.core import PipelineRun
pub_pipeline_run = PipelineRun(Experiment(ws, "Compare_v2_Models_Experiment"), pipeline_run_id)
RunDetails(pub_pipeline_run).show()

### Than you!
That's it. Thank you for watching! 