In [None]:
! pip3 install kfp --upgrade

In [None]:
! pip3 list

In [1]:
import kfp
import kfp.components as comp
import kfp.compiler as compiler
import kfp.dsl as dsl

# Init an experiment

In [2]:
EXPERIMENT_NAME = 'SLEQ_BASICS'
PIPELINE_FILENAME = 'complex_pipeline.tar.gz'
PIPELINE_RUN_NAME = '[basic] complex pipeline'

In [3]:
client = kfp.Client()

existing_experiments = client.list_experiments().experiments

if existing_experiments is not None:
    experiment = next(iter([exp for exp in existing_experiments if exp.name == EXPERIMENT_NAME]), None)
else:
    experiment = None
    
if experiment is None:
    experiment = client.create_experiment(EXPERIMENT_NAME)
    print('Experiment %s created with ID %s' % (experiment.name, experiment.id))
else:
    print('Experiment already exists with id %s' % experiment.id)

Experiment already exists with id 3215e00f-775a-4f1f-adcc-a56aef71131a


# Defines complex pipeline

In [4]:
class ExitOp(dsl.ContainerOp):
    def __init__(self):
        super(ExitOp, self).__init__(
            name='Exit operator',
            image='library/bash:4.4.23',
            command=['echo', 'exit!']
        )

In [14]:
def random_print_or_error(name: str) -> str:
    import random
    import sys
    
    exit_code = random.choice([0, 1, 3, 4, 5])
    if exit_code == 0:
        return '[%s] Ok, could continue' % name
    else:
        sys.exit(exit_code)

random_print_or_error_operation = comp.func_to_container_op(random_print_or_error)

In [6]:
class MergerPrinter(dsl.ContainerOp):
    def __init__(self, input_1, input_2):
        super(MergerPrinter, self).__init__(
            name='Exit operator',
            image='library/bash:4.4.23',
            command=['echo', '%s | %s' % (input_1, input_2)]
        )

In [15]:
@dsl.pipeline(
    name='complex_pipeline',
    description='Exit Handler, retry and merge flows'
)
def pipeline():

    with dsl.ExitHandler(ExitOp()):
        ope_1 = random_print_or_error_operation(name='ope_1').set_retry(10)
        ope_2 = random_print_or_error_operation(name='ope_2').set_retry(5)
        MergerPrinter(ope_1.output, ope_2.output)

pipeline_func = compiler.Compiler().compile(pipeline, PIPELINE_FILENAME)

# Running the pipeline

In [16]:
client.run_pipeline(experiment.id, PIPELINE_RUN_NAME, PIPELINE_FILENAME)

{'created_at': datetime.datetime(2019, 3, 12, 23, 53, 55, tzinfo=tzlocal()),
 'description': None,
 'error': None,
 'id': '18858074-4522-11e9-9694-0800273ba42c',
 'metrics': None,
 'name': '[basic] complex pipeline',
 'pipeline_spec': {'parameters': None,
                   'pipeline_id': None,
                   'pipeline_manifest': None,
                   'workflow_manifest': '{"apiVersion": '
                                        '"argoproj.io/v1alpha1", "kind": '
                                        '"Workflow", "metadata": '
                                        '{"generateName": '
                                        '"complex-pipeline-"}, "spec": '
                                        '{"arguments": {"parameters": []}, '
                                        '"entrypoint": "complex-pipeline", '
                                        '"onExit": "exit-operator", '
                                        '"serviceAccountName": '
                                    