Copyright (c) Microsoft Corporation. All rights reserved.  
Licensed under the MIT License.

## Description

This demo notebook shows how to create an AML pipeline that:

1. Copies input data from ADLS to blob
1. Reads input blob data, scores, and writes output to blob
1. Copies output data from blob to ADLS

NOTE: This version uses [KeyVault](https://azure.microsoft.com/en-us/services/key-vault/) to store secrets for ADLS and blob

## Prerequisites
Make sure you go through the configuration Notebook located at https://github.com/Azure/MachineLearningNotebooks first if you haven't. This sets you up with a working config file that has information on your workspace, subscription id, etc.

Next, authenticate your VM to Key Vault using MSI. The following steps are based on the tutorial: [Use a Windows VM system-assigned managed identity to access Azure Key Vault](
https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/tutorial-windows-vm-access-nonaad)

browse: Azure portal

## create KeyVault
add the following secrets:
- account-key-blob: <your_storage_account_key
- tenant-id: <your_tenant_id>
- client-id: <your_client_id>
- client-secret: <your_client_secret>

# turn on MSI for your VM
click: <your vm>
click: identity
click: System assigned: On (off by default)
click: Save

# grant VM access to secrets in your Key Vault
click: <your key vault>
click: Access policies > Add new
select: Configure from template: Secret Management
select: Select principal: <your vm>
click: Select
click: OK > Save

In [None]:
from azureml.core import Datastore
from azureml.core import Experiment
from azureml.core.compute import AmlCompute, ComputeTarget, DataFactoryCompute
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.datastore import Datastore
from azureml.core.runconfig import CondaDependencies, RunConfiguration
from azureml.data.data_reference import DataReference
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.steps import DataTransferStep
from azureml.exceptions import ComputeTargetException

In [None]:
import os
from azureml.core import Workspace, Run, Experiment

ws = Workspace.from_config()
print('Workspace name: ' + ws.name, 
      'Azure region: ' + ws.location, 
      'Subscription id: ' + ws.subscription_id, 
      'Resource group: ' + ws.resource_group, sep = '\n')


## Set up machine learning resources

### Create datastores

ADLS will contain the input and output data. Blob will act as a "staging" area for the data, since it can be mounted from AML.

In [None]:
import requests

key_vault_name="<your_key_vault_name>"

# securely get key vault token
headers={'Metadata': 'true'}
url = "http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=https%3A%2F%2Fvault.azure.net"
response = requests.get(url=url, headers=headers).json()
key_vault_access_token = response['access_token']

# get secrets
headers={'Authorization': 'Bearer ' + key_vault_access_token}

url=f"https://{key_vault_name}.vault.azure.net/secrets/account-key-blob?api-version=2016-10-01"
response = requests.get(url=url, headers=headers).json()
account_key_blob=response['value']

url=f"https://{key_vault_name}.vault.azure.net/secrets/tenant-id?api-version=2016-10-01"
response = requests.get(url=url, headers=headers).json()
tenant_id=response['value']

url=f"https://{key_vault_name}.vault.azure.net/secrets/client-id?api-version=2016-10-01"
response = requests.get(url=url, headers=headers).json()
client_id=response['value']

url=f"https://{key_vault_name}.vault.azure.net/secrets/client-secret?api-version=2016-10-01"
response = requests.get(url=url, headers=headers).json()
client_secret=response['value']

In [None]:
datastore_name_blob="MyBlobDatastore"
account_name_blob = "<your_storage_account_name>"
container_name_blob="batchscoring"

blob_datastore = Datastore.register_azure_blob_container(ws, 
                      datastore_name=datastore_name_blob, 
                      container_name=container_name_blob, 
                      account_name=account_name_blob,
                      account_key= account_key_blob,                                   
                      overwrite=True)
print("registered datastore with name: %s" % datastore_name_blob)

In [None]:
datastore_name_adls="MyAdlsDatastore"
store_name_adls="<your_adls_storage_account_name>"

adls_datastore = Datastore.register_azure_data_lake(
        workspace=ws,
        datastore_name=datastore_name_adls,
        subscription_id=ws.subscription_id, # subscription id of ADLS account
        resource_group=ws.resource_group, # resource group of ADLS account
        store_name=store_name_adls, # ADLS account name
        tenant_id=tenant_id, # tenant id of service principal
        client_id=client_id, # client id of service principal
        client_secret=client_secret) # the secret of service principal
print("registered datastore with name: %s" % datastore_name_adls)

### Configure data references
Now you need to add references to the data, as inputs to the appropriate pipeline steps in your pipeline. A data source in a pipeline is represented by a DataReference object. The DataReference object points to data that lives in, or is accessible from, a datastore.

In [None]:
# https://docs.microsoft.com/en-us/azure/machine-learning/service/reference-azure-machine-learning-cli

input_dir_adls_dataref = DataReference(datastore=adls_datastore,
                          data_reference_name="input_dir_adls",
                          path_on_datastore="input",
                          mode="download")
model_dir_adls_dataref = DataReference(datastore=adls_datastore,
                          data_reference_name="model_dir_adls",
                          path_on_datastore="model",
                          mode="download")
output_dir_adls_dataref = DataReference(datastore=adls_datastore,
                          data_reference_name="output_dir_adls",
                          path_on_datastore="output")

In [None]:
input_dir_blob_dataref = DataReference(datastore=blob_datastore, 
                             data_reference_name="input_dir_blob",
                             path_on_datastore="input_from_adls")
model_dir_blob_dataref = DataReference(datastore=blob_datastore, 
                          data_reference_name="model_dir_blob",
                          path_on_datastore="model_from_adls")
#output_dir_blob_dataref = DataReference(datastore=blob_datastore, 
#                          data_reference_name="output_dir_blob",
#                          path_on_datastore="output_to_adls")
output_dir_blob_pipedata = PipelineData(name="output", # folder path
                          datastore=blob_datastore, 
                          output_path_on_compute="output_to_adls")

### Create and attach Compute targets
Use the below code to create and attach Compute targets. 

In [None]:
import os

# choose a name for your cluster
aml_compute_name = os.environ.get("AML_COMPUTE_NAME", "gpu-cluster")
#aml_compute_name = os.environ.get("AML_COMPUTE_NAME", "cpu-cluster")
cluster_min_nodes = os.environ.get("AML_COMPUTE_MIN_NODES", 0)
cluster_max_nodes = os.environ.get("AML_COMPUTE_MAX_NODES", 1)
vm_size = os.environ.get("AML_COMPUTE_SKU", "STANDARD_NC6")
#vm_size = os.environ.get("AML_COMPUTE_SKU", "STANDARD_D2_V2")


if aml_compute_name in ws.compute_targets:
    compute_target = ws.compute_targets[aml_compute_name]
    if compute_target and type(compute_target) is AmlCompute:
        print('found compute target. just use it. ' + aml_compute_name)
else:
    print('creating a new compute target...')
    provisioning_config = AmlCompute.provisioning_configuration(vm_size = vm_size, # NC6 is GPU-enabled
                                                                vm_priority = 'lowpriority', # optional
                                                                min_nodes = cluster_min_nodes, 
                                                                max_nodes = cluster_max_nodes)

    # create the cluster
    compute_target = ComputeTarget.create(ws, aml_compute_name, provisioning_config)
    
    # can poll for a minimum number of nodes and for a specific timeout. 
    # if no min node count is provided it will use the scale settings for the cluster
    compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)
    
     # For a more detailed view of current Azure Machine Learning Compute  status, use the 'status' property    
    print(compute_target.status.serialize())

## Prepare the Model

In [None]:
# create a local model directory
model_dir = 'models'
if not os.path.isdir(model_dir):
    os.mkdir(model_dir)

In [None]:
# download the data to local model directory

#https://docs.microsoft.com/en-us/azure/data-lake-store/data-lake-store-data-operations-python
#https://stackoverflow.com/questions/48208389/python-code-to-access-azure-data-lake-store#48213226
!pip install azure-mgmt-resource
!pip install azure-mgmt-datalake-store
!pip install azure-datalake-store

from azure.datalake.store import core, lib, multithread

## Download a file
token = lib.auth(tenant_id = tenant_id, client_secret = client_secret, client_id = client_id)
adlsFileSystemClient = core.AzureDLFileSystem(token, store_name=store_name_adls)
multithread.ADLDownloader(adlsFileSystemClient, 
                          #lpath=model_dir,
                          lpath='models',
                          rpath='model',
                          nthreads=64, overwrite=True, buffersize=4194304, blocksize=4194304)

### Register the model with Workspace

In [None]:
import shutil
from azureml.core.model import Model

# register downloaded model 
model = Model.register(model_path = "models/",
                       model_name = "keras", # this is the name the model is registered as
                       tags = {'pretrained': "keras"},
                       description = "Keras model used to score Indian dataset",
                       workspace = ws)
# remove the downloaded dir after registration if you wish
#shutil.rmtree("models")

## Write your scoring script

To do the scoring, we use a batch scoring script `batch_scoring.py`, which is located in the same directory that this notebook is in. You can take a look at this script to see how you might modify it for your custom batch scoring task.

The python script `batch_scoring.py` takes input data from blob, performs scoring, and writes the results back out to blob.

The script `batch_scoring.py` takes the following parameters:

- `--input_dir`  : the blob path containing the input data
- `--output_dir` : the blob path containing the output data

## Build and run the batch scoring pipeline
You have everything you need to build the pipeline. Let's put all these together.

###  Specify the environment to run the script
Specify the conda dependencies for your script. You will need this object when you create the pipeline step later on.

In [None]:
from azureml.core.runconfig import DEFAULT_GPU_IMAGE

#cd = CondaDependencies.create(pip_packages=["tensorflow-gpu==1.10.0", "azureml-defaults"])
cd = CondaDependencies.create(pip_packages=["tensorflow-gpu==1.10.0", "azureml-defaults", "keras"])

# Runconfig
amlcompute_run_config = RunConfiguration(conda_dependencies=cd)
amlcompute_run_config.environment.docker.enabled = True
amlcompute_run_config.environment.docker.gpu_support = True
amlcompute_run_config.environment.docker.base_image = DEFAULT_GPU_IMAGE
amlcompute_run_config.environment.spark.precache_packages = False

### Create the pipeline step
Create the pipeline step using the script, environment configuration, and parameters. Specify the compute target you already attached to your workspace as the target of execution of the script. We will use PythonScriptStep to create the pipeline step.

In [None]:
data_factory_name = 'adftest'

def get_or_create_data_factory(workspace, factory_name):
    try:
        return DataFactoryCompute(workspace, factory_name)
    except ComputeTargetException as e:
        if 'ComputeTargetNotFound' in e.message:
            print('Data factory not found, creating...')
            provisioning_config = DataFactoryCompute.provisioning_configuration()
            data_factory = ComputeTarget.create(workspace, factory_name, provisioning_config)
            data_factory.wait_for_completion()
            return data_factory
        else:
            raise e
            
data_factory_compute = get_or_create_data_factory(ws, data_factory_name)

print("setup data factory account complete")

# CLI:
# Create: az ml computetarget setup datafactory -n <name>
# BYOC: az ml computetarget attach datafactory -n <name> -i <resource-id>

In [None]:
copy_input_data_from_adls_to_blob = DataTransferStep(
    name="transfer_adls_to_blob",
    source_data_reference=input_dir_adls_dataref,
    destination_data_reference=input_dir_blob_dataref,
    compute_target=data_factory_compute,
    allow_reuse=False)

print("data transfer step created")

In [None]:
batch_score_step_keras_registered = PythonScriptStep(
    name="batch_scoring",
    script_name="batch_scoring.py",
    arguments=["--input_dir", input_dir_blob_dataref,
               "--output_dir", output_dir_blob_pipedata],
    compute_target=compute_target,
    inputs=[input_dir_blob_dataref],
    outputs=[output_dir_blob_pipedata],
    runconfig=amlcompute_run_config,
    allow_reuse=False
)
print("python script step created")

In [None]:
copy_output_data_from_blob_to_adls = DataTransferStep(
    name="transfer_blob_to_adls",
    source_data_reference=output_dir_blob_pipedata,
    destination_data_reference=output_dir_adls_dataref,
    compute_target=data_factory_compute,
    allow_reuse=False,
    source_reference_type="directory")

print("data transfer step created")

### Run the pipeline
At this point you can run the pipeline and examine the output it produced. 

In [None]:
pipeline = Pipeline(workspace=ws, steps=[copy_input_data_from_adls_to_blob,
                                         batch_score_step_keras_registered,
                                         copy_output_data_from_blob_to_adls])
pipeline_run = Experiment(ws, 'batch_scoring_keras').submit(pipeline, pipeline_params={})

In [None]:
pipeline_run.wait_for_completion(show_output=True)