# Simple KubeFlow Pipeline

Lightweight python components do not require you to build a new container image for every code change.
They're intended to use for fast iteration in notebook environment.

#### Building a lightweight python component
To build a component just define a stand-alone python function and then call kfp.components.func_to_container_op(func) to convert it to a component that can be used in a pipeline.

There are several requirements for the function:
* The function should be stand-alone. It should not use any code declared outside of the function definition. Any imports should be added inside the main function. Any helper functions should also be defined inside the main function.
* The function can only import packages that are available in the base image. If you need to import a package that's not available you can try to find a container image that already includes the required packages. (As a workaround you can use the module subprocess to run pip install for the required package.)
* If the function operates on numbers, the parameters need to have type hints. Supported types are ```[int, float, bool]```. Everything else is passed as string.
* To build a component with multiple output values, use the typing.NamedTuple type hint syntax: ```NamedTuple('MyFunctionOutputs', [('output_name_1', type), ('output_name_2', float)])```

In [1]:
# Install the KubeFlow Pipeline SDK
!pip3 install https://storage.googleapis.com/ml-pipeline/release/0.1.16/kfp.tar.gz --upgrade

Collecting https://storage.googleapis.com/ml-pipeline/release/0.1.16/kfp.tar.gz
[?25l  Downloading https://storage.googleapis.com/ml-pipeline/release/0.1.16/kfp.tar.gz (147kB)
[K    100% |████████████████████████████████| 153kB 30.8MB/s ta 0:00:01
Collecting google-cloud-storage==1.13.0 (from kfp==0.1)
[?25l  Downloading https://files.pythonhosted.org/packages/d7/62/a2e3111bf4d1eb54fe86dec694418644e024eb059bf1e66ebdcf9f98ad70/google_cloud_storage-1.13.0-py2.py3-none-any.whl (59kB)
[K    100% |████████████████████████████████| 61kB 2.8MB/s ta 0:00:01
[?25hCollecting kubernetes==8.0.0 (from kfp==0.1)
[?25l  Downloading https://files.pythonhosted.org/packages/6c/44/f8286fb7a25a4ff29a4dec1b5baa49571eedc2b2edf6ec4b51e4b511ac0f/kubernetes-8.0.0-py2.py3-none-any.whl (1.3MB)
[K    100% |████████████████████████████████| 1.4MB 16.6MB/s ta 0:00:01    84% |███████████████████████████     | 1.1MB 64.8MB/s eta 0:00:01
[?25hCollecting PyJWT==1.6.4 (from kfp==0.1)
  Downloading https://files.

    Uninstalling kubernetes-9.0.0:
      Successfully uninstalled kubernetes-9.0.0
  Found existing installation: kfp 0.1
    Uninstalling kfp-0.1:
      Successfully uninstalled kfp-0.1
Successfully installed PyJWT-1.6.4 adal-1.2.1 cryptography-2.4.2 google-auth-1.6.1 google-cloud-core-0.28.1 google-cloud-storage-1.13.0 kfp-0.1 kubernetes-8.0.0 requests-toolbelt-0.8.0
[33mYou are using pip version 19.0.1, however version 19.1.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


Simple function that just add two numbers:

In [2]:
#Define a Python function
def add_fn(a: float, b: float) -> float:
   '''Calculates sum of two arguments'''
   return a + b

Convert the function to a pipeline operation

In [3]:
import kfp.components as comp

add_op = comp.func_to_container_op(add_fn)

A bit more advanced function which demonstrates how to use imports, helper functions and produce multiple outputs.

In [4]:
from typing import NamedTuple
def div_fn(dividend: float, divisor:float, output_dir:str = './') -> NamedTuple('DivOutput', [('quotient', float), ('remainder', float)]):
    '''Divides two numbers and calculate  the quotient and remainder'''
    #Imports inside a component function:
    import numpy as np

    #This function demonstrates how to use nested functions inside a component function:
    def nested_div_helper(dividend, divisor):
        return np.divmod(dividend, divisor)

    (quotient, remainder) = nested_div_helper(dividend, divisor)

    from tensorflow.python.lib.io import file_io
    import json
    
    # Exports two sample metrics:
    metrics = {
      'metrics': [{
          'name': 'quotient',
          'numberValue':  float(quotient),
        },{
          'name': 'remainder',
          'numberValue':  float(remainder),
        }]}

    with file_io.FileIO(output_dir + 'mlpipeline-metrics.json', 'w') as f:
        json.dump(metrics, f)

    from collections import namedtuple
    output = namedtuple('DivOutput', ['quotient', 'remainder'])
    return output(quotient, remainder)

Test running the python function directly

In [5]:
div_fn(100, 7)

DivOutput(quotient=14, remainder=2)

#### Convert the function to a pipeline operation

You can specify an alternative base container image (the image needs to have Python 3.5+ installed).

In [6]:
div_op = comp.func_to_container_op(div_fn, base_image='tensorflow/tensorflow:1.11.0-py3')

#### Define the pipeline
Pipeline function has to be decorated with the `@dsl.pipeline` decorator

In [7]:
import kfp.dsl as dsl
@dsl.pipeline(
   name='Calculation pipeline',
   description='A toy pipeline that performs arithmetic calculations.'
)
def add_div_pipeline(
   a='a',
   b='7',
   c='17',
):
    #Passing pipeline parameter and a constant value as operation arguments
    add_task = add_op(a, 4) #Returns a dsl.ContainerOp class instance. 
    
    #Passing a task output reference as operation arguments
    #For an operation with a single return value, the output reference can be accessed using `task.output` or `task.outputs['output_name']` syntax
    div_task = div_op(add_task.output, b, '/')

    #For an operation with a multiple return values, the output references can be accessed using `task.outputs['output_name']` syntax
    result_task = add_op(div_task.outputs['quotient'], c)

#### Compile the pipeline

In [8]:
pipeline_func = add_div_pipeline
pipeline_filename = pipeline_func.__name__ + '.pipeline.tar.gz'
import kfp.compiler as compiler
compiler.Compiler().compile(pipeline_func, pipeline_filename)

In [10]:
!tar -xvzf add_div_pipeline.pipeline.tar.gz
!cat pipeline.yaml

pipeline.yaml
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: calculation-pipeline-
spec:
  arguments:
    parameters:
    - name: a
      value: a
    - name: b
      value: '7'
    - name: c
      value: '17'
  entrypoint: calculation-pipeline
  serviceAccountName: pipeline-runner
  templates:
  - container:
      args:
      - '{{inputs.parameters.a}}'
      - '4'
      - /outputs/Output/data
      command:
      - python3
      - -c
      - "def add_fn(a: float, b: float) -> float:\n   '''Calculates sum of two arguments'''\n\
        \   return a + b\n\nimport sys\n_args = {\n    'a': float(sys.argv[1]),\n\
        \    'b': float(sys.argv[2]),\n}\n_output_files = [\n    sys.argv[3],\n]\n\
        \n_outputs = add_fn(**_args)\n\nif not hasattr(_outputs, '__getitem__') or\
        \ isinstance(_outputs, str):\n    _outputs = [_outputs]\n\nfrom pathlib import\
        \ Path\nfor idx, filename in enumerate(_output_files):\n    _output_path =\
        \ Path(

#### Submit the pipeline for execution

In [9]:
#Specify pipeline argument values
arguments = {'a': '7', 'b': '8'}

#Get or create an experiment and submit a pipeline run
import kfp
client = kfp.Client()
experiment = client.create_experiment('simple_add_div_pipeline')

#Submit a pipeline run
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)