In [1]:
from kfp import dsl, components
from disdat_kfp.caching_wrapper import Caching
import kfp
import os

In [None]:
BASE_IMAGE = 'python:3.8.11-slim' # feel free to use other python images 
YOUR_S3_BUCKET = YOUR_BUCKET # for instance s3://hello-bucket. Make sure your EKS cluster has access to this bucket!
PIPELINE_NAME = 'simple_versioned_workflow'
SERVICE_ACCOUNT = YOUR_ROLE # for instance 'mlp-pipelines'

### User ContainerOp 1
This component produces a file and return some numbers

In [3]:
def file_producer(char_num: int, data: components.OutputTextFile(str)):
    data.write('*' * char_num)
# create a container op from pure python function 
# KFP compiler will generate the YAML for you. 
producer_op = components.create_component_from_func(file_producer,
                                                    base_image=BASE_IMAGE)

### User ContainerOp 2
This component consumes the file produced by container 1 and count the number of chars

In [4]:
def file_consumer(data_handle: components.InputTextFile(str)) -> int:
    data = data_handle.read()
    return len(data)
consumer_op = components.create_component_from_func(file_consumer,
                                                    base_image=BASE_IMAGE)

### User Container 3
This is a random container to show you how enable_caching work with force dependencies 

In [5]:
def hello(msg: str='123') -> str:
    print('hello')
    return msg

hello_op = components.create_component_from_func(hello,
                                                    base_image=BASE_IMAGE)

### built workflow with disdat-kfp

In [17]:

@dsl.pipeline(
    name=PIPELINE_NAME,
    description="file passing pipeline"
)
def pipeline():
    caching = Caching(disdat_context=PIPELINE_NAME,
                      disdat_repo_s3_url='s3://' + YOUR_S3_BUCKET, 
                      force_rerun_pipeline=False, 
                      use_verbose=True)
    
    
    hello_msg = hello_op() 
    
    # enable caching works like a decorator 
    cached_producer = caching.enable_caching(producer_op, 
                                             char_num=1000, 
                                             # if False to enable cacaching
                                             _disdat_force_rerun=False, 
                                             # your own bundle name
                                             _disdat_bundle='file_producer_bundle', 
                                             # you can force dependency like this 
                                             _after=[hello_msg])
    
    # pass outputs like you normally would, cached_producer will have the same 
    # output signature as file_producer
    consumer_op(cached_producer.outputs['data'])
    
    # you can have two cascading cached components as well
    cached_consumer = caching.enable_caching(consumer_op, 
                                             data_handle=cached_producer.outputs['data'], 
                                             _disdat_force_rerun=False, 
                                             _disdat_bundle='file_consumer_bundle')
    # note that we have a cached consumer and a normal consumer. 
    # This is to show you that you can treat cached_container_op as a normal component

### Compile the workflow and extract the YAML

In [14]:
kfp.compiler.Compiler().compile(pipeline, 'pipeline.yaml')

### Run the workflow 
I use Argo to execute KFP, you are free to choose other platforms

In [16]:
os.system('argo submit --serviceaccount {} pipeline.yaml'.format(SERVICE_ACCOUNT))

### Results
Below are images captured by Argo GUI, which is the default KFP platform at Intuit

First execution:
![run_1](docs/run_1.png)


Second execution:
![run_2](docs/run_2.png)

If you rerun the pipeline with exactly the same parameters, file_producer and consumer will be skipped