In [None]:
# !pip install kfp==1.6.3

In [None]:
from typing import NamedTuple

import kfp
from kfp.components import InputPath, InputTextFile, OutputPath, OutputTextFile
from kfp.components import func_to_container_op

from datetime import datetime

import sys
sys.path.insert(0, "..")
from constants import NAMESPACE, HOST
from utils.auth import get_session_cookie
from utils import helpers

### Define several constants

In [None]:
EXPERIMENT_NAME = "tutorial"
PIPELINE_NAME = "tutorial"
PIPELINE_VERSION = "0.0.1" # remember to change every run
PIPELINE_DESCRIPTION = "This is a tutorial pipeline"

### Create components from func

In [None]:
@func_to_container_op
def produce_one_small_output() -> str:
    return 'Hello world'

@func_to_container_op
def produce_two_small_outputs() -> NamedTuple('Outputs', [('text', str), ('number', int)]):
    return ("data 1", 42)

@func_to_container_op
def consume_two_arguments(text: str, number: int):
    print('Text={}'.format(text))
    print('Number={}'.format(str(number)))

### Create pipelines by connecting components

In [None]:
def producers_to_consumers_pipeline(text: str = "Hello world"):
    '''Pipeline that passes data from producer to consumer'''
    produce1_task = produce_one_small_output()
    produce2_task = produce_two_small_outputs()

    consume_task1 = consume_two_arguments(produce1_task.output, 42)
    consume_task2 = consume_two_arguments(text, produce2_task.outputs['number'])
    consume_task3 = consume_two_arguments(produce2_task.outputs['text'], produce2_task.outputs['number'])

### Run pipelines

1. First, we define the client to interact with kubeflow API. We use session cookie in this case for authentication.

In [None]:
session_cookie = get_session_cookie()
client = kfp.Client(
    host=f"{HOST}/pipeline",
    cookies=f"authservice_session={session_cookie}",
    namespace=NAMESPACE,
)

2. Next, compile the pipeline into YAML, upload it to the pipeline store, and run

In [None]:
pipeline_package_path = f"pipeline_{PIPELINE_VERSION}.yaml"
kfp.compiler.Compiler().compile(
    pipeline_func=producers_to_consumers_pipeline, package_path=pipeline_package_path
)
# get experiment ID
experiment = helpers.get_or_create_experiment(client, name=EXPERIMENT_NAME)
pipeline = helpers.get_or_create_pipeline(
    client,
    pipeline_name=PIPELINE_NAME,
    version=PIPELINE_VERSION,
    pipeline_description=PIPELINE_DESCRIPTION
)
now = datetime.now().strftime("%Y%m%d%H%M%S")
client.run_pipeline(
    experiment_id=experiment.id,
    job_name=f"{PIPELINE_NAME} {PIPELINE_VERSION} {now}",
    version_id=pipeline.id,
)

3. Another way is to run directly from notebook (not recommended for prod)

In [None]:
client.create_run_from_pipeline_func(producers_to_consumers_pipeline, 
            arguments={}, 
            experiment_name=EXPERIMENT_NAME
)

4. Create a recurring run with a single command

In [None]:
# Dont forget to disable recurring run in case you dont need anymore
client.create_recurring_run(
    experiment_id=experiment.id,
    job_name=f"{PIPELINE_NAME} {PIPELINE_VERSION} {now}",
    cron_expression="0 0 * * * *", # hourly
    version_id=pipeline.id,
)