## Azure ML Pipeline - Parameterized Input Dataset
This notebook demonstrates creation & execution of an Azure ML pipeline designed to accept a parameterized input reflecting the location of a file in the Azure ML default datastore to be initially registered as a tabular dataset and subsequently processed. This notebook was built as part of a larger solution where files were moved from a blob storage container to the default AML datastore via Azure Data Factory.

### Import Required Packages

In [None]:
from azureml.core import Workspace, Experiment, Datastore, Environment, Dataset
from azureml.core.compute import ComputeTarget, AmlCompute, DataFactoryCompute
from azureml.core.compute_target import ComputeTargetException
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import DEFAULT_CPU_IMAGE
from azureml.pipeline.core import Pipeline, PipelineParameter, PipelineData
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core import PipelineParameter, PipelineData
from azureml.data.output_dataset_config import OutputTabularDatasetConfig, OutputDatasetConfig, OutputFileDatasetConfig
from azureml.data.datapath import DataPath
from azureml.data.data_reference import DataReference
from azureml.data.sql_data_reference import SqlDataReference
from azureml.pipeline.steps import DataTransferStep
import logging

### Connect to Azure ML Workspace, Provision Compute Resources, and get References to Datastores
Connect to workspace using config associated config file. Get a reference to you pre-existing AML compute cluster or provision a new cluster to facilitate processing. Finally, get references to your default blob datastore.

In [None]:
#Connect to AML Workspace
ws = Workspace.from_config()

#Select AML Compute Cluster
cpu_cluster_name = 'cpucluster'

# Verify that cluster does not exist already
try:
    cpu_cluster = ComputeTarget(workspace=ws, name=cpu_cluster_name)
    print('Found an existing cluster, using it instead.')
except ComputeTargetException:
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D13_V2',
                                                           min_nodes=0,
                                                           max_nodes=10)
    cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, compute_config)
    cpu_cluster.wait_for_completion(show_output=True)
    
#Get default datastore
default_ds = ws.get_default_datastore()

 ### Create Run Configuration
The `RunConfiguration` defines the environment used across all python steps. You can optionally add additional conda or pip packages to be added to your environment. [More details here](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.conda_dependencies.condadependencies?view=azure-ml-py).
~~~~
run_config.environment.python.conda_dependencies = CondaDependencies.create(conda_packages=['requests'])
run_config.environment.python.conda_dependencies.add_pip_package('azureml-opendatasets')
~~~~

In [None]:
run_config = RunConfiguration()
run_config.environment.docker.enabled = True
run_config.environment.docker.base_image = DEFAULT_CPU_IMAGE

### Define Output Datasets
Below we define the configuration for datasets that will be passed between steps in our pipeline. Note, in all cases we specify the datastore that should hold the datasets and whether they should be registered following step completion or not. This can optionally be disabled by removing the register_on_complete() call. `upload_file_dataset` is intended to hold the data within an uploaded CSV file and `processed_dataset` will contain our uploaded data post-processing.

In [None]:
uploaded_file_dataset = OutputFileDatasetConfig(name='uploaded_file_data', destination=(default_ds, 'uploaded_file_data/{run-id}')).read_delimited_files().register_on_complete(name='uploaded_file_data')
processed_dataset = OutputFileDatasetConfig(name='processed_data', destination=(default_ds, 'processed_data/{run-id}')).read_delimited_files().register_on_complete(name='processed_data')

### Define Pipeline Parameters
PipelineParameter objects serve as variable inputs to an Azure ML pipeline and can be specified at runtime. Below we specify a pipeline parameter object `uploaded_file_path_param` which will be used to define the locations of uploaded data inside the default Azure ML Blob datastore. Multiple pipeline parameters can be created and used.

In [None]:
uploaded_file_path_param = PipelineParameter(name='uploaded_file_path_param', default_value='adf_uploads/<YOUR-FILE-NAME>')

### Define Pipeline Steps
The pipeline below consists of two steps - one step to gather and register the uploaded file in the AML datastore, and a secondary step to consume and process this registered dataset. Also, any PipelineParameters defined above can be passed to and consumed within these steps.

In [None]:
#Get CSV file uploaded to AML datastore via ADF
#Register tabular dataset after retrieval
register_data_step = PythonScriptStep(
    name='register-uploaded-file-data',
    script_name='register_file_data.py',
    arguments =['--uploaded_file_path_param', uploaded_file_path_param,
               '--uploaded_file_dataset', uploaded_file_dataset],
    outputs=[uploaded_file_dataset],
    compute_target=cpu_cluster,
    source_directory='.',
    allow_reuse=False,
    runconfig=run_config
)

#Process uploaded data - uploaded_file_dataset is passed as an input
#and will be subsequently processed and registered upon completion of the step
process_data_step = PythonScriptStep(
    name='process-uploaded-file-data',
    script_name='process_file_data.py',
    arguments =['--processed_dataset', processed_dataset],
    inputs=[uploaded_file_dataset.as_input(name='uploaded_data')],
    outputs=[processed_dataset],
    compute_target=cpu_cluster,
    source_directory='.',
    allow_reuse=False,
    runconfig=run_config
)


### Create Pipeline
Create an Azure ML Pipeline by specifying the steps to be executed. Note: based on the dataset dependencies between steps, exection occurs logically such that no step will execute unless all of the necessary input datasets have been generated.

In [None]:
pipeline = Pipeline(workspace=ws, steps=[register_data_step, process_data_step])

### Create Experiment and Run Pipeline
Define a new experiment (logical container for pipeline runs) and execute the pipeline. You can modify the values of pipeline parameters here when submitting a new run.

In [None]:
experiment = Experiment(ws, 'sample-pipeline-development')
run = experiment.submit(pipeline)
run.wait_for_completion(show_output=True)

### Publish Pipeline
Create a published version of your pipeline that can be triggered via a REST API call.

In [None]:
published_pipeline = pipeline.publish(name = 'sample_aml_pipeline',
                                     description = 'Sample pipeline that registers a dataset uploaded manually.',
                                     continue_on_step_failure = False)