In enterprise environment, it is common to encapsulate the sequence of discrete steps to build a ml solution that could be run on one or more compute targets. It is an automated build process.

## Connect to your workspace

In [1]:
import azureml.core
from azureml.core import workspace
#Load the workspace from the saved config file
ws=workspace.from_config()
print('Ready to use Azure ML {} to work with {}'.format(azureml.core.VERSION, ws.name)'

SyntaxError: EOL while scanning string literal (<ipython-input-1-3f5e623606c7>, line 5)

## Prepare the training data
Use the data stored in the cloud. It is an Azure Storage Blob Container. In practice, you will register a datastore that reference the cloud location where you store your data. Then create a tabular dataset that represent the csv files you uploaded. 

In [3]:
from azureml.core import Dataset
default_ds=ws.get_default_datastore()

if 'diabetes dataset' not in ws.datasets:
    default_ds.upload_files(files=['./data/diabetes.csv', './data/diabetes2.csv'], # Upload the diabetes csv files in /data
                        target_path='diabetes-data/', # Put it in a folder path in the datastore
                        overwrite=True, # Replace existing files of the same name
                        show_progress=True)
#Create a tabular dataset from a path on the datastore
tab_data_set=Dataset.Tabular.from_delimited_files(path=(default_ds, 'diabetes-data/*.csv'))

#Register the tabular dataset
try:
        tab_data_set = tab_data_set.register(workspace=ws, 
                                name='diabetes dataset',
                                description='diabetes data',
                                tags = {'format':'CSV'},
                                create_new_version=True)
        print('Dataset registered.')
    except Exception as ex:
        print(ex)
else:
    print('Dataset already registered.')

IndentationError: unindent does not match any outer indentation level (<tokenize>, line 20)

## Pipeline Steps
Pipeline steps contains one or more steps, which can be python scripts, or specialized steps like Auto ML traning estimator or a data transfer step that copies data from one location to another. Each step can run on its own compute context.
We will build a a simple pipeline which contains an estimator(to train a model) and a python script(to register the tranined model). Start by creating a folder to contain all scripts. 

In [6]:
#Create a folder for the pipeline step files 
import os 
experiment_folder='diabetes_pipeline'
os.mkdir(experiment_folder, exist_ok=True)
print(experiment_folder)

TypeError: 'exist_ok' is an invalid keyword argument for mkdir()

step1: use estimator to run a training script. 

In [5]:
%%writefile $experiment_folder/train_diabetes.py
# Import libraries
from azureml.core import Run
import argparse
import pandas as pd
import numpy as np
import joblib
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import roc_auc_score

# Get parameters
parser = argparse.ArgumentParser()
parser.add_argument('--output_folder', type=str, dest='output_folder', default="diabetes_model", help='output folder')
args = parser.parse_args()
output_folder = args.output_folder

# Get the experiment run context
run = Run.get_context()

# load the diabetes data (passed as an input dataset)
print("Loading Data...")
diabetes = run.input_datasets['diabetes_train'].to_pandas_dataframe()

# Separate features and labels
X, y = diabetes[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age']].values, diabetes['Diabetic'].values

# Split data into training set and test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, random_state=0)

# Train adecision tree model
print('Training a decision tree model')
model = DecisionTreeClassifier().fit(X_train, y_train)

# calculate accuracy
y_hat = model.predict(X_test)
acc = np.average(y_hat == y_test)
print('Accuracy:', acc)
run.log('Accuracy', np.float(acc))

# calculate AUC
y_scores = model.predict_proba(X_test)
auc = roc_auc_score(y_test,y_scores[:,1])
print('AUC: ' + str(auc))
run.log('AUC', np.float(auc))

# Save the trained model
os.makedirs(output_folder, exist_ok=True)
output_path = output_folder + "/model.pkl"
joblib.dump(value=model, filename=output_path)

run.complete()

Writing diabetes_pipeline/train_diabetes.py


FileNotFoundError: [Errno 2] No such file or directory: 'diabetes_pipeline/train_diabetes.py'

Step 2: load the model from where its saved and register it into the workspace. model_folder paramter contains the path to the folder where the model is saved from previous step. 

In [7]:
%%writefile $experiment_folder/register_diabetes.py
# Import libraries
import argparse
import joblib
from azureml.core import Workspace, Model, Run

# Get parameters
parser = argparse.ArgumentParser()
parser.add_argument('--model_folder', type=str, dest='model_folder', default="diabetes_model", help='model location')
args = parser.parse_args()
model_folder = args.model_folder

# Get the experiment run context
run = Run.get_context()

# load the model
print("Loading model from " + model_folder)
model_file = model_folder + "/model.pkl"
model = joblib.load(model_file)

Model.register(workspace=run.experiment.workspace,
               model_path = model_file,
               model_name = 'diabetes_model',
               tags={'Training context':'Pipeline'})

run.complete()

Writing diabetes_pipeline/register_diabetes.py


FileNotFoundError: [Errno 2] No such file or directory: 'diabetes_pipeline/register_diabetes.py'

## Specify compute context for each steps
First, set up a compute target, which is a compute cluster in your workspace. 

In [8]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

cluster_name='your_compute_cluster'
try:
    # Check for existing compute target
    pipeline_cluster = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # If it doesn't already exist, create it
    try:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS11_V2', max_nodes=2)
        pipeline_cluster = ComputeTarget.create(ws, cluster_name, compute_config)
        pipeline_cluster.wait_for_completion(show_output=True)
    except Exception as ex:
        print(ex)

Failure while loading azureml_run_type_providers. Failed to load entrypoint automl = azureml.train.automl.run:AutoMLRun._from_run_dto with exception unknown locale: UTF-8.


NameError: name 'ws' is not defined

Secondly, set up a python environment and do a run configuration.

In [9]:
from azureml.core import environment
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import RunConfiguration

#Creata a python environment for the experiment
diabetes_env=Environment('diabetes-pipeline-env')

from azureml.core import Environment
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import RunConfiguration

# Create a Python environment for the experiment
diabetes_env = Environment("diabetes-pipeline-env")
diabetes_env.python.user_managed_dependencies = False # Let Azure ML manage dependencies
diabetes_env.docker.enabled = True # Use a docker container

# Create a set of package dependencies
diabetes_packages = CondaDependencies.create(conda_packages=['scikit-learn','pandas'],
                                             pip_packages=['azureml-defaults','azureml-dataprep[pandas]'])

# Add the dependencies to the environment
diabetes_env.python.conda_dependencies = diabetes_packages

# Register the environment (just in case you want to use it again)
diabetes_env.register(workspace=ws)
registered_env = Environment.get(ws, 'diabetes-pipeline-env')

#Creata a new run_config object for the pipeline
pipeline_run_config = RunConfiguration()

# Use the compute you created above. 
pipeline_run_config.target = pipeline_cluster

# Assign the environment to the run configuration
pipeline_run_config.environment = registered_env

print ("Run configuration created.")

NameError: name 'Environment' is not defined

## Run a pipeline on remote computer
1. write the model to the folder, the folder path must be passed as a data reference to a location in the datastore within the workspace. 
2. PiplelineData object is a special kind of data referenece which is used to pass data from one output of pipeline step to another input of pipeline step. Create one and also pass it as as script argument. 

In [10]:
from azureml.pipeline.core import PipelineData
from azureml.pipeline.steps import PythonScriptStep, EstimatorStep
from azureml.train.estimator import Estimator
# Get the training dataset
diabetes_ds = ws.datasets.get("diabetes dataset")

# Create a PipelineData (temporary Data Reference) for the model folder
model_folder = PipelineData("model_folder", datastore=ws.get_default_datastore())

estimator = Estimator(source_directory=experiment_folder,
                        compute_target = pipeline_cluster,
                        environment_definition=pipeline_run_config.environment,
                        entry_script='train_diabetes.py')

# Step 1, run the estimator to train the model
train_step = EstimatorStep(name = "Train Model",
                           estimator=estimator, 
                           estimator_entry_script_arguments=['--output_folder', model_folder],
                           inputs=[diabetes_ds.as_named_input('diabetes_train')],
                           outputs=[model_folder],
                           compute_target = pipeline_cluster,
                           allow_reuse = True)

# Step 2, run the model registration script
register_step = PythonScriptStep(name = "Register Model",
                                source_directory = experiment_folder,
                                script_name = "register_diabetes.py",
                                arguments = ['--model_folder', model_folder],
                                inputs=[model_folder],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

print("Pipeline steps defined")

ValueError: unknown locale: UTF-8

In [11]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline

# Construct the pipeline
pipeline_steps = [train_step, register_step]
pipeline = Pipeline(workspace = ws, steps=pipeline_steps)
print("Pipeline is built.")

# Create an experiment and run the pipeline
experiment = Experiment(workspace = ws, name = 'diabetes-training-pipeline')
pipeline_run = experiment.submit(pipeline, regenerate_outputs=True)
print("Pipeline submitted for execution.")
pipeline_run.wait_for_completion(show_output=True)

NameError: name 'train_step' is not defined

Pipeline runs can be monitored from experiment page on AzureML studio. 

When the pipeline has finished, a new model should be registered with a Training context tag indicating it was trained in a pipeline. Run the following code to verify this.

In [12]:
from azureml.core import Model

for model in Model.list(ws):
    print(model.name, 'version:', model.version)
    for tag_name in model.tags:
        tag = model.tags[tag_name]
        print ('\t',tag_name, ':', tag)
    for prop_name in model.properties:
        prop = model.properties[prop_name]
        print ('\t',prop_name, ':', prop)
    print('\n')

NameError: name 'ws' is not defined

## Publish the pipeline
Publish it as a REST service.

In [13]:
published_pipeline = pipeline.publish(name="Diabetes_Training_Pipeline",
                                      description="Trains diabetes model",
                                      version="1.0")
rest_endpoint = published_pipeline.endpoint
print(rest_endpoint)

NameError: name 'pipeline' is not defined

To use the pipeline, client application should make a REST call over HTTP. This request must be authenticated, an authorization head is required. 
A real application would require a service principal with which to be authenticated, but to test this out, we'll use the authorization header from your current connection to your Azure workspace, which you can get using the following code:

In [None]:
from azureml.core.authentication import InteractiveLoginAuthentication

interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()

import requests
experiment_name = 'Run-diabetes-pipeline'

response = requests.post(rest_endpoint, 
                         headers=auth_header, 
                         json={"ExperimentName": experiment_name})
run_id = response.json()["Id"]
run_id

#Note: The pipeline should complete quickly, because each step was configured to allow output reuse. This was done primarily for convenience and to save time in this example. In reality, you'd likely want the first step to run every time in case the data has changed, 
#and trigger the subsequent steps only if the output from step one changes

Note, we have launched a browser for you to login. For old experience with device code, use "az login --use-device-code"


Performing interactive authentication. Please follow the instructions on the terminal.


In [None]:
from azureml.pipeline.core.run import PipelineRun

published_pipeline_run = PipelineRun(ws.experiments[experiment_name], run_id)
pipeline_run.wait_for_completion(show_output=True)

In reality, you could build more sophisticated logic into the pipeline steps - for example, evaluating the model against some test data to calculate a performance metric like AUC or accuracy, comparing the metric to that of any previously registered versions of the model, and only registering the new model if it performs better