# Machine Learning Pipelines

A Machine Learning pipeline is a set of independent steps that each can have different compute, data, environment, etc. dependencies. Each step is treated as a separate run and all of the four aspects of a run (metrics, logs, snapshot and outputs) are stored independently. After the pipeline is created, it can be published as an API for later reuse.

This gives you huge benefit as you have ultimate control over defining each step of the pipeline in a way that serves best for that particular step.

ML Pipelines are some type of workflow orchestration tools. The main difference between ML Pipelines and regular job workflows (such as Azure Data Factory or Airflow) is that this workflow engine is designed to address ML needs. Therefore, if you have non-ML related workflow it's recommended you use generic workflow engines. You may call an ML Pipeline from a generic pipeline as a step.

In this tutorial, we want to treat the MNIST model we trained in the previous example like a serious ML problem. We'd like to break the task into three parts:
1. Downloading the dataset from the Yann Lecun website, unzip and normalize it and save it in a shared storage
2. Train our 2-Layer Neural Net on the normalized data
3. Register the model in case the performance of the new model is higher than the latest version of the model in the Model Registry
4. Publish the pipeline as a bundle of the above steps as an API Endpoint

At the end of this notebook, you'll build the following pipeline.
![ML Pipeline](assets/MLOps_Pipeline.jpg)

### 1. Defining an ML Pipeline

In [5]:
import azureml.core
from azureml.core import Workspace, Experiment, Datastore
from azureml.core.compute import AmlCompute
from azureml.core.compute import ComputeTarget
from azureml.core.authentication import InteractiveLoginAuthentication
from azureml.widgets import RunDetails

# Check core SDK version number
print("SDK version:", azureml.core.VERSION)

from azureml.data.data_reference import DataReference
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.pipeline.steps import PythonScriptStep

print("Pipeline SDK-specific imports completed")

SDK version: 1.9.0
Pipeline SDK-specific imports completed


In [6]:
# Your subscription ID will be different replace the stirng with yours
subscription_id = "b198933e-f055-498f-958d-0726ab11eddb"
resource_group = "MLOps_Template"
workspace_name = "MLOps_template_ML"
workspace_region = "West US 2"

interactive_auth = InteractiveLoginAuthentication(tenant_id="e65922f9-4bd4-4f11-b1a3-48e89d75674e")



Performing interactive authentication. Please follow the instructions on the terminal.




Interactive authentication successfully completed.


In [9]:
# import the Workspace class and check the azureml SDK version
# exist_ok checks if workspace exists or not.

from azureml.core import Workspace

ws = Workspace(workspace_name = workspace_name,
               subscription_id = subscription_id,
               resource_group = resource_group)

# persist the subscription id, resource group name, and workspace name in aml_config/config.json.
ws.write_config()

In [10]:

ws = Workspace.from_config()
print('Workspace name: ' + ws.name, 
      'Azure region: ' + ws.location, 
      'Subscription id: ' + ws.subscription_id, 
      'Resource group: ' + ws.resource_group, sep = '\n')


Workspace name: MLOps_template_ML
Azure region: westus2
Subscription id: b198933e-f055-498f-958d-0726ab11eddb
Resource group: MLOps_Template


### Data Stores

As described in the previous section, Datastores are attached to workspaces and are used to store connection information to Azure storage services so you can refer to them by name and don't need to remember the connection information and secret used to connect to the storage services.

https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.datastore.datastore?view=azure-ml-py

To explore the registered Datastores in your Workspace, login to **ml.azure.com**, in the left pane, under **Manage** section click on the **Datastores**. By default, you have two data stores registered, **workspaceblobstore** and **workspacefilestore**.

We skip the **workspacefilestore** for now and only use **workspaceblobstore** for this exercise. **workspaceblobstore** is referring to the default Blob storage that is created at the creation of Workspace. **Blob storage** is a type of storage account that is used to keep any type of data from binary (image files) to csv or parquet (It's similar to **AWS S3**). At any time you can register a new Azure's storage account at your workspace.

In [11]:
# Retrieve the pointer to the default Blob storage.

def_blob_store = Datastore(ws, "workspaceblobstore")

## The code below also yields the same result:
# def_blob_store = ws.get_default_datastore()

print("Blobstore's name: {}".format(def_blob_store.name))

Blobstore's name: workspaceblobstore


In [None]:
# An object from DataReference class represents a path within a Datastore. So in the example below, you're explaining that the MNIST data should be available in the **mnist_datainput** parth under the **workspaceblobstore** container in the Azure storage account.

In [None]:
# blob_input_data = DataReference(
#     datastore=def_blob_store,
#     data_reference_name="mnist_datainput",
#     path_on_datastore="mnist_datainput")
# 
# print("DataReference object created")

Making sure the compute targets are created.

In this example, we want to have two types of compute environment, the first compute type is a CPU type and the other is a GPU type cluster each with 1 node.

In [12]:
# Create a GPU cluster of type NV6 with 1 node. (due to subscription's limitations we stick to 1 node)

from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

# choose a name for your cluster
cluster_name = "cpucluster"

try:
    compute_target_cpu = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing compute target.')
except ComputeTargetException:
    print('Creating a new compute target...')
    # CPU: Standard_D3_v2
    # GPU: Standard_NV6
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D2_V2', 
                                                           max_nodes=1,
                                                           min_nodes=1)

    # create the cluster
    compute_target_cpu = ComputeTarget.create(ws, cluster_name, compute_config)

    compute_target_cpu.wait_for_completion(show_output=True)

# use get_status() to get a detailed status for the current cluster. 
print(compute_target_cpu.get_status().serialize())

Creating a new compute target...
Creating
Succeeded...............
AmlCompute wait for completion finished

Minimum number of nodes requested have been provisioned
{'currentNodeCount': 1, 'targetNodeCount': 1, 'nodeStateCounts': {'preparingNodeCount': 1, 'runningNodeCount': 0, 'idleNodeCount': 0, 'unusableNodeCount': 0, 'leavingNodeCount': 0, 'preemptedNodeCount': 0}, 'allocationState': 'Steady', 'allocationStateTransitionTime': '2020-07-22T21:34:48.305000+00:00', 'errors': None, 'creationTime': '2020-07-22T21:32:51.796259+00:00', 'modifiedTime': '2020-07-22T21:33:37.668392+00:00', 'provisioningState': 'Succeeded', 'provisioningStateTransitionTime': None, 'scaleSettings': {'minNodeCount': 1, 'maxNodeCount': 1, 'nodeIdleTimeBeforeScaleDown': ''}, 'vmPriority': 'Dedicated', 'vmSize': 'STANDARD_D2_V2'}


In [None]:
## Commenting this as we want to only use one compute target to make things faster. Try this later if you like to train on GPU.

# # choose a name for your cluster
# cluster_name = "gpucluster"
# 
# try:
#     compute_target_gpu = ComputeTarget(workspace=ws, name=cluster_name)
#     print('Found existing compute target.')
# except ComputeTargetException:
#     print('Creating a new compute target...')
#     # CPU: Standard_D3_v2
#     # GPU: Standard_NV6
#     compute_config = AmlCompute.provisioning_configuration(vm_size='Standard_NV6', 
#                                                            max_nodes=1,
#                                                            min_nodes=1)
# 
#     # create the cluster
#     compute_target_gpu = ComputeTarget.create(ws, cluster_name, compute_config)
# 
#     compute_target_gpu.wait_for_completion(show_output=True)
# 
# # use get_status() to get a detailed status for the current cluster. 
# print(compute_target_gpu.get_status().serialize())

In [13]:
cts = ws.compute_targets
for ct in cts:
    print(ct)


cpucluster


PipelineData is a way to define data dependancies in an ML Pipeline. In this example, we want to first download the MNIST data into a directory called raw_data and then save the processed and normalized numpy objects into a subdirectory called Processed. This PipelineData object will be used as the output of the first step named Data Extraction.

In [14]:
processed_mnist_data = PipelineData("processed_mnist_data", datastore=def_blob_store)
processed_mnist_data

$AZUREML_DATAREFERENCE_processed_mnist_data

As the first step is a regular Python script, and can be executed on a CPU node with no prepackaged ML environment requirment, we stick to the default configurations.

The configurations below, first deploys a CPU based linux docker image on the VM and then installs 'azureml-sdk' and 'numpy' packages.

In [15]:
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import DEFAULT_CPU_IMAGE

# create a new runconfig object
run_config = RunConfiguration()

# enable Docker 
run_config.environment.docker.enabled = True

# set Docker base image to the default CPU-based image
run_config.environment.docker.base_image = DEFAULT_CPU_IMAGE

# use conda_dependencies.yml to create a conda environment in the Docker image for execution
run_config.environment.python.user_managed_dependencies = False

# specify CondaDependencies obj
run_config.environment.python.conda_dependencies = CondaDependencies.create(pip_packages=['azureml-sdk',
                                                                                          'numpy'])

Here we define the first step by defining an object from PythonScriptStep class. Under the hood, it calls the extract.py file as the entry script and we pass the **processed_mnist_data** object as a parameter to the script.

**outputs** parameter defines the data output dependencies that in this case, we have an object of DataPipeline **processed_mnist_data** as the output dependency.

As the script can run on a CPU node, we don't waste our money by running it on a GPU node. Therefore, we select **compute_target_cpu** as the target compute.

In [16]:
# source directory
source_directory = 'DataExtraction'

extractDataStep = PythonScriptStep(
    script_name="extract.py", 
    arguments=["--output_extract", processed_mnist_data],
    outputs=[processed_mnist_data],
    compute_target=compute_target_cpu, 
    source_directory=source_directory,
    runconfig=run_config)

print("Data Extraction Step created")

Data Extraction Step created


The next step is to run our Tensorflow job to train our very best MNIST classifier. As this is a TF job, we can leverage Estimator classes such as  **azureml.train.dnn.TensorFlow**. Moreover, the inputs argument instructs the Pipeline what should be the dependency before executing this step. As the **processed_mnist_data** PipelineData object is provided as the output for the step above and input for this step, the Pipeline engine will execute the Training step after the Data Extraction step.

As a TF job, we can leverage our GPU node to boost up the computational performance of the training step. So we provide the GPU cluster as the compute target.

The TF estimator support TF 1. If your script is based on TF 2, then you can use the **PythonScriptStep** and provide your custom docker image or pip install TF 2 on a base GPU image.

We provide two arguments, **release_id** and **model_name**. **release_id** helps us to logically tag the run to a number that later can be retrieved. You can think of the **release_id** as the version number. In the 3rd day, you'll use the release pipeline to populate the release_id. **model_name** as it's name suggests instructs the code on what name to use to save the model in the run->output section.


In [18]:
from azureml.train.dnn import TensorFlow

source_directory = 'Training'
est = TensorFlow(source_directory=source_directory,
                 compute_target=compute_target_cpu,
                 entry_script='train.py', 
                 use_gpu=False, 
                 framework_version='1.13')

In [19]:
from azureml.pipeline.steps import EstimatorStep

trainingStep = EstimatorStep(name="Training-Step",
                             estimator=est,
                             estimator_entry_script_arguments=["--input_data_location", processed_mnist_data,
                                                               '--batch-size', 50,
                                                               '--first-layer-neurons', 300,
                                                               '--second-layer-neurons', 100,
                                                               '--learning-rate', 0.01,
                                                               "--release_id", 0,
                                                               '--model_name', 'tf_mnist_pipeline.model'],
                             runconfig_pipeline_params=None,
                             inputs=[processed_mnist_data],
                             compute_target=compute_target_cpu)

print("Model Training Step is Completed")

Model Training Step is Completed


And finally, we want to evaluate and register the model. Similar to the first step, we only need a CPU node to accomplish this task as we're running a regular python script with no ML dependencies. So we instantiate an object from **PythonScriptStep** class and provide **evaluate_model.py** as the entry script. 

The two arguments we provided in the step above are used here to retrieve the model saved in the run->output section of the experiment. Using release id, we can retrieve all other models and check if this model is outperforming them or not. If not we don't register this model into the model registry.


In [20]:
# source directory
source_directory = 'RegisterModel'

modelEvalReg = PythonScriptStep(
    name="Evaluate and Register Model",
    script_name="evaluate_model.py", 
    arguments=["--release_id", 0,
               '--model_name', 'tf_mnist_pipeline.model'],
    compute_target=compute_target_cpu, 
    source_directory=source_directory,
    runconfig=run_config)
print("Model Evaluation and Registration Step is Created")

Model Evaluation and Registration Step is Created


As this step doesn't have any inputs or outputs data dependancies, the Pipeline engine will execute it in parallel with the prevous two steps. However, logically we should execute this after the training step. Therefore, we use the below command the instruct the Pipeline engine:

In [21]:
modelEvalReg.run_after(trainingStep)

As all of the pipeline steps are defined. We can build the Pipeline class which defines how the pipeline should be executed.

Pipelines are loosely coupled with Experiments. At run time you can define to which Experiment this pipeline execution should be connected. For this we define/connect to **MNIST-Model-Manual-Pipeline** experiment:

In [22]:
from azureml.pipeline.core import Pipeline
from azureml.core import Experiment
pipeline = Pipeline(workspace=ws, steps=[extractDataStep, trainingStep, modelEvalReg])
pipeline_run = Experiment(ws, 'MNIST-Model-Manual-Pipeline').submit(pipeline)



Created step extract.py [c11c26ce][b7d35166-f8b3-40ce-b825-51f34cb68afe], (This step will run and generate new outputs)
Created step Training-Step [d276fba9][114fa064-33a8-44ed-a9bc-2ecd8056a940], (This step will run and generate new outputs)
Created step Evaluate and Register Model [354d46ec][a393d26f-8711-440c-b088-5964de8bce40], (This step will run and generate new outputs)
Submitted PipelineRun e953c74d-177e-4fb3-88de-fe5e9263b978
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/MNIST-Model-Manual-Pipeline/runs/e953c74d-177e-4fb3-88de-fe5e9263b978?wsid=/subscriptions/b198933e-f055-498f-958d-0726ab11eddb/resourcegroups/MLOps_Template/workspaces/MLOps_template_ML


In [23]:
RunDetails(pipeline_run).show()

_PipelineWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', …

Similar to any run, Pipeline runs are non-blocking. However, if you automate this job, you like your code to wait until the pipeline is constructed and executed. So you'd use the below command to make the run execution blocking:

In [24]:
pipeline_run.wait_for_completion(show_output=True, raise_on_error=True)

PipelineRunId: e953c74d-177e-4fb3-88de-fe5e9263b978
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/MNIST-Model-Manual-Pipeline/runs/e953c74d-177e-4fb3-88de-fe5e9263b978?wsid=/subscriptions/b198933e-f055-498f-958d-0726ab11eddb/resourcegroups/MLOps_Template/workspaces/MLOps_template_ML
PipelineRun Status: Running


StepRunId: 329b2954-ed21-493f-8ee9-d2776f451dc0
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/MNIST-Model-Manual-Pipeline/runs/329b2954-ed21-493f-8ee9-d2776f451dc0?wsid=/subscriptions/b198933e-f055-498f-958d-0726ab11eddb/resourcegroups/MLOps_Template/workspaces/MLOps_template_ML
StepRun( extract.py ) Status: Running

Streaming azureml-logs/20_image_build_log.txt
2020/07/22 22:21:47 Downloading source code...
2020/07/22 22:21:49 Finished downloading source code
2020/07/22 22:21:50 Creating Docker network: acb_default_network, driver: 'bridge'
2020/07/22 22:21:51 Successfully set up Docker network: acb_default_network
2020/07/22

[91m

  current version: 4.7.12
  latest version: 4.8.3

Please update conda by running

    $ conda update -n base -c defaults conda


[0m
#
# To activate this environment, use
#
#     $ conda activate /azureml-envs/azureml_87321baca1fc0bacb2b64d6c9ca03b94
#
# To deactivate an active environment, use
#
#     $ conda deactivate

Removing intermediate container 085a844e7ab2
 ---> d30646392485
Step 9/15 : ENV PATH /azureml-envs/azureml_87321baca1fc0bacb2b64d6c9ca03b94/bin:$PATH
 ---> Running in 1c1ae12bfa17
Removing intermediate container 1c1ae12bfa17
 ---> 940c6373bd01
Step 10/15 : ENV AZUREML_CONDA_ENVIRONMENT_PATH /azureml-envs/azureml_87321baca1fc0bacb2b64d6c9ca03b94
 ---> Running in 0d4d98da47c5
Removing intermediate container 0d4d98da47c5
 ---> dfae9c20af80
Step 11/15 : ENV LD_LIBRARY_PATH /azureml-envs/azureml_87321baca1fc0bacb2b64d6c9ca03b94/lib:$LD_LIBRARY_PATH
 ---> Running in 81d2a6ed2618
Removing intermediate container 81d2a6ed2618
 ---> e1103bbf1787
Step 12/15 : COPY azure

Normalized numpy binary files are saved at: /mnt/batch/tasks/shared/LS_root/jobs/mlops_template_ml/azureml/329b2954-ed21-493f-8ee9-d2776f451dc0/mounts/workspaceblobstore/azureml/329b2954-ed21-493f-8ee9-d2776f451dc0/processed_mnist_data
Starting the daemon thread to refresh tokens in background for process with pid = 111


The experiment completed successfully. Finalizing run...
Cleaning up all outstanding Run operations, waiting 300.0 seconds
2 items cleaning up...
Cleanup took 0.7004928588867188 seconds
2020/07/22 22:28:27 Process Exiting with Code:  0

Streaming azureml-logs/75_job_post-tvmps_1f626ef4e87a5e3b695a9c76c6f8c9788cf2c638d1386b9b8218a6adf8189b7b_d.txt
Entering job release. Current time:2020-07-22T22:28:28.152422
Starting job release. Current time:2020-07-22T22:28:29.825933
Logging experiment finalizing status in history service.
Starting the daemon thread to refresh tokens in background for process with pid = 156
[2020-07-22T22:28:29.844390] Entering context manager inject




StepRunId: 3805a880-2419-494a-9b02-80e6d6f00f9d
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/MNIST-Model-Manual-Pipeline/runs/3805a880-2419-494a-9b02-80e6d6f00f9d?wsid=/subscriptions/b198933e-f055-498f-958d-0726ab11eddb/resourcegroups/MLOps_Template/workspaces/MLOps_template_ML
StepRun( Training-Step ) Status: NotStarted
StepRun( Training-Step ) Status: Running

Streaming azureml-logs/55_azureml-execution-tvmps_1f626ef4e87a5e3b695a9c76c6f8c9788cf2c638d1386b9b8218a6adf8189b7b_d.txt
1.13-cpu: Pulling from tensorflow
16c48d79e9cc: Pulling fs layer
3c654ad3ed7d: Pulling fs layer
6276f4f9c29d: Pulling fs layer
a4bd43ad48ce: Pulling fs layer
e8d5220518a8: Pulling fs layer
b89bc95dc4e6: Pulling fs layer
874f36ef0b3a: Pulling fs layer
20592f072a44: Pulling fs layer
d5a9f64d93c8: Pulling fs layer
bf16c69a1526: Pulling fs layer
b47bfd667bdb: Pulling fs layer
6a140fc11307: Pulling fs layer
874f36ef0b3a: Waiting
20592f072a44: Waiting
d5a9f64d93c8: Waiting
bf16c69a1526

13 -- Training accuracy: 1.0 Validation accuracy: 0.9674
14 -- Training accuracy: 1.0 Validation accuracy: 0.9668
15 -- Training accuracy: 0.96 Validation accuracy: 0.967
16 -- Training accuracy: 0.98 Validation accuracy: 0.9692
17 -- Training accuracy: 1.0 Validation accuracy: 0.969

Streaming azureml-logs/75_job_post-tvmps_1f626ef4e87a5e3b695a9c76c6f8c9788cf2c638d1386b9b8218a6adf8189b7b_d.txt
Entering job release. Current time:2020-07-22T22:31:39.346335
Starting job release. Current time:2020-07-22T22:31:40.290844
Logging experiment finalizing status in history service.
Starting the daemon thread to refresh tokens in background for process with pid = 263
[2020-07-22T22:31:40.304010] Entering context manager injector.
Job release is complete. Current time:2020-07-22T22:31:41.266148

StepRun(Training-Step) Execution Summary
StepRun( Training-Step ) Status: Finished
{'runId': '3805a880-2419-494a-9b02-80e6d6f00f9d', 'target': 'cpucluster', 'status': 'Completed', 'startTimeUtc': '2020-07-

StepRun( Evaluate and Register Model ) Status: NotStarted
StepRun( Evaluate and Register Model ) Status: Running

Streaming azureml-logs/55_azureml-execution-tvmps_1f626ef4e87a5e3b695a9c76c6f8c9788cf2c638d1386b9b8218a6adf8189b7b_d.txt
Using default tag: latest
latest: Pulling from azureml/azureml_2e1aa647468113323a439077734657c5
Digest: sha256:5ad4c6cc73968982b57bceab23f46f13cfabb51a13e0184bea30c22104c89a53
Status: Image is up to date for mlopstemplat9c2637c6.azurecr.io/azureml/azureml_2e1aa647468113323a439077734657c5:latest
2020-07-22T22:32:05Z IsDedicatedCompute == True, won't poll for Low Pri Preemption
2020-07-22T22:32:05Z Starting output-watcher...
2020-07-22T22:32:06Z a854f37e198c76f84b80fbbdf0c54f27337079746eb02a508e974e2772ec7ac6
2020-07-22T22:32:06Z 
2020/07/22 22:32:06 Starting App Insight Logger for task:  containerSetup
2020/07/22 22:32:06 Version: 3.0.01291.0001 Branch: hotfixChrisChange Commit: 03c8130a
2020/07/22 22:32:06 /dev/infiniband/uverbs0 found (implying presence 


Streaming azureml-logs/75_job_post-tvmps_1f626ef4e87a5e3b695a9c76c6f8c9788cf2c638d1386b9b8218a6adf8189b7b_d.txt
Entering job release. Current time:2020-07-22T22:32:22.369639
Starting job release. Current time:2020-07-22T22:32:24.061691
Logging experiment finalizing status in history service.
Starting the daemon thread to refresh tokens in background for process with pid = 151
[2020-07-22T22:32:24.075012] Entering context manager injector.
Job release is complete. Current time:2020-07-22T22:32:25.417774

StepRun(Evaluate and Register Model) Execution Summary
StepRun( Evaluate and Register Model ) Status: Finished
{'runId': 'f230bd88-914b-41fb-9981-b073d9c91168', 'target': 'cpucluster', 'status': 'Completed', 'startTimeUtc': '2020-07-22T22:32:07.189085Z', 'endTimeUtc': '2020-07-22T22:32:31.305839Z', 'properties': {'azureml.runsource': 'azureml.StepRun', 'ContentSnapshotId': 'eb703a41-9367-4922-97f9-f359b4295631', 'StepType': 'PythonScriptStep', 'ComputeTargetType': 'AmlCompute', 'azurem

'Finished'

### 2. Publish and trigger a pipeline

This Pipeline is executed once under an experiment. But for later use, you may want to Publish the Pipeline as an Endpoint. publish_pipeline method publishes a pipeline under Pipeline section of Workspace. The published pipeline can later be called from any where inside or outside of Azure.

One of the use-cases is to call the Pipeline within a Data Engineering Pipeline. So the Data Engineering team can trigger the piblished pipeline by having the URI information of the pipeline.

In [25]:
published_pipeline = pipeline_run.publish_pipeline(name="MNIST-Pipeline-Manually-Built-Manulife", 
                                                   description="Steps are: data preparation, training, model validation and model registration", 
                                                   version="0.1", 
                                                   continue_on_step_failure=False)

In [26]:
from azureml.pipeline.core import PublishedPipeline

pipeline_id = published_pipeline.id # use your published pipeline id
published_pipeline = PublishedPipeline.get(ws, pipeline_id)
published_pipeline


Name,Id,Status,Endpoint
MNIST-Pipeline-Manually-Built-Manulife,b29d20d0-04e9-4c29-bc44-fb8bc3eb43c3,Active,REST Endpoint


Here is the endpoint that is callable:

In [27]:
rest_endpoint = published_pipeline.endpoint
rest_endpoint

'https://westus2.api.azureml.ms/pipelines/v1.0/subscriptions/b198933e-f055-498f-958d-0726ab11eddb/resourceGroups/MLOps_Template/providers/Microsoft.MachineLearningServices/workspaces/MLOps_template_ML/PipelineRuns/PipelineSubmit/b29d20d0-04e9-4c29-bc44-fb8bc3eb43c3'

Now you can call the Endpoint from anywhere. In order to call the endpoint, you need to authenticate yourself. There are two ways to do that:
    
    1. InteractiveLoginAuthentication
    1. ServicePrincipalAuthentication
    
The first one requires you to authentical yourself interactively or be already authenticated. As we're already authenticated, so we can use the first approach. The second approach id described in the next section.

The InteractiveLoginAuthentication class can help us generate the authentication key required to connect with the ML Pipeline Endpoint using **get_authentication_header** method:

In [28]:
from azureml.core.authentication import InteractiveLoginAuthentication
import requests

auth = InteractiveLoginAuthentication()

In [29]:
aad_token = auth.get_authentication_header()
aad_token

{'Authorization': 'Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiIsIng1dCI6Imh1Tjk1SXZQZmVocTM0R3pCRFoxR1hHaXJuTSIsImtpZCI6Imh1Tjk1SXZQZmVocTM0R3pCRFoxR1hHaXJuTSJ9.eyJhdWQiOiJodHRwczovL21hbmFnZW1lbnQuY29yZS53aW5kb3dzLm5ldC8iLCJpc3MiOiJodHRwczovL3N0cy53aW5kb3dzLm5ldC9lNjU5MjJmOS00YmQ0LTRmMTEtYjFhMy00OGU4OWQ3NTY3NGUvIiwiaWF0IjoxNTk1NTE1MjcwLCJuYmYiOjE1OTU1MTUyNzAsImV4cCI6MTU5NTUxOTE3MCwiYWNyIjoiMSIsImFpbyI6IkFXUUFtLzhRQUFBQTBxVDI2Yi95eHp0ekl5eXllTmpRL0hhbjBYV3BIWW1lYmpKcTF4UHZyMTZwTit5OGljRE01RnExUUtKWm9FVVNOQ1ZhTmdvYzNGa2lIRmtoM0xUZFo2THY3eTZ4cW9BRGRrNEo1ZlQ0dThRdjZwMnQwZGJBV2NsUm1pYy9LT1hOIiwiYWx0c2VjaWQiOiI1OjoxMDAzQkZGREFGMEIzODQ1IiwiYW1yIjpbInB3ZCIsIndpYSJdLCJhcHBpZCI6IjA0YjA3Nzk1LThkZGItNDYxYS1iYmVlLTAyZjllMWJmN2I0NiIsImFwcGlkYWNyIjoiMCIsImVtYWlsIjoibmlsb29mYXIubmF5ZWJpQGF2YW5hZGUuY29tIiwiaWRwIjoiaHR0cHM6Ly9zdHMud2luZG93cy5uZXQvY2YzNjE0MWMtZGRkNy00NWE3LWIwNzMtMTExZjY2ZDBiMzBjLyIsImlwYWRkciI6Ijc1LjE1NS41OC45MyIsIm5hbWUiOiJOaWxvb2ZhciBOYXllYmkiLCJvaWQiOiIyYjYyMDc3ZC02MGVkLTRkNzktODdmNC0zZ

Finally using a simple post request, you can trigger the pipeline. Post request are available in any modern programming language, therefore, you can trigger the pipeline from anywhere.

In [30]:
# specify the param when running the pipeline
response = requests.post(rest_endpoint, 
                         headers=aad_token, 
                         json={"ExperimentName": "Kicked_MNist_Pipeline_Remotely",
                               "RunSource": "SDK"})

In [31]:
run_id = response.json()["Id"]

print(run_id)

6fcf3a40-643e-4354-925c-f347fc11a6e4


In [32]:
response.json()

{'Description': None,
 'Status': {'StatusCode': 0,
  'StatusDetail': None,
  'CreationTime': '2020-07-23T15:10:10.7650029Z',
  'EndTime': None},
 'GraphId': '7cdc9871-ed0b-4cf0-8530-ec64b8d49652',
 'IsSubmitted': False,
 'HasErrors': False,
 'UploadState': 0,
 'ParameterAssignments': {},
 'DataPathAssignments': {},
 'DataSetDefinitionValueAssignments': {},
 'RunHistoryExperimentName': 'Kicked_MNist_Pipeline_Remotely',
 'PipelineId': 'b29d20d0-04e9-4c29-bc44-fb8bc3eb43c3',
 'RunSource': 'SDK',
 'RunType': 0,
 'TotalRunSteps': 3,
 'ScheduleId': None,
 'RunUrl': 'https://ml.azure.com/experiments/Kicked_MNist_Pipeline_Remotely/runs/6fcf3a40-643e-4354-925c-f347fc11a6e4?tid=e65922f9-4bd4-4f11-b1a3-48e89d75674e&wsid=/subscriptions/b198933e-f055-498f-958d-0726ab11eddb/resourcegroups/MLOps_Template/workspaces/MLOps_template_ML',
 'tags': {},
 'StepTags': {},
 'Properties': {},
 'StepProperties': {},
 'CreatedBy': {'UserObjectId': '2b62077d-60ed-4d79-87f4-3ff740f34234',
  'UserTenantId': 'e65922

In [33]:
# Retrieving the Pipeline Run:

from azureml.pipeline.core import PipelineRun

exp = Experiment(name="Kicked_MNist_Pipeline_Remotely", workspace=ws)
pipeline_run = PipelineRun(experiment=exp, run_id=run_id)

### 3. Schedule the Pipeline

Another way of using a Pipeline is to schedule it. In the example below, the pipeline is scheduled to be triggered weekly on Fridays at 15:30 UTC:

In [None]:
# # from azureml.pipeline.core import Schedule, ScheduleRecurrence
# 
# 
# recurrence = ScheduleRecurrence(frequency="Week", interval=1, week_days=["Friday"], time_of_day="15:30")
# schedule = Schedule.create(ws, name="ScheduledPipeline", pipeline_id=pipeline_id,
#                               experiment_name="MNIST-Pipeline-Wekly-Scheduled", recurrence=recurrence)
# 
## Get the list of scheduled Pipelines
# Schedule.list(ws)

In [43]:
#! git init
## connect to gitHub
! ssh-keygen -t rsa -C "niloofar.nayebi@avanade.com"

Reinitialized existing Git repository in C:/Users/niloofar.nayebi/PROJECT/10-AltaGas-MLops/build-release-ci-cd-master/.git/
^C


In [39]:
! git remote add origin git@github.com:user/MLOpsPython.git
! git add . && git commit -m "Pipeline published and scheduled"

[master 182fdbe] Pipeline published and scheduled
 1 file changed, 4 insertions(+), 70 deletions(-)


The file will have its original line endings in your working directory


In [41]:
! git push origin master

Host key verification failed.
fatal: Could not read from remote repository.

Please make sure you have the correct access rights
and the repository exists.


### Delete compute resources

Once this exercise is completed, you can delete disposable resources so you avoid getting unnecessary charges.  

In [None]:
compute_target_cpu.delete()