# Introduction

The first pipeline is based on the tutorial available at: https://docs.microsoft.com/en-us/azure/machine-learning/how-to-create-your-first-pipeline

# Create ML pipelines

## Attach to workspace

In [None]:
import azureml.core
from azureml.core import Workspace, Datastore

ws = Workspace.from_config()

## Set up datastore

In [None]:
# Default datastore 
def_data_store = ws.get_default_datastore()

# Get the blob storage associated with the workspace
def_blob_store = Datastore(ws, "workspaceblobstore")

# Get file storage associated with the workspace
def_file_store = Datastore(ws, "workspacefilestore")

## Upload data files to datatore

In [None]:
# The data file downloaded from:
# https://github.com/Azure/MachineLearningNotebooks/blob/master/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/20news.pkl

def_blob_store.upload_files(
    ["./data/20newsgroups/20news.pkl"],
    target_path="20newsgroups",
    overwrite=True)

## Set up compute target

In [None]:
from azureml.core.compute import ComputeTarget, AmlCompute

compute_name = "aml-compute"
vm_size = "STANDARD_D2_V2"

if compute_name in ws.compute_targets:
    compute_target = ws.compute_targets[compute_name]
    if compute_target and type(compute_target) is AmlCompute:
        print('Found compute target: ' + compute_name)
else:
    print('Creating a new compute target...')
    provisioning_config = AmlCompute.provisioning_configuration(vm_size=vm_size,
                                                                min_nodes=0,
                                                                max_nodes=4,
                                                                idle_seconds_before_scaledown=600)
    # create the compute target
    compute_target = ComputeTarget.create(ws, compute_name, provisioning_config)

    # Can poll for a minimum number of nodes and for a specific timeout.
    # If no min node count is provided it will use the scale settings for the cluster
    compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)

    # For a more detailed view of current cluster status, use the 'status' property
    print(compute_target.status.serialize())

In [None]:
# list of Compute Targets on the workspace
cts = ws.compute_targets
for ct in cts:
    print(ct)

## Create data reference

In [None]:
from azureml.data.data_reference import DataReference
from azureml.pipeline.core import PipelineData

### Data sources

In [None]:
blob_input_data = DataReference(
    datastore=def_blob_store,
    data_reference_name="test_data",
    path_on_datastore="20newsgroups/20news.pkl")

### Intermediate/Output Data

In [None]:
processed_data1 = PipelineData("processed_data1", datastore=def_blob_store)

In [None]:
processed_data2 = PipelineData("processed_data2", datastore=def_blob_store)

In [None]:
processed_data3 = PipelineData("processed_data3", datastore=def_blob_store)

## Construct pipeline steps

> The best practice is to use separate folders for scripts and its dependent files for each step and specify that folder as the `source_directory` for the step. This helps reduce the size of the snapshot created for the step (only the specific folder is snapshotted). Since changes in any files in the `source_directory` would trigger a re-upload of the snapshot, this helps keep the reuse of the step when there are no changes in the `source_directory` of the step.

In [None]:
from azureml.pipeline.steps import PythonScriptStep

source_directory = 'data_dependency_run_train'
print(f"Source directory for the step is {os.path.realpath(source_directory)}.")

trainStep = PythonScriptStep(
    script_name="train.py",
    arguments=["--input", blob_input_data, "--output", processed_data1],
    inputs=[blob_input_data],
    outputs=[processed_data1],
    compute_target=compute_target,
    source_directory=source_directory,
    allow_reuse=True)

In [None]:
source_directory = "data_dependency_run_extract"
print(f"Source directory for the step is {os.path.realpath(source_directory)}.")

extractStep = PythonScriptStep(
    script_name="extract.py",
    arguments=["--input_extract", processed_data1, "--output_extract", processed_data2],
    inputs=[processed_data1],
    outputs=[processed_data2],
    compute_target=compute_target, 
    source_directory=source_directory)

> The next step is a bit complex. It consumes intermediate data and existing data, and produces intermediate data

In [None]:
from azureml.pipeline.core import PipelineParameter
from azureml.data.datapath import DataPath, DataPathComputeBinding

datapath = DataPath(datastore=def_blob_store, path_on_datastore='20newsgroups/20news.pkl')
datapath_param = PipelineParameter(name="compare_data", default_value=datapath)
data_parameter1 = (datapath_param, DataPathComputeBinding(mode='mount'))

In [None]:
source_directory = "data_dependency_run_compare"
print(f"Source directory for the step is {os.path.realpath(source_directory)}.")

compareStep = PythonScriptStep(
    script_name="compare.py",
    arguments=["--compare_data1", data_parameter1, "--compare_data2", processed_data2, "--output_compare", processed_data3],
    inputs=[data_parameter1, processed_data2],
    outputs=[processed_data3],    
    compute_target=compute_target, 
    source_directory=source_directory)

## Build the pipeline

In [None]:
# list of steps to run
compareModels = [trainStep, extractStep, compareStep]

from azureml.pipeline.core import Pipeline

# Build the pipeline
pipeline1 = Pipeline(workspace=ws, steps=[compareModels])

## Validate the pipeline

In [None]:
pipeline1.validate()

## Run published pipeline

### Publish the pipeline

When the pipeline is published, it's visible on the list of Pipelines (in the Azure Portal). However it's not visible in the new studio (preview) :/

In [None]:
published_pipeline1 = pipeline1.publish(name="My_New_Pipeline", 
                                        description="My Published Pipeline Description", 
                                        continue_on_step_failure=True)
published_pipeline1

#### Get published Pipeline

In [None]:
from azureml.pipeline.core import PublishedPipeline

pipeline_id = published_pipeline1.id # use your published pipeline id
published_pipeline = PublishedPipeline.get(ws, pipeline_id)
published_pipeline

### Submit

In [None]:
from azureml.core import Experiment

experiment = Experiment(ws, 'Compare_Models_Exp')
pipeline_run = experiment.submit(published_pipeline, regenerate_outputs=False)

## (Alternative) Submit pipeline without publishing

In [None]:
from azureml.core import Experiment

# Submit the pipeline to be run
experiment = Experiment(ws, 'Compare_Models_Exp')
pipeline_run1 = experiment.submit(pipeline1, regenerate_outputs=False)
pipeline_run1.wait_for_completion()

## See Outputs

In [None]:
for step in pipeline_run1.get_steps():
    print("Outputs of step " + step.name)
    
    # Get a dictionary of StepRunOutputs with the output name as the key 
    output_dict = step.get_outputs()
    
    for name, output in output_dict.items():
        
        output_reference = output.get_port_data_reference() # Get output port data reference
        print("\tname: " + name)
        print("\tdatastore: " + output_reference.datastore_name)
        print("\tpath on datastore: " + output_reference.path_on_datastore)

In [None]:
for step_run in pipeline_run1.get_children():
    status = step_run.get_status()
    print('Script:', step_run.name, 'status:', status)
    
    # Change this if you want to see details even if the Step has succeeded.
    if status == "Failed":
        joblog = step_run.get_job_log()
        print('job log:', joblog)

## Download output from the pipeline

In [None]:
os.makedirs('./data/outputs', exist_ok=True)

# Retrieve the step runs by name 'train.py'
last_step = pipeline_run1.find_step_run('compare.py')

if last_step:
    last_step_obj = last_step[0] # since we have only one step by name 'train.py'
    last_step_obj.get_output_data('processed_data3').download("./data/outputs") # download the output to current directory