# Lab03 - Implementing and monitoring a batch scoring solution

Azure Machine Learning Batch scoring targets large inference jobs that are not time-sensitive. It is optimized for high-throughput, fire-and-forget inference over large collections of data.

In this lab, we will be using [Azure Machine Learning Pipelines](https://docs.microsoft.com/en-us/azure/machine-learning/concept-ml-pipelines) to  run batch scoring task on a large data set, using a trained model. We will use the `NYC taxi fare predictor` model from Lab 01 to make batch predictions of taxi fares for customer based on a set of input features.  The AML pipeline would entail two steps:

1. **Preprocess data step**: In this step we will simulate generation of a large dataset, approximately 250k rows of data. The data generated in this step is passed as an input to the next step.
1. **Inference step**: In this step we will use a special type of step called `ParallelRunStep` to do batch scoring on the input data. The `ParallelRunStep` uses a configuration called `ParallelRunConfig` that allows you to control how to break up the scoring job in batches across the available compute nodes and their respective processors. 

Finally, we will also review how to monitor the batch scoring pipeline runs from within Azure Machine Learning Studio.

**Import required libraries**

In [None]:
import numpy as np
import pandas as pd
import azureml
from azureml.core import Workspace, Experiment, Run
from azureml.core.model import Model
from azureml.core.dataset import Dataset
from azureml.core.datastore import Datastore
from azureml.data.datapath import DataPath
from azureml.data.dataset_factory import TabularDatasetFactory
from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.core.compute_target import ComputeTargetException
from azureml.core import Environment
from azureml.core.runconfig import RunConfiguration
from azureml.widgets import RunDetails

# Check core SDK version number
print("SDK version:", azureml.core.VERSION)

from azureml.data.data_reference import DataReference
from azureml.pipeline.core import Pipeline, PipelineData, PipelineRun, StepRun
from azureml.pipeline.steps import PythonScriptStep, ParallelRunStep, ParallelRunConfig
from azureml.pipeline.core import PipelineParameter, PublishedPipeline
from azureml.pipeline.core.pipeline_output_dataset import PipelineOutputFileDataset
print("Pipeline SDK-specific imports completed")

## Connect to the Azure Machine Learning Workspace

The AML Python SDK is required for leveraging the experimentation (including pipelines), model management and model deployment capabilities of Azure Machine Learning services. Run the following cell to connect to the AML **Workspace**.

In [None]:
ws = Workspace.from_config()
print(ws)

### Upload the sample inference data to the blob store

The sample dataset has 11,734 rows of input data. We will use this dataset to create a larger input dataset of size around 250k rows in the `preprocess` pipeline step.

In [None]:
input_location = "./data"
target_path = "inference-data"
datastore = ws.get_default_datastore()
datastore.upload(input_location, 
                 target_path = target_path, 
                 overwrite = True, 
                 show_progress = True)

### Register the sample inference dataset

In [None]:
inference_data_path = DataPath(datastore=datastore, 
                               path_on_datastore=os.path.join(target_path, "nyc-taxi-data.csv"),
                               name="inference-data")
inference_ds = Dataset.Tabular.from_delimited_files(path=inference_data_path)
dataset_name = "nyc-taxi-inference-dataset"
description = "Inference dataset to predict NYC taxi fares."
registered_dataset = inference_ds.register(ws, dataset_name, description=description, create_new_version=True)
print('Registered dataset name {} and version {}'.format(registered_dataset.name, registered_dataset.version))

### Register the trained NYC taxi fare predictor model

The model file is located in the local folder named `model`. We will register this model in the AML model registry. The registered model will be used in the batch inferencing pipeline step to score the input data.

In [None]:
model_file_path = "./model/nyc-taxi-fare.pkl"
model_name = 'nyc-taxi-fare-predictor'
model_description = 'Model to predict taxi fares in NYC.'
model = Model.register(model_path=model_file_path, 
                                  model_name=model_name, 
                                  description=model_description, 
                                  workspace=ws)
model_version = model.version
print("Model registered: {} \nModel Description: {} \nModel Version: {}".format(model.name, 
                                                                                model.description, 
                                                                                model.version))

## Create New Compute Cluster

AML Compute is a service for provisioning and managing clusters of Azure virtual machines for running machine learning workloads. Run the cell below to create a new compute cluster that has 2 nodes, and where each node has 4 cores. We will use this compute cluster to run the pipeline jobs.

In [None]:
cluster_name = "amlcompute-ad"

try:
    compute_target = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing compute target.')
except ComputeTargetException:
    print('Creating a new compute target...')
    compute_config = AmlCompute.provisioning_configuration(vm_size='Standard_DS12_v2', min_nodes=0, max_nodes=2)
    # create the cluster
    compute_target = ComputeTarget.create(ws, cluster_name, compute_config)
    compute_target.wait_for_completion(show_output=True)

# Use the 'status' property to get a detailed status for the current AmlCompute. 
print(compute_target.status.serialize())

## Create Pipeline Steps

A `Pipeline Step` is a unit of execution. Step typically needs a target of execution (compute target), a script to execute, and may require script arguments and inputs, and can produce outputs. The step also could take a number of other parameters. In this lab we will be using the following built-in Steps:

- [PythonScriptStep](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.python_script_step.pythonscriptstep?view=azure-ml-py): A step to run a Python script in a Pipeline. In this lab we will use this type for preprocessing the input data.
- [ParallelRunStep](https://docs.microsoft.com/en-us/python/api/azureml-contrib-pipeline-steps/azureml.contrib.pipeline.steps.parallelrunstep?view=azure-ml-py): A step to process large amounts of data asynchronously and in parallel. In this lab we will use this type for batch inferencing on a large input data.

### Generate the pipeline step scripts

**Run the cell below to create the folder, `scripts`, where we will save the script files**

In [None]:
source_folder = './scripts'
preprocess_filename = 'preprocess.py'
inference_filename = 'inference.py'
preprocess_file_full_path = os.path.join(source_folder, preprocess_filename)
inference_file_full_path = os.path.join(source_folder, inference_filename)
os.makedirs(source_folder, exist_ok=True)

**Run the cell below to generate the script for preprocessing the sample inference data**

Review the script to see how we take the registered inference dataset and expand it `n` folds to simulate a large dataset that can be then fed to the inference step for batch scoring.

In [None]:
%%writefile $preprocess_file_full_path
import argparse
import os
import pandas as pd
import numpy as np

import azureml.core
from azureml.core import Workspace, Experiment, Run
from azureml.core import Dataset

print("In preprocess.py")

parser = argparse.ArgumentParser("preprocess")

parser.add_argument("--dataset_name", type=str, help="dataset name", dest="dataset_name", required=True)
parser.add_argument("--n", type=str, help="expansion factor", dest="n", required=False, default=20)
parser.add_argument("--output", type=str, help="output directory for processed data", dest="output", required=True)

args = parser.parse_args()

print("Argument 1: %s" % args.dataset_name)
print("Argument 2: %s" % args.n)
print("Argument 3: %s" % args.output)

run = Run.get_context()
ws = run.experiment.workspace

input_dataset = ws.datasets[args.dataset_name]
print('Dataset name {} and version {}'.format(args.dataset_name, input_dataset.version))
data = input_dataset.to_pandas_dataframe()
print('Inference data loaded!')

columns = ['vendorID', 'passengerCount', 'tripDistance', 'hour_of_day', 'day_of_week', 
            'day_of_month', 'month_num', 'snowDepth', 'precipTime', 'precipDepth', 
            'temperature', 'normalizeHolidayName', 'isPaidTimeOff']

data = data[columns]

df = data.copy()
print("Size of the initial dataframe: {}".format(len(df)))

for i in range(args.n):
    data_copy = data.copy()
    df = df.append(data_copy, ignore_index=True)

print("Size of the final dataframe: {}".format(len(df)))

import random
import string
N = 8
print('Adding customer_id')
df.loc[:, 'customer_id'] = [''.join(random.choices(string.ascii_uppercase + string.digits, k=N)) 
                            for i in range(len(df))]

from random import randrange
print('Adding partition col')
df.loc[:, 'partition_id'] = [randrange(10) for i in range(len(df))]

print("Saving processed file...")
if not (args.output is None):
    os.makedirs(args.output, exist_ok=True)
    df.to_parquet(args.output, partition_cols=['partition_id'])
    print("Saving processed file done!")
else:
    print("Output directory not found. File not saved!")

print("Done!")

**Run the cell below to generate the script for batch inferencing**

Just like a real-time inferencing service, a batch inferencing requires a scoring script to load the model and use it to predict new values. It must include two functions:

- init(): Called when the pipeline is initialized.
- run(mini_batch): Called for each batch of data to be processed.

Typically, you use the init function to load the model from the model registry, and use the run function to generate predictions from each batch of data and return the results.

In [None]:
%%writefile $inference_file_full_path
import argparse
import os
import pandas as pd
import numpy as np
import joblib
from sklearn.ensemble import GradientBoostingRegressor
import azureml.core
from azureml.core.model import Model
from azureml_user.parallel_run import EntryScript

columns = ['vendorID', 'passengerCount', 'tripDistance', 'hour_of_day', 'day_of_week', 
            'day_of_month', 'month_num', 'snowDepth', 'precipTime', 'precipDepth', 
            'temperature', 'normalizeHolidayName', 'isPaidTimeOff']

def init():
    global nyc_taxi_model

    logger = EntryScript().logger
    logger.info("init() is called.")
    print("SDK version:", azureml.core.VERSION)
    
    parser = argparse.ArgumentParser("inference")
    parser.add_argument("--model_name", dest="model_name", required=True, type=str, help="model name")
    parser.add_argument("--model_version", dest="model_version", required=True, type=int, help="model version")
    args, unknown_args = parser.parse_known_args()

    model_path = Model.get_model_path(model_name=args.model_name, version=args.model_version)
    logger.info("model path: " + model_path)
    nyc_taxi_model = joblib.load(model_path)
    logger.info("Model loaded.")

def run(input_data):
    logger = EntryScript().logger
    logger.info("run() is called with number of rows: {}.".format(len(input_data)))
    print("Number of rows: {}.".format(len(input_data)))
    
    inference_result = nyc_taxi_model.predict(input_data[columns]) # return type is ndarray
    
    results = pd.DataFrame()
    results['customer_id'] = input_data['customer_id']
    results['taxi_fare'] = inference_result

    #return list(inference_result) # this needs to be either list or pandas dataframe
    return results

### Create the RunConfiguration for Pipeline Steps

A RunConfiguration specify requirements for the pipeline steps such as conda dependencies and docker image. Create and register a custom Inferencing Environment. Use the custom environment in the RunConfiguration.

In [None]:
inference_env = Environment.get(workspace=ws, name='AzureML-Minimal').clone("Custom-Batch-Inference-Env")
cd = inference_env.python.conda_dependencies
cd.add_pip_package("inference-schema")
cd.add_pip_package("azureml-dataset-runtime[fuse,pandas]")
cd.add_pip_package("numpy")
cd.add_pip_package("pandas")
cd.add_pip_package("joblib")
cd.add_pip_package("scikit-learn==0.24.1")
cd.add_pip_package("sklearn-pandas==2.2.0")
inference_env.register(workspace=ws)
print('Registered inferencing env.')

# Create a new runconfig object
run_amlcompute = RunConfiguration()
run_amlcompute.environment = inference_env
print('Created new run configuration.')

### Create a Preprocess step of type PythonScriptStep

A `PythonScriptStep` is a basic, built-in step to run a Python Script on a compute target. It takes a script name and optionally other parameters like arguments for the script, compute target, inputs and outputs, and run configuration. In this case, we use a pipeline data object to save the output of the preprocess step.

In [None]:
datastore = ws.get_default_datastore()
processed_batch_data = PipelineOutputFileDataset(PipelineData('processed_batch_data', datastore=datastore))
print("PipelineData object created")

preprocessStep = PythonScriptStep(
    name="process_data",
    script_name=preprocess_filename, 
    arguments=["--dataset_name", dataset_name,
               "--output", processed_batch_data],
    outputs=[processed_batch_data],
    allow_reuse = False,
    compute_target=compute_target,
    runconfig=run_amlcompute,
    source_directory=source_folder
)
print("preprocessStep created")

### Create an inference step of type ParallelRunStep

The `ParallelRunConfig`, configures how we want to batch the input data and run the scoring script in parallel on available compute cores. The compute cluster provisioned for this lab has a total of 8 cores across two nodes. In this lab, we will use both nodes of the compute cluster, however we will limit to use only 2 of the available 4 cores on each node. The reason for this restriction is for demonstration purpose only, and thus we will be able to see the scoring job spread across both nodes of the compute cluster. The output_action setting for the step is set to "append_row", which will ensure that all instances of the step being run in parallel will collate their results to a single output file named ` nyc-taxi-fares.txt`.

The `ParallelRunStep` accepts to input parameters: the model name and model version. The step also accepts the output of the preprocess step as input. Also, the output of the step is written to another pipeline data object.

In [None]:
datastore = ws.get_default_datastore()
inference_output = PipelineData('inference_output', datastore=datastore)
print("PipelineData object created")

parallel_run_config = ParallelRunConfig(
    source_directory=source_folder,
    entry_script=inference_filename, 
    mini_batch_size='10MB', #approximate size of mini batch data
    error_threshold=10, #acceptable number of record failures
    output_action='append_row',
    append_row_file_name="nyc-taxi-fares.txt",
    environment=inference_env,
    compute_target=compute_target, 
    #Number of nodes to use from the compute target
    node_count=2, 
    #Typically you will not set process_count_per_node and it will default to number of cores on node
    #Here we limit to use only 2 of the 4 cores available on each node
    process_count_per_node = 2,
    run_invocation_timeout=240, #Timeout in seconds for each invocation of the run() method
    run_max_try = 3 #The number of maximum tries for a failed or timeout mini batch
)

inferenceStep = ParallelRunStep(
    name="inference", 
    #Convert PipelineOutputFileDataset to PipelineOutputTabularDataset
    inputs=[processed_batch_data.parse_parquet_files()],
    output=inference_output,
    parallel_run_config=parallel_run_config,
    arguments=['--model_name', model_name, '--model_version', model_version],
    allow_reuse=False
)

print("inferenceStep created")

## Create the Batch Scoring Pipeline

Since there is a data dependency between the two steps: `preprocessStep` and `inferenceStep`, we only specify the final step in the pipeline. Because of the data dependency the `preprocessStep` will be run prior to running the `inferenceStep`. Run the following cell to create and validate the pipeline.

In [None]:
inference_pipeline = Pipeline(workspace=ws, steps=[inferenceStep])
print ("Inference Pipeline is built")

inference_pipeline.validate()
print("Simple validation complete")

## Submit the pipeline run

The code pattern to submit a run to Azure Machine Learning compute targets is always:

- Create an experiment to run.
- Submit the experiment.
- Wait for the run to complete.

In [None]:
experiment_name = 'lab03-exp'
pipeline_run = Experiment(ws, experiment_name).submit(inference_pipeline)
print("Pipeline is submitted for execution")

### Monitor the Run

Using the azureml Jupyter widget, you can monitor the pipeline run. Run the cell below to monitor the experiment run. Wait till the experiment run status is **Completed** before proceeding beyond the next cell.

In [None]:
RunDetails(pipeline_run).show()

## Download and review output

Run the cell below to download and review the ouput of the batch scoring pipeline. The results will be downloaded in a local folder named: `azureml`. The output will print every 25k row, and each row will print customer id and the predicted taxi fare. In the end the output will also print the total number of rows (customer fares) that were scored.

In [None]:
data = pipeline_run.find_step_run('inference')[0].get_output_data('inference_output')
data.download('.', show_progress=True)
data_path = os.path.join('./', data.path_on_datastore, 'nyc-taxi-fares.txt')
with open(data_path) as fp:
    line = fp.readline()
    cnt = 1
    while line:
        if cnt%25000 == 0:
            print("Line {}: {}".format(cnt, line.strip()))
        line = fp.readline()
        cnt += 1
print("Total number of rows processed: {}".format(cnt))

## Monitor Batch Scoring Experiment Run

You can monitor your batch inferencing pipeline runs from within Azure Machine Learning Studio. From the pipeline run details you can navigate to each pipeline step run details and monitor and review the various log files. Below we can look at how to access the **EntryScript** logs that we generated within our inference script’s **init** and **run** methods.

Run the cell below and then **right-click** on **Link to Azure Machine Learning studio** link below to open the `AML Experiment Run Details` page in a **new browser tab**.

In [None]:
pipeline_run

- From the **Pileline Run Details** page select **Steps**

![Pipeline Run Details](./images/monitor-logs01.png 'Pipeline Run Details')

- From the **Steps** tab, select the step: **inference**
    
![Pipeline Run Details - Pipeline Steps](./images/monitor-logs02.png 'Pipleline Steps')

- In the **Outputs + logs** tab, navigate to **logs, user, entry_script_logs, first folder, first log file**
    
![Inference Step Run Details - Outputs + logs](./images/monitor-logs03.png 'Inference Step Outputs + Logs')

Recall that our compute cluster has a maximum of 2 nodes, and each node has 4 cores. However, in **parallel_run_config** we restricted to use only **2 cores per node**. Thus, as a result there are two log files (one for each core) per node (organized by folder).  In the entry script logs you can observe that the scoring script was run twice on this particular core. And the batch size compute resulted in approximately **25k** rows. You will also observe that for some of the cores the entry script was called three times. Overall the entry script was called 10 times across all 4 cores, processing approximately **250k** rows of data.