In [13]:
from azureml.core import Workspace, Run, Dataset
from azureml.pipeline.wrapper import Pipeline, dsl, Module

ws = Workspace.get(name='kubeflow_ws_1', subscription_id='74eccef0-4b8d-4f83-b5f9-fa100d155b22', resource_group='kubeflow-demo')

one2two = Module.from_yaml(ws, yaml_file='./noop/1in2out.spec.yaml')
one2one = Module.from_yaml(ws, yaml_file='./noop/1in1out.spec.yaml')
data = Dataset.get_by_name(ws, 'training_data')

In [14]:
@dsl.pipeline(
    name='A pipeline composed with nodes 1 in 2 outs',
    description='A sample',
    default_compute_target='cpu-lowpri'
)
def cell_division():
    layer = 4
    nodes = []
    nodes.append(one2two(input1=data))
    last_layer = []
    for i in range(0, layer-1):
        print('i=', i, ' nodes len=', len(nodes))
        current_layer_nodes = []
        for j in range(0, pow(2,i)):
            print('j=', j)
            n = nodes[-j-1]
            current_layer_nodes.append(one2two(input1=n.outputs.output1))
            current_layer_nodes.append(one2two(input1=n.outputs.output2))
        nodes = nodes + current_layer_nodes
        last_layer = current_layer_nodes

    x = {}
    seq = 0
    for n in last_layer:
        seq += 1
        x['output{}'.format(seq)] = n.outputs.output1
        seq += 1
        x['output{}'.format(seq)] = n.outputs.output2
    return x

In [15]:
@dsl.pipeline(
    name='A pipeline composed with nodes 1 in 1 outs',
    description='A sample',
    default_compute_target='cpu-lowpri'
)
def chain(input):
    length = 10
    last = None
    for i in range(0, length):
        j = None
        if last == None:
            j = input
        else:
            j = last.outputs.output1
        node = one2one(input1=j)
        last = node

    return {**last.outputs}

In [16]:
@dsl.pipeline(
    name='A pipeline composed with split and chain',
    description='A sample',
    default_compute_target='cpu-lowpri'
)
def waterfall():
    part1 = cell_division()
    x = {}
    for o in part1.outputs.values():
        part2 = chain(o)
        x = {**x, **part2.outputs}

    return x

In [17]:
pipeline = waterfall()

pipeline.validate()

i= 0  nodes len= 1
j= 0
i= 1  nodes len= 3
j= 0
j= 1
i= 2  nodes len= 7
j= 0
j= 1
j= 2
j= 3


{'result': 'validation passed', 'errors': []}

In [18]:
pipeline.submit(experiment_name='samples')

Submitted PipelineRun cc620ddf-10bd-4ef5-880c-2b2b8ad8e860
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/samples/runs/cc620ddf-10bd-4ef5-880c-2b2b8ad8e860?wsid=/subscriptions/74eccef0-4b8d-4f83-b5f9-fa100d155b22/resourcegroups/kubeflow-demo/workspaces/kubeflow_ws_1


Experiment,Id,Type,Status,Details Page,Docs Page
samples,cc620ddf-10bd-4ef5-880c-2b2b8ad8e860,azureml.PipelineRun,NotStarted,Link to Azure Machine Learning studio,Link to Documentation
