In [None]:
import kfp
from kfp.dsl import Input, Output, Artifact # https://www.kubeflow.org/docs/components/pipelines/v2/data-types/artifacts/
from google.cloud import aiplatform
from typing import Union, List

In [None]:
PROJECT_ID = <your-project-id>
! gcloud config set project {PROJECT_ID}
REGION = "us-central1"
BUCKET_URI = "gs://lh-sandbox"
! gsutil mb -l {REGION} -p {PROJECT_ID} {BUCKET_URI}
shell_output = !gcloud auth list
SERVICE_ACCOUNT = shell_output[2].replace("*", "").strip()
! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_URI
! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_URI
BASE_IMAGE = f"{REGION}-docker.pkg.dev/{PROJECT_ID}/lh-sandbox/kfp_tutorial:latest"

In [None]:
dockerfile = f"""FROM us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-3:latest
RUN python -m pip install --upgrade pip kfp"""
!echo "$dockerfile" > Dockerfile.tutorial
# !gcloud auth configure-docker us-central1-docker.pkg.dev
!docker build -t $BASE_IMAGE -f Dockerfile.tutorial .
!docker push $BASE_IMAGE

In [None]:
# decorator review
def run_n_times(n):
    def decorator(func):
        def wrapper(*args, **kwargs): # wrapper should have same args as func; if unknown use *args, **kwargs
            for _ in range(n):
                func(*args, **kwargs)
        return wrapper
    return decorator

@run_n_times(4)
def print_once(string):
    print(string)
# equivalent to
# print_once = run_n_times(4)(print_once)

print_once("hello world")

In [None]:
@kfp.dsl.component(base_image=BASE_IMAGE) #, packages_to_install=['kfp']) # won't work bc not connected to internet
def A(d: dict, f: float, s: str) -> list:
    print(d)
    square = f ** 2
    return [square, s] # can return int, float, bool, str, dict, list; other types not supported i.e. tuple

@kfp.dsl.component(base_image=BASE_IMAGE)
def B(l: list, b: bool, a: Output[Artifact]) -> int:
    import os
    
    print(l)
    
    print(a.path) # defaults to blah/blah/<name-of-variable> -> blah/blah/a
    # path be used as either a file OR directory; can't do both

    if b: # use path as file
        with open(a.path, 'w') as file:
            file.write(l[1]) # file will be named <name-of-variable> -> a

    if not b: # use path as directory
        os.makedirs(a.path, exist_ok=True)
        with open(os.path.join(a.path, 'f1.txt'), 'w') as file:
            file.write(l[1])
        with open(os.path.join(a.path, 'f2.txt'), 'w') as file:
            file.write(l[1])

    return int(l[0])

@kfp.dsl.component(base_image=BASE_IMAGE)
def C(a: Input[Artifact], i: int):
    print('path:', a.path) # used in code
    print('uri:', a.uri) # hyperlink to gcp ui
    print('metadata:', a.metadata)
    print('name:', a.name) # cannot be overwritten

In [None]:
@kfp.dsl.pipeline
def pipeline(d: dict, f: float, s: str):
    task1 = A(d=d, f=f, s=s)
    print(task1.output)
    print(task1.outputs)
    print()
    # Note: task1.output == task1.outputs['Output']
    
    # https://cloud.google.com/vertex-ai/docs/pipelines/machine-types
    task2 = B(l=task1.output, b=False).set_cpu_limit('1').set_memory_limit('4G') # .add_node_selector_constraint('NVIDIA_TESLA_V100').set_gpu_limit('1')
    # print(task2.output) # errors bc component B has > 1 outputs: ('e', 'Output')
    print(task2.outputs)
    print()
    
    task3 = C(a=task2.outputs['a'], i=task2.outputs['Output'])
    # print(task3.output) # errors bc component C has 0 outputs
    print(task3.outputs)

In [None]:
compiler = kfp.compiler.Compiler()
compiler.compile(
    pipeline_func=pipeline, package_path="tutorial.yaml"
)

In [None]:
job = aiplatform.PipelineJob(
    display_name="tutorial",
    template_path="./tutorial.yaml",
    pipeline_root=f"{BUCKET_URI}/tutorial", # where component outputs are stored during pipeline runs
    parameter_values={ # what to pass into kfp.dsl.pipeline arguments
        'd': {'hello': 'world'},
        'f': 2.0,
        's': 'hi',
    },
    enable_caching=False # rerun pipeline tasks each time instead of using cache
)

In [None]:
job.run()