# Azure ML Pipeline
This notebook demonstrates creation & execution of an Azure ML pipeline designed to query data from an attached Azure SQL Datastore (using dynamic arguments passed as a pipeline parameter), process that data, and then export the result dataset to ADLS Gen 2 and the attached Azure SQL DB.

### Import required packages

In [None]:
from azureml.core import Workspace, Experiment, Datastore, Environment, Dataset
from azureml.core.compute import ComputeTarget, AmlCompute, DataFactoryCompute
from azureml.core.compute_target import ComputeTargetException
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import DEFAULT_CPU_IMAGE
from azureml.pipeline.core import Pipeline, PipelineParameter, PipelineData
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core import PipelineParameter, PipelineData
from azureml.data.output_dataset_config import OutputTabularDatasetConfig, OutputDatasetConfig, OutputFileDatasetConfig
from azureml.data.datapath import DataPath
from azureml.data.data_reference import DataReference
from azureml.data.sql_data_reference import SqlDataReference
from azureml.pipeline.steps import DataTransferStep
import logging

### Connect to Azure ML Workspace, Provision Compute Resources, and get References to Datastores

Connect to workspace using config associated config file. Get a reference to you pre-existing AML compute cluster or provision a new cluster to facilitate processing. To enable data insertion into Azure SQL DB we use a DataTransferStep in our pipeline - this step requires a DataFactoryCompute target which is provisioned below. Finally, get references to your ADLS Gen2, Azure SQL, and default blob datastores.

In [None]:
#Connect to AML Workspace
ws = Workspace.from_config()

#Select AML Compute Cluster
cpu_cluster_name = 'cpucluster'

# Verify that cluster does not exist already
try:
    cpu_cluster = ComputeTarget(workspace=ws, name=cpu_cluster_name)
    print('Found an existing cluster, using it instead.')
except ComputeTargetException:
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D13_V2',
                                                           min_nodes=0,
                                                           max_nodes=10)
    cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, compute_config)
    cpu_cluster.wait_for_completion(show_output=True)
    
#Create Data Factory Compute for DataTransferStep
def get_or_create_data_factory(workspace, factory_name):
    try:
        return DataFactoryCompute(workspace, factory_name)
    except ComputeTargetException as e:
        if 'ComputeTargetNotFound' in e.message:
            print('Data factory not found, creating...')
            provisioning_config = DataFactoryCompute.provisioning_configuration()
            data_factory = ComputeTarget.create(workspace, factory_name, provisioning_config)
            data_factory.wait_for_completion()
            return data_factory
        else:
            raise e
data_factory_name = 'adfcompute'           
data_factory_compute = get_or_create_data_factory(ws, data_factory_name)

#Get Default, Azure SQL, and ADLS Gen2 Datastores
default_ds = ws.get_default_datastore()
azsql_ds = Datastore.get(ws, 'azsql_ds')
adlsgen2_ds = Datastore.get(ws, 'azadlsgen2_ds')

### Create Run Configuration
The `RunConfiguration` defines the environment used across all python steps. You can optionally add additional conda or pip packages to be added to your environment. [More details here](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.conda_dependencies.condadependencies?view=azure-ml-py).
~~~~
run_config.environment.python.conda_dependencies = CondaDependencies.create(conda_packages=['requests'])
run_config.environment.python.conda_dependencies.add_pip_package('azureml-opendatasets')
~~~~


In [None]:
run_config = RunConfiguration()
run_config.environment.docker.enabled = True
run_config.environment.docker.base_image = DEFAULT_CPU_IMAGE

### Define Output Datasets
Below we define the configuration for datasets that will be passed between steps in our pipeline. Note, in all cases we specify the datastore that should hold the datasets and whether they should be registered following step completion or not. This can optionally be disabled by removing the `register_on_complete()` call. We also leverage a `PipelineData` object as an intermediary before saving that data to ADLS Gen2 and Azure SQL DB. PipelineData objects are compatible inputs to a `TransferDataStep` and make exporting data to Azure SQL DB easier - the `SqlDataReference` references the target Azure SQL DB and table we aim to insert data into.

In [None]:
profile_dataset = OutputFileDatasetConfig(name='profile_data', destination=(default_ds, 'profile_data/{run-id}')).read_delimited_files().register_on_complete(name='profile_data')
filter_dataset = OutputFileDatasetConfig(name='filter_data', destination=(default_ds, 'filter_data/{run-id}')).read_delimited_files().register_on_complete(name='filter_data')
processed_dataset_interim = PipelineData('processed_data_interim', datastore=default_ds)
processed_dataset = OutputFileDatasetConfig(name='processed_data', destination=(adlsgen2_ds, '{run-id}')).read_delimited_files().register_on_complete(name='processed_data')
sql_data_ref = SqlDataReference(datastore=azsql_ds, sql_table="Results", data_reference_name='azure_sql_reference')

### Define Pipeline Parameters
`PipelineParameter` objects serve as variable inputs to an Azure ML pipeline and can be specified at runtime. Below we specify a single `filter_param` pipeline parameter which will be used as an argument in a SQL query, and to create a unique filename when writing data to ADLS Gen 2. Multiple pipeline parameters can be created.

In [None]:
filter_param = PipelineParameter(name='filter_parameter', default_value=101)

### Define Pipeline Steps
The pipeline below consists of five steps - two queries to Azure SQL DB, a processing step where ML code should go, an export to ADLS Gen 2, and a data transfer to Azure SQL DB. All of the `PythonScriptStep`s have a corresponding `*.py` file which is referenced in the step arguments. Also, any `PipelineParameter`s define above can be passed to and consumed within these steps. The final `DataTransferStep` is used to move data from blob storage to Azure SQL DB. <b>NOTE:</b> in order for the `DataTransferStep` to work your Azure SQL DB datastore must be connected via a service principal - [see the documentation here for more details](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.datatransferstep?view=azure-ml-py#remarks) and [here for details on configuring service principal authentication](https://docs.microsoft.com/en-us/azure/data-factory/connector-azure-sql-database#service-principal-authentication). 

In [None]:
#SQL query to get 'filter' data. Note: filter_param is passed as an argument here and used to modify the SQL query.
#Output data is collected in the filter_dataset, registered upon step completion, and passed to subsequent steps.
get_filter_data_step = PythonScriptStep(
    name='get-filter-data',
    script_name='get_sql_filter_data.py',
    arguments =['--query_param', filter_param,
               '--filter_dataset', filter_dataset],
    outputs=[filter_dataset],
    compute_target=cpu_cluster,
    source_directory='.',
    allow_reuse=False,
    runconfig=run_config
)

#SQL query to get 'profile' data.
#Data is collected in the profile_dataset, registered upon step completion, and passed to subsequent steps.
get_profile_data_step = PythonScriptStep(
    name='get-profile-data',
    script_name='get_sql_profile_data.py',
    arguments =['--profile_dataset', profile_dataset],
    outputs=[profile_dataset],
    compute_target=cpu_cluster,
    source_directory='.',
    allow_reuse=False,
    runconfig=run_config
)

#This step should contain all of the ML goodness.
#filter_dataset & profile_dataset are passed in and parsed into pandas dataframes.
#Output data is collected in the processed_dataset_interim PipelineData object
#which is subsequently exported to ADLS Gen2 and Azure SQL DB
process_data_step = PythonScriptStep(
    name='process-data',
    script_name='process_data.py',
    arguments =['--processed_dataset_interim', processed_dataset_interim],
    inputs=[filter_dataset.as_input('filter_data'), profile_dataset.as_input('profile_data')],
    outputs=[processed_dataset_interim],
    compute_target=cpu_cluster,
    source_directory='.',
    allow_reuse=False,
    runconfig=run_config
)

#Write processed results to ADLS Gen2.
#processed_dataset_interim is passed in and parsed into a pandas dataframe.
#Output data is collected and registered in the processed_dataset which is written to ADLS Gen2.
#Output data is registered upon step completion.
save_data_adls_step = PythonScriptStep(
    name='save-data-adls',
    script_name='save_data_adls_gen2.py',
    arguments =['--processed_dataset', processed_dataset, '--query_param', filter_param, '--processed_dataset_interim', processed_dataset_interim],
    inputs=[processed_dataset_interim.as_input('processed_data_interim')],
    outputs=[processed_dataset],
    compute_target=cpu_cluster,
    source_directory='.',
    allow_reuse=False,
    runconfig=run_config
)

#Transfer processed results from blob storage to Azure SQL DB.
#processed_data_interim is passed as a reference and inserted into Azure SQL DB.
#Note: data is inserted into the table specified in the SqlDataReference above.
transfer_adlsgen2_to_sql = DataTransferStep(
    name='save-data-sql',
    source_data_reference=processed_dataset_interim,
    destination_data_reference=sql_data_ref,
    compute_target=data_factory_compute)

### Create Pipeline
Create an Azure ML Pipeline by specifying the steps to be executed. Note: based on the dataset dependencies between steps, exection occurs logically such that no step will execute unless all of the necessary input datasets have been generated.

In [None]:
pipeline = Pipeline(workspace=ws, steps=[get_filter_data_step, get_profile_data_step, process_data_step, save_data_adls_step, transfer_adlsgen2_to_sql])

### Create Experiment and Run Pipeline
Define a new experiment (logical container for pipeline runs) and execute the pipeline. You can modify the values of pipeline parameters here when submitting a new run.

In [None]:
experiment = Experiment(ws, 'pipeline-development')
run = experiment.submit(pipeline, pipeline_parameters={'filter_parameter': 102})
run.wait_for_completion(show_output=True)

### Publish Pipeline
Create a published version of your pipeline that can be triggered via a REST API call.

In [None]:
published_pipeline = pipeline.publish(name = 'my_pipeline',
                                     description = 'Sample pipeline that queries Azure SQL DB, processes data, and exports results to ADLS Gen2/Azure SQL DB',
                                     continue_on_step_failure = False)

### Remote Pipeline Trigger
Trigger the published pipeline using a call to the REST API. Note: you must use service principal credentials to authenticate this call. The Client ID, Client Secret, Tenant ID, and pipeline endpoint are stored as environment variables prior to execution.

In [None]:
#Pipeline execution via REST endpoint requires AAD Token (obtained here from service principal)
#Relevant docs:
#https://docs.microsoft.com/en-us/azure/machine-learning/how-to-deploy-pipelines
#https://github.com/Azure/MachineLearningNotebooks/blob/master/how-to-use-azureml/manage-azureml-service/authentication-in-azureml/authentication-in-azureml.ipynb

import requests
import os
from azureml.core.authentication import ServicePrincipalAuthentication

#Service principal creds stored as environment vars
client_id = os.environ.get('client_id')
tenant_id = os.environ.get('tenant_id')
service_principal_password = os.environ.get('service_principal_password')
pipeline_endpoint = os.environ.get('pipeline_endpoint')

#Leverage ADAL library for obtaining token
from adal import AuthenticationContext

client_id = client_id
client_secret = service_principal_password
resource_url = "https://login.microsoftonline.com"
tenant_id = tenant_id
authority = "{}/{}".format(resource_url, tenant_id)

auth_context = AuthenticationContext(authority)
token_response = auth_context.acquire_token_with_client_credentials("https://management.azure.com/", client_id, client_secret)

#Format token response for API request to pipeline
headers = {'Authorization': 'Bearer {}'.format(token_response['accessToken'])}

#Trigger remote pipeline run
#Pipeline endpoint can be obtained from AML portal as well
response = requests.post(pipeline_endpoint,
                         headers=headers,
                         json={"ExperimentName": "REST_Pipeline_Trigger_Test", "ParameterAssignments": {"filter_parameter": 105}})