In [1]:
import kfp
from kfp import dsl
from kfp import components as comp

In [2]:
def cifa10_preprocessing(img_path: str, f: comp.OutputTextFile()):
# def cifa10_preprocessing(img_path: str):
    import cv2
    import numpy as np
    import tensorflow as tf
    
    img = cv2.imread(img_path)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = img.reshape(-1,3).T.reshape(-1)
    img = tf.convert_to_tensor([img], np.uint8)
    img = tf.reshape(img, (-1, 3, 32, 32))
    img = tf.transpose(img, (0, 2, 3, 1))
    img = tf.cast(img, tf.float32)
    img = img / 255
    
    f.write(str(img)) # cast to str
#     return img

In [3]:
def cifa10_model_run(f: comp.InputTextFile()):
    import tensorflow as tf
    
    img = tf.convert_to_tensor(f.read())
    #TODO kfserving inference service run
    return img

In [11]:
cifa10_prep_op = comp.func_to_container_op(cifa10_preprocessing,
                                           base_image='tensorflow/tensorflow:2.1.0-py3',
                                          output_component_file='cifa10_prep_component.yaml',
                                          packages_to_install=['opencv-python==4.5.1.48'],
                                              )
cifa10_run_op = comp.func_to_container_op(cifa10_model_run,
                                         output_component_file='cifa10_model_run_component.yaml')

In [12]:
@dsl.pipeline(
    name='cifa10-run',
    description='cifa10 model run pipeline')
def cifa10_pipeline(img_path):
    preprocessing_task = cifa10_prep_op(img_path)
    print(preprocessing_task.output)
    model_run_task = cifa10_run_op(preprocessing_task.output)

In [13]:
kfp.compiler.Compiler().compile(cifa10_pipeline, 'cifa10_pipeline2.yaml')

{{pipelineparam:op=cifa10-preprocessing;name=f}}


In [7]:
import kfp
from kfp import dsl
from kfp import components as comp


def add(a: float, b: float, f: comp.OutputTextFile()):
    '''Calculates sum of two arguments'''
    sum_ = a + b
    f.write(str(sum_)) # cast to str
    return sum_


def multiply(c: float, d: float, f: comp.InputTextFile()):
    '''Calculates the product'''
    in_ = float(f.read()) # cast to float
    product = c * d * in_
    print(product)
    return product


add_op = comp.func_to_container_op(add,
                                   output_component_file='add_component.yaml')
product_op = comp.create_component_from_func(
    multiply, output_component_file='multiple_component.yaml')


@dsl.pipeline(
    name='Addition-pipeline',
    description='An example pipeline that performs addition calculations.')
def my_pipeline(a, b='7', c='4', d='1'):
    first_add_task = add_op(a, b)
    second_add_task = product_op(c, d, first_add_task.output)

In [57]:
kfp.compiler.Compiler().compile(my_pipeline, 'test.yaml')

In [None]:
'/home/jovyan/data-vol-1/team5-models/cifa10/test_images/3991.jpeg'

In [65]:
!pip list

Package                  Version
------------------------ ----------
absl-py                  0.9.0
adal                     1.2.5
ansiwrap                 0.8.4
anyio                    3.5.0
appdirs                  1.4.4
argon2-cffi              20.1.0
asgiref                  3.4.1
asn1crypto               1.4.0
astor                    0.8.1
async-generator          1.10
attrs                    19.3.0
autopep8                 1.5.4
azure-common             1.1.26
azure-core               1.10.0
azure-mgmt-core          1.2.2
azure-mgmt-storage       16.0.0
azure-storage-blob       2.1.0
azure-storage-common     2.1.0
azure-storage-file       2.1.0
backcall                 0.1.0
black                    20.8b1
bleach                   3.1.0
boto3                    1.15.18
botocore                 1.18.18
bump2version             1.0.1
bumpversion              0.6.0
cachetools               4.0.0
certifi                  2019.11.28
cffi                     1.14.4
chardet          