## Install required libraries

Install the libraries only once. It is recommended restarting the notebook kernel after libraries being installed.

In [1]:
!pip install -U azureml-sdk==1.26.0

Collecting azureml-sdk==1.26.0
  Downloading azureml_sdk-1.26.0-py3-none-any.whl (4.4 kB)
Collecting azureml-dataset-runtime[fuse]~=1.26.0
  Downloading azureml_dataset_runtime-1.26.0-py3-none-any.whl (3.4 kB)
Collecting azureml-train~=1.26.0
  Downloading azureml_train-1.26.0-py3-none-any.whl (3.3 kB)
Collecting azureml-core~=1.26.0
  Downloading azureml_core-1.26.0.post1-py3-none-any.whl (2.2 MB)
[K     |████████████████████████████████| 2.2 MB 13.1 MB/s eta 0:00:01
[?25hCollecting azureml-train-automl-client~=1.26.0
  Downloading azureml_train_automl_client-1.26.0-py3-none-any.whl (119 kB)
[K     |████████████████████████████████| 119 kB 63.7 MB/s eta 0:00:01
[?25hCollecting azureml-pipeline~=1.26.0
  Downloading azureml_pipeline-1.26.0-py3-none-any.whl (3.7 kB)
Collecting pyarrow<2.0.0,>=0.17.0
  Downloading pyarrow-1.0.1-cp36-cp36m-manylinux2014_x86_64.whl (17.3 MB)
[K     |████████████████████████████████| 17.3 MB 56.0 MB/s eta 0:00:01
Collecting azureml-dataprep<2.14.0a,>=2

# Azure Machine Learning Pipelines

This notebook shows how Contoso Auto can benefit from creating re-usable Azure machine learning pipelines.

The goal is to build a pipeline that demonstrates the basic data science workflow of data preparation, model training, and predictions. With Azure Machine Learning, distinct steps are defined and it is possible to re-use these pipeline steps as well as to rerun only the steps when needed as you tweak and test your workflow.

A subset of data is collected from Contoso Auto's fleet management program. The data is enriched with holiday and weather data. The goal is to train a regression model to predict taxi fares in New York City based on input features such as, number of passengers, trip distance, datetime, holiday information and weather information.

The machine learning pipeline in this notebook is organized into three steps:

- **Preprocess Training and Input Data:** Preprocess the data to better represent the datetime features, such as hours of the day, and day of the week to capture the cyclical nature of these features.

- **Model Training:** Use GradientBoostingRegressor algorithm scikit-learn library to train a regression model. The trained model will be saved for later making predictions.

- **Model Inference:**  Bulk predictions. Each time an input file is uploaded to a data store, bulk model predictions is initiated on the input data. Before model predictions, the preprocess data step is re-used to process input data.

Each of these pipelines is going to have implicit data dependencies and AML make it possible to re-use previously completed steps and run only the modified or new steps in your pipeline.

The pipelines will be run on the Azure Machine Learning compute.

## Azure Machine Learning and Pipeline SDK-specific Imports

In [2]:
import numpy as np
import pandas as pd
import azureml.core
from azureml.core import Workspace, Experiment, Datastore
from azureml.data.azure_storage_datastore import AzureBlobDatastore
from azureml.core.compute import AmlCompute
from azureml.core.compute import ComputeTarget
from azureml.widgets import RunDetails

# Check core SDK version number
print("SDK version:", azureml.core.VERSION)

from azureml.data.data_reference import DataReference
from azureml.pipeline.core import Pipeline, PipelineData, PipelineRun, StepRun
from azureml.pipeline.steps import PythonScriptStep
print("Pipeline SDK-specific imports completed")

SDK version: 1.31.0
Pipeline SDK-specific imports completed


## Setup
Set the values for `subscription_id`, `resource_group`, `workspace_name` and `workspace_region` as directed by the comments (*these values can be acquired from the Azure Portal*).

To get these values, do the following:
1.  Navigate to the Azure Portal and login with the credentials provided.
2.  From the left hand menu, under Favorites, select `Resource Groups`.
3.  In the list, select the resource group with the name similar to `XXXXX`.
4.  From the Overview tab, capture the desired values.


In [3]:
#Provide the Subscription ID of your existing Azure subscription
subscription_id = "8c12de75-5827-4795-b948-6c427fc4b7ab" # <- needs to be the subscription with the Quick-Starts resource group

#Provide values for the existing Resource Group 
resource_group = "Shelly.Xiao.RG" # <- replace XXXXX with your unique identifier

#Provide the Workspace Name and Azure Region of the Azure Machine Learning Workspace
workspace_name = "shelly-demo" # <- replace XXXXX with your unique identifier (should be lowercase)
workspace_region = "westus2" # <- region of your Quick-Starts resource group

aml_compute_target = "gpucluster" # <- the name of your GPU-enabled cluster

experiment_name = 'fleet-pipeline'

# project folder for the script files
project_folder = 'aml-pipelines-scripts'
data_location = 'aml-pipelines-data'
test_data_location = 'aml-pipelines-test-data'

In [4]:
data_reference_name = 'nyc_taxi_raw_features'
feature_data = 'nyc-taxi-raw-features/nyc-taxi-sample-data.csv'

## Create and connect to an Azure Machine Learning Workspace

Run the following cell to access your Azure Machine Learning **Workspace**.

**Important Note**: You will be prompted to login in the text that is output below the cell. Be sure to navigate to the URL displayed and enter the code that is provided. Once you have entered the code, return to this notebook and wait for the output to read `Workspace configuration succeeded`.

In [5]:
# By using the exist_ok param, if the worskpace already exists you get a reference to the existing workspace
# allowing you to re-run this cell multiple times as desired (which is fairly common in notebooks).
ws = Workspace.create(
    name = workspace_name,
    subscription_id = subscription_id,
    resource_group = resource_group, 
    location = workspace_region,
    exist_ok = True)

ws.write_config()
print('Workspace configuration succeeded')

Workspace configuration succeeded


## Create AML Compute Cluster

Azure Machine Learning Compute is a service for provisioning and managing clusters of Azure virtual machines for running machine learning workloads. Let's create a new Aml Compute in the current workspace, if it doesn't already exist. We will run all our pipelines on this compute target.

In [6]:
from azureml.core.compute_target import ComputeTargetException

try:
    aml_compute = AmlCompute(ws, aml_compute_target)
    print("found existing compute target.")
except ComputeTargetException:
    print("creating new compute target")
    
    provisioning_config = AmlCompute.provisioning_configuration(vm_size = "STANDARD_NC12",
                                                                min_nodes = 1, 
                                                                max_nodes = 1)    
    aml_compute = ComputeTarget.create(ws, aml_compute_target, provisioning_config)
    aml_compute.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)
    
print("Aml Compute attached")

creating new compute target
Creating......
SucceededProvisioning operation finished, operation "Succeeded"
Succeeded...........................
AmlCompute wait for completion finished

Minimum number of nodes requested have been provisioned
Aml Compute attached


## Create the Run Configuration

In [7]:
from azureml.core.runconfig import RunConfiguration, DockerConfiguration
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import DEFAULT_CPU_IMAGE,  DEFAULT_GPU_IMAGE

# Create a new runconfig object
run_amlcompute = RunConfiguration()

# Use the cluster you created above. 
run_amlcompute.target = aml_compute_target

# Enable Docker
#docker_config = DockerConfiguration(use_docker=True)
run_amlcompute.environment.docker.enabled = True

# Set Docker base image to the default CPU/GPU-based image
run_amlcompute.environment.docker.base_image = DEFAULT_GPU_IMAGE

# Use conda_dependencies.yml to create a conda environment in the Docker image for execution
run_amlcompute.environment.python.user_managed_dependencies = False

# Specify CondaDependencies obj, add necessary packages
run_amlcompute.environment.python.conda_dependencies = CondaDependencies.create(pip_packages=[
    'numpy',
    'pandas',
    'joblib',
    'scikit-learn',
    'sklearn-pandas==2.0.0'
])

'enabled' is deprecated. Please use the azureml.core.runconfig.DockerConfiguration object with the 'use_docker' param instead.


## Pipeline Step 1 - Process Training Data 

The process training data step in the pipeline takes raw training data as input. This data can be a data source that lives in one of the accessible data locations, or intermediate data produced by a previous step in the pipeline. In this example we will upload the raw training data in the workspace's default blob store. Run the following two cells at the end of which we will create a **DataReference** object that points to the raw training data *csv* file stored in the default blob store.

In [8]:
# Default datastore (Azure file storage)
def_file_store = ws.get_default_datastore() 
print("Default datastore's name: {}".format(def_file_store.name))
def_blob_store = Datastore(ws, "workspaceblobstore")
print("Blobstore's name: {}".format(def_blob_store.name))

Default datastore's name: workspaceblobstore
Blobstore's name: workspaceblobstore


In [9]:
raw_train_data = DataReference(datastore=def_blob_store, 
                                      data_reference_name=data_reference_name, 
                                      path_on_datastore=feature_data)
print("DataReference object created")

DataReference object created


### Create the Process Training Data Pipeline Step

Within a pipeline, any intermediate data (e.g., the output of a previous Step) is represented by a PipelineData object. PipelineData can be produced by one step and consumed in another step by providing the PipelineData object as an output of one step and the input of one or more steps.

In the following cell, an instance of the PythonScriptStep is created. It takes a reference to input data, a script to run against the input data and a destination to write the output data that results.

Specifically, the step takes the `raw_train_data` DataReference object as input, and it will output an intermediate PipelineData object represented by `processed_train_data` that holds the processed training data. 

This output data will have new engineered features for the datetime components: hour of the day, and day of the week, as scripted in `preprocess.py`. 


In [10]:
processed_train_data = PipelineData('processed_train_data', datastore=def_blob_store)
print("PipelineData object created")

processTrainDataStep = PythonScriptStep(
    name="process_train_data",
    script_name="preprocess.py", 
    arguments=["--process_mode", 'train',
               "--input", raw_train_data,
               "--output", processed_train_data],
    inputs=[raw_train_data],
    outputs=[processed_train_data],
    compute_target=aml_compute,
    runconfig=run_amlcompute,
    source_directory=project_folder
)
print("preprocessStep created")

PipelineData object created
preprocessStep created


## Pipeline Step 2 -  Train Pipeline Step

The train pipeline step takes the `processed_train_data` created in the above step as input and generates another PipelineData object to save the model that results as its output in `trained_model`. This is an example of how machine learning pipelines can have many steps and these steps can use (or reuse) datasources and intermediate data.


In [11]:
trained_model = PipelineData('trained_model', datastore=def_blob_store)
print("PipelineData object created")

trainStep = PythonScriptStep(
    name="train",
    script_name="train.py", 
    arguments=["--input", processed_train_data, "--output", trained_model],
    inputs=[processed_train_data],
    outputs=[trained_model],
    compute_target=aml_compute,
    runconfig=run_amlcompute,
    source_directory=project_folder
)
print("trainStep created")

PipelineData object created
trainStep created


### Create and Validate the Pipeline
Now the core steps have been defined, it is time to define the actual Pipeline object.

The `trainStep` has an implicit data dependency on the `processTrainDataStep`. As such, you need only include the `trainStep` in your Pipeline object. 

You will observe that when you run the pipeline that it will first run the **processTrainDataStep** followed by the **trainStep**.

Run the following cell to create the Pipeline and validate it is configured correctly (e.g., check for issues like disconnected inputs).

In [12]:
pipeline = Pipeline(workspace=ws, steps=[trainStep])
print ("Pipeline is built")

pipeline.validate()
print("Simple validation complete")

Pipeline is built
Step train is ready to be created [609f90f6]Step process_train_data is ready to be created [3c47be96]

Simple validation complete


### Submit the Pipeline

Next, submit the pipeline for execution.

In [13]:
pipeline_run = Experiment(ws, experiment_name).submit(pipeline)
print("Pipeline is submitted for execution")

Created step train [609f90f6][e63e9316-fba3-40e1-abef-a2fbeebabc50], (This step will run and generate new outputs)
Created step process_train_data [3c47be96][873bdeb9-f288-46a7-b952-29d9e42dfbb0], (This step will run and generate new outputs)
Using data reference nyc_taxi_raw_features for StepId [2cad3ef2][1471eda0-0460-46e8-9d99-b6bba7aee8b2], (Consumers of this data are eligible to reuse prior runs.)
Submitted PipelineRun 1378a21d-6b63-48c9-9ad4-65f187f1e210
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/1378a21d-6b63-48c9-9ad4-65f187f1e210?wsid=/subscriptions/8c12de75-5827-4795-b948-6c427fc4b7ab/resourcegroups/Shelly.Xiao.RG/workspaces/shelly-demo&tid=570057f4-73ef-41c8-bcbb-08db2fc15c2b
Pipeline is submitted for execution


### Monitor the Run Details

The pipeline run will take **about 8 minutes to complete.**

You can monitor the execution of a pipeline in realtime both from a notebook and using the Azure Portal. 

Run the following cell and observe the order in which the pipeline steps are executed: **processTrainDataStep** followed by the **trainStep**

Wait until both pipeline steps finish running. The cell below should periodically auto-refresh and you can also rerun the cell to force a refresh.

Notice in the output of the cell below, at the bottom, there is a link you can click to see the status of the run from within the Azure Portal.

In [14]:
RunDetails(pipeline_run).show()

_PipelineWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', …

## Pipeline Step 3 - Inferencing
At this point you have a trained model you can use to begin making predictions. In the following, you will create a new pipeline step that is used for the purpose of inferencing (or scoring).

### Create a DataReference Object to represent the Test data

Run the following cell to upload the test data and create the DataReference object (`raw_bulk_test_data`) that references it. 

In [15]:
test_data_reference_name = 'raw_bulk_test_data'
test_data_path = 'bulk-test-data/raw-test-data.csv'

In [16]:
# Create a DataReference object referencing the 'raw-test-data.csv' file
raw_bulk_test_data = DataReference(datastore=def_blob_store, 
                                      data_reference_name=test_data_reference_name, 
                                      path_on_datastore=test_data_path)
print("DataReference object created")

DataReference object created


### Create the Process Test Data Pipeline Step

Similar to the process train data pipeline step, create a new step for processing the test data. Note that it is the same script file `preprocess.py` that is used to process both the train and test data. The key difference is that the process_mode argument is set to *inference*, which the `preprocess.py` script will use to process the test data in a slightly different way than the train data. 


In [17]:
processed_test_data = PipelineData('processed_test_data', datastore=def_blob_store)
print("PipelineData object created")

processTestDataStep = PythonScriptStep(
    name="process_test_data",
    script_name="preprocess.py", 
    arguments=["--process_mode", 'inference',
               "--input", raw_bulk_test_data,
               "--output", processed_test_data],
    inputs=[raw_bulk_test_data],
    outputs=[processed_test_data],
    allow_reuse = False,
    compute_target=aml_compute,
    runconfig=run_amlcompute,
    source_directory=project_folder
)
print("preprocessStep created")

PipelineData object created
preprocessStep created


### Create the Inference Pipeline Step to Make Predictions

The inference pipeline step takes the `processed_test_data` created in the above step and the `trained_model` created in the train step as two inputs and generates `inference_output` as its output. This is yet another example of how machine learning pipelines can have many steps and these steps could use or reuse datasources and intermediate data.


In [18]:
inference_output = PipelineData('inference_output', datastore=def_blob_store)
print("PipelineData object created")

inferenceStep = PythonScriptStep(
    name="inference",
    script_name="inference.py", 
    arguments=["--input", processed_test_data,
               "--model", trained_model,
               "--output", inference_output],
    inputs=[processed_test_data, trained_model],
    outputs=[inference_output],
    compute_target=aml_compute,
    runconfig=run_amlcompute,
    source_directory=project_folder
)
print("inferenceStep created")

PipelineData object created
inferenceStep created


### Create and Validate the Pipeline

The `inferenceStep` has an implicit data dependency with **ALL** of the previous pipeline steps. So when we create a Pipeline object that lists only `inferenceStep` in the steps array, we are implicitly including `process_test_data`, and the model created by `train`.

Run the following cell to create and validate the inferencing pipeline.

In [19]:
inference_pipeline = Pipeline(workspace=ws, steps=[inferenceStep])
print ("Inference Pipeline is built")

inference_pipeline.validate()
print("Simple validation complete")

Inference Pipeline is built
Step inference is ready to be created [afcac014]
Step process_test_data is ready to be created [a94cc92b]
Simple validation complete


### Publish the Inference Pipeline

Publish the inferencing pipeline so it can be executed to score any data that is supplied in a batch fashion.

Note that we are not submitting the pipeline to run, instead we are publishing the pipeline. When you publish a pipeline, it can be submitted to run by invoking it via its REST endpoint. 


In [20]:
pipeline_name = 'Inference Pipeline'
published_pipeline = inference_pipeline.publish(name = pipeline_name)

Created step inference [afcac014][21b795b4-5170-439f-8132-101095357fde], (This step will run and generate new outputs)
Created step process_test_data [a94cc92b][e44d884a-7cee-4085-80b3-71bb38446673], (This step will run and generate new outputs)
Created step train [f19e7e98][e63e9316-fba3-40e1-abef-a2fbeebabc50], (This step is eligible to reuse a previous run's output)
Created step process_train_data [e30c5bfd][873bdeb9-f288-46a7-b952-29d9e42dfbb0], (This step is eligible to reuse a previous run's output)
Using data reference raw_bulk_test_data for StepId [c02c64d9][14cb2364-0210-4353-a339-ea6bbdfebbc4], (Consumers of this data are eligible to reuse prior runs.)Using data reference nyc_taxi_raw_features for StepId [ca2bfe1e][1471eda0-0460-46e8-9d99-b6bba7aee8b2], (Consumers of this data are eligible to reuse prior runs.)



### Schedule the Inference Pipeline

We want to run the Inference Pipeline when any new data is uploaded at the location referenced by the `raw_bulk_test_data` DataReference object. The next cell creates a Schedule to monitor the datastore for changes, and is responsible for running the `Inference Pipeline` when it detects a new file being uploaded.

In [21]:
from azureml.pipeline.core.schedule import Schedule

schedule = Schedule.create(workspace=ws, name=pipeline_name + "_sch",
                           pipeline_id=published_pipeline.id, 
                           experiment_name=experiment_name,
                           datastore=def_blob_store,
                           wait_for_provisioning=True,
                           description="Datastore scheduler for Pipeline: " + pipeline_name,
                           path_on_datastore='bulk-test-data',
                           polling_interval=1 # in minutes
                           )

print("Created schedule with id: {}".format(schedule.id))

Provisioning status: Completed
Created schedule with id: 182991c3-f237-465e-a873-6b3dca5996fa


In [49]:
# Check the status of the Schedule and confirm it's Active
print('Schedule status: ', schedule.status)

Schedule status:  Active


### Test the Inference Pipeline

In the following cell some test data is created and uploaded to the `bulk-test-data` blob store to make bulk predictions. 

In [22]:
# Create the raw test data
columns = ['vendorID', 'passengerCount', 'tripDistance', 'hour_of_day', 'day_of_week', 'day_of_month', 
           'month_num', 'normalizeHolidayName', 'isPaidTimeOff', 'snowDepth', 'precipTime', 
           'precipDepth', 'temperature']

data = [[1, 4, 10, 15, 4, 5, 7, 'None', False, 0, 0.0, 0.0, 80], 
        [1, 1, 5, 6, 0, 20, 1, 'Martin Luther King, Jr. Day', True, 0, 2.0, 3.0, 35]]

data_df = pd.DataFrame(data, columns = columns)

os.makedirs(test_data_location, exist_ok=True)
data_df.to_csv(os.path.join(test_data_location, 'raw-test-data.csv'), header=True, index=False)

from datetime import datetime
data_upload_time = datetime.utcnow()
print('Data upload time in UTC: ', data_upload_time)

# Upload the raw test data to the blob storage
def_blob_store.upload(src_dir=test_data_location, 
                      target_path='bulk-test-data', 
                      overwrite=True, 
                      show_progress=True)

# Wait for 65 seconds...
import time
print('Please wait...')
time.sleep(65)
print('Done!')

Data upload time in UTC:  2021-07-14 17:14:03.847666
Uploading an estimated of 3 files
Uploading aml-pipelines-test-data/.amlignore
Uploaded aml-pipelines-test-data/.amlignore, 1 files out of an estimated total of 3
Uploading aml-pipelines-test-data/.amlignore.amltmp
Uploaded aml-pipelines-test-data/.amlignore.amltmp, 2 files out of an estimated total of 3
Uploading aml-pipelines-test-data/raw-test-data.csv
Uploaded aml-pipelines-test-data/raw-test-data.csv, 3 files out of an estimated total of 3
Uploaded 3 files
Please wait...
Done!


### Wait for Schedule to Trigger

The Schedule polling interval is 1 minute. You can also log into Azure Portal and navigate to your `resource group -> workspace -> experiment` to see if the `Inference Pipeline` has started executing.

**If the inference_pipeline_run object in the below cell is None, it means that the Schedule has not triggered yet!**

**If the Schedule does not trigger in 2 minutes, try rerunning the data upload cell again!**

In [23]:
# Confirm that the inference_pipeline_run object is NOT None
inference_pipeline_run = schedule.get_last_pipeline_run()
print(inference_pipeline_run)

Run(Experiment: fleet-pipeline,
Id: d4393d5a-4b6a-4c38-bf12-8b3b5e996599,
Type: azureml.PipelineRun,
Status: Completed)


*If you upload the test data file more than once, confirm that we have the latest pipeline run object. We will compare the pipeline start time with the time you uploaded the test data file.*

In [24]:
# confirm the start time
import dateutil.parser

if inference_pipeline_run.get_details()['status'] != 'NotStarted':
    pipeline_starttime = dateutil.parser.parse(inference_pipeline_run.get_details()['startTimeUtc'], ignoretz=True)
else:
    pipeline_starttime = datetime.utcnow()

if(pipeline_starttime > data_upload_time):
    print('We have the correct inference pipeline run! Proceed to next cell.')
else:
    print('Rerun the above cell to get the latest inference pipeline run!')

We have the correct inference pipeline run! Proceed to next cell.


### Monitor the Run Details

Observe the order in which the pipeline steps are executed based on their implicit data dependencies.

Wait until all steps finish running. The cell below should periodically auto-refresh and you can also rerun the cell to force a refresh.

**This example demonstrates how AML make it possible to reuse previously completed steps and run only the modified or new steps in your pipeline.**

In [25]:
RunDetails(inference_pipeline_run).show()

_PipelineWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', …

### Download and Observe the Predictions

In [26]:
print("Get StepRun for inference step...")
pipeline_run_id = inference_pipeline_run.id
step_run_id = inference_pipeline_run.find_step_run('inference')[0].id
node_id = inference_pipeline_run.get_graph().node_name_dict['inference'][0].node_id
print('Pipeline Run ID: {} Step Run ID: {}, Step Run Node ID: {}'.format(pipeline_run_id, 
                                                                         step_run_id, 
                                                                         node_id))
step_run = StepRun(inference_pipeline_run.experiment, 
                   step_run_id, 
                   pipeline_run_id, 
                   node_id)
print(step_run)

print("Downloading evaluation results...")
# access the evaluate_output
#data = pipeline_run.find_step_run('evaluate')[0].get_output_data('evaluate_output')
data = step_run.get_output_data('inference_output')
# download the predictions to local path
data.download('.', show_progress=True)

Get StepRun for inference step...
Pipeline Run ID: d4393d5a-4b6a-4c38-bf12-8b3b5e996599 Step Run ID: 6e477a36-762e-4242-81c3-d831684c8621, Step Run Node ID: afcac014
Run(Experiment: fleet-pipeline,
Id: 6e477a36-762e-4242-81c3-d831684c8621,
Type: azureml.StepRun,
Status: Completed)
Downloading evaluation results...
Downloading azureml/6e477a36-762e-4242-81c3-d831684c8621/inference_output/results.txt
Downloaded azureml/6e477a36-762e-4242-81c3-d831684c8621/inference_output/results.txt, 1 files out of an estimated total of 1


1

In [27]:
with open(os.path.join('./', data.path_on_datastore, 'results.txt')) as f:
    results = f.read()
print("Printing evaluation results...")
print(results)

Printing evaluation results...
40.83369223705172
18.003962863409335



### Cleanup Resources

If you are done experimenting with this experience, run the following cell to clean up the schedule.

In [28]:
schedule.disable()

# Realtime inference

In [29]:
model_location = 'aml-pipelines-models'

os.makedirs(model_location, exist_ok=True)

In [30]:
training_run = pipeline_run.find_step_run("train")[0]
step_run_output = training_run.get_output("trained_model")

port_data_reference = step_run_output.get_port_data_reference()
port_data_reference.download(local_path="./aml-pipelines-models")
model_path = f'./aml-pipelines-models/azureml/{training_run.id}/trained_model/nyc-taxi-fare.pkl'
print(model_path)

./aml-pipelines-models/azureml/d796b3ae-f106-499c-943e-32bff16f9451/trained_model/nyc-taxi-fare.pkl


In [32]:
# register the model for deployment
from azureml.core.model import Model
model = Model.register(model_path = model_path,
                       model_name = "predict-nyc-taxi-fare",
                       tags = {'area': "auto", 'type': "regression"},
                       description = "Contoso Auto model to predict taxi fares in New York City",
                       workspace = ws)

print(f'model name: {model.name}, model description: {model.description}, model version: {model.version}')

Registering model predict-nyc-taxi-fare
model name: predict-nyc-taxi-fare, model description: Contoso Auto model to predict taxi fares in New York City, model version: 2


In [37]:
from azureml.core import Environment
testEnv = Environment.from_conda_specification('testEnv', './aml-pipeline-config/aml_pipeline_dependencies.yml')
testEnv.register(workspace=ws)

{
    "databricks": {
        "eggLibraries": [],
        "jarLibraries": [],
        "mavenLibraries": [],
        "pypiLibraries": [],
        "rcranLibraries": []
    },
    "docker": {
        "arguments": [],
        "baseDockerfile": null,
        "baseImage": "mcr.microsoft.com/azureml/openmpi3.1.2-ubuntu18.04:20210531.v1",
        "baseImageRegistry": {
            "address": null,
            "password": null,
            "registryIdentity": null,
            "username": null
        },
        "enabled": false,
        "platform": {
            "architecture": "amd64",
            "os": "Linux"
        },
        "sharedVolumes": true,
        "shmSize": null
    },
    "environmentVariables": {
        "EXAMPLE_ENV_VAR": "EXAMPLE_VALUE"
    },
    "inferencingStackVersion": null,
    "name": "testEnv",
    "python": {
        "baseCondaEnvironment": null,
        "condaDependencies": {
            "channels": [
                "anaconda",
                "conda-forge"
      

In [40]:
from azureml.core.model import InferenceConfig
from azureml.core.webservice import AciWebservice

inference_config = InferenceConfig(entry_script='./aml-pipelines-scripts/scoring_service.py', environment=testEnv)
aci_config = AciWebservice.deploy_configuration(
    cpu_cores = 1, 
    memory_gb = 1, 
    tags = {'name': 'aci-cluster'}, 
    description = 'Scoring web service.')

from azureml.core import Webservice

service_name = 'predict-nyc-taxi-fare'

webservice = Model.deploy(workspace=ws,
                       name=service_name,
                       models=[model],
                       inference_config=inference_config,
                       deployment_config=aci_config, 
                       overwrite=True)
webservice.wait_for_deployment(show_output=True)
print(webservice.state)

Tips: You can try get_logs(): https://aka.ms/debugimage#dockerlog or local deployment: https://aka.ms/debugimage#debug-locally to debug if deployment takes longer than 10 minutes.
Running
2021-07-14 18:05:40+00:00 Creating Container Registry if not exists.
2021-07-14 18:05:40+00:00 Registering the environment.
2021-07-14 18:05:41+00:00 Use the existing image.
2021-07-14 18:05:42+00:00 Generating deployment configuration.
2021-07-14 18:05:42+00:00 Submitting deployment to compute.
2021-07-14 18:05:45+00:00 Checking the status of deployment predict-nyc-taxi-fare..
2021-07-14 18:10:54+00:00 Checking the status of inference endpoint predict-nyc-taxi-fare.
Succeeded
ACI service creation operation finished, operation "Succeeded"
Healthy


### Test the deployed web service

With the deployed web service ready, we can test calling the service to see the scored results. There are three ways to approach this: 
1. You could use the Webservice object that you acquired in the previous cell to call the service directly. 
2. You could use the Webservice class to get a reference to a deployed web service by name. 
3. You could use any client capable of making a REST call.

In this notebook, we will take the first approach. Run the following cells to retrieve the web service by name and then to invoke it using test data.

The output of this cell will be an array of numbers, where each number represents the expected taxi fare.

In [44]:
%%time
# load some test vehicle data that the model has not seen
test_data = pd.read_csv(os.path.join(test_data_location, 'raw-test-data.csv'))

CPU times: user 3.02 ms, sys: 3.67 ms, total: 6.69 ms
Wall time: 44.1 ms


In [45]:

def get_sin_cosine(value, max_value, is_zero_base = False):
    if not is_zero_base:
        value = value - 1
    sine =  np.sin(value * (2.*np.pi/max_value))
    cosine = np.cos(value * (2.*np.pi/max_value))
    return (sine, cosine)

test_data[['hour_sine', 'hour_cosine']] = test_data['hour_of_day'].apply(lambda x: 
                                                               pd.Series(get_sin_cosine(x, 24, True)))

# Day of week is a cyclical feature ranging from 0 to 6
test_data[['day_of_week_sine', 'day_of_week_cosine']] = test_data['day_of_week'].apply(lambda x: 
                                                                             pd.Series(get_sin_cosine(x, 7, True)))
columns = ['vendorID', 'passengerCount', 'tripDistance', 'hour_sine', 'hour_cosine', 
           'day_of_week_sine', 'day_of_week_cosine', 'day_of_month', 'month_num', 
           'normalizeHolidayName', 'isPaidTimeOff', 'snowDepth', 'precipTime', 
           'precipDepth', 'temperature']

test_data = test_data[columns]


In [46]:
# prepare the data and select five vehicles
test_data_json = test_data.to_json(orient="split")
prediction = webservice.run(input_data = test_data_json)
print(prediction)

{"result": [40.83369223705172, 18.003962863409335]}
