# 02. Azure ML Pipeline Creation
This notebook demonstrates creation of an Azure ML pipeline which is designed to receive two datasets as inputs (both tabular data), apply some transformations, and export the end results to a text file in an Azure ML-linked datastore. The custom python code within this single python step can be adapted to support numerous operations including model scoring.

### Import required packages

In [None]:
from azureml.core import Workspace, Experiment, Datastore, Environment, Dataset, Model
from azureml.core.compute import ComputeTarget, AmlCompute, DataFactoryCompute, DatabricksCompute
from azureml.core.compute_target import ComputeTargetException
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import DEFAULT_CPU_IMAGE
from azureml.pipeline.core import Pipeline, PipelineParameter, PipelineData, PipelineDataset
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core import PipelineParameter, PipelineData
from azureml.data.output_dataset_config import OutputTabularDatasetConfig, OutputDatasetConfig, OutputFileDatasetConfig
from azureml.data.datapath import DataPath
from azureml.data.data_reference import DataReference
from azureml.data.sql_data_reference import SqlDataReference
from azureml.pipeline.steps import DataTransferStep
from azureml.pipeline.steps import DatabricksStep
from datetime import datetime
import pandas as pd

import warnings
warnings.filterwarnings('ignore')

### Connect to AML workspace and get references to training cluster and default datastore

In [None]:
# Connect to AML Workspace
ws = Workspace.from_config()

compute_target  = ComputeTarget(workspace=ws, name='cpucluster')

#Get default datastore
default_ds = ws.get_default_datastore()

### Create RunConfiguration for pipeline execution
The `RunConfiguration` is essentially the environment in which a pipeline script will execute. Here we are using a prepared Azure ML environment though custom environments can be configured

In [None]:
run_config = RunConfiguration()
run_config.docker.use_docker = True
run_config.environment = Environment.get(ws, name='AzureML-sklearn-1.0-ubuntu20.04-py38-cpu', version="1")

### Define Input and Output Datasets
Retrieve references to two registered datasets to be consumed as inputs by the pipeline, and configure an `OutputFileDatasetConfig` to point to a location in blob storage where the pipeline output will be written. 

In [None]:
scored_data = OutputFileDatasetConfig(name='Scored_Data', destination=(default_ds, 'scored_data'))
dataset_one = Dataset.get_by_name(ws, 'Dataset_One')
dataset_two = Dataset.get_by_name(ws, 'Dataset_Two')

### Define Pipeline Parameters
Pipeline parameters are dynamic arguments which can be passed to a pipeline at runtime. Here we are passing a single argument, `filename` which represents the name of the file that should be written to blob storage. To ensure files are not overwritten these files are named with a 'timestamp.csv' notation.

In [None]:
filename = PipelineParameter(name='filename', default_value='20220506000001.csv')

### Define Pipeline Steps
Pipeline steps are module steps designed to be executed in sequence to perform a machine learning operation. Here we have a single `PythonScriptStep` which simply executes a python script that contains logic for accepting two input datasets, performing some transformations, and exporting the results into a CSV file. This python file is located at `./pipeline_step_scripts/score_data.py`.

In [None]:
score_data_step = PythonScriptStep(
    name='Score Data',
    script_name='score_data.py',
    arguments=['--scored_data', scored_data, '--filename', filename],
    inputs=[dataset_one.as_named_input('Dataset_One'), dataset_two.as_named_input('Dataset_Two')],
    outputs=[scored_data],
    compute_target=compute_target,
    source_directory='./pipeline_step_scripts',
    allow_reuse=False,
    runconfig=run_config
)


### Define Pipeline
Azure ML Pipelines are series of steps designed to be executed in sequence. The syntax for constructing individual steps into a unified pipeline can be seen below.

In [None]:
pipeline = Pipeline(workspace=ws, steps=[score_data_step])

### Create Published PipelineEndpoint
`PipelineEndpoints` can be used to create a versions of published pipelines while maintaining a consistent endpoint. These endpoint URLs can be triggered remotely by submitting an authenticated request and updates to the underlying pipeline are tracked in the AML workspace.

In [None]:
from azureml.pipeline.core import PipelineEndpoint

def published_pipeline_to_pipeline_endpoint(
    workspace,
    published_pipeline,
    pipeline_endpoint_name,
    pipeline_endpoint_description="Endpoint to my pipeline",
):
    try:
        pipeline_endpoint = PipelineEndpoint.get(
            workspace=workspace, name=pipeline_endpoint_name
        )
        print("using existing PipelineEndpoint...")
        pipeline_endpoint.add_default(published_pipeline)
    except Exception as ex:
        print(ex)
        # create PipelineEndpoint if it doesn't exist
        print("PipelineEndpoint does not exist, creating one for you...")
        pipeline_endpoint = PipelineEndpoint.publish(
            workspace=workspace,
            name=pipeline_endpoint_name,
            pipeline=published_pipeline,
            description=pipeline_endpoint_description
        )


pipeline_endpoint_name = 'Sample Scoring Pipeline'
pipeline_endpoint_description = ''

published_pipeline = pipeline.publish(name=pipeline_endpoint_name,
                                     description=pipeline_endpoint_description,
                                     continue_on_step_failure=False)

published_pipeline_to_pipeline_endpoint(
    workspace=ws,
    published_pipeline=published_pipeline,
    pipeline_endpoint_name=pipeline_endpoint_name,
    pipeline_endpoint_description=pipeline_endpoint_description
)