# Benchmarking with an example pipeline
This notebook creates and submits a simple pipeline based on the scripts in `./steps`. Each step in this example pipeline uses the `PipelineBenchmarker` class, with the final step showing how to consolidate and save the whole pipeline benchmarks.

In [None]:
%load_ext dotenv
%dotenv

import os
import azureml.core
from azureml.core import Workspace, Environment, Datastore, Dataset, Experiment
from azureml.core.authentication import InteractiveLoginAuthentication
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.exceptions import ComputeTargetException
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies
from azureml.pipeline.core import Pipeline, PipelineData, PipelineParameter
from azureml.pipeline.steps import PythonScriptStep
from azureml.data.data_reference import DataReference
from msrest.exceptions import HttpOperationError

## 0. Setting environment config
Prior to running this notebook, please create a `.env` file in this directory, using `.env.example` as reference.

## 1. Get configuration and secrets
Load in the config and retrieve secrets to access the Azure Machine Learning (AML) workspace

In [None]:
BLOB_STORAGE_NAME = os.environ['AML_STORAGE_NAME'] 
BLOB_STORAGE_KEY = os.environ['AML_STORAGE_KEY']

Log in through Interactive Loging Authentication

In [None]:
interactive_auth = InteractiveLoginAuthentication(tenant_id=os.environ["TENANT_ID"])

Restore the AML workspace from the `config.json` file which can be downloaded through the Azure portal. You may also reference the `config.json.example` file in this directory.

In [None]:
ws = Workspace.from_config()
print('Workspace name: ' + ws.name, 
      'Azure region: ' + ws.location, 
      'Subscription id: ' + ws.subscription_id, 
      'Resource group: ' + ws.resource_group, sep = '\n')

Set secrets into the assigned KeyVault

In [None]:
required_secrets = [
    'AML_STORAGE_NAME',
    'AML_STORAGE_KEY',
]

keyvault = ws.get_default_keyvault()

if keyvault is None:
    print("Keyvault for workspace did not load correctly", Severity.ERROR)
# Replace _ with - to change variable name to acceptable format for Keyvault
for key in required_secrets:
    value = os.getenv(key)
    name = key.replace("_", "-")
    try:
        print(f"Attempting to set env: \"{key}\" as secret: \"{name}\" in the workspace keyvault")
        keyvault.set_secret(name=name, value=value)
    except HttpOperationError as err:
        print(f"Issue setting secret in keyvault with name: {name}, Err: {str(err)}")

## 2. Set up the pipeline environment

### 2.1 Set the compute target
Create the compute target which will be used to execute the pipeline

In [None]:
def get_compute(workspace: Workspace, compute_name: str, vm_size: str, vm_priority: str, min_nodes: int, max_nodes: int,
                scale_down: int):
    """Returns an existing compute or creates a new one.
    
    Parameters
    ----------
    workspace: Workspace
        AzureML workspace
    compute_name: str
        Name of the compute
    vm_size: str
        VM size
    vm_priority: str
        Low priority or dedicated cluster
    min_nodes: int
        Minimum number of nodes
    max_nodes: int
        Maximum number of nodes in the cluster
    scale_down: int
        Number of seconds to wait before scaling down the cluster
    
    Returns
    ----------
    compute_target : ComputeTarget
        A reference to compute
    """
    try:
        if compute_name in workspace.compute_targets:
            compute_target = workspace.compute_targets[compute_name]
            if compute_target and isinstance(compute_target, AmlCompute):
                print(f"Found existing compute target {compute_name} so using it.")
        else:
            compute_config = AmlCompute.provisioning_configuration(vm_size=vm_size,
                                                                   vm_priority=vm_priority,
                                                                   min_nodes=min_nodes,
                                                                   max_nodes=max_nodes,
                                                                   idle_seconds_before_scaledown=scale_down,
                                                                   )

            compute_target = ComputeTarget.create(workspace, compute_name, compute_config)
            compute_target.wait_for_completion(show_output=True)
        return compute_target
    except ComputeTargetException as ex_var:
        print(f'An error occurred trying to provision compute: {str(ex_var)}')
        sys.exit(-1)

In [None]:
# Set compute target
compute_target = get_compute(
    workspace=ws,
    compute_name="benchmark",
    vm_size='STANDARD_D2_V3',
    vm_priority='lowpriority', 
    min_nodes=0,
    max_nodes=4,
    scale_down=120,
)

In [None]:
compute_target

### 2.2 Set up and register Datastore references

In [None]:
def get_blob_datastore(workspace: Workspace, data_store_name: str, storage_name: str, storage_key: str,container_name: str):
    """Returns a reference to a datastore
    
    Parameters
    ----------
    workspace : Workspace
        Existing AzureML Workspace object
    data_store_name : string
        Data store name
    storage_name : string
        Blob storage account name
    storage_key : string
        Blob storage account key
    container_name : string
        Container name

    Returns
    ----------
    blob_datastore : Datastore
        A reference to datastore
    """
    try:
        blob_datastore = Datastore.get(workspace, data_store_name)
        print(f"Found Blob Datastore with name: {data_store_name}")
    except HttpOperationError:
        blob_datastore = Datastore.register_azure_blob_container(
            workspace=workspace,
            datastore_name=data_store_name,
            account_name=storage_name,  # Storage account name
            container_name=container_name,  # Name of Azure blob container
            account_key=storage_key)  # Storage account key
    print(f"Registered blob datastore with name: {data_store_name}")
    return blob_datastore

Set up datastore reference and directories

In [None]:
pipeline_datastore = get_blob_datastore(workspace=ws,
                                    data_store_name="sandbox_ds",
                                    storage_name=BLOB_STORAGE_NAME,
                                    storage_key=BLOB_STORAGE_KEY,
                                    container_name="sandbox")

root_dir = DataReference(
    datastore=pipeline_datastore,
    data_reference_name="root_dir",
    mode="mount"
)
input_dir = "experiments/input"
output_dir = "experiments/output"
results_dir = "experiments/results"

### 2.3 Set up the task-specific environment

In [None]:
# Create Pipeline run configuration 
run_config = RunConfiguration()
run_config.environment.docker.enabled = False
run_config.environment.python.user_managed_dependencies = False
run_config.environment.python.conda_dependencies = CondaDependencies.create(
    conda_packages=[
        "python==3.8.5"
    ],
    pip_packages=[
        "numpy==1.18.5",
        "pandas==1.1.3",
        "azureml-core==1.22.0",
        "azureml-dataprep==2.5.0",
        "azure-storage-blob==12.5.0",
        "argparse==1.4.0",
        "vyper-config==0.6.2",
    ],
)

## 3. Create the pipeline
Define the steps of the pipeline. In this example pipeline, each step represents a particular Python script in `./steps`.

In [None]:
steps_dir = "../../"

data_prep_step = PythonScriptStep(
    script_name="mlops/example_pipeline/steps/prepare_data.py",
    source_directory=steps_dir,
    arguments=[
        "--root_dir",
        root_dir,
        "--raw_dir",
        input_dir,
        "--prepared_dir",
        output_dir,
    ],
    inputs=[root_dir],
    outputs=[],
    compute_target=compute_target,
    runconfig=run_config,
    allow_reuse=False,
)

data_process_step = PythonScriptStep(
    script_name="mlops/example_pipeline/steps/process_data.py",
    source_directory=steps_dir,
    arguments=[
        "--root_dir",
        root_dir,
        "--prepared_dir",
        output_dir,
        "--results_dir",
        results_dir,
    ],
    inputs=[root_dir],
    outputs=[],
    compute_target=compute_target,
    runconfig=run_config,
    allow_reuse=False,
)

benchmark_step = PythonScriptStep(
    script_name="mlops/example_pipeline/steps/generate_benchmark_report.py",
    source_directory=steps_dir,
    arguments=["--root_dir", root_dir],
    inputs=[root_dir],
    outputs=[],
    compute_target=compute_target,
    runconfig=run_config,
    allow_reuse=False,
)


Order the steps and build the pipeline

In [None]:
# Wait for certain steps to run before starting another
data_process_step.run_after(data_prep_step)
benchmark_step.run_after(data_process_step)

# Build the pipeline
list_of_steps = [data_prep_step, data_process_step, benchmark_step]
example_pipeline = Pipeline(workspace=ws, steps=[list_of_steps])

## 4. Publish and run the pipeline
Let's first publish the pipeline.

In [None]:
# Check if the pipeline is consistent 
example_pipeline.validate()

# Publish pipeline
published_pipeline = example_pipeline.publish(
    name = "example-pipeline",
    description = "Pipeline to demonstrate the PipelineBenchmarker class"
)

Now that it is published, let's submit and run the example pipeline.

In [None]:
# Submit the pipeline
pipeline_run = Experiment(ws, 'example-pipeline').submit(example_pipeline)
pipeline_run.wait_for_completion()

## 5. Viewing benchmarks
Now that the pipeline has run, the benchmarking results have been saved into the output of the experiment (pipeline run) in the Azure Machine Learning portal.

Benchmarking results have also been written to the mounted blob storage. `/pipeline_benchmarks/benchmarks.csv` contains a record of all the pipeline runs. More detail for each individual run is stored in `pipeline_benchmarks/individual_runs/benchmark_<run ID here>.json`.

See the documentation in `../../docs` for more details and example output.