## Introduction

Author: Yingding Wang\
Created on: 17.11.2023

Example KFP v2 pipeline to use PVC https://pypi.org/project/kfp-kubernetes/

In [1]:
import sys

In [2]:
# !{sys.executable} -m pip install --upgrade --user kfp[kubernetes]==2.6.0
# installs the following packages
!{sys.executable} -m pip install --upgrade --user kfp==2.6.0 kfp-kubernetes==1.1.0 kfp-pipeline-spec==0.3.0 kfp-server-api==2.0.5



In [3]:
!{sys.executable} -m pip list | grep kfp

kfp                           2.6.0
kfp-kubernetes                1.1.0
kfp-pipeline-spec             0.3.0
kfp-server-api                2.0.5


In [4]:
#!{sys.executable} -m pip install --user --upgrade kfp[kubernetes]==2.4.0 kfp-server-api==2.0.5

In [5]:
from kfp.dsl.pipeline_task import PipelineTask

def set_res_limit(task: PipelineTask, mem_req="200Mi", cpu_req="2000m", mem_lim="4000Mi", cpu_lim='4000m') -> PipelineTask:
    """set the resource limit for cpu and memory, no cpu and memory requirement sofar.
    should the limit is set to small, the Task Pod would be stopped by kubernetes with OOMKilled status.
    
    Args:
        task(PipelineTask): the KFP PipelineTask which need to be set the cpu and memory limits
        cpu_limit(str): the str representation of cpu limit e.g. '1' as one cpu time, '0.5' as 1/2 cpu time
        mem_limit(str): the str representation of memory limit e.g. '500M' for 500MB RAM
        
    Return:
        (PipelineTask): the PipelineTask with the desired limitations set
    """
    # return task.set_cpu_limit('1').set_memory_limit('500M')
    return task.set_cpu_request(cpu_req)\
            .set_cpu_limit(cpu_lim)\
            .set_memory_request(mem_req)\
            .set_memory_limit(mem_lim)

In [6]:
from kfp import dsl
from kfp import kubernetes

@dsl.component(
base_image='python:3.10.13-bullseye'
)
def make_data():
    with open('/data/file.txt', 'w') as f:
        f.write('my data')

@dsl.component(
base_image='python:3.10.13-bullseye'
)
def read_data():
    with open('/reused_data/file.txt') as f:
        print(f.read())

@dsl.pipeline
def my_pipeline():
    pvc1 = kubernetes.CreatePVC(
        # can also use pvc_name instead of pvc_name_suffix to use a pre-existing PVC
        pvc_name_suffix='-my-pvc',
        access_modes=['ReadWriteOnce'],
        size='1Gi',
        storage_class_name='kubeflow-nfs-csi', # use the name of your available storage class      
    )

    task1 = make_data()
    task1 = set_res_limit(task=task1, cpu_req='500m', mem_req='200M')
    # normally task sequencing is handled by data exchange via component inputs/outputs
    # but since data is exchanged via volume, we need to call .after explicitly to sequence tasks
    task2 = read_data().after(task1)
    task2 = set_res_limit(task=task2, cpu_req='500m', mem_req='200M')
    
    kubernetes.mount_pvc(
         task1,
         pvc_name=pvc1.outputs['name'],
         mount_path='/data',
    )
    kubernetes.mount_pvc(
         task2,
         pvc_name=pvc1.outputs['name'],
         mount_path='/reused_data',
    )

    # wait to delete the PVC until after task2 completes
    delete_pvc1 = kubernetes.DeletePVC(
        pvc_name=pvc1.outputs['name']).after(task2)

In [7]:
from kfp import compiler
import os

my_pipeline_file_name = "pvc_pipeline"
pipeline_path_dir = "./compiled"

if not os.path.exists(pipeline_path_dir):
    os.makedirs(pipeline_path_dir)

compiler.Compiler().compile(
    pipeline_func=my_pipeline,
    package_path=f"{pipeline_path_dir}/{my_pipeline_file_name}.yaml"
)

In [8]:
from kfp.client import Client
import warnings

'''suppress kfp v2 client FutureWarning
https://stackoverflow.com/questions/14463277/how-to-disable-python-warnings/14463362#14463362
'''
with warnings.catch_warnings(action="ignore", category=FutureWarning):
    # kubeflow pipeline poddefault is passing the credential to the client
    client = Client()
    
NAMESPACE = client.get_user_namespace()
print(NAMESPACE)

kubeflow-kindfor


## Enable caching false is causing issue with PVC creationg
* https://github.com/kubeflow/pipelines/issues/10188

Note:
* this error will crash the kfp backend

You need to `kubectl -n kubeflow rollout restart deployment` to restart the container.

In [9]:
ENABLE_CACHING = True
# ENABLE_CACHING = False
EXPERIMENT_NAME = "demo"

run = client.create_run_from_pipeline_func(
    pipeline_func=my_pipeline,
    arguments = {},
    run_name="my pvc test",
    experiment_name = EXPERIMENT_NAME,
    namespace=NAMESPACE,
    enable_caching=ENABLE_CACHING,
)
run

RunPipelineResult(run_id=bbd662c1-d98e-45f5-8a31-8b18bf162ad1)