# TABLE OF CONTENTS:
---
* [Setup](#Setup)
    * [Connect to Workspace](#Connect-to-Workspace)
* [Data](#Data)
    * [Create and Register Input Dataset](#Create-and-Register-Input-Dataset)
    * [Create Output PipelineData](#Create-Output-PipelineData)
* [Compute Target](#Compute-Target)
* [Pipeline Run Environment](#Pipeline-Run-Environment)
* [Pipeline Artifacts & Configuration](#Pipeline-Artifacts-&-Configuration)
    * [Scoring Script](#Scoring-Script)
    * [Parallel Run Configuration](#Parallel-Run-Configuration)
    * [Parallel Run Step](#Parallel-Run-Step)
* [Pipeline Run](#Pipeline-Run)
    * [Submit Experiment](#Submit-Experiment)
    * [Download & Inspect Pipeline Output](#Download-&-Inspect-Pipeline-Output)
* [Publish the Pipeline](#Publish-the-Pipeline)
    * [Configure Service Principal Secret](#Configure-Service-Principal-Secret)
    * [Retrieve Authentication Header](#Retrieve-Authentication-Header)
* [Trigger a Published Pipeline Run](#Trigger-a-Published-Pipeline-Run)
* [Resource Clean Up](#Resource-Clean-Up)
---

In this notebook, a batch scoring pipeline is published and triggered which uses the PyTorch model trained in the **02_model_training** notebook to run inference on mini-batches of images in a parallel manner on a compute cluster. In general, machine learning pipelines help to optimize the workflow in terms of speed, portability and reuse. After building and publishing the pipeline, a REST endpoint can be used to trigger the pipeline from any HTTP library on any platform.

# Setup

In [None]:
# Import libraries
import azureml.core
import pandas as pd
import requests
import tempfile
from azureml.core import Environment, Experiment, Workspace
from azureml.core.authentication import ServicePrincipalAuthentication
from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.core.compute_target import ComputeTargetException
from azureml.core.dataset import Dataset
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.pipeline.core.run import PipelineRun
from azureml.pipeline.steps import ParallelRunConfig, ParallelRunStep
from datetime import datetime
from dotenv import load_dotenv
from pathlib import Path

In [None]:
# Check core SDK version number
print("SDK version:", azureml.core.VERSION)

### Connect to Workspace

In order to connect and communicate with the Azure Machine Learning (AML) workspace, a workspace object needs to be instantiated using the Azure ML SDK.

In [None]:
# Connect to the AML workspace.
# For alternative connection options (e.g. for automated workloads) see the aml_snippets directory.
ws = Workspace.from_config()

# Data

### Create and Register Input Dataset

Create a Dataset object which will be used to read data from the workspace default datastore. 

**Note**: To transfer intermediate data between pipeline steps, a PipelineData object should be used instead.
While here the pipeline will consist of a single step only, a usual flow with multiple steps will include:
* Using Dataset objects as inputs to fetch raw data, performing some transformations, then outputting a PipelineData object.
* Use the previous step's PipelineData output object as an input object, repeated for subsequent steps.

In [None]:
# Create a dataset using the default datastore
# Here the validation set will be used for batch scoring (to exemplify the process;
# in reality a folder with new data that you want to score should be used instead -> data/batchscoring)
datastore = ws.get_default_datastore()
input_images = Dataset.File.from_files((datastore, "data/fowl_data/val"))

In [None]:
# Register the dataset to the AML workspace
input_images = input_images.register(workspace=ws, name="fowl-dataset-batch-scoring")

### Create Output PipelineData

Create a PipelineData object which will be used as a pointer to the datastore and will determine where to output the batch scoring results.

In [None]:
output_dir = PipelineData(name="batch_scoring_results", datastore=datastore)

# Compute Target

Create a remote compute target to run experiments on. The below code will first check whether a compute target with name `cluster_name` already exists and if it does, it will use that instead of creating a new one.

AML pipelines cannot be run locally.

In [None]:
# Choose a name for the CPU cluster
cluster_name = "cpu-cluster"

# Verify that cluster does not exist already
try:
    compute_target = ComputeTarget(workspace=ws, name=cluster_name)
    print("Found existing cluster, use it.")
except ComputeTargetException:
    compute_config = AmlCompute.provisioning_configuration(vm_size="STANDARD_D2_V2", # CPU
                                                           # vm_size='STANDARD_NC6', # GPU
                                                           max_nodes=4,
                                                           idle_seconds_before_scaledown=2400)
    
    compute_target = ComputeTarget.create(ws, cluster_name, compute_config)

compute_target.wait_for_completion(show_output=True)

# Use get_status() to get a detailed status for the current cluster
print(compute_target.get_status().serialize())

# Pipeline Run Environment

Retrieve a registered environment from the AML workspace to run the pipeline in.

In [None]:
env_name = "pytorch-aml-env"
env = Environment.get(workspace=ws, name=env_name)

# Pipeline Artifacts & Configuration

### Scoring Script

In order for the pipeline to use the model, a batch scoring script is created (which is different from the live inference scoring script). 

This script takes a minibatch of input images, applies the classification model, and outputs the predictions to a results file.

The script batch_scoring.py takes the following parameters, which get passed from the ParallelRunStep that is created later in the notebook:

* --model_name: the name of the model being used

The pipelines infrastructure uses the ArgumentParser class to pass parameters into pipeline steps. For example, in the code below the first argument --model_name is given the property identifier model_name. In the main() function, this property is accessed using Model.get_model_path(args.model_name).

The pipeline here only has one step and writes the output to a file, but for multi-step pipelines, ArgumentParser is also used to define a directory to write output data for input to subsequent steps. An example for such a pattern can be found [here](https://github.com/Azure/MachineLearningNotebooks/blob/master/how-to-use-azureml/machine-learning-pipelines/nyc-taxi-data-regression-model-building/nyc-taxi-data-regression-model-building.ipynb).

In [None]:
%%writefile ../src/batch_pipeline_deployment/batch_score.py
# Import libraries
import argparse
import json
import numpy as np
import os
import torch
import torch.nn as nn
from azureml.core.model import Model
from PIL import Image
from torchvision import transforms


def init():
    
    global model
    
    parser = argparse.ArgumentParser(description="Start the Pytorch model serving")
    parser.add_argument("--model_name", dest="model_name", required=True)
    args, _ = parser.parse_known_args()
    
    model_path = Model.get_model_path(args.model_name)
    model = torch.load(model_path, map_location=lambda storage, loc: storage)
    model.eval()
    
    
def run(mini_batch):
    
    result_list = []
    for file_path in mini_batch:
        image = preprocess_image(file_path)
        
        # get prediction
        with torch.no_grad():
            output = model(image)
            classes = ["chicken", "turkey"]
            softmax = nn.Softmax(dim=1)
            pred_probs = softmax(output).numpy()[0]
            index = torch.argmax(output, 1)
            result = os.path.basename(file_path) + ", " + str(classes[index]) + ", " + str(pred_probs[index])
            result_list.append(result)
    
    return result_list


def preprocess_image(image_file):
    """
    Preprocess an input image.
    :param image_file: Path to the input image
    :return image: preprocessed image as torch tensor
    """
    
    data_transforms = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

    image = Image.open(image_file)
    image = data_transforms(image)
    image = image[np.newaxis, ...]
    
    return image

### Parallel Run Configuration

Create a parallel run configuration to wrap the inference script. 

In [None]:
parallel_run_config = ParallelRunConfig(
    environment=env,
    entry_script="batch_score.py",
    source_directory="../src/batch_pipeline_deployment",
    output_action="append_row",
    append_row_file_name="parallel_run_step.txt",
    mini_batch_size="10",
    error_threshold=1,
    compute_target=compute_target,
    process_count_per_node=2,
    node_count=1
)

### Parallel Run Step

Create the pipeline step. 

A pipeline step is an object that encapsulates everything that is needed for running a pipeline including:
* environment and dependency settings
* the compute target to run the pipeline on
* input and output data, and any custom parameters
* reference to a script or SDK-logic to run during the step

There are multiple classes that inherit from the parent class PipelineStep to assist with building a step using certain frameworks and stacks. Here, a ParallelRunStep class is used to define the step logic using a scoring script.

An object reference in the outputs array becomes available as an input for a subsequent pipeline step, for scenarios where there is more than one step.

In [None]:
parallel_step_name = "batchscoring-" + datetime.now().strftime("%Y%m%d%H%M")

batch_score_step = ParallelRunStep(
    name=parallel_step_name,
    inputs=[input_images.as_named_input("input_images")],
    output=output_dir,
    arguments=["--model_name", "fowl-model"],
    parallel_run_config=parallel_run_config,
    allow_reuse=False
)

For a list of all classes for different step types, see the [steps package](https://docs.microsoft.com/en-gb/python/api/azureml-pipeline-steps/azureml.pipeline.steps?view=azure-ml-py).

# Pipeline Run

### Submit Experiment

Create a Pipeline object with all the configured pipeline steps as well as an experiment object to submit the pipeline for execution.

Note: The first pipeline run takes roughly 15 minutes, as all dependencies must be downloaded, a Docker image is created, and the Python environment is provisioned/created. Running it again takes significantly less time as those resources are reused. However, total run time depends on the workload of your scripts and processes running in each pipeline step.

In [None]:
pipeline = Pipeline(workspace=ws, steps=[batch_score_step])
pipeline_run = Experiment(ws, "pytorch-fowl-batch-scoring").submit(pipeline)

In [None]:
# Wait for completion of the run and show output log
pipeline_run.wait_for_completion(show_output=True)

### Download & Inspect Pipeline Output

In [None]:
batch_run = pipeline_run.find_step_run(batch_score_step.name)[0]
batch_output = batch_run.get_output_data(output_dir.name)

target_dir = tempfile.mkdtemp()
batch_output.download(local_path=target_dir)
result_file = os.path.join(target_dir, batch_output.path_on_datastore, parallel_run_config.append_row_file_name)

df = pd.read_csv(result_file, delimiter=",", header=None)
df.columns = ["Filename", "Prediction", "Probability"]
print("Prediction has ", df.shape[0], " rows")

# Show first 30 rows of output file
df.head(30)

In [None]:
# Check path on datastore
batch_output.path_on_datastore

# Publish the Pipeline

Publish the pipeline to create a REST endpoint that allows to rerun the pipeline from any HTTP library on any platform. The published pipeline can also be run from the AML workspace where different metdata such as run history and duration are tracked as well.

In [None]:
published_pipeline = pipeline_run.publish_pipeline(
    name="fowl-pytorch-scoring",
    description="Batch scoring using fowl pytorch model",
    version="1.0")

published_pipeline

To run the pipeline using the REST endpoint, an OAuth2 Bearer-type authentication header is needed. For an automated workflow in a production scenario, a service principal (App Registration) should be created to retrieve this authentication header. For more information on how to create a service principal from the Azure Portal, check [this](https://docs.microsoft.com/en-us/azure/active-directory/develop/howto-create-service-principal-portal). A secret for the service principal needs to be created afterwards and the service principal needs to be granted role access to the AML workspace. The service principal password will be retrieved from Azure Key Vault.

### Configure Service Principal Secret

The following cells only need to be run once. They will retrieve the service principal secret from the .env configuration file and write it to the workspace keyvault.

In [None]:
# Load environment variables from .env file
env_path = Path("../config/") / ".env"
load_dotenv(dotenv_path=env_path)

In [None]:
# Create keyvault object
keyvault = ws.get_default_keyvault()

In [None]:
# Set keyvault secret
svc_pr_pw = os.environ.get("AML_SERVICE_PRINCIPAL_PASSWORD")
keyvault.set_secret(name="svc-pr-pw", value = svc_pr_pw)

### Retrieve Authentication Header

Use the service principal authentication to retrieve an OAuth2 Bearer-type authentication header.

In [None]:
svc_pr_pw = keyvault.get_secret(name="svc-pr-pw")

In [None]:
svc_pr = ServicePrincipalAuthentication(
    tenant_id="461e2020-109b-4c43-ad3f-eb9944f5dc44",
    service_principal_id="8a0b5ebf-55c7-4dfa-a49c-37b0acd9c3ce",
    service_principal_password=svc_pr_pw)

auth_header = svc_pr.get_authentication_header()

# Trigger a Published Pipeline Run

Get the REST url from the endpoint property of the published pipeline object. The REST url can also be found in the AML workspace in the portal.

Build an HTTP POST request to the endpoint, specifying the authentication header. Additionally, add a JSON payload object with the experiment name and the configuration parameter assignments. Here, the process_count_per_node is used as an example parameter, which is passed through to ParallelRunStep because it is defined in the step configuration.

Make the request to trigger the run.

In [None]:
rest_endpoint = published_pipeline.endpoint
response = requests.post(rest_endpoint, 
                         headers=auth_header, 
                         json={"ExperimentName": "pytorch-fowl-batch-scoring-request",
                               "ParameterAssignments": {"process_count_per_node": 6}})

Access the Id key from the response dict to get the value of the run id.

In [None]:
try:
    response.raise_for_status()
except Exception:    
    raise Exception("Received bad response from the endpoint: {}\n"
                    "Response Code: {}\n"
                    "Headers: {}\n"
                    "Content: {}".format(rest_endpoint, response.status_code, response.headers, response.content))

run_id = response.json().get("Id")
print("Submitted pipeline run: ", run_id)

If desired, use the run id to monitor the status of the new run.

In [None]:
published_pipeline_run = PipelineRun(ws.experiments["pytorch-fowl-batch-scoring-request"], run_id)

In [None]:
# Show detailed info about the run
published_pipeline_run.wait_for_completion(show_output=True)

# Resource Clean Up

Delete the compute target

In [None]:
# compute_target.delete()