# Exercise 6 - Automating ML Workflows with Pipelines

In the previous exercises, you explored the entire machine learning process from accessing data through to training and deploying machine learning models. Up until now, you have performed the various steps required to create a machine learning solution interactively. In this exercise, you'll explore automation of these steps using *pipelines*.

> **Important**: This exercise assumes you have completed the previous exercises in this series - specifically, you must have:
>
> - Created an Azure ML Workspace.
> - Uploaded the diabetes.csv data file to the workspace's default datastore.
> - Created a dataset called **Diabetes Dataset**.
> - Created an Azure ML compute resource named **cpu-compute**
>
> If you haven't done that, there's no time like the present!

> **More Information**: For more information about Azure ML Pipelines, see the [documentation](https://docs.microsoft.com/en-us/azure/machine-learning/service/concept-ml-pipelines).

## Task 1: Connect to Your Workspace

The first thing you need to do is to connect to your workspace using the Azure ML SDK. Let's start by ensuring you still have the latest version installed (if you ended and restarted your Azure Notebooks session, the environment may have been reset)

In [None]:
!pip install --upgrade azureml-sdk[notebooks]

import azureml.core
print("Ready to use Azure ML", azureml.core.VERSION)

Now you're ready to connect to your workspace. When you created it in the previous exercise, you saved its configuration; so now you can simply load the workspace from its configuration file.

> **Note**: If the authenticated session with your Azure subscription has expired since you completed the previous exercise, you'll be prompted to reauthenticate.

In [None]:
from azureml.core import Workspace

# Load the workspace from the saved config file
ws = Workspace.from_config()
print('Ready to work with', ws.name)

## Task 2: Prepare the Pipeline Environment

Pipelines consist of one or more *steps*, which can be Python scripts, or specialized steps like an Auto ML training estimator or a data transfer step that copies data from one location to another. Each step can run in its own compute context.

In this exercise, you'll build a simple pipeline that contains two Python script steps (one to train a model, and another to register the trained model). Before creating the pipeline however, you'll need to prepare the environment by creating the scripts for each step, and defining compute and run configuration for the steps. 

Let's start by creating a folder for our pipeline step scripts.

In [None]:
import os
# Create a folder for the experiment files
experiment_name = 'diabetes_pipeline'
experiment_folder = './' + experiment_name
os.makedirs(experiment_folder, exist_ok=True)

print(experiment_folder)

## Python script creation

When defining scripts to run in our pipelines, we can choose between two primary ways to ingest Azure Machine Learning [Datasets](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.dataset.dataset?view=azure-ml-py). 

- One is from the run context of our script when it is executed on the compute target.

- Alternatively, the use of a [FileDataset](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.filedataset?view=azure-ml-py) can allow us to mount and pass in the path to files represented by a Dataset at runtime.

The former pattern is great for keeping everything in the cloud, and when as a development team we are making changes simultaneously to our source code and AML pipelines, meaning that our scripts need only ever run on our compute targets - on which the run context will always have an assigned Workspace.

The latter pattern smoothes the transition from local development of scripts, to their consumption in pipelines, by allowing us to utilise environment variables leading to our dataset files in either context. We can thus remove any reference to run context from our scripts, and run them locally, in order to iterate quickly, while safely expecting the same remote behaviour when the pipeline is submitted for execution.

The difference can be seen in the script definitions below.

### Option 1 - Retrieving datasets from the Workspace via the Run context

This will create the source code file `train_diabetes_no_pipeline_input.py`.

The script includes the following parameters:

- **dataset_name**: The name of the dataset in your workspace to be used as source data for training.
- **regularization**: The regularization rate to be used when training a logistic regression model.
- **output_folder** the folder where the trained model should be saved.

In [None]:
%%writefile $experiment_folder/train_diabetes_no_pipeline_input.py
# Import libraries
import argparse
import joblib
from azureml.core import Workspace, Dataset, Experiment, Run
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression


# Get parameters
parser = argparse.ArgumentParser()
parser.add_argument('--dataset_name', type=str, dest='dataset_name', default="Diabetes Dataset", help='source dataset')
parser.add_argument('--regularization', type=float, dest='reg_rate', default=0.01, help='regularization rate')
parser.add_argument('--output_folder', type=str, dest='output_folder', default="diabetes_model", help='output folder')
args = parser.parse_args()
reg = args.reg_rate
dataset_name = args.dataset_name
output_folder = args.output_folder

# Get the experiment run context
run = Run.get_context()

# load the diabetes dataset
print("Loading data from " + dataset_name)
diabetes = Dataset.get_by_name(workspace=run.experiment.workspace, name=dataset_name).to_pandas_dataframe()

# Separate features and labels
X, y = diabetes[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age']].values, diabetes['Diabetic'].values

# Train a logistic regression model
print('Training a logistic regression model with regularization rate of', reg)
run.log('Regularization Rate',  np.float(reg))
model = LogisticRegression(C=1/reg, solver="liblinear").fit(X, y)

# Save the trained model
os.makedirs(output_folder, exist_ok=True)
output_path = output_folder + "/model.pkl"
joblib.dump(value=model, filename=output_path)

run.complete()

### Option 2 - Reading datasets from mounted input files

First, we require a FileDataset in order to take advantage of the mounting functionality.

In [None]:
from azureml.core import Dataset

# Retrieve diabetes dataset as a FileDataset
diabetes_file_dataset = Dataset.File.from_files(
        path=[(ws.get_default_datastore(), "diabetes-data/diabetes.csv")], validate=True
)

print("Sourced Dataset:\n{}".format(diabetes_file_dataset))

# And ready for direct input to a pipeline step...

# This creates a context manager which automatically mounts the file to the
# compute environment at runtime.
diabetes_file_dataset = diabetes_file_dataset.as_named_input('training_dataset').as_mount()

We use this mounted FileDataset in one of our [later pipeline examples](http://localhost:8888/notebooks/06%20-%20Automating%20ML%20Workflows%20with%20Pipelines.ipynb#Task-3:-Create-and-Run-a-Pipeline).

Next, we create `train_diabetes_with_pipeline_input.py`: a script which retrieves the mounted input without relying on knowledge or existence of a run context. This can be run and iterated on locally like any other Python script.

The script includes the following parameters:

- **regularization**: The regularization rate to be used when training a logistic regression model.
- **output_folder** the folder where the trained model should be saved.

In [None]:
%%writefile $experiment_folder/train_diabetes_with_pipeline_input.py
# Import libraries
import os
import argparse
import joblib
from azureml.core import Run
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression


# Get parameters
parser = argparse.ArgumentParser()

# Notice no dataset name is required in this version
parser.add_argument('--regularization', type=float, dest='reg_rate', default=0.01, help='regularization rate')
parser.add_argument('--output_folder', type=str, dest='output_folder', default="diabetes_model", help='output folder')
args = parser.parse_args()
reg = args.reg_rate
output_folder = args.output_folder

dataset_path = os.environ['training_dataset'] # The key here is the name of the input created from the Dataset

# Get the experiment run context
# We longer need this for I/O, but can continue to use it for logging etc., even locally.
run = Run.get_context()

# load the diabetes dataset
print("Loading data from " + dataset_path)
# Simply use pandas as with a local file
diabetes = pd.read_csv(dataset_path)

print("Data retrieved: {}\n".format(diabetes))

# Separate features and labels
X, y = diabetes[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age']].values, diabetes['Diabetic'].values

# Train a logistic regression model
print('Training a logistic regression model with regularization rate of', reg)
run.log('Regularization Rate',  np.float(reg))
model = LogisticRegression(C=1/reg, solver="liblinear").fit(X, y)

# Save the trained model
os.makedirs(output_folder, exist_ok=True)
output_path = output_folder + "/model.pkl"
joblib.dump(value=model, filename=output_path)

run.complete()

## Model registration

The script for the second step of the pipeline will load the model from where it was saved, and then register it in the workspace. It includes a single **model_path** parameter that contains the path where the model was saved.

In [None]:
%%writefile $experiment_folder/register_diabetes.py
# Import libraries
import argparse
import joblib
from azureml.core import Workspace, Model, Run

# Get parameters
parser = argparse.ArgumentParser()
parser.add_argument('--model_folder', type=str, dest='model_folder', default="diabetes_model", help='model location')
args = parser.parse_args()
model_folder = args.model_folder

# Get the experiment run context
run = Run.get_context()

# load the model
print("Loading model from " + model_folder)
model_file = model_folder + "/model.pkl"
model = joblib.load(model_file)

Model.register(workspace=run.experiment.workspace,
               model_path = model_file,
               model_name = 'diabetes_model',
               tags={'Training context':'Pipeline'})

run.complete()

In this exercise, you'll use the same compute for both steps, but it's important to realize that each step is run independently; so you could specify different compute contexts for each step if appropriate.

In [None]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

# Choose a name for your CPU cluster
cpu_cluster_name = "cpu-cluster"

# Verify that cluster does not exist already
try:
    cpu_cluster = ComputeTarget(workspace=ws, name=cpu_cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # Create an AzureMl Compute resource (a container cluster)
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D2_V2', 
                                                           vm_priority='lowpriority', 
                                                           max_nodes=4)
    cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, compute_config)

cpu_cluster.wait_for_completion(show_output=True)

The compute will require a Python environment with the necessary package dependencies installed, so we'll create a run configuration.

In [None]:
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies

# Create a new runconfig object
pipeline_run_config = RunConfiguration()

# Use the compute you created above. 
pipeline_run_config.target = cpu_cluster

# Enable Docker
pipeline_run_config.environment.docker.enabled = True

# Specify CondaDependencies obj, add necessary packages
pipeline_run_config.environment.python.user_managed_dependencies = False
pipeline_run_config.environment.python.conda_dependencies = CondaDependencies.create(
    conda_packages=['pandas','scikit-learn'], 
    pip_packages=['azureml-sdk', 'pyarrow'])

print ("Run configuration created.")

## Task 3: Create and Run a Pipeline

Now we're ready to create and run a pipeline.

First we need to define the steps for the pipeline, and any data references that need to passed between them. In this case, the first step must write the model to a folder that can be read from by the second step. Since the steps will be run on remote compute (and in fact, could each be run on different compute), the folder path must be passed as a data reference to a location in a datastore within the workspace. The **PipelineData** object is just that, so we'll create one and use at as the output for the first step and the input for the second step. Note that we also need to pass it as a script argument so our code can access the datastore location referenced by the data reference.

### Pipeline for option 1 - Retrieving input dataset from run context

In [None]:
from azureml.pipeline.core import PipelineData
from azureml.pipeline.steps import PythonScriptStep

# Create a PipelineData (Data Reference) for the model folder
model_folder = PipelineData("model_folder", datastore=ws.get_default_datastore())

# Step 1, run the training script
train_step = PythonScriptStep(name = "Train Model",
                                source_directory = experiment_folder,
                                script_name = "train_diabetes_no_pipeline_input.py",
                                arguments = ['--dataset_name', 'Diabetes Dataset',
                                             '--regularization', 0.1,
                                             '--output_folder', model_folder],
                                outputs=[model_folder],
                                compute_target = cpu_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = False)

# Step 2, run the model registration script
register_step = PythonScriptStep(name = "Register Model",
                                source_directory = experiment_folder,
                                script_name = "register_diabetes.py",
                                arguments = ['--model_folder', model_folder],
                                inputs=[model_folder],
                                compute_target = cpu_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = False)

print("Pipeline steps defined")

### Pipeline for option 2 - Retrieving input dataset from mount

Notice here the use of the `inputs` parameter to PythonScriptStep. This is where we pass in our mounted FileDataset.

In [None]:
from azureml.pipeline.core import PipelineData
from azureml.pipeline.steps import PythonScriptStep

# Create a PipelineData (Data Reference) for the model folder
model_folder = PipelineData("model_folder", datastore=ws.get_default_datastore())

# Step 1, run the training script

# Here we define a list of inputs and provide our FileDataset as an mounted input named 'training_dataset'
# This name forms the key of the environment variable to which the path to the file is the value
# at runtime.
train_step = PythonScriptStep(name = "Train Model",
                                source_directory = experiment_folder,
                                script_name = "train_diabetes_with_pipeline_input.py",
                                arguments = ['--regularization', 0.1,
                                             '--output_folder', model_folder],
                                inputs=[diabetes_file_dataset],
                                outputs=[model_folder],
                                compute_target = cpu_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = False)

# Step 2, run the model registration script
register_step = PythonScriptStep(name = "Register Model",
                                source_directory = experiment_folder,
                                script_name = "register_diabetes.py",
                                arguments = ['--model_folder', model_folder],
                                inputs=[model_folder],
                                compute_target = cpu_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = False)

print("Pipeline steps defined")

OK, we're ready to go. let's build the pipeline from the steps we've defined and run it in an experiment.

In [None]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline
from azureml.widgets import RunDetails

# Construct the pipeline
pipeline_steps = [train_step, register_step]
pipeline = Pipeline(workspace = ws, steps=pipeline_steps)
print("Pipeline is built.")

# Create an experiment and run the pipeline
experiment = Experiment(workspace = ws, name = experiment_name)
pipeline_run = experiment.submit(pipeline, regenerate_outputs=True)
print("Pipeline submitted for execution.")

RunDetails(pipeline_run).show()

When the pipeline has finished, a new model should be registered with a *Training context* tag indicating it was trained in a pipeline. Run the following code to verify this.

In [None]:
from azureml.core import Model

for model in Model.list(ws):
    print(model.name, 'version:', model.version)
    for tag_name in model.tags:
        tag = model.tags[tag_name]
        print ('\t',tag_name, ':', tag)
    for prop_name in model.properties:
        prop = model.properties[prop_name]
        print ('\t',prop_name, ':', prop)
    print('\n')

This is a simple example, designed to demonstrate the principle. In reality, you could build more sophisticated logic into the pipeline steps - for example, evaluating the model against some test data to calculate a performance metric like AUC or accuracy, comparing the metric to that of any previously registered versions of the model, and only registering the new model if it performs better.

You can use the [Azure Machine Learning extension for Azure DevOps](https://marketplace.visualstudio.com/items?itemName=ms-air-aiagility.vss-services-azureml) to combine Azure ML pipelines with Azure DevOps pipelines (yes, it *is* confusing that they have the same name!) and integrate model retraining into a *continuous integration/continuous deployment (CI/CD)* process. For example you could use an Azure DevOps *build* pipeline to trigger an Azure ML pipeline that trains and registers a model, and when the model is registered it could trigger an Azure Devops *release* pipeline that deploys the model as a web service, along with the application or service that consumes the model.