# Azure Machine Learning Pipelines with Data Dependency

We will see how we can build a pipeline with implicit data dependency.

In [1]:
import azureml.core
from azureml.core import Workspace, Experiment, Datastore
from azureml.core.compute import AmlCompute
from azureml.core.compute import ComputeTarget, ComputeInstance
from azureml.widgets import RunDetails

# Check core SDK version number
print("SDK version:", azureml.core.VERSION)

from azureml.data.data_reference import DataReference
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.pipeline.steps import PythonScriptStep
print("Pipeline SDK-specific imports completed")

SDK version: 1.51.0
Pipeline SDK-specific imports completed


In [2]:
# source directory
source_directory = '_run_train'
    
print('Sample scripts will be created in {} directory.'.format(source_directory))

Sample scripts will be created in _run_train directory.


In [None]:
ws = Workspace.from_config()
print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\n')

# Default datastore (Azure blob storage)
# def_blob_store = ws.get_default_datastore()
def_blob_store = Datastore(ws, "workspaceblobstore")
print("Blobstore's name: {}".format(def_blob_store.name))

In [4]:
# Reference the data uploaded to blob storage using DataReference
# Assign the datasource to blob_input_data variable

# DataReference(datastore, 
#               data_reference_name=None, 
#               path_on_datastore=None, 
#               mode='mount', 
#               path_on_compute=None, 
#               overwrite=False)

blob_input_data = DataReference(
    datastore=def_blob_store,
    data_reference_name="test_data",
    path_on_datastore="titanic/Titanic.csv")
print("Step 1: DataReference object created")

Step 1: DataReference object created


In [5]:
# Set the compute target
aml_compute_target = "CPU-MD"

# Try to get the existing compute instance
aml_compute = AmlCompute(ws, aml_compute_target)
print("Step 2: AML Compute target created.")

Step 2: AML Compute target created.


### Intermediate/Output Data

Intermediate data (or output of a Step) is represented by PipelineData object. PipelineData can be produced by one step and consumed in another step by providing the PipelineData object as an output of one step and the input of one or more steps.

In [6]:
# Define intermediate data using PipelineData
# PipelineData(name, 
#              datastore=None, 
#              output_name=None, 
#              output_mode='mount', 
#              output_path_on_compute=None, 
#              output_overwrite=None, 
#              data_type=None, 
#              is_directory=None)

# Naming the intermediate data as processed_data1 and assigning it to the variable processed_data1.
processed_data1 = PipelineData("processed_data1",datastore=def_blob_store, is_directory=True)
print("Step 3: PipelineData object created")

Step 3: PipelineData object created



### Pipelines steps using datasources and intermediate data

Machine learning pipelines can have many steps and these steps could use or reuse datasources and intermediate data. Here's how we construct such a pipeline:

##### Define a Step that consumes a datasource and produces intermediate data.

In this step, we define a step that consumes a datasource and produces intermediate data.

Open train.py in the local machine and examine the arguments, inputs, and outputs for the script. That will give you a good sense of why the script argument names used below are important.
Specify conda dependencies and a base docker image through a RunConfiguration

This step uses a docker image and scikit-learn, use a RunConfiguration to specify these requirements and use when creating the PythonScriptStep.


In [7]:
from azureml.core.runconfig import DockerConfiguration, RunConfiguration, DEFAULT_CPU_IMAGE
from azureml.core.conda_dependencies import CondaDependencies

# create a new runconfig object
run_config = RunConfiguration()

# enable Docker using the DockerConfiguration object
run_config.docker = DockerConfiguration(use_docker=True)

# set Docker base image to the default CPU-based image
run_config.environment.docker.base_image = DEFAULT_CPU_IMAGE

# use conda_dependencies.yml to create a conda environment in the Docker image for execution
run_config.environment.python.user_managed_dependencies = False

# specify CondaDependencies obj
run_config.environment.python.conda_dependencies = CondaDependencies.create(conda_packages=['scikit-learn'])

##### Define a Step that consumes intermediate data and produces intermediate data

In this step, we define a step that consumes an intermediate data and produces intermediate data.

Open extract.py in the local machine and examine the arguments, inputs, and outputs for the script. That will give you a good sense of why the script argument names used below are important.


In [8]:
# step4 consumes the datasource (Datareference) in the previous step
# and produces processed_data1
trainStep = PythonScriptStep(
    script_name="train.py", 
    arguments=["--input_data", blob_input_data, "--output_train", processed_data1],
    inputs=[blob_input_data],
    outputs=[processed_data1],
    compute_target=aml_compute, 
    source_directory=source_directory,
    runconfig=run_config
)
print("Step 4: trainStep created")

Step 4: trainStep created


##### Define a Step that consumes intermediate data and produces intermediate data

In this step, we define a step that consumes an intermediate data and produces intermediate data.

Open extract.py in the local machine and examine the arguments, inputs, and outputs for the script. That will give you a good sense of why the script argument names used below are important.


In [9]:
# step5 to use the intermediate data produced by step4
# This step also produces an output processed_data2
processed_data2 = PipelineData("processed_data2", datastore=def_blob_store, is_directory=True)
source_directory = "_run_extract"

extractStep = PythonScriptStep(
    script_name="extract.py",
    arguments=["--input_extract", processed_data1, "--output_extract", processed_data2],
    inputs=[processed_data1],
    outputs=[processed_data2],
    compute_target=aml_compute, 
    source_directory=source_directory)
print("Step 5: extractStep created")

Step 5: extractStep created


##### Define a Step that consumes intermediate data and existing data and produces intermediate data

In this step, we define a step that consumes multiple data types and produces intermediate data.

This step uses the output generated from the previous step as well as existing data on a DataStore. The location of the existing data is specified using a PipelineParameter and a DataPath. Using a PipelineParameter enables easy modification of the data location when the Pipeline is published and resubmitted.

Open compare.py in the local machine and examine the arguments, inputs, and outputs for the script. That will give you a good sense of why the script argument names used below are important.


In [10]:
# Reference the data uploaded to blob storage using a PipelineParameter and a DataPath
from azureml.pipeline.core import PipelineParameter
from azureml.data.datapath import DataPath, DataPathComputeBinding

datapath = DataPath(datastore=def_blob_store, path_on_datastore='titanic/Titanic.csv')
datapath_param = PipelineParameter(name="compare_data", default_value=datapath)
data_parameter1 = (datapath_param, DataPathComputeBinding(mode='mount'))

In [11]:
# Now define the compare step which takes two inputs and produces an output
processed_data3 = PipelineData("processed_data3", datastore=def_blob_store, is_directory=True)
source_directory = "_run_compare"

compareStep = PythonScriptStep(
    script_name="compare.py",
    arguments=["--compare_data1", data_parameter1, "--compare_data2", processed_data2, "--output_compare", processed_data3],
    inputs=[data_parameter1, processed_data2],
    outputs=[processed_data3],    
    compute_target=aml_compute, 
    source_directory=source_directory)
print("Step 6: compareStep created")

Step 6: compareStep created


#### Build Pipeline

In [12]:
pipeline1 = Pipeline(workspace=ws, steps=[compareStep])
print ("Pipeline is built")

Pipeline is built


In [None]:
pipeline_run1 = Experiment(ws, 'Data_dependency_sample').submit(pipeline1)
print("Pipeline is submitted for execution")

In [14]:
RunDetails(pipeline_run1).show()

_PipelineWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', …

In [None]:
pipeline_run1.wait_for_completion(show_output=True)

In [None]:
# Get Steps
for step in pipeline_run1.get_steps():
    print("Outputs of step " + step.name)
    
    # Get a dictionary of StepRunOutputs with the output name as the key 
    output_dict = step.get_outputs()
    
    for name, output in output_dict.items():
        
        output_reference = output.get_port_data_reference() # Get output port data reference
        print("\tname: " + name)
        print("\tdatastore: " + output_reference.datastore_name)
        print("\tpath on datastore: " + output_reference.path_on_datastore)

In [None]:
# Retrieve the step runs by name 'train.py'
train_step = pipeline_run1.find_step_run('train.py')

if train_step:
    train_step_obj = train_step[0] # since we have only one step by name 'train.py'
    train_step_obj.get_output_data('processed_data1').download("./outputs") # download the output to current directory