# Creating a Pipeline

In the previous labs, you used the Azure Machine Learning SDK to explore the entire model training process from accessing data through to running training experiments and registering machine learning models. Up until now, you have performed the various steps required to create a machine learning solution interactively. In this lab, you'll explore automation of these steps using *pipelines*.

## Connect to Your Workspace

The first thing you need to do is to connect to your workspace using the Azure ML SDK.

> **Note**: If the authenticated session with your Azure subscription has expired since you completed the previous exercise, you'll be prompted to reauthenticate.

In [44]:
#for this example your training folder should contain your training script as well as training data 
training_folder="Scripts"

In [45]:
import azureml.core
from azureml.core import Workspace

ws = Workspace.from_config()
print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\n')

# Explicit Workspace config:
# workspace_name=""
# subscription_id=""
# resource_group=""

# ws = Workspace.get(
#    name=workspace_name,
#    subscription_id=subscription_id,
#    resource_group=resource_group)

print('Ready to use Azure ML {} to work with {}'.format(azureml.core.VERSION, ws.name))

cesardl-automl-ncentralus-demo-ws
cesardl-automl-ncentralus-demo-ws-resgrp
northcentralus
381b38e9-9840-4719-a5a0-61d9585e1e91
Ready to use Azure ML 1.0.83 to work with cesardl-automl-ncentralus-demo-ws


## Create Scripts for Pipeline Steps

Pipelines consist of one or more *steps*, which can be Python scripts, or specialized steps like an Auto ML training estimator or a data transfer step that copies data from one location to another. Each step can run in its own compute context.

In this exercise, you'll build a simple pipeline that contains an estimator step (to train a model) and a Python script step (to register the trained model).

In [46]:
import os
import shutil

os.makedirs(training_folder, exist_ok=True)

The script for the second step of the pipeline will load the model from where it was saved, and then register it in the workspace. It includes a single **model_folder** parameter that contains the path where the model was saved.

In [47]:
%%writefile $training_folder/register_model.py
# Import libraries
import argparse
import joblib
from azureml.core import Workspace, Model, Run

# Get parameters
parser = argparse.ArgumentParser()
parser.add_argument('--model_folder', type=str, dest='model_folder')
args = parser.parse_args()
model_folder = args.model_folder

# Get the experiment run context
run = Run.get_context()

# load the model
print("Loading model from " + model_folder)
model_file = model_folder + "/model.pkl"
model = joblib.load(model_file)

Model.register(workspace=run.experiment.workspace,
               model_path = model_file,
               model_name = 'model',
               tags={'Training context':'Pipeline'})

run.complete()

Overwriting Scripts/register_model.py


## Prepare a Compute Environment for the Pipeline Steps

In this exercise, you'll use the same compute for both steps, but it's important to realize that each step is run independently; so you could specify different compute contexts for each step if appropriate.

First, get the compute target.

In [48]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

cluster_name = "aml-cluster"

# Verify that cluster exists
try:
    pipeline_cluster = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # If not, create it
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D2_V2', 
                                                           vm_priority='lowpriority', 
                                                           max_nodes=4)
    pipeline_cluster = ComputeTarget.create(ws, cluster_name, compute_config)

pipeline_cluster.wait_for_completion(show_output=True)

Found existing cluster, use it.
Succeeded
AmlCompute wait for completion finished
Minimum number of nodes requested have been provisioned


The compute will require a Python environment with the necessary package dependencies installed, so we'll create a run configuration.

In [49]:
from azureml.core import Environment
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import RunConfiguration

# Create a Python environment for the experiment
env = Environment("pipeline-experiment-env")
env.python.user_managed_dependencies = False # Let Azure ML manage dependencies
env.docker.enabled = True # Use a docker container

# Create a set of package dependencies
dependencies = CondaDependencies.create(conda_packages=['scikit-learn', 'pandas'],
                                             pip_packages=['azureml-sdk','lightgbm'])

# Add the dependencies to the environment
env.python.conda_dependencies = dependencies

# Register the environment (just in case previous lab wasn't completed)
env.register(workspace=ws)
registered_env = Environment.get(ws, 'pipeline-experiment-env')

# Create a new runconfig object for the pipeline
pipeline_run_config = RunConfiguration()

# Use the compute you created above. 
pipeline_run_config.target = pipeline_cluster

# Assign the environment to the run configuration
pipeline_run_config.environment = registered_env

print ("Run configuration created.")

Run configuration created.


## Create and Run a Pipeline

Now we're ready to create and run a pipeline.

First we need to define the steps for the pipeline, and any data references that need to passed between them. In this case, the first step must write the model to a folder that can be read from by the second step. Since the steps will be run on remote compute (and in fact, could each be run on different compute), the folder path must be passed as a data reference to a location in a datastore within the workspace. The **PipelineData** object is a special kind of data reference that is used for interim storage locations that can be passed between pipeline steps, so we'll create one and use at as the output for the first step and the input for the second step. Note that we also need to pass it as a script argument so our code can access the datastore location referenced by the data reference.

In [50]:
from azureml.pipeline.core import PipelineData
from azureml.pipeline.steps import PythonScriptStep, EstimatorStep
from azureml.train.estimator import Estimator

# Create a PipelineData (temporary Data Reference) for the model folder
model_folder = PipelineData("model_folder", datastore=ws.get_default_datastore())

estimator = Estimator(source_directory=training_folder,
                        compute_target = pipeline_cluster,
                        environment_definition=pipeline_run_config.environment,
                        entry_script='train.py')

train_step = EstimatorStep(name = "Train Model",
                           estimator=estimator, 
                           estimator_entry_script_arguments=['--output_folder', model_folder],
                           outputs=[model_folder],
                           compute_target = pipeline_cluster,
                           allow_reuse = True)

# Step 2, run the model registration script
register_step = PythonScriptStep(name = "Register Model",
                                source_directory = training_folder,
                                script_name = "register_model.py",
                                arguments = ['--model_folder', model_folder],
                                inputs=[model_folder],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

register_step.run_after(train_step)

print("Pipeline steps defined")

Pipeline steps defined


OK, we're ready to go. let's build the pipeline from the steps we've defined and run it as an experiment.

In [51]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline
from azureml.widgets import RunDetails

# Construct the pipeline
pipeline_steps = [train_step, register_step]
pipeline = Pipeline(workspace = ws, steps=pipeline_steps)
print("Pipeline is built.")

published_pipeline = pipeline.publish(
    name="Training_Pipeline")

# Just to start test right away
experiment = Experiment(workspace = ws, name = 'training-pipeline')
pipeline_run = experiment.submit(pipeline, regenerate_outputs=True)
print("Pipeline submitted for execution.")

RunDetails(pipeline_run).show()
pipeline_run.wait_for_completion()

Pipeline is built.
Created step Train Model [ecccd0be][57efcc96-0836-433c-b690-32e903625f3f], (This step will run and generate new outputs)Created step Register Model [37997c00][ae9f9f79-d68b-49fb-bd47-1752b565914b], (This step will run and generate new outputs)

Created step Train Model [ecccd0be][85d10e85-0604-4d48-8673-5900f6cf4c69], (This step will run and generate new outputs)
Created step Register Model [37997c00][4ab5111f-5933-496e-9507-5b9356d51725], (This step will run and generate new outputs)
Submitted PipelineRun 182537d0-76e1-457f-8bc5-946be4203d12
Link to Azure Machine Learning studio: https://ml.azure.com/experiments/training-pipeline/runs/182537d0-76e1-457f-8bc5-946be4203d12?wsid=/subscriptions/381b38e9-9840-4719-a5a0-61d9585e1e91/resourcegroups/cesardl-automl-ncentralus-demo-ws-resgrp/workspaces/cesardl-automl-ncentralus-demo-ws
Pipeline submitted for execution.


_PipelineWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', …

PipelineRunId: 182537d0-76e1-457f-8bc5-946be4203d12
Link to Portal: https://ml.azure.com/experiments/training-pipeline/runs/182537d0-76e1-457f-8bc5-946be4203d12?wsid=/subscriptions/381b38e9-9840-4719-a5a0-61d9585e1e91/resourcegroups/cesardl-automl-ncentralus-demo-ws-resgrp/workspaces/cesardl-automl-ncentralus-demo-ws
PipelineRun Status: NotStarted
PipelineRun Status: Running


StepRunId: 95f2c341-21d2-4dc5-837e-5613c465f285
Link to Portal: https://ml.azure.com/experiments/training-pipeline/runs/95f2c341-21d2-4dc5-837e-5613c465f285?wsid=/subscriptions/381b38e9-9840-4719-a5a0-61d9585e1e91/resourcegroups/cesardl-automl-ncentralus-demo-ws-resgrp/workspaces/cesardl-automl-ncentralus-demo-ws
StepRun( Train Model ) Status: NotStarted
StepRun( Train Model ) Status: Running

Streaming azureml-logs/55_azureml-execution-tvmps_c1da901410239e55c40c6c5f877d10417e227c13bbdcdf22a3cd6d97e300df91_p.txt
2020-03-01T21:03:37Z Starting output-watcher...
2020-03-01T21:03:37Z IsDedicatedCompute == False, star

ActivityFailedException: ActivityFailedException:
	Message: Activity Failed:
{
    "error": {
        "code": "UserError",
        "message": "User program failed with AttributeError: 'DataFrame' object has no attribute 'ix'",
        "detailsUri": "https://aka.ms/azureml-known-errors",
        "details": [],
        "debugInfo": {
            "type": "AttributeError",
            "message": "'DataFrame' object has no attribute 'ix'",
            "stackTrace": "  File \"/mnt/batch/tasks/shared/LS_root/jobs/cesardl-automl-ncentralus-demo-ws/azureml/95f2c341-21d2-4dc5-837e-5613c465f285/mounts/workspaceblobstore/azureml/95f2c341-21d2-4dc5-837e-5613c465f285/azureml-setup/context_manager_injector.py\", line 127, in execute_with_context\n    runpy.run_path(sys.argv[0], globals(), run_name=\"__main__\")\n  File \"/azureml-envs/azureml_a07b3f6b915508b9807c3a8738c8bf38/lib/python3.6/runpy.py\", line 263, in run_path\n    pkg_name=pkg_name, script_name=fname)\n  File \"/azureml-envs/azureml_a07b3f6b915508b9807c3a8738c8bf38/lib/python3.6/runpy.py\", line 96, in _run_module_code\n    mod_name, mod_spec, pkg_name, script_name)\n  File \"/azureml-envs/azureml_a07b3f6b915508b9807c3a8738c8bf38/lib/python3.6/runpy.py\", line 85, in _run_code\n    exec(code, run_globals)\n  File \"train.py\", line 118, in <module>\n    X_train, X_test = X.ix[train_index], X.ix[test_index]\n  File \"/azureml-envs/azureml_a07b3f6b915508b9807c3a8738c8bf38/lib/python3.6/site-packages/pandas/core/generic.py\", line 5274, in __getattr__\n    return object.__getattribute__(self, name)\n"
        }
    },
    "time": "0001-01-01T00:00:00.000Z"
}
	InnerException None
	ErrorResponse 
{
    "error": {
        "message": "Activity Failed:\n{\n    \"error\": {\n        \"code\": \"UserError\",\n        \"message\": \"User program failed with AttributeError: 'DataFrame' object has no attribute 'ix'\",\n        \"detailsUri\": \"https://aka.ms/azureml-known-errors\",\n        \"details\": [],\n        \"debugInfo\": {\n            \"type\": \"AttributeError\",\n            \"message\": \"'DataFrame' object has no attribute 'ix'\",\n            \"stackTrace\": \"  File \\\"/mnt/batch/tasks/shared/LS_root/jobs/cesardl-automl-ncentralus-demo-ws/azureml/95f2c341-21d2-4dc5-837e-5613c465f285/mounts/workspaceblobstore/azureml/95f2c341-21d2-4dc5-837e-5613c465f285/azureml-setup/context_manager_injector.py\\\", line 127, in execute_with_context\\n    runpy.run_path(sys.argv[0], globals(), run_name=\\\"__main__\\\")\\n  File \\\"/azureml-envs/azureml_a07b3f6b915508b9807c3a8738c8bf38/lib/python3.6/runpy.py\\\", line 263, in run_path\\n    pkg_name=pkg_name, script_name=fname)\\n  File \\\"/azureml-envs/azureml_a07b3f6b915508b9807c3a8738c8bf38/lib/python3.6/runpy.py\\\", line 96, in _run_module_code\\n    mod_name, mod_spec, pkg_name, script_name)\\n  File \\\"/azureml-envs/azureml_a07b3f6b915508b9807c3a8738c8bf38/lib/python3.6/runpy.py\\\", line 85, in _run_code\\n    exec(code, run_globals)\\n  File \\\"train.py\\\", line 118, in <module>\\n    X_train, X_test = X.ix[train_index], X.ix[test_index]\\n  File \\\"/azureml-envs/azureml_a07b3f6b915508b9807c3a8738c8bf38/lib/python3.6/site-packages/pandas/core/generic.py\\\", line 5274, in __getattr__\\n    return object.__getattribute__(self, name)\\n\"\n        }\n    },\n    \"time\": \"0001-01-01T00:00:00.000Z\"\n}"
    }
}

This is a simple example, designed to demonstrate the principle. In reality, you could build more sophisticated logic into the pipeline steps - for example, evaluating the model against some test data to calculate a performance metric like AUC or accuracy, comparing the metric to that of any previously registered versions of the model, and only registering the new model if it performs better.

You can use the [Azure Machine Learning extension for Azure DevOps](https://marketplace.visualstudio.com/items?itemName=ms-air-aiagility.vss-services-azureml) to combine Azure ML pipelines with Azure DevOps pipelines (yes, it *is* confusing that they have the same name!) and integrate model retraining into a *continuous integration/continuous deployment (CI/CD)* process. For example you could use an Azure DevOps *build* pipeline to trigger an Azure ML pipeline that trains and registers a model, and when the model is registered it could trigger an Azure Devops *release* pipeline that deploys the model as a web service, along with the application or service that consumes the model.

In [None]:
## Create a Score.py File

In [None]:
import os

folder_name = 'porto-service'

# Create a folder for the web service files
experiment_folder = './' + folder_name
os.makedirs(folder_name, exist_ok=True)

print(folder_name, 'folder created.')

In [None]:
%%writefile $folder_name/score.py

"""
Copyright (C) Microsoft Corporation. All rights reserved.​
 ​
Microsoft Corporation (“Microsoft”) grants you a nonexclusive, perpetual,
royalty-free right to use, copy, and modify the software code provided by us
("Software Code"). You may not sublicense the Software Code or any use of it
(except to your affiliates and to vendors to perform work on your behalf)
through distribution, network access, service agreement, lease, rental, or
otherwise. This license does not purport to express any claim of ownership over
data you may have shared with Microsoft in the creation of the Software Code.
Unless applicable law gives you more rights, Microsoft reserves all other
rights not expressly granted herein, whether by implication, estoppel or
otherwise. ​
 ​
THE SOFTWARE CODE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS
OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
MICROSOFT OR ITS LICENSORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THE SOFTWARE CODE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
"""
import json
import numpy
from azureml.core.model import Model
from sklearn.externals import joblib


def init():
    global LGBM_MODEL
    # load the model from file into a global object
    model_path = Model.get_model_path(
        model_name="lgbm_binary_model.pkl")
    LGBM_MODEL = joblib.load(model_path)


def run(raw_data, request_headers):
    data = json.loads(raw_data)["data"]
    data = numpy.array(data)
    result = LGBM_MODEL.predict(data)

    # Demonstrate how we can log custom data into the Application Insights
    # traces collection.
    # The 'X-Ms-Request-id' value is generated internally and can be used to
    # correlate a log entry with the Application Insights requests collection.
    # The HTTP 'traceparent' header may be set by the caller to implement
    # distributed tracing (per the W3C Trace Context proposed specification)
    # and can be used to correlate the request to external systems.
    print(('{{"RequestId":"{0}", '
           '"TraceParent":"{1}", '
           '"NumberOfPredictions":{2}}}'
           ).format(
               request_headers.get("X-Ms-Request-Id", ""),
               request_headers.get("Traceparent", ""),
               len(result)
    ))

    return {"result": result.tolist()}


if __name__ == "__main__":
    # Test scoring
    init()
    TEST_ROW = '{"data":[[0,1,8,1,0,0,1,0,0,0,0,0,0,0,12,1,0,0,0.5,0.3,0.610327781,7,1,-1,0,-1,1,1,1,2,1,65,1,0.316227766,0.669556409,0.352136337,3.464101615,0.1,0.8,0.6,1,1,6,3,6,2,9,1,1,1,12,0,1,1,0,0,1],[4,2,5,1,0,0,0,0,1,0,0,0,0,0,5,1,0,0,0.9,0.5,0.771362431,4,1,-1,0,0,11,1,1,0,1,103,1,0.316227766,0.60632002,0.358329457,2.828427125,0.4,0.5,0.4,3,3,8,4,10,2,7,2,0,3,10,0,0,1,1,0,1]]}'  # NOQA: E501
    PREDICTION = run(TEST_ROW, {})
    print("Test result: ", PREDICTION)