In [1]:
!pip install kfp --upgrade --user

Requirement already up-to-date: kfp in /home/jovyan/.local/lib/python3.6/site-packages (1.0.4)
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [2]:
import kfp
from kfp import compiler
import kfp.dsl as dsl
import kfp.notebook
import kfp.components as comp

In [3]:
# Define a Python function
def add(a: float, b: float) -> float:
    '''Calculates sum of two arguments'''
    return a + b

In [4]:
add_op = comp.func_to_container_op(add)

In [5]:
from typing import NamedTuple
def my_divmod(dividend: float, divisor:float) -> \
    NamedTuple('MyDivmodOutput', [('quotient', float), ('remainder', float)]):
    '''Divides two numbers and calculate the quotient and remainder'''
    # Imports inside a component function:
    import numpy as np
    
    # This function demonstrates how to use nested functions inside a 
    # component function:
    def divmod_helper(dividend, divisor):
        return np.divmod(dividend, divisor)
    
    (quotient, remainder) = divmod_helper(dividend, divisor)
    
    from collections import namedtuple
    divmod_output = namedtuple('MyDivmodOutput', ['quotient', 'remainder'])
    return divmod_output(quotient, remainder)

In [6]:
my_divmod(100, 7)

MyDivmodOutput(quotient=14, remainder=2)

In [8]:
divmod_op = comp.func_to_container_op(my_divmod, base_image='tensorflow/tensorflow:latest-gpu-py3')

In [9]:
@dsl.pipeline(
    name='Calculation pipeline',
    description='A toy pipeline that performs arithmetic calculations.'
)
def calc_pipeline(
    a='a',
    b='7',
    c='17',
):
    # Passing pipeline parameter and a constant value as operation arguments
    add_task = add_op(a, 4) # Returns a dsl.ContainerOp clas instance.
    
    # Passing a task output reference as operation arguments
    # For an operation with a single return value, the output
    # reference can be accessed using `task.output`
    # or `task.outputs['output_name']` syntax
    divmod_task = divmod_op(add_task.output, b)
    
    # For an operation with multiple return values, the output references
    # can be accessed using `task.outputs['output_name']` syntax
    result_task = add_op(divmod_task.outputs['quotient'], c)

In [10]:
client = kfp.Client(host='pipelines-api.kubeflow.svc.cluster.local:8888')
# Specify pipeline argument values
arguments = {'a': '7', 'b': '8'} # whatever makes sense for new version
# Submit a pipeline run
client.create_run_from_pipeline_func(calc_pipeline, arguments=arguments)

RunPipelineResult(run_id=41c68fd2-e3f2-4e44-8078-a3142292e9e0)