## Azure Machine Learning - Data Processing Pipeline
This notebook demonstrates creation and execution of an Azure ML pipeline designed to load and process data from an AML-linked blob storage account that has moved from SharePoint via a Logic App.


### Import Required Packages

In [None]:
# Import required packages
from azureml.core import Workspace, Experiment, Datastore, Environment, Dataset
from azureml.core.compute import ComputeTarget, AmlCompute, DataFactoryCompute
from azureml.core.compute_target import ComputeTargetException
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import DEFAULT_CPU_IMAGE
from azureml.pipeline.core import Pipeline, PipelineParameter, PipelineData
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core import PipelineParameter, PipelineData
from azureml.data.output_dataset_config import OutputTabularDatasetConfig, OutputDatasetConfig, OutputFileDatasetConfig
from azureml.data.datapath import DataPath
from azureml.data.data_reference import DataReference
from azureml.data.sql_data_reference import SqlDataReference
from azureml.pipeline.steps import DataTransferStep
import logging

### Connect to Azure ML Workspace, Provision Compute Resources, and get References to Datastores
Connect to workspace using config associated config file. Get a reference to you pre-existing AML compute cluster or provision a new cluster to facilitate processing. Finally, get references to your default blob datastore.

In [None]:
# Connect to AML Workspace
ws = Workspace.from_config()

#Select AML Compute Cluster
cpu_cluster_name = 'cpucluster'

# Verify that cluster does not exist already
try:
    cpu_cluster = ComputeTarget(workspace=ws, name=cpu_cluster_name)
    print('Found an existing cluster, using it instead.')
except ComputeTargetException:
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D3_V2',
                                                           min_nodes=0,
                                                           max_nodes=1)
    cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, compute_config)
    cpu_cluster.wait_for_completion(show_output=True)
    
#Get default datastore
default_ds = ws.get_default_datastore()

### Create Run Configuration
The `RunConfiguration` defines the environment used across all python steps. You can optionally add additional conda or pip packages to be added to your environment. [More details here](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.conda_dependencies.condadependencies?view=azure-ml-py).

Here, we also register the environment to the AML workspace so that it can be used for future retraining and inferencing operations.

In [None]:
run_config = RunConfiguration()
run_config.docker.use_docker = True
run_config.environment = Environment(name='sample_env')
run_config.environment.docker.base_image = DEFAULT_CPU_IMAGE
run_config.environment.python.conda_dependencies = CondaDependencies.create()
run_config.environment.python.conda_dependencies.set_pip_requirements([
    'requests',
    'scikit-learn==0.22.1',
    'pandas',
    'joblib',
    'openpyxl==3.0.9',
    'azureml-core',
    'azureml-automl-core',
    'azureml-train-automl',
    'azureml-train-core',
    'azureml-automl-runtime',
    'azureml-train-automl-client',
    'azureml-train-automl-runtime'
])
# run_config.environment.python.conda_dependencies.set_python_version('3.8.10')

#Register environment for reuse 
run_config.environment.register(ws)


### Define Output Datasets
Below we define the configuration for datasets that will be passed between steps in our pipeline. Note, in all cases we specify the datastore that should hold the datasets and whether they should be registered following step completion or not. This can optionally be disabled by removing the register_on_complete() call.

In [None]:
from azureml.data import DataType
column_dictionary = {
    'WeekStarting':DataType.to_datetime(),
    'Store':DataType.to_string(),
    'Brand':DataType.to_string(),
    'Quantity':DataType.to_long(),
    'Advert':DataType.to_bool(),
    'Price':DataType.to_float(),
    'Revenue':DataType.to_float()
}

input_data = OutputFileDatasetConfig(name='Input_Data', destination=(default_ds, 'input_data/{run-id}')).read_delimited_files(set_column_types=column_dictionary).register_on_complete(name='Input_Data')
training_data = OutputFileDatasetConfig(name='Training_Data', destination=(default_ds, 'training_data/{run-id}')).read_delimited_files(set_column_types=column_dictionary).register_on_complete(name='Training_Data')
forecasting_data = OutputFileDatasetConfig(name='Forecasting_Data', destination=(default_ds, 'forecasting_data/{run-id}')).read_delimited_files(set_column_types=column_dictionary).register_on_complete(name='Forecasting_Data')
forecast_results_data = OutputFileDatasetConfig(name='Forecast_Results_Data', destination=(default_ds, 'forecast_results_data/{run-id}')).read_delimited_files(set_column_types=column_dictionary).register_on_complete(name='Forecast_Results_Data')


### Define Pipeline Parameters
`PipelineParameter` objects serve as variable inputs to an Azure ML pipeline and can be specified at runtime. Below we specify the path to the data added to the default datastore via a Logic App.

In [None]:
file_path = PipelineParameter(name='file_path', default_value='sharepoint_data/OJData.csv')
model_name = PipelineParameter(name='model_name', default_value='FORECASTING_MODEL')
target_column = PipelineParameter(name='target_column', default_value='Quantity')
timestamp_column = PipelineParameter(name='timestamp_column', default_value='WeekStarting')

### Define Pipeline Steps
The pipeline below consists of a single step which represents how to load data from the default datastore, register as an input dataset, apply some transformations, then register as an output dataset. 

In [None]:
# Get and process raw data from AML-linked datastore
# Register tabular dataset after retrieval
get_and_split_step = PythonScriptStep(
    name='Get Data and Split',
    script_name='get_and_split_data.py',
    arguments =['--input_data', input_data,
                '--training_data', training_data,
                '--forecasting_data', forecasting_data,
                '--timestamp_column', timestamp_column,
                '--file_path', file_path],
    outputs=[training_data, forecasting_data],
    compute_target=cpu_cluster,
    source_directory='./pipeline_step_scripts',
    allow_reuse=True,
    runconfig=run_config
)

from azureml.train.automl import AutoMLConfig
from azureml.pipeline.steps import AutoMLStep
automl_settings = {
    "iteration_timeout_minutes" : 90,
    "iterations" : 5,
    "experiment_timeout_hours" : 6,
    "primary_metric" : 'normalized_root_mean_squared_error',
}

automl_config = AutoMLConfig(task = 'forecasting',
                             path = '.',
                             debug_log = 'automated_ml_errors.log',
                             compute_target = cpu_cluster,
                             run_configuration = run_config,
                             featurization = 'auto',
                             training_data = training_data,
                             label_column_name = 'Quantity',
                             max_cores_per_iteration = -1,
                             max_concurrent_iterations = 3,
                             time_column_name = 'WeekStarting',
                             max_horizon = 10,
                             **automl_settings)

train_model_step = AutoMLStep(name='Train Forecasting Model (AutoML)',
    automl_config=automl_config,
    passthru_automl_config=False,
    enable_default_model_output=False,
    enable_default_metrics_output=False,
    allow_reuse=True)

train_model_step.run_after(get_and_split_step)

generate_forecast_step = PythonScriptStep(
    name='Register Model and Generate Forecast',
    script_name='generate_forecast.py',
    arguments =['--model_name', model_name,
                '--target_column', target_column,
                '--forecast_results_data', forecast_results_data],
    inputs=[forecasting_data.as_input(name='Forecast_Data')],
    outputs=[forecast_results_data],
    compute_target=cpu_cluster,
    source_directory='./pipeline_step_scripts',
    allow_reuse=False,
    runconfig=run_config
)

generate_forecast_step.run_after(train_model_step)

### Create Pipeline
Create an Azure ML Pipeline by specifying the steps to be executed. 

In [None]:
pipeline = Pipeline(workspace=ws, steps=[get_and_split_step, train_model_step, generate_forecast_step])

### Optional: Trigger a Pipeline Execution from the Notebook
You can create an Experiment (logical collection for runs) and submit a pipeline run directly from this notebook by running the commands below

In [None]:
# experiment = Experiment(ws, 'sample-pipeline-run')
# run = experiment.submit(pipeline)
# run.wait_for_completion(show_output=True)

### Create a Published PipelineEndpoint
Once we have created our pipeline we will look to retrain our model periodically as new data becomes available. By publishing our pipeline to a `PipelineEndpoint` we can iterate on our pipeline definition but maintain a consistent REST API endpoint. 

In [None]:
from azureml.pipeline.core import PipelineEndpoint

def published_pipeline_to_pipeline_endpoint(
    workspace,
    published_pipeline,
    pipeline_endpoint_name,
    pipeline_endpoint_description="Endpoint to my pipeline",
):
    try:
        pipeline_endpoint = PipelineEndpoint.get(
            workspace=workspace, name=pipeline_endpoint_name
        )
        print("using existing PipelineEndpoint...")
        pipeline_endpoint.add_default(published_pipeline)
    except Exception as ex:
        print("PipelineEndpoint does not exist, creating one for you...")
        pipeline_endpoint = PipelineEndpoint.publish(
            workspace=workspace,
            name=pipeline_endpoint_name,
            pipeline=published_pipeline,
            description=pipeline_endpoint_description
        )


pipeline_endpoint_name = 'AML Pipeline Endpoint'
pipeline_endpoint_description = 'Sample pipeline for loading data from an AML datastore, processing, and storing results'

published_pipeline = pipeline.publish(name=pipeline_endpoint_name,
                                     description=pipeline_endpoint_description,
                                     continue_on_step_failure=False)

published_pipeline_to_pipeline_endpoint(
    workspace=ws,
    published_pipeline=published_pipeline,
    pipeline_endpoint_name=pipeline_endpoint_name,
    pipeline_endpoint_description=pipeline_endpoint_description
)

### Sample Pipeline Trigger Request (REST API)
You can trigger your published pipeline remotely by making an authenticated call the PipelineEndpoint's REST API. The sample request code below requires creation of a service principal and assignment of that SP to your AML workspace as a Contributor [more details here](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-setup-authentication). When triggering a pipeline using a REST API call you are required to provide an Experiment Name and can optionally updated the default pipeline parameter values

In [None]:
import requests
import os
from azureml.core.authentication import ServicePrincipalAuthentication

#Service principal creds stored as environment vars
client_id = os.environ.get('client_id')
tenant_id = os.environ.get('tenant_id')
service_principal_password = os.environ.get('service_principal_password')
pipeline_endpoint = os.environ.get('pipeline_endpoint')

#Leverage ADAL library for obtaining token
from adal import AuthenticationContext

client_id = client_id
client_secret = service_principal_password
resource_url = "https://login.microsoftonline.com"
tenant_id = tenant_id
authority = "{}/{}".format(resource_url, tenant_id)

auth_context = AuthenticationContext(authority)
token_response = auth_context.acquire_token_with_client_credentials("https://management.azure.com/", client_id, client_secret)

#Format token response for API request to pipeline
headers = {'Authorization': 'Bearer {}'.format(token_response['accessToken'])}

#Trigger remote pipeline run
#Pipeline endpoint can be obtained from AML portal as well
response = requests.post(pipeline_endpoint,
                         headers=headers,
                         json={"ExperimentName": "REST_Pipeline_Trigger_Test", "ParameterAssignments": {"testing_size": 0.3}})