## Create an End-to-End Pipeline using Azure Machine Learning

### Connect to an Azure Machine Learning Workspace

In [1]:
import azureml.core
from azureml.core import Workspace

# Load the workspace from the saved config file
ws = Workspace.from_config()
print('Ready to use Azure ML {} to work with {}'.format(azureml.core.VERSION, ws.name))

Ready to use Azure ML 1.20.0 to work with mlops-aml-ws


### Prepare the Training Data
Local data files can be used to train a model, but when running training workloads on cloud-based compute, it makes more sense to store the data centrally in the cloud and then ingest it wherever the training script happens to be running.

Here the training data is uploaded to a *datastore* and then a *dataset* is defined. For simplicity, the data is uploaded to the *default* datastore for your Azure Machine Learning workspace. In production, a datastore that references an existing cloud data storage location would be registered (e.g., a Data Lake). A *tabular* dataset is then created using the existing CSV files.

In [2]:
from azureml.core import Dataset

default_ds = ws.get_default_datastore()

if 'diabetes dataset' not in ws.datasets:
    default_ds.upload_files(files=['./data/diabetes.csv', './data/diabetes2.csv'],
                            target_path='diabetes-data/', overwrite=True, show_progress=True)

    # Create a tabular dataset from the path on the datastore
    tab_ds = Dataset.Tabular.from_delimited_files(path=(default_ds, 'diabetes-data/*.csv'))

    # Register the tabular dataset
    try:
        tab_ds = tab_ds.register(workspace=ws, name='diabetes dataset',
                                 description='diabetes data',
                                 tags = {'format':'CSV'}, create_new_version=True)
        print('Dataset registered.')
    except Exception as ex:
        print(ex)
else:
    print('Dataset already registered.')

Dataset already registered.


### Create Scripts for Pipeline Steps
- Create a folder dedicated to holding the scripts for each pipeline step
- For the first pipeline step, generate a script that trains the machine learning model
- For the second pipeline step, generate a script that registers the machine learning model

In [3]:
# Create a folder for the pipeline step files
import os

experiment_folder = 'diabetes_pipeline'
os.makedirs(experiment_folder, exist_ok=True)

print(experiment_folder)

diabetes_pipeline


In [4]:
%%writefile $experiment_folder/train_diabetes.py
# Import libraries
from azureml.core import Run
import argparse
import pandas as pd
import numpy as np
import joblib
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import roc_auc_score

# Get parameters
parser = argparse.ArgumentParser()
parser.add_argument('--output_folder', type=str, dest='output_folder', default="diabetes_model", help='output folder')
args = parser.parse_args()
output_folder = args.output_folder

# Get the experiment run context
run = Run.get_context()

# load the diabetes data (passed as an input dataset)
print("Loading Data...")
diabetes = run.input_datasets['diabetes_train'].to_pandas_dataframe()

# Separate features and labels
X, y = diabetes[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age']].values, diabetes['Diabetic'].values

# Split data into training set and test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, random_state=0)

# Train adecision tree model
print('Training a decision tree model')
model = DecisionTreeClassifier().fit(X_train, y_train)

# calculate accuracy
y_hat = model.predict(X_test)
acc = np.average(y_hat == y_test)
print('Accuracy:', acc)
run.log('Accuracy', np.float(acc))

# calculate AUC
y_scores = model.predict_proba(X_test)
auc = roc_auc_score(y_test,y_scores[:,1])
print('AUC: ' + str(auc))
run.log('AUC', np.float(auc))

# Save the trained model
os.makedirs(output_folder, exist_ok=True)
output_path = output_folder + "/model.pkl"
joblib.dump(value=model, filename=output_path)

run.complete()

Overwriting diabetes_pipeline/train_diabetes.py


In [5]:
%%writefile $experiment_folder/register_diabetes.py
# Import libraries
import argparse
import joblib
from azureml.core import Workspace, Model, Run

# Get parameters
parser = argparse.ArgumentParser()
parser.add_argument('--model_folder', type=str, dest='model_folder', default="diabetes_model", help='model location')
args = parser.parse_args()
model_folder = args.model_folder

# Get the experiment run context
run = Run.get_context()

# load the model
print("Loading model from " + model_folder)
model_file = model_folder + "/model.pkl"
model = joblib.load(model_file)

Model.register(workspace=run.experiment.workspace,
               model_path = model_file,
               model_name = 'diabetes_model',
               tags={'Training context':'Pipeline'})

run.complete()

Overwriting diabetes_pipeline/register_diabetes.py


### Prepare a Compute Environment

In [6]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

cluster_name = "train-cluster"

try:
    # Check for existing compute target
    pipeline_cluster = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing cluster, use it.')
    
except ComputeTargetException:
    # If it doesn't already exist, create it
    try:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS11_V2'
                                                               , max_nodes=2)
        pipeline_cluster = ComputeTarget.create(ws, cluster_name, compute_config)
        pipeline_cluster.wait_for_completion(show_output=True)
        
    except Exception as ex:
        print(ex)

Found existing cluster, use it.


### Define a Run Configuration
The compute requires a Python environment with the necessary package dependencies installed

In [7]:
from azureml.core import Environment
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import RunConfiguration

# Create a Python environment for the experiment
diabetes_env = Environment("diabetes-pipeline-env")
diabetes_env.python.user_managed_dependencies = False # Let Azure ML manage dependencies
diabetes_env.docker.enabled = True # Use a docker container

# Create a set of package dependencies
diabetes_packages = CondaDependencies.create(conda_packages=['scikit-learn','pandas'],
                                             pip_packages=['azureml-defaults','azureml-dataprep[pandas]'])

# Add the dependencies to the environment
diabetes_env.python.conda_dependencies = diabetes_packages

# Register the environment (just in case you want to use it again)
diabetes_env.register(workspace=ws)
registered_env = Environment.get(ws, 'diabetes-pipeline-env')

# Create a new runconfig object for the pipeline
pipeline_run_config = RunConfiguration()

# Use the compute you created above. 
pipeline_run_config.target = pipeline_cluster

# Assign the environment to the run configuration
pipeline_run_config.environment = registered_env

print ("Run configuration created.")

Run configuration created.


## Create and Run a Pipeline

First you need to define the steps for the pipeline, and any data references that need to passed between them. In this case, the first step must write the model to a folder that can be read from by the second step. Since the steps will be run on remote compute (and in fact, could each be run on different compute), the folder path must be passed as a data reference to a location in a datastore within the workspace. The **PipelineData** object is a special kind of data reference that is used to pass data from the output of one pipeline step to the input of another, creating a dependency between them. You'll create one and use it as the output for the first step and the input for the second step. Note that you also need to pass it as a script argument so your code can access the datastore location referenced by the data reference.

In [8]:
from azureml.pipeline.core import PipelineData
from azureml.pipeline.steps import PythonScriptStep, EstimatorStep
from azureml.train.estimator import Estimator

# Get the training dataset
diabetes_ds = ws.datasets.get("diabetes dataset")

# Create a PipelineData (temporary Data Reference) for the model folder
model_folder = PipelineData("model_folder", datastore=ws.get_default_datastore())

estimator = Estimator(source_directory=experiment_folder,
                        compute_target = pipeline_cluster,
                        environment_definition=pipeline_run_config.environment,
                        entry_script='train_diabetes.py')

# Step 1, run the estimator to train the model
train_step = EstimatorStep(name = "Train Model",
                           estimator=estimator, 
                           estimator_entry_script_arguments=['--output_folder', model_folder],
                           inputs=[diabetes_ds.as_named_input('diabetes_train')],
                           outputs=[model_folder],
                           compute_target = pipeline_cluster,
                           allow_reuse = True)

# Step 2, run the model registration script
register_step = PythonScriptStep(name = "Register Model",
                                source_directory = experiment_folder,
                                script_name = "register_diabetes.py",
                                arguments = ['--model_folder', model_folder],
                                inputs=[model_folder],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

print("Pipeline steps defined")

'Estimator' is deprecated. Please use 'ScriptRunConfig' from 'azureml.core.script_run_config' with your own defined environment or an Azure ML curated environment.


Pipeline steps defined


### Build the Pipeline from the Steps, and then Run it as an AML Experiment
> **Note**: This will take a while. The training cluster must be started and configured with the Python environment before the scripts can be run. This is a good time for a coffee break!

In [9]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline

# Construct the pipeline
pipeline_steps = [train_step, register_step]
pipeline = Pipeline(workspace = ws, steps=pipeline_steps)
print("Pipeline is built.")

# Create an experiment and run the pipeline
experiment = Experiment(workspace = ws, name = 'diabetes-training-pipeline')
pipeline_run = experiment.submit(pipeline, regenerate_outputs=True)
print("Pipeline submitted for execution.")
pipeline_run.wait_for_completion(show_output=True)

Pipeline is built.
Created step Train Model [0ef6bb17][6ef40495-9929-4f1c-abec-b3dcef47bd94], (This step will run and generate new outputs)
Created step Register Model [1c29e350][684f2a55-bea4-471c-b5fd-c98d4aaca80b], (This step will run and generate new outputs)
Submitted PipelineRun 139fbc05-bf77-4cc6-9edd-24f76c3d08e5
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/diabetes-training-pipeline/runs/139fbc05-bf77-4cc6-9edd-24f76c3d08e5?wsid=/subscriptions/49c8fb06-83d0-410a-81dd-c687e23bd829/resourcegroups/mlops-rg/workspaces/mlops-aml-ws
Pipeline submitted for execution.
PipelineRunId: 139fbc05-bf77-4cc6-9edd-24f76c3d08e5
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/diabetes-training-pipeline/runs/139fbc05-bf77-4cc6-9edd-24f76c3d08e5?wsid=/subscriptions/49c8fb06-83d0-410a-81dd-c687e23bd829/resourcegroups/mlops-rg/workspaces/mlops-aml-ws
PipelineRun Status: Running


StepRunId: a659116e-d4b6-4522-afd8-cfd77f667bbf
Link to Azure Machine

Training a decision tree model
Accuracy: 0.9006666666666666
AUC: 0.886078007363292


[2021-04-07T20:32:59.597517] The experiment completed successfully. Finalizing run...
Cleaning up all outstanding Run operations, waiting 900.0 seconds
2 items cleaning up...
Cleanup took 0.15256738662719727 seconds
[2021-04-07T20:32:59.980115] Finished context manager injector.
2021/04/07 20:33:05 Attempt 1 of http call to http://10.0.0.4:16384/sendlogstoartifacts/status
2021/04/07 20:33:05 Not exporting to RunHistory as the exporter is either stopped or there is no data.
Stopped: false
OriginalData: 2
FilteredData: 0.
2021/04/07 20:33:05 Process Exiting with Code:  0

Streaming azureml-logs/75_job_post-tvmps_9d59c075601fa673d7bb6b8a9e5d44a7505ed8b23f738d007c500f119c525b9a_p.txt
[2021-04-07T20:33:06.168884] Entering job release
[2021-04-07T20:33:07.311707] Starting job release
[2021-04-07T20:33:07.312610] Logging experiment finalizing status in history service.
Starting the daemon thread to refresh to




StepRunId: cd539bad-c452-48d9-b34d-30a0f08ff739
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/diabetes-training-pipeline/runs/cd539bad-c452-48d9-b34d-30a0f08ff739?wsid=/subscriptions/49c8fb06-83d0-410a-81dd-c687e23bd829/resourcegroups/mlops-rg/workspaces/mlops-aml-ws
StepRun( Register Model ) Status: Queued
StepRun( Register Model ) Status: Running

Streaming azureml-logs/55_azureml-execution-tvmps_9d59c075601fa673d7bb6b8a9e5d44a7505ed8b23f738d007c500f119c525b9a_p.txt
2021-04-07T20:33:33Z Successfully mounted a/an Blobfuse File System at /mnt/batch/tasks/shared/LS_root/jobs/mlops-aml-ws/azureml/cd539bad-c452-48d9-b34d-30a0f08ff739/mounts/workspaceblobstore
2021-04-07T20:33:34Z Starting output-watcher...
2021-04-07T20:33:34Z IsDedicatedCompute == False, starting polling for Low-Pri Preemption
2021-04-07T20:33:38Z Executing 'Copy ACR Details file' on 10.0.0.4
2021-04-07T20:33:38Z Copy ACR Details file succeeded on 10.0.0.4. Output: 
>>>   
>>>   
Login Succee


Streaming azureml-logs/75_job_post-tvmps_9d59c075601fa673d7bb6b8a9e5d44a7505ed8b23f738d007c500f119c525b9a_p.txt
[2021-04-07T20:33:57.662055] Entering job release
[2021-04-07T20:33:58.688111] Starting job release
[2021-04-07T20:33:58.693175] Logging experiment finalizing status in history service.
Starting the daemon thread to refresh tokens in background for process with pid = 146
[2021-04-07T20:33:58.693594] job release stage : upload_datastore starting...
[2021-04-07T20:33:58.703091] job release stage : start importing azureml.history._tracking in run_history_release.
[2021-04-07T20:33:58.703439] job release stage : copy_batchai_cached_logs starting...
[2021-04-07T20:33:58.704063] job release stage : execute_job_release starting...
[2021-04-07T20:33:58.704232] job release stage : copy_batchai_cached_logs completed...
[2021-04-07T20:33:58.704835] Entering context manager injector.
[2021-04-07T20:33:58.723013] job release stage : upload_datastore completed...
[2021-04-07T20:33:58.8632



PipelineRun Execution Summary
PipelineRun Status: Finished
{'runId': '139fbc05-bf77-4cc6-9edd-24f76c3d08e5', 'status': 'Completed', 'startTimeUtc': '2021-04-07T20:26:50.174683Z', 'endTimeUtc': '2021-04-07T20:34:13.790882Z', 'properties': {'azureml.runsource': 'azureml.PipelineRun', 'runSource': 'SDK', 'runType': 'SDK', 'azureml.parameters': '{}'}, 'inputDatasets': [], 'outputDatasets': [], 'logFiles': {'logs/azureml/executionlogs.txt': 'https://jtupitzaamlsa.blob.core.windows.net/azureml/ExperimentRun/dcid.139fbc05-bf77-4cc6-9edd-24f76c3d08e5/logs/azureml/executionlogs.txt?sv=2019-02-02&sr=b&sig=a9UprFA8Ds6TUBBruot4yMhhRJzblwjs7Bvr1357iVk%3D&st=2021-04-07T20%3A24%3A15Z&se=2021-04-08T04%3A34%3A15Z&sp=r', 'logs/azureml/stderrlogs.txt': 'https://jtupitzaamlsa.blob.core.windows.net/azureml/ExperimentRun/dcid.139fbc05-bf77-4cc6-9edd-24f76c3d08e5/logs/azureml/stderrlogs.txt?sv=2019-02-02&sr=b&sig=3zXKrLtHi5%2BWc8KqBqzVHQX5zxrvUiC9gswb6SI0l%2Bs%3D&st=2021-04-07T20%3A24%3A15Z&se=2021-04-08T0

'Finished'

### Verify that the Newly Trained Model Exists
A new model should be registered with a Training context tag indicating it was trained in a pipeline.

In [10]:
from azureml.core import Model

for model in Model.list(ws):
    print(model.name, 'version:', model.version)
    for tag_name in model.tags:
        tag = model.tags[tag_name]
        print ('\t',tag_name, ':', tag)
    for prop_name in model.properties:
        prop = model.properties[prop_name]
        print ('\t',prop_name, ':', prop)
    print('\n')

diabetes_model version: 2
	 Training context : Pipeline


diabetes_model version: 1
	 Training context : Pipeline


porto_seguro_safe_driver_model version: 24


porto_seguro_safe_driver_model version: 23


porto_seguro_safe_driver_model.pkl version: 6


porto_seguro_safe_driver_model version: 22


porto_seguro_safe_driver_model version: 21


diabetes_regression_model.pkl version: 1
	 area : diabetes_regression
	 run_id : 6ea0258d-ebb1-451b-aec0-187aef0a20f3
	 experiment_name : mlopspython
	 mse : 3295.741064355809
	 BuildId : 26
	 BuildUri : https://dev.azure.com/jtupitza-msft/python-mlops/_build/results?buildId=26




### Publish the Pipeline as a REST Service

In [11]:
published_pipeline = pipeline.publish(name="Diabetes_Training_Pipeline",
                                      description="Trains diabetes model",
                                      version="1.0")
rest_endpoint = published_pipeline.endpoint
print(rest_endpoint)

https://eastus.api.azureml.ms/pipelines/v1.0/subscriptions/49c8fb06-83d0-410a-81dd-c687e23bd829/resourceGroups/mlops-rg/providers/Microsoft.MachineLearningServices/workspaces/mlops-aml-ws/PipelineRuns/PipelineSubmit/b3f8f29c-5222-4944-9d9d-f41a2124fe3e


### Test the New REST Service Endpoint
- Use the authorization header from the current Azure workspace connection to authenticate the call to the REST Service.
- Since the pipeline runs asynchronously, an identifier is returned that can be used to track the experiment at runtime.

In [12]:
from azureml.core.authentication import InteractiveLoginAuthentication

interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()

In [13]:
import requests
from azureml.pipeline.core.run import PipelineRun

experiment_name = 'Run-diabetes-pipeline'

response = requests.post(rest_endpoint, headers=auth_header, 
                         json={"ExperimentName": experiment_name})
run_id = response.json()["Id"]
print("Tracking Run: ", run_id)

published_pipeline_run = PipelineRun(ws.experiments[experiment_name], run_id)
published_pipeline_run.wait_for_completion(show_output=True)

PipelineRunId: fa02c5cd-75bc-4aa7-b593-12561d977630
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/Run-diabetes-pipeline/runs/fa02c5cd-75bc-4aa7-b593-12561d977630?wsid=/subscriptions/49c8fb06-83d0-410a-81dd-c687e23bd829/resourcegroups/mlops-rg/workspaces/mlops-aml-ws
PipelineRun Status: NotStarted
PipelineRun Status: Running


StepRunId: 0335f780-42e8-4e66-8095-d4e35d858cbf
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/Run-diabetes-pipeline/runs/0335f780-42e8-4e66-8095-d4e35d858cbf?wsid=/subscriptions/49c8fb06-83d0-410a-81dd-c687e23bd829/resourcegroups/mlops-rg/workspaces/mlops-aml-ws

StepRun(Train Model) Execution Summary
StepRun( Train Model ) Status: Finished
{'runId': '0335f780-42e8-4e66-8095-d4e35d858cbf', 'target': 'train-cluster', 'status': 'Completed', 'startTimeUtc': '2021-04-07T20:36:32.279888Z', 'endTimeUtc': '2021-04-07T20:36:32.713888Z', 'properties': {'azureml.reusedrunid': 'a659116e-d4b6-4522-afd8-cfd77f667bbf', 'azurem




StepRunId: 393c267e-9825-4d11-bc4b-112623780d8c
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/Run-diabetes-pipeline/runs/393c267e-9825-4d11-bc4b-112623780d8c?wsid=/subscriptions/49c8fb06-83d0-410a-81dd-c687e23bd829/resourcegroups/mlops-rg/workspaces/mlops-aml-ws

StepRun(Register Model) Execution Summary
StepRun( Register Model ) Status: Finished
{'runId': '393c267e-9825-4d11-bc4b-112623780d8c', 'target': 'train-cluster', 'status': 'Completed', 'startTimeUtc': '2021-04-07T20:36:35.198617Z', 'endTimeUtc': '2021-04-07T20:36:35.704468Z', 'properties': {'azureml.reusedrunid': 'cd539bad-c452-48d9-b34d-30a0f08ff739', 'azureml.reusednodeid': '1c29e350', 'azureml.reusedpipeline': '139fbc05-bf77-4cc6-9edd-24f76c3d08e5', 'azureml.reusedpipelinerunid': '139fbc05-bf77-4cc6-9edd-24f76c3d08e5', 'azureml.runsource': 'azureml.StepRun', 'azureml.nodeid': '1c29e350', 'ContentSnapshotId': 'f182ef6a-ffbf-4b49-83e3-f2cf7d0f662d', 'StepType': 'PythonScriptStep', 'ComputeTargetTy



PipelineRun Execution Summary
PipelineRun Status: Finished
{'runId': 'fa02c5cd-75bc-4aa7-b593-12561d977630', 'status': 'Completed', 'startTimeUtc': '2021-04-07T20:36:29.514418Z', 'endTimeUtc': '2021-04-07T20:36:37.918434Z', 'properties': {'azureml.runsource': 'azureml.PipelineRun', 'runSource': 'Unavailable', 'runType': 'HTTP', 'azureml.parameters': '{}', 'azureml.pipelineid': 'b3f8f29c-5222-4944-9d9d-f41a2124fe3e'}, 'inputDatasets': [], 'outputDatasets': [], 'logFiles': {'logs/azureml/executionlogs.txt': 'https://jtupitzaamlsa.blob.core.windows.net/azureml/ExperimentRun/dcid.fa02c5cd-75bc-4aa7-b593-12561d977630/logs/azureml/executionlogs.txt?sv=2019-02-02&sr=b&sig=6IvRhhLByUgIO9JSkW8uzz%2FDgRlqGfrZskiURRg%2FfNk%3D&st=2021-04-07T20%3A26%3A40Z&se=2021-04-08T04%3A36%3A40Z&sp=r', 'logs/azureml/stderrlogs.txt': 'https://jtupitzaamlsa.blob.core.windows.net/azureml/ExperimentRun/dcid.fa02c5cd-75bc-4aa7-b593-12561d977630/logs/azureml/stderrlogs.txt?sv=2019-02-02&sr=b&sig=1asAlgH99CcJtxV53c5