In [None]:
!pip install kfp_tekton

In [None]:
import kfp
import kfp_tekton

In [None]:
def gather_and_prepare_data() -> str:

  import pandas

  print('Downloading dataset...')

  dataset = pandas.read_csv('https://storage.googleapis.com/download.tensorflow.org/data/abalone_train.csv')
  dataset.head()

  print('Saving to workspace...')

  # TODO

  print('Done!')

  return 'success'

gather_and_prepare_data_op = kfp.components.create_component_from_func(
  func = gather_and_prepare_data,
  base_image = 'python:3.9',
  packages_to_install = ['pandas']
)

In [None]:
def develop_model(data_status: str) -> str:

  print('Checking dataset...')

  if data_status != 'success':
    raise Exception('DataError')

  import tensorflow
  print('Tensorflow version: %s' % tensorflow.__version__)

  print('Creating model...')

  model = tensorflow.keras.models.Sequential([
    tensorflow.keras.layers.Dense(64, activation = 'relu'),
    tensorflow.keras.layers.Dense(1, activation = 'sigmoid')
  ])

  print('Training model...')

  # TODO

  print('Done!')

  return 'success'

develop_model_op = kfp.components.create_component_from_func(
  func = develop_model,
  base_image = 'tensorflow/tensorflow:2.2.3-py3'
)

In [None]:
@kfp.components.create_component_from_func
def deploy_model(model_status: str, environment: str) -> str:

  print('Checking model...')

  if model_status != 'success':
    raise Exception('ModelError')

  print('Deploying model to %s environment...' % environment)

  # TODO

  print('Done!')

  return 'success'

In [None]:
from typing import NamedTuple

@kfp.components.create_component_from_func
def test_model(
  model_deployment_status: str,
  environment: str
) -> NamedTuple('VisualizationOutput', [('mlpipeline_ui_metadata', 'UI_metadata')]):

  print('Checking model deployment...')

  if model_deployment_status != 'success':
    raise Exception('ModelDeploymentError')

  print('Testing model...')

  import json
  from collections import namedtuple

  metadata = {
    'outputs': [{
      'type': 'roc',
      'format': 'csv',
      'schema': [
        { 'name': 'fpr', 'type': 'NUMBER' },
        { 'name': 'tpr', 'type': 'NUMBER' },
        { 'name': 'thresholds', 'type': 'NUMBER'}
      ],
      'source': 'https://raw.githubusercontent.com/kubeflow/pipelines/master/samples/core/visualization/roc.csv'
    }]
  }

  visualization_output = namedtuple('VisualizationOutput', ['mlpipeline_ui_metadata'])
  output = visualization_output(json.dumps(metadata))

  print('Done!')

  return output

In [None]:
@kfp.dsl.pipeline(
  name = 'TDC Pipeline',
  description = 'A very nice Pipeline for TDC'
)
def tdc_pipeline(environment: str):

  gather_and_prepare_data_task = gather_and_prepare_data_op()
  develop_model_task = develop_model_op(gather_and_prepare_data_task.output)
  deploy_model_task = deploy_model(develop_model_task.output, environment)
  test_model_task = test_model(deploy_model_task.output, environment)

In [None]:
pipeline_config = {
  'kubeflow_url': '<KubeFlow URL>/pipeline',
  'kubeflow_namespace': None
}

pipeline_arguments = {
  'environment': 'development'
}

In [None]:
client = kfp_tekton.TektonClient(host = pipeline_config['kubeflow_url'])

client.create_run_from_pipeline_func(
  tdc_pipeline,
  namespace = pipeline_config['kubeflow_namespace'],
  arguments = pipeline_arguments
)