## Azure Machine Learning - Model Training Pipeline
This notebook demonstrates creation and execution of an Azure ML pipeline designed to load data from an AML-linked blob storage account, split the data into testing and training subsets, train a classification model, evaluate and register the model, and then package the model into a Docker container and push to a container registry. For the final evaluation step a champion vs. challenger A/B test is performed using a target metric of interest so that the best performing model is always reflected in the model registry.

Note: This notebook builds from the Iris Setosa sample dataset available in Scikit-Learn.

### Import Required Packages

In [10]:
# Import required packages
from azureml.core import Workspace, Experiment, Datastore, Environment, Dataset
from azureml.core.compute import ComputeTarget, AmlCompute, DataFactoryCompute
from azureml.core.compute_target import ComputeTargetException
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import DEFAULT_CPU_IMAGE
from azureml.pipeline.core import Pipeline, PipelineParameter, PipelineData
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core import PipelineParameter, PipelineData
from azureml.data.output_dataset_config import OutputTabularDatasetConfig, OutputDatasetConfig, OutputFileDatasetConfig
import logging
import os

### Connect to Azure ML Workspace, Provision Compute Resources, and get References to Datastores
Connect to workspace using config associated config file. Get a reference to you pre-existing AML compute cluster or provision a new cluster to facilitate processing. Finally, get references to your default blob datastore.

<i>Note:</i> For execution in MLOps CI/CD pipelines, some of the hard-coded values below can be parameterized.

In [11]:
# Connect to AML Workspace
ws = None
try:
    ws = Workspace.from_config()
except Exception:
    ws = Workspace(subscription_id=os.getenv('SUBSCRIPTION_ID'),  resource_group = os.getenv('RESOURCE_GROUP'), workspace_name = os.getenv('WORKSPACE_NAME'))


#Select AML Compute Cluster
cpu_cluster_name = 'cluster002'

# Verify that cluster does not exist already
try:
    cpu_cluster = ComputeTarget(workspace=ws, name=cpu_cluster_name)
    print('Found an existing cluster, using it instead.')
except ComputeTargetException:
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D3_V2',
                                                           min_nodes=0,
                                                           max_nodes=1)
    cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, compute_config)
    cpu_cluster.wait_for_completion(show_output=True)
    
#Get default datastore
default_ds = ws.get_default_datastore()

Found an existing cluster, using it instead.


### Create Run Configuration
The `RunConfiguration` defines the environment used across all python steps. You can optionally add additional conda or pip packages to be added to your environment. [More details here](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.conda_dependencies.condadependencies?view=azure-ml-py).

Here, we also register the environment to the AML workspace so that it can be used for future retraining and inferencing operations.

In [12]:
aml_env = Environment.from_pip_requirements(name='sample_env', file_path='./requirements.txt')

run_config = RunConfiguration()
run_config.docker.use_docker = True
run_config.environment = aml_env
run_config.environment.docker.base_image = DEFAULT_CPU_IMAGE
run_config.docker.arguments = ['-v', '/var/run/docker.sock:/var/run/docker.sock']
run_config.environment.python.conda_dependencies.set_python_version('3.8.10')

#Register environment for reuse 
run_config.environment.register(ws)

{
    "assetId": "azureml://locations/eastus/workspaces/81cbe2c0-7e85-4124-9514-4feff7eb9bdb/environments/sample_env/versions/3",
    "databricks": {
        "eggLibraries": [],
        "jarLibraries": [],
        "mavenLibraries": [],
        "pypiLibraries": [],
        "rcranLibraries": []
    },
    "docker": {
        "arguments": [],
        "baseDockerfile": null,
        "baseImage": "mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:20221101.v1",
        "baseImageRegistry": {
            "address": null,
            "password": null,
            "registryIdentity": null,
            "username": null
        },
        "buildContext": null,
        "enabled": false,
        "platform": {
            "architecture": "amd64",
            "os": "Linux"
        },
        "sharedVolumes": true,
        "shmSize": null
    },
    "environmentVariables": {
        "EXAMPLE_ENV_VAR": "EXAMPLE_VALUE"
    },
    "inferencingStackVersion": null,
    "name": "sample_env",
    "python": 

### Define Output Datasets
Below we define the configuration for datasets that will be passed between steps in our pipeline. Note, in all cases we specify the datastore that should hold the datasets and whether they should be registered following step completion or not. This can optionally be disabled by removing the register_on_complete() call.

In [13]:
raw_data = OutputFileDatasetConfig(name='Raw_Data', destination=(default_ds, 'raw_data/{run-id}')).read_delimited_files().register_on_complete(name='Raw_Data')
training_data = OutputFileDatasetConfig(name='Training_Data', destination=(default_ds, 'training_data/{run-id}')).read_delimited_files().register_on_complete(name='Training_Data')
testing_data = OutputFileDatasetConfig(name='Testing_Data', destination=(default_ds, 'testing_data/{run-id}')).read_delimited_files().register_on_complete(name='Testing_Data')

### Define Pipeline Parameters
`PipelineParameter` objects serve as variable inputs to an Azure ML pipeline and can be specified at runtime. Below we specify the percent of data (0.0-1.0) that should be added to our testing dataset, along with the target column name, and pass these as variable parameters into the pipeline at runtime.

In [14]:
testing_size = PipelineParameter(name='testing_size', default_value=0.3)
target_column = PipelineParameter(name='target_column', default_value='target')
model_name = PipelineParameter(name='model_name', default_value='iris-classification')
model_description = PipelineParameter(name='model_description', default_value='Scikit-Learn K-Neighbors Classifier for Iris Dataset')

### Define Pipeline Steps
The pipeline below consists of five distinct steps all of which execute an associated python script located in the ./pipeline_script_steps dir. First, we call get_data.py and retrieve data from the registered blob datastore and register this dataset as Raw_Data. From here we run split_data.py which splits the raw data into test and train datasets according to the variable `testing_size` parameter - both of which are subsequently registered. Then, we pass the test and training datasets into a step that runs train_model.py which trains the iris classifier and computes and registers a set of metrics. Afterwards, the final step executes evaluate_and_register.py which loads both the new model (challenger) and current best model (champion) into code and evaluates the provided test dataset. Based on the `accuracy` metric, if the challenger model performs better, or no model has been registered to-date, the model is registered in the workspace. Finally, to support deployments to different environments, we package the challenger model (if it is the strongest performer) into a docker container and add it to the AML linked container regsitry.

<i>Note:</i> The first step `add_data_step` is included purely for demonstration purposes. This step serves to move data into an attached blob storage location to be consumed in downstream steps.

In [15]:
# Step for demo: Seed datastore with
# Iris Setosa dataset
add_data_step = PythonScriptStep(
    name='Add Sample Data to Blob Storage',
    script_name='add_data.py',
    compute_target=cpu_cluster,
    source_directory='./pipeline_step_scripts',
    allow_reuse=False,
    runconfig=run_config
)

# Get raw data from AML-linked datastore
# Register tabular dataset after retrieval
get_data_step = PythonScriptStep(
    name='Get Data from Blob Storage',
    script_name='get_data.py',
    arguments =['--raw_data', raw_data],
    outputs=[raw_data],
    compute_target=cpu_cluster,
    source_directory='./pipeline_step_scripts',
    allow_reuse=False,
    runconfig=run_config
)
get_data_step.run_after(add_data_step)

# Load raw data and split into test and train
# datasets according to the specified split percentage
split_data_step = PythonScriptStep(
    name='Split Train and Test Data',
    script_name='split_data.py',
    arguments =['--training_data', training_data,
                '--testing_data', testing_data,
                '--testing_size', testing_size],
    inputs=[raw_data.as_input(name='Raw_Data')],
    outputs=[training_data, testing_data],
    compute_target=cpu_cluster,
    source_directory='./pipeline_step_scripts',
    allow_reuse=False,
    runconfig=run_config
)

# Train iris classification model using split
# test and train datasets. Both the scaler and trained model
# will be saved as PipelineData
train_model_step = PythonScriptStep(
    name='Train Model',
    script_name='train_model.py',
    arguments =[
                '--target_column', target_column
    ],
    inputs=[training_data.as_input(name='Training_Data'),
            testing_data.as_input(name='Testing_Data')
           ],
    outputs=[],
    compute_target=cpu_cluster,
    source_directory='./pipeline_step_scripts',
    allow_reuse=False,
    runconfig=run_config
)

#Evaluate and register model here
#Compare metrics from current model and register if better than current
#best model
evaluate_and_register_step = PythonScriptStep(
    name='Evaluate and Register Model',
    script_name='evaluate_and_register.py',
    arguments=[
               '--target_column', target_column,
               '--model_name', model_name,
               '--model_description', model_description],
    inputs=[training_data.as_input(name='Training_Data'),
            testing_data.as_input(name='Testing_Data')],
    compute_target=cpu_cluster,
    source_directory='./pipeline_step_scripts',
    allow_reuse=False,
    runconfig=run_config
)
evaluate_and_register_step.run_after(train_model_step)

#Package model step
#Container registered champion model here for deployment to target
#endpoints
package_model_step = PythonScriptStep(
    name='Package Model',
    script_name='package_model.py',
    arguments=[
               '--model_name', model_name
    ],
    inputs=[testing_data.as_input(name='Testing_Data')],
    compute_target=cpu_cluster,
    source_directory='./pipeline_step_scripts',
    allow_reuse=False,
    runconfig=run_config
)
package_model_step.run_after(evaluate_and_register_step)

# Alternate step for checking connectivity to ACR
# Note: assumes a version '1' of the target model has been added to the container registry
copy_model_step = PythonScriptStep(
    name='Copy Model',
    script_name='copy_model_image.py',
    arguments=['--model_name', model_name, '--model_version', '1'],
    compute_target=cpu_cluster,
    source_directory='./pipeline_step_scripts',
    allow_reuse=False,
    runconfig=run_config
)
copy_model_step.run_after(evaluate_and_register_step)

### Create Pipeline
Create an Azure ML Pipeline by specifying the steps to be executed. Note: based on the dataset dependencies between steps, exection occurs logically such that no step will execute unless all of the necessary input datasets have been generated.

In [16]:
# Original pipeline with model package creation
pipeline = Pipeline(workspace=ws, steps=[add_data_step, get_data_step, split_data_step, train_model_step, evaluate_and_register_step, package_model_step])

# Pipeline with ONLY copy_model_step
#pipeline = Pipeline(workspace=ws, steps=[copy_model_step])

# Updated pipeline with with copy model in lieu of model package creation step
# pipeline = Pipeline(workspace=ws, steps=[add_data_step, get_data_step, split_data_step, train_model_step, evaluate_and_register_step, copy_model_step])

### Create a Published PipelineEndpoint
Once we have created our pipeline we will look to retrain our model periodically as new data becomes available. By publishing our pipeline to a `PipelineEndpoint` we can iterate on our pipeline definition but maintain a consistent REST API endpoint. 

In [17]:
from azureml.pipeline.core import PipelineEndpoint

def published_pipeline_to_pipeline_endpoint(
    workspace,
    published_pipeline,
    pipeline_endpoint_name,
    pipeline_endpoint_description="Endpoint to my pipeline",
):
    try:
        pipeline_endpoint = PipelineEndpoint.get(
            workspace=workspace, name=pipeline_endpoint_name
        )
        print("using existing PipelineEndpoint...")
        pipeline_endpoint.add_default(published_pipeline)
    except Exception as ex:
        print(ex)
        # create PipelineEndpoint if it doesn't exist
        print("PipelineEndpoint does not exist, creating one for you...")
        pipeline_endpoint = PipelineEndpoint.publish(
            workspace=workspace,
            name=pipeline_endpoint_name,
            pipeline=published_pipeline,
            description=pipeline_endpoint_description
        )


pipeline_endpoint_name = 'Classification Model Training Pipeline'
pipeline_endpoint_description = 'Sample pipeline for training, evaluating, and registering a classification model based on the Iris Setosa dataset'

published_pipeline = pipeline.publish(name=pipeline_endpoint_name,
                                     description=pipeline_endpoint_description,
                                     continue_on_step_failure=False)

published_pipeline_to_pipeline_endpoint(
    workspace=ws,
    published_pipeline=published_pipeline,
    pipeline_endpoint_name=pipeline_endpoint_name,
    pipeline_endpoint_description=pipeline_endpoint_description
)

Created step Add Sample Data to Blob Storage [e0eb6014][555f757b-8a26-4a6a-80b7-74b336a1d118], (This step will run and generate new outputs)
Created step Get Data from Blob Storage [cc3b74b8][50bccb89-ad74-4a6f-915c-e7762e8114a5], (This step will run and generate new outputs)
Created step Split Train and Test Data [7ba39730][ec3be4b4-fb0f-4cc1-9412-98c60d6824d6], (This step will run and generate new outputs)
Created step Train Model [918aa2a4][9add66a8-8532-4282-a060-6a4f38ce3a0b], (This step will run and generate new outputs)
Created step Evaluate and Register Model [68d03298][a261be1a-83b7-4913-a8d0-5b5fcf7929db], (This step will run and generate new outputs)
Created step Package Model [e1a08ea3][7bcc6b10-9e89-4fb1-82a5-70243eadd39a], (This step will run and generate new outputs)
using existing PipelineEndpoint...


### Optional: Trigger a Pipeline Execution from the Notebook
You can create an Experiment (logical collection for runs) and submit a pipeline run directly from this notebook by running the commands below

In [18]:
experiment_name = os.getenv('EXPERIMENT_NAME', 'sample-training-pipeline-run')
experiment = Experiment(ws, experiment_name)
run = experiment.submit(pipeline)
# run.wait_for_completion(show_output=True)

Submitted PipelineRun ff766874-6a4f-4054-acea-485aafa7835b
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/ff766874-6a4f-4054-acea-485aafa7835b?wsid=/subscriptions/3a72b9f9-cb8d-44de-827e-ade93ff2dab5/resourcegroups/textron-mlops-live-demo/workspaces/aml-dev-ws&tid=16b3c013-d300-468d-ac64-7eda0820b6d3
