### Pipeline
Pipeline: create a workflow to implements steps; steps can be sequentially or parallel. Each step can be run on a specific compute target. 
Common Azure ML pipeline steps:
<ul>
<li>PythonScript </li>
<li>DataTransfer: Data Factory to copy data between data stores</li>
<li>DataBricks: Runs a notebook, scipt</li>
<li>AdlaStep: Runs a SQL job in Data Lake</li>
<li>ParallelRun: runs python script on multiple compute nodes</li>
</ul>

In [None]:
from azureml.pipeline.steps import PythonScriptStep

# Step to run a Python script
step1 = PythonScriptStep(name = 'prepare data',
                         source_directory = 'scripts',
                         script_name = 'data_prep.py',
                         compute_target = 'aml-cluster')

# Step to train a model
step2 = PythonScriptStep(name = 'train model',
                         source_directory = 'scripts',
                         script_name = 'train_model.py',
                         compute_target = 'aml-cluster')

After defining the steps, you can assign them to a pipeline, and run it as an experiment:

In [None]:
from azureml.pipeline.core import Pipeline
from azureml.core import Experiment

# Construct the pipeline
train_pipeline = Pipeline(workspace = ws, steps = [step1,step2])

# Create an experiment and run the pipeline
experiment = Experiment(workspace = ws, name = 'training-pipeline')
pipeline_run = experiment.submit(train_pipeline)

#### Pipeline Data Object
- References a location in datastore
- Creates a data dependency between pipeline steps
<p>
To use a PipelineData object to pass data between steps, you must:
<ol>
    <li>Define a PipelineData obj that references the location</li>
    <li>Pass PipelineData obj as a script in argument in steps that run scripts</li>
    <li>Specify if it is input or output</li>
</ol>

In [None]:
from azureml.pipeline.core import PipelineData
from azureml.pipeline.steps import PythonScriptStep, EstimatorStep

# Get a dataset for the initial data
raw_ds = Dataset.get_by_name(ws, 'raw_dataset')

##1() Define a PipelineData object to pass data between steps
data_store = ws.get_default_datastore()
prepped_data = PipelineData('prepped',  datastore=data_store)

# Step to run a Python script
step1 = PythonScriptStep(name = 'prepare data',
                         source_directory = 'scripts',
                         script_name = 'data_prep.py',
                         compute_target = 'aml-cluster',
                         ##2) Script arguments include PipelineData
                         arguments = ['--raw-ds', raw_ds.as_named_input('raw_data'),
                                      '--out_folder', prepped_data],
                         ##3) Specify PipelineData as output
                         outputs=[prepped_data])

# Step to run an estimator
step2 = PythonScriptStep(name = 'train model',
                         source_directory = 'scripts',
                         script_name = 'data_prep.py',
                         compute_target = 'aml-cluster',
                         # Pass as script argument
                         arguments=['--in_folder', prepped_data],
                         # Specify PipelineData as input
                         inputs=[prepped_data])

#### Reuse pipeline steps
Azure ML includes caching and reuse features to reduce time. Time needs to be reduced because multiple steps can take a long time to complete.

In [None]:
step1 = PythonScriptStep(name = 'prepare data',
                         source_directory = 'scripts',
                         script_name = 'data_prep.py',
                         compute_target = 'aml-cluster',
                         runconfig = run_config,
                         inputs=[raw_ds.as_named_input('raw_data')],
                         outputs=[prepped_data],
                         arguments = ['--folder', prepped_data]),
                         ##### Disable step reuse
                         allow_reuse = False)

##### Forcing all steps to run
`pipeline_run = experiment.submit(train_pipeline, regenerate_outputs=True)`

Force them to rerun regardless of individual reuse configuration.


#### Publish Pipeline
Publish to REST endpoint, which the pipeline can be run on demand. There are two ways to do this.

In [None]:
published_pipeline = pipeline.publish(name='training_pipeline',
                                          description='Model training pipeline',
                                          version='1.0')

Publish method on a <b>successful</b> run of the pipeline.

In [None]:
# Get the most recent run of the pipeline
pipeline_experiment = ws.experiments.get('training-pipeline')
run = list(pipeline_experiment.get_runs())[0]

# Publish the pipeline from the run
published_pipeline = run.publish_pipeline(name='training_pipeline',
                                          description='Model training pipeline',
                                          version='1.0')

After it is published, you can view it.<br>
`rest_endpoint = published_pipeline.endpoint` <br>
`print(rest_endpoint)`



#### Using a published pipeline
It is just like calling an API call on your endpoint. Nothing special.

#### Pipeline Parameters
Pipeline can have parameters. Import `PipelineParameter` and specify each parameter at least one step.

In [None]:
from azureml.pipeline.core.graph import PipelineParameter

reg_param = PipelineParameter(name='reg_rate', default_value=0.01)


step2 = PythonScriptStep(name = 'train model',
                         source_directory = 'scripts',
                         script_name = 'data_prep.py',
                         compute_target = 'aml-cluster',
                         # Pass parameter as script argument
                         arguments=['--in_folder', prepped_data,
                                    '--reg', reg_param],
                         inputs=[prepped_data])

#### Schedule Pipeline
Think of it as cronjobs, similar.<br>
You can also schedule when there is data changes. You must create a Schedule that monitors a specified path on a datastore.

In [None]:
from azureml.core import Datastore
from azureml.pipeline.core import Schedule

training_datastore = Datastore(workspace=ws, name='blob_data')
pipeline_schedule = Schedule.create(ws, name='Reactive Training',
                                    description='trains model on data change',
                                    pipeline_id=published_pipeline_id,
                                    experiment_name='Training_Pipeline',
                                    
                                    #### SCHEDULES WHEN A DATA CHANGES
                                    datastore=training_datastore,
                                    path_on_datastore='data/training')

Knowledge check:
- You're creating a pipeline that includes two steps. Step 1 preprocesses some data, and step 2 uses the preprocessed data to train a model. What type of object should you use to pass data from step 1 to step 2 and create a dependency between these steps? Pipeline Data
- You've published a pipeline that you want to run every week. You plan to use the Schedule.create method to create the schedule. What kind of object must you create first to configure how frequently the pipeline runs? ScheduleRecurrence