## Azure ML Pipeline - Parallel Processing for Fuzzy Matching
This notebook demonstrates creation & execution of an Azure ML pipeline designed to read data from an attached Azure Blob Datastore (using dynamic arguments passed as a pipeline parameter), process that data, and then export the result dataset back to blob storage in both Excel and CSV format.

### Import Required Packages

In [None]:
from azureml.core import Workspace, Experiment, Datastore, Environment, Dataset
from azureml.core.compute import ComputeTarget, AmlCompute, DataFactoryCompute
from azureml.core.compute_target import ComputeTargetException
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import DEFAULT_CPU_IMAGE
from azureml.pipeline.core import Pipeline, PipelineParameter, PipelineData
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core import PipelineParameter, PipelineData
from azureml.data.output_dataset_config import OutputTabularDatasetConfig, OutputDatasetConfig, OutputFileDatasetConfig
from azureml.data.datapath import DataPath
from azureml.data.data_reference import DataReference
from azureml.data.sql_data_reference import SqlDataReference
from azureml.pipeline.steps import DataTransferStep
from azureml.pipeline.steps import ParallelRunConfig, ParallelRunStep
import logging

### Connect to Azure ML Workspace, Provision Compute Resources, and get References to Datastore

Connect to workspace using config associated config file. Get a reference to you pre-existing AML compute cluster or provision a new cluster to facilitate processing.

In [None]:
ws = Workspace.from_config()

#Select AML Compute Cluster
cpu_cluster_name = 'cpucluster'

# Verify that cluster does not exist already
try:
    cpu_cluster = ComputeTarget(workspace=ws, name=cpu_cluster_name)
    print('Found an existing cluster, using it instead.')
except ComputeTargetException:
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D13_V2',
                                                           min_nodes=0,
                                                           max_nodes=10)
    cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, compute_config)
    cpu_cluster.wait_for_completion(show_output=True)
    

#Get Default Datastore
default_ds = ws.get_default_datastore()

### Create Run Configuration
The `RunConfiguration` defines the environment used across all python steps. You can optionally add additional conda or pip packages to be added to your environment. [More details here](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.conda_dependencies.condadependencies?view=azure-ml-py).
~~~~
run_config.environment.python.conda_dependencies = CondaDependencies.create(conda_packages=['requests'])
run_config.environment.python.conda_dependencies.add_pip_package('azureml-opendatasets')
~~~~

In [None]:
run_config = RunConfiguration()
run_config.environment.docker.enabled = True
run_config.environment.docker.base_image = DEFAULT_CPU_IMAGE
run_config.environment.python.conda_dependencies.add_pip_package('openpyxl==3.0.7')

### Define Output Datasets
Below we define the configuration for datasets that will be passed between steps in our pipeline. Note, in all cases we specify the datastore that should hold the datasets and whether they should be registered following step completion or not. This can optionally be disabled by removing the `register_on_complete()` call. We also leverage a `PipelineData` object as an intermediary before saving data to well-formatted CSV/Excel files.

In [None]:
excel_dataset = OutputFileDatasetConfig(name='excel_data', destination=(default_ds, 'excel_data/{run-id}')).register_on_complete(name='excel_data')
sql_dataset = OutputFileDatasetConfig(name='sql_data', destination=(default_ds, 'sql_data/{run-id}')).read_delimited_files().register_on_complete(name='sql_data')
processed_dataset_tabular = OutputFileDatasetConfig(name='processed_data_tabular', destination=(default_ds, 'processed_data_tabular/{run-id}')).read_delimited_files().register_on_complete(name='processed_data_tabular')
processed_dataset_file = OutputFileDatasetConfig(name='processed_data_file', destination=(default_ds, 'processed_data_file/{run-id}')).register_on_complete(name='processed_data_file')
processed_dataset_pipeline_data = PipelineData(name='processed_data', datastore=default_ds)

### Define Pipeline Parameters
`PipelineParameter` objects serve as variable inputs to an Azure ML pipeline and can be specified at runtime. Below we specify a two pipeline parameter objects `excel_path_param` and `sql_path_param` which will be used to define the locations of target data inside the default Azure ML Blob datastore, respectively. Multiple pipeline parameters can be created.

In [None]:
excel_path_param = PipelineParameter(name='excel_path_parameter', default_value='lalanding/<SAMPLE_RUN_ID>/')
sql_path_param = PipelineParameter(name='sql_path_parameter', default_value='adflanding/<SAMPLE_RUN_ID>')
min_year_param = PipelineParameter(name='min_year', default_value=2019)
max_year_param = PipelineParameter(name='max_year', default_value=2021)

### Define Pipeline Steps
The pipeline below consists of four steps - two steps to gather and register Excel/SQL data, a processing step where fuzz matching code should go, and a data organization/export step. All of the `PythonScriptStep`s have a corresponding `*.py` file which is referenced in the step arguments. Also, any `PipelineParameter`s defined above can be passed to and consumed within these steps.

In [None]:
#Get Excel file specified by the excel_path_param
#Read individual sheets of data and save as separate CSVs in the excel_dataset location
#Register data upon completion
register_excel_data_step = PythonScriptStep(
    name='register-excel-data',
    script_name='register_excel_data.py',
    arguments =['--excel_path_param', excel_path_param,
               '--excel_dataset', excel_dataset],
    outputs=[excel_dataset],
    compute_target=cpu_cluster,
    source_directory='.',
    allow_reuse=False,
    runconfig=run_config
)

#Get CSV file containing queried SQL data
#Register tabular dataset after retrieval from Blob Storage
register_sql_data_step = PythonScriptStep(
    name='register-sql-data',
    script_name='register_sql_data.py',
    arguments =['--sql_path_param', sql_path_param,
               '--sql_dataset', sql_dataset],
    outputs=[sql_dataset],
    compute_target=cpu_cluster,
    source_directory='.',
    allow_reuse=False,
    runconfig=run_config
)


#Process Data
#Parallel step to perform fuzzy matching
#All results are appended as individual rows to a table that will be converted to a Pandas
#dataframe and exported to CSV/Excel in the final step (organize_results_step)
#Results are captured in a PipelineData object (processed_dataset_pipeline_data) passed to the final step

#The settings below define the number of processes that will run per node, 
#and the number of nodes available for processing.
#Ting - adjust these settings upwards to reduce processing time
processes_per_node = 8
node_count = 1
timeout = 180

parallel_run_config = ParallelRunConfig(
    source_directory='.',
    entry_script='process_data.py',
    mini_batch_size="1",
    run_invocation_timeout=timeout,
    error_threshold=10,
    output_action="append_row",
    environment=run_config.environment,
    process_count_per_node=processes_per_node,
    compute_target=cpu_cluster,
    node_count=node_count)

parallel_fuzzy_matching_step = ParallelRunStep(
    name="parallel-fuzzy-matching-step",
    parallel_run_config=parallel_run_config,
    inputs=[excel_dataset.as_input(name='excel_data')],
    side_inputs=[sql_dataset.as_input(name='sql_data')],
    output=processed_dataset_pipeline_data,
    allow_reuse=False
)

#Final step where fuzzy matching results are organized into a well-formatted dataframe
#and exported both to CSV (registered as a Tabular dataset in the AML workspace)
#and saved to Excel (to be consumed by Ting/internal clients)
organize_results_step = PythonScriptStep(
    name='organize_results_step',
    script_name='organize_results.py',
    arguments =['--processed_dataset_tabular', processed_dataset_tabular,
               '--processed_dataset_file', processed_dataset_file,
               '--processed_dataset', processed_dataset_pipeline_data],
    inputs=[processed_dataset_pipeline_data],
    outputs=[processed_dataset_tabular, processed_dataset_file],
    compute_target=cpu_cluster,
    source_directory='.',
    allow_reuse=False,
    runconfig=run_config
)


### Create Pipeline
Create an Azure ML Pipeline by specifying the steps to be executed. Note: based on the dataset dependencies between steps, exection occurs logically such that no step will execute unless all of the necessary input datasets have been generated.

In [None]:
pipeline = Pipeline(workspace=ws, steps=[register_excel_data_step, register_sql_data_step, parallel_fuzzy_matching_step, organize_results_step])

### Create Experiment and Run Pipeline
Define a new experiment (logical container for pipeline runs) and execute the pipeline. You can modify the values of pipeline parameters here when submitting a new run.

In [None]:
experiment = Experiment(ws, 'pipeline-development')
run = experiment.submit(pipeline)
run.wait_for_completion(show_output=True)

### Publish Pipeline
Create a published version of your pipeline that can be triggered via a REST API call.

In [None]:
published_pipeline = pipeline.publish(name = 'fuzzy_matching_aml_pipeline',
                                     description = 'Sample pipeline that registers excel/sql datasets',
                                     continue_on_step_failure = False)