# Orchestrating machine learning with pipelines

In Azure Machine Learning, you run workloads as experiments that leverage data assets and compute resources. In an enterprise data science process, you'll generally want to separate the overall process into individual tasks, and orchestrate these tasks as *pipelines* of connected steps.

## Learning objectives
In this module, you'll learn how to:
- Create an Azure Machine Learning pipeline.
  - Define steps
  - define inputs and outputs
  - deal with reusability
- Publish an Azure Machine Learning pipeline.
- Schedule an Azure Machine Learning pipeline

## Pipeline Introduction
A *pipeline* is a workflow of machine learning tasks in which each task is implemented as a step.

Steps can be arranged sequentially or in parallel, enabling you to build sophisticated flow logic to orchestrate machine learning operations. Each step can be run on a specific compute target, making it possible to combine different types of processing as required to achieve an overall goal. 

You can publish a pipeline as a REST endpoint, enabling client applications to initiate a pipeline run. You can also define a schedule for a pipeline, and have it run automatically at periodic intervals.

## Pipeline steps
An Azure Machine Learning pipeline consists of one or more *steps* that perform tasks. 
### Types of steps
Common kinds of steps in an Azure Machine Learning pipeline include:
- `PythonScriptStep`: Runs a specified Python script
- `EstimatorStep`: Runs an estimator
- `DataTransferStep`: Uses Azure Data Factory to copy data between data stores.
- `DatabricksStep`: Runs a notebook, script, or compiled JAR on a databricks cluster
- `AdlaStep`: Runs a U-SQL job in Azure Data Lake Analytics.

`Note`: For a full list see [azure.pipeline.steps package documentation](https://aka.ms/AA70rrh)

## Defining steps in a pipeline
To create a pipeline, you must first define each step and then create a pipeline that includes the steps. See example below to see how the specific configuration of each step can vary by the step type.

In [None]:
from azureml.pipeline.steps import PythonScriptStep, EstimatorStep

# Step to run a Python script
step1 = PythonScriptStep(name = 'prepare data',
                         source_directory = 'scripts',
                         script_name = 'data_prep.py',
                         compute_target = 'aml-cluster',
                         runconfig=run_config)

# Step to run an estimator
step2 = EstimatorStep(name = 'train model',
                      estimator = sk_estimator,
                      compute_target = 'aml-cluster')

After definine the steps, you can assign them to a pipeline and run it as an experiment:

In [None]:
from azureml.pipeline.core import Pipeline
from azureml.core import Experiment

# Construct the pipeline
train_pipeline = Pipeline(workspace = ws, steps = [step1, step2])

# Create an experiment and run the pipeline
experiment = Experiment(workspace = ws, name = 'training-pipeline')
pipeline_run=experiment.submit(train_pipeline)

## Pass data between pipeline steps
Oten a pipeline line includes at least one step that depends on the output of a preceding step.

### THe PipelineData object
The `PipelineData` object is a special kind of `DataReference` that:
- References a location in a datastore.
- Creates a data dependency between pipeline steps.
You can view a `PipelineData` object as an intermediary store for data that must be passed from a step to a subsequent step.

<img src =https://docs.microsoft.com/en-us/learn/wwl-data-ai/create-pipelines-in-aml/media/06-01-pipelinedata.jpg>

### PipelineData step inputs and outputs
To use a `PipelineData` object to pass data between steps, you must:
1. Define a named `PipelineData` object that references a location in a datastore.
2. Specify the `PipelineData` object as an *input* or *output* for the steps that use it
3. Pass the `PipelineData` object as a script parameter in steps that run scripts (and include code in those scripts to read or write data)

For example, the following code defines a `PipelineData` object that for the preprocessed data that must be passed between the steps.

In [None]:
from azureml.pipeline.core import PipelineData
from azureml.pipeline.steps import PythonScriptStep, EstimatorStep

# Get a dataset for the initial data
raw_ds = Dataset.get_by_name(ws, 'raw_dataset')

# Define a PipelineData object to pass data between steps
data_store = ws.get_default_datastore()
prepped_data=PipelineData('prepped', datastore=data_store)

#step to run a Python script
step1 = PythonScriptStep(name = 'prepare data',
                         source_directory= 'scripts',
                         compute_target = 'aml-cluster',
                         runconfig = run_config,
                         # Specify Dataset as initial input
                         inputs = [raw_ds.as_named_input('raw_data')],
                         # Specify PipelineData as output
                         outputs = [prepped_data],
                         # Also pass as data reference to script
                         arguments = ['--folder', prepped_data])

# Step to run as estimator
step2 = EstimatorStep(name = 'train model', 
                      estimator = sk_estimator,
                      compute_target = 'aml-cluster',
                      #Specify PipelineData as input
                      inputs = [prepped_data],
                      # Pass as data reference to estimator script
                      estimator_entry_script_arguments=['--folder', prepped_data])

In the scripts themselves, you can obtain a reference to the `PipelineData` object from the scrip argument and use it like a local folder.

In [None]:
from azureml.core import Run
import argparse
import os

#Get the experiment run context
run = Run.get_context()

# Get input dataset as dataframe
raw_df = run.input_datasets['raw_data'].to_pandas_dataframe()

# Get PipelineData argument
parser = argparse.ArgumentParser()
parser.addargument('--folder', type=str, dest='folder')
args = parser.parse_args()
output_folder = args.folder

#code to prep data (in this case, just select specific columns)
prepped_df = raw_df[['col1', 'col2', 'col3']]

# Save prepped data to the PipelineData location
os.makedirs(output_folder, exist_ok=True)
output_path = os.path.join(output_folder, 'prepped_data.csv')
prepped_df.to_csv(output_path)

## Reuse pipeline steps
Pipelines with multiple long-running steps can take a significant time to complete. Azure Machine Learning includes some caching and reuse features to reduce this time.

### Managing step output reuse
By default, the step output from a previous pipeline run is reused without rerunning th estep provided the script, source directory, and othe rparameters for th estep have not changed. This can lead to stale results when changes to downstream data sources have not been accounted for.

To controle for this you can set the `allow_reuse` parameter in the step configuration like this:

In [None]:
step1 = PythonScriptStep(name = 'prepare data',
                         source_directory = 'scripts',
                         script_name = 'data_prep.py',
                         compute_target = 'aml-cluster',
                         runconfig = run_config,
                         inputs=[raw_ds.as_named_input('raw_data')],
                         outputs=[prepped_data],
                         arguments = ['--folder', prepped_data]),
                         # Disable step reuse
                         allow_reuse = False)

When you have multiple steps, you can force all of them to run regardless of individual reuse configuration by setting the `regenerate_outputs` parameter when submitting the pipeline experiment

In [None]:
pipeline_run = experiment.submit(train_pipeline, regenerate_outputs=True)

## Pulbish Pipelines
After you have created a pipeline, you can publish it to create a REST endpoint through which the pipeline can be run on demand.
### Publishing a pipeline
To publish a pipeline, you can call its `publish` method, as shown here:

In [None]:
published_pipeline = pipeline.publish(name='training_pipeline',
                                          description='Model training pipeline',
                                          version='1.0')

Alternatively, you can call the `publish` method on a successfull run of the pipeline

In [None]:
# Get the most recent run of the pipeline
pipeline_experiment = ws.experiments.get('training-pipeline')
run = list(pipeline_experiment.get_runs())[0]

# Publish the pipeline from the run
published_pipeline = run.publish_pipeline(name='training_pipeline',
                                          description='Model training pipeline',
                                          version='1.0')

After the pipeline has been published, you can view it in Azure Machine Learning Studio. You can also determine the URI of its endpoint like this:

In [None]:
rest_endpoint = published_pipeline.endpoint
print(rest_endpoint)

### Using a Published pipeline
To initiate a published enpoint, you make an HTTP request to its REST endpoint, passing an authorization header with a token for a service principal with permission to run the pipelin, and a JSON payload specifying the xperiment name. The pipeline is run asynchronously, so th eresponse from a successful REST call includes the run ID. You can use this to track the run in Azure Machine Learning studio.

For example, the following Python code makes a REST request to run a pipeline and displays the returned run ID.

In [None]:
import requests

response = requests.post(rest_endpoint,
                         headers=auth_header,
                         json={"ExperimentName": "run_training_pipeline"})
run_id = response.json()["Id"]
print(run_id)

## Use pipeline parameters
you can increase the flexibility of a pipeline by defining parameters.

### Defining parameters for a pipeline

To define parameters for a pipeline, create a `PipelineParameter` object for each parameter, and specify each parameter in at least one step.

For example, regularization rate:

In [None]:
from azureml.pipeline.core.graph import PipelineParameter

reg_param = PipelineParameter(name='reg_rate', default_value=0.01)

...

step2 = EstimatorStep(name = 'train model',
                      estimator = sk_estimator,
                      compute_target = 'aml-cluster',
                      inputs=[prepped],
                      estimator_entry_script_arguments=['--folder', prepped,
                                                        '--reg', reg_param])

## SChedule pipelines
After publishing, you can initiate the pipeline on demand through its REST endpoint, or you can have the pipeline run automatically based on a periodic schedule or in response to data updates.

## Scheduling a pipeline for periodic intervals
To schedule to run at periodic intervals, you must define a `ScheduleRecurrence` that determines the run frequency, and use it to create a `Schedule

In [None]:
from azureml.pipeline.core import ScheduleRecurrence, Schedule

daily = ScheduleRecurrence(frequency='Day', interval=1)
pipeline_schedule = Schedule.create(ws, name='Daily Training',
                                        description='trains model every day',
                                        pipeline_id=published_pipeline.id,
                                        experiment_name='Training_Pipeline',
                                        recurrence=daily)

### Triggering a pipeline run on data changes

To schedule a pipeline to run whenever data changes, you must create a `Schedule` that monitors a specified path on a datastore like this:

In [None]:
from azureml.core import Datastore
from azureml.pipeline.core import Schedule

training_datastore = Datastore(workspace=ws, name='blob_data')
pipeline_schedule = Schedule.create(ws, name='Reactive Training',
                                    description='trains model on data change',
                                    pipeline_id=published_pipeline_id,
                                    experiment_name='Training_Pipeline',
                                    datastore=training_datastore,
                                    path_on_datastore='data/training')