# Build the ML Pipeline

In this notebook we provide a template for creating a 2-step machine learning pipeline that does data prep and training. We articulate how to test, publish and schedule the pipeline.

## Convert notebooks to scripts

Below we run a bash script that:

1. converts the data prep and training notebooks to python scripts (we strip out markdown and the output cells from the notebook)
2. formats the scripts (using [yapf](https://github.com/google/yapf))
3. runs linting - this is to detect errors

The linting below is set to only capture errors in the code - if you would like to see warnings, etc then you can remove the `-E` option.

In [None]:
%%bash

pip install yapf

INPUTDIR=../notebooks
OUTPUTDIR=scripts

# convert the data_prep notebook to a script, format and lint
jupyter nbconvert --to python $INPUTDIR/01-data-prep/data_prep.ipynb --output-dir $OUTPUTDIR/01-data-prep --template=ipynb_to_py.tpl
yapf -i $OUTPUTDIR/01-data-prep/data_prep.py
pylint -E $OUTPUTDIR/01-data-prep/data_prep.py

# convert the train notebook to a script, format and lint
jupyter nbconvert --to python $INPUTDIR/02-train/train.ipynb --output-dir $OUTPUTDIR/02-train --template=ipynb_to_py.tpl
yapf -i $OUTPUTDIR/02-train/train.py
pylint -E $OUTPUTDIR/02-train/train.py

exit 0

## Import packages

Below we provide the salient packages for building an ML pipeline.

In [None]:
from azureml.core import Workspace, Environment, RunConfiguration, Dataset, Datastore
from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.core.compute_target import ComputeTargetException
from azureml.pipeline.core import Pipeline, PipelineData, PipelineParameter, ScheduleRecurrence, Schedule
from azureml.pipeline.steps import PythonScriptStep, EstimatorStep
from azureml.train.sklearn import SKLearn
from azureml.train.dnn import PyTorch, TensorFlow
from azureml.data import TabularDataset, FileDataset

In [None]:
ws = Workspace.from_config()

## Create compute and environment for pipeline

Below we create the compute to run the ML Pipeline - feel free to change the vm_size parameter below to suit your needs

In [None]:
aml_compute_target = "cpu-cluster"
try:
    aml_compute = AmlCompute(ws, aml_compute_target)
    print("found existing compute target.")
except ComputeTargetException:
    print("creating new compute target")
    
    provisioning_config = AmlCompute.provisioning_configuration(vm_size = "STANDARD_D2_V2",
                                                                idle_seconds_before_scaledown=1200,
                                                                min_nodes = 0, 
                                                                max_nodes = 4)    
    aml_compute = ComputeTarget.create(ws, aml_compute_target, provisioning_config)
    aml_compute.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)
    
print("Azure Machine Learning Compute attached")

#### Update conda_dependencies.yml

In this directory is a conda_dependencies.yml file - you should include any package dependencies there.

In [None]:
run_config = RunConfiguration()
my_env = Environment.from_conda_specification("my_pipeline", "./conda_dependencies.yml")
run_config.environment = my_env

### Data prep step

First we define the input data and output folder. You will need to update the `DATASET_NAME` and `DATASTORE_NAME`.

In [None]:
DATASET_NAME = ''
DATASTORE_NAME = ''

input_dataset = Dataset.get_by_name(ws, DATASET_NAME)
datastore = Datastore(ws, DATASTORE_NAME)
training_data = PipelineData(name='training_data', datastore=datastore)

Next, we define the step

In [None]:
my_inputs = None
if isinstance(input_dataset, FileDataset):
    my_inputs = [input_dataset.as_named_input(input_dataset.name).as_mount()]
elif isinstance(input_dataset, TabularDataset):
    my_inputs = [input_dataset.as_named_input(input_dataset.name)]
    

data_prep_step = PythonScriptStep(script_name="data_prep.py",
                                  name="data_prep_step",
                                  arguments=["--input_dataset", input_dataset.name,
                                            "--output_folder", training_data],
                                  compute_target=aml_compute, 
                                  runconfig=run_config,
                                  inputs=my_inputs,
                                  outputs=[training_data],
                                  source_directory="scripts/01-data-prep")

### Training step

We run the training script in a `PythonScriptStep`. However, if you are using the scikit, tensorflow or pytorch frameworks we would recommend that you use the equivalent AzureML estimators and an `EstimatorStep` - these provide a higher level abstraction and the backend container will be set up to exploit GPU (if you DNNs). We provide the templated code below for the different estimators - just uncomment the relevant one.

In [None]:
param1 = PipelineParameter('param1', 1.0)

In [None]:
train_step = PythonScriptStep(script_name="train.py",
                                  name="training_step",
                                  arguments=["--training_data", training_data,
                                            "--param1", param1],
                                  compute_target=aml_compute, 
                                  runconfig=run_config,
                                  inputs=[training_data],
                                  outputs=[],
                                  source_directory="scripts/02-train")

# If your training script uses scikit-learn then it makes sense to use the SKLearn estimator and an estimator step
# est = SKLearn(source_directory="scripts/02-train", 
#                     compute_target=compute_target, 
#                     entry_script="train.py", 
#                     environment_definition=my_env)
# If your training script uses tensorflow then it makes sense to use the tensorflow estimator and an estimator step (n.b. ensure you are using a gpu machine)
# est = TensorFlow(source_directory="scripts/02-train", 
#                 use_gpu=True,
#                 compute_target=compute_target,                      
#                 entry_script="train.py",
#                 environment_definition=my_env)
# If your training script uses pytorch then it makes sense to use the pytorch estimator and an estimator step (n.b. ensure you are using a gpu machine)
# est = PyTorch(source_directory="scripts/02-train", 
#                 use_gpu=True,
#                 compute_target=compute_target,                      
#                 entry_script="train.py",
#                 environment_definition=my_env)
# Now use estimator step
# train_step = EstimatorStep(name="training_step", 
#                           estimator=est, 
#                           estimator_entry_script_arguments=["--training_data", training_data, "--param1", param1],
#                           inputs=[training_data], 
#                           outputs=[])

## Define the pipeline

In [None]:
pipeline = Pipeline(workspace=ws, 
                    steps=[data_prep_step, train_step], 
                    description="a 2-step data prep and training pipeline")

## Validate, Test, Publish and Schedule the pipeline

First we test the pipeline works by validating it and submitting to the job service.

In [None]:
pipeline.validate()
pipeline.submit(experiment_name="pipeline-test", regenerate_outputs=True)

Next we publish the pipeline

In [None]:
published_pipeline = pipeline.publish()

Finally, we set up a schedule for the pipeline. Here you have two options:

1. Schedule the pipeline on re-occurence i.e. time schedule (e.g. minute, hourly, daily, weekly, etc)
2. Schedule the pipeline for when additions or modifications are made to Blobs in the Datastore. By default, the Datastore container is monitored for changes. Use the path_on_datastore parameter to instead specify a path on the Datastore to monitor for changes. Note: the path_on_datastore will be under the container for the datastore, so the actual path monitored will be container/path_on_datastore. Changes made to subfolders in the container/path will not trigger the schedule. Note: Only Blob Datastores are supported.

#### Option 1: Schedule on recurrence

In [None]:
recurrence = ScheduleRecurrence(frequency="Day", interval=2, hours=[22], minutes=[30]) # Runs every other day at 10:30pm

schedule = Schedule.create(workspace=ws, 
                           name="My_Schedule",
                           pipeline_id=published_pipeline.id, 
                           experiment_name='Schedule_Run',
                           recurrence=recurrence,
                           wait_for_provisioning=True,
                           description="Schedule Run")

# You may want to make sure that the schedule is provisioned properly
# before making any further changes to the schedule

print("Created schedule with id: {}".format(schedule.id))

#### Option 2: Schedule on datastore change

In [None]:
schedule = Schedule.create(workspace=ws, 
                           name="My_Schedule",
                           pipeline_id=published_pipeline.id, 
                           experiment_name='Schedule_Run',
                           datastore=datastore,
                           wait_for_provisioning=True,
                           description="Schedule Run")
                          #polling_interval=5, use polling_interval to specify how often to poll for blob additions/modifications. Default value is 5 minutes.
                          #path_on_datastore="file/path") use path_on_datastore to specify a specific folder to monitor for changes.

# You may want to make sure that the schedule is provisioned properly
# before making any further changes to the schedule

print("Created schedule with id: {}".format(schedule.id))