## Vertex AI Pipelines with Kubeflow Pipelines (Basic components)
- Written by Takashi Nakamura
- Vertex AI documentation: https://cloud.google.com/vertex-ai/docs
- Kubeflow documentation: https://www.kubeflow.org/docs/

### Import relevant package

In [None]:
import os
import requests
from datetime import datetime

import kfp

from kfp.v2.dsl import pipeline
from kfp.v2 import compiler

import google.cloud.aiplatform as aiplatform

### -1. Pre-requisite
- Create a GCP account
- Enable relevant APIs, services (e.g., Kubernetes, vertex AI, etc)

### 0. Parameters for GCP and kubeflow

In [None]:
# NOTE: it's not the best practice
PROJECT_ID = ""  # Your project id
SERVICE_ACCOUNT = ""  # assume XXXX@YYYYY.iam.gserviceaccount.com

In [None]:
# Instantiate aiplatform (Vertex AI)
aiplatform.init(project=PROJECT_ID, staging_bucket=PIPELINE_ROOT)

PIPELINE_NAME = "vertex-ai-kfp-gcp-demo-notebook"

In [None]:
# Base image
PYTHON_BASE = "python:3.10"

### 1. Basic: simple example
- Change the component name
- Provide machine spec to run on Vertex AI
- Create the order of the components
- Python version is 3.7 (by default?)

In [None]:
@kfp.v2.dsl.component()
def produce_msg_op(my_name: str) -> str:
    return f"Hello, {my_name}!"

@kfp.v2.dsl.component(base_image=PYTHON_BASE)
def print_msg_op(my_msg: str):
    print(my_msg)
    
@kfp.v2.dsl.component()
def check_nvidia_smi_op():
    import subprocess
    subprocess.run("nvidia-smi", shell=True)

@kfp.v2.dsl.pipeline(name="pipeline1")
def my_pipeline_1():
    """
    Pipeline code
    """
    # 1st component
    msg_task_0 = produce_msg_op("GCP Vertex AI Learning Team").set_display_name("component0")
    # 2nd component
    print_msg_task_0 = print_msg_op(msg_task_0.output).set_display_name("component1").set_cpu_limit("5")
    # 3rd component
    print_msg_op("This is component2!").after(print_msg_task_0).set_display_name("component2").set_memory_limit("10")
    
    # Different way to create the pipeline relationship
    msg_task_1 = produce_msg_op("GCP Vertex AI Learning Team").set_display_name("component3")
    print_msg_task_1 = produce_msg_op(msg_task_1.output).set_display_name("component4")
    print_msg_task_2 = produce_msg_op("This is component5!").set_display_name("component5")
    # Explicitly mention the relationshop
    print_msg_task_1.after(msg_task_1)
    print_msg_task_2.after(print_msg_task_1)
    
    # check_nvidia_smi_op().set_display_name("component3").add_node_selector_constraint("cloud.google.com/gke-accelerator", "NVIDIA_TESLA_T4").set_gpu_limit(1)


In [None]:
# Compile pipeline
pipeline_detail_json = "pipeline_1.json"
compiler.Compiler().compile(pipeline_func=my_pipeline_1, package_path=pipeline_detail_json)

# Check the current time
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
JOBID = f"training-pipeline-{TIMESTAMP}"

# Pipeline job
pipeline_ = aiplatform.pipeline_jobs.PipelineJob(
    enable_caching=False,
    display_name=PIPELINE_NAME,
    template_path=pipeline_detail_json,
    job_id=JOBID,
    parameter_values={},
)

# Submit the job
pipeline_.submit(service_account=SERVICE_ACCOUNT)

### 2. Run the component after the completion
- ```kfp.v2.dsl.ExitHandler``` supports to check the completion
- ```kfp.v2.dsl.PipelineTaskFinalStatus``` can configure to run certain tasks if the previous task failed 
    - NB: But currently has some bugs?
    - https://github.com/kubeflow/pipelines/issues/8649

In [None]:
from kfp.v2.dsl import PipelineTaskFinalStatus

In [None]:
@kfp.v2.dsl.component(base_image=PYTHON_BASE)
def exit_func(my_msg: str, my_status: PipelineTaskFinalStatus):
    print(my_msg)
    print("my_status.state", my_status.state)
    print("my_status.error_code", my_status.error_code)
    print("my_status.error_message", my_status.error_message)

@kfp.v2.dsl.component(base_image=PYTHON_BASE)
def raise_value_error():
    raise ValueError("Raised ValueError")

@kfp.v2.dsl.pipeline(name="pipeline2")
def my_pipeline_2():
    exit_task_0 = exit_func("Run exit_op for Value Error")
    
    # Value error
    with kfp.v2.dsl.ExitHandler(exit_op=exit_task_0):
        previous_task_0 = raise_value_error()

In [None]:
# Compile pipeline
pipeline_detail_json = "pipeline_2.json"
compiler.Compiler().compile(pipeline_func=my_pipeline_2, package_path=pipeline_detail_json)

# Check the current time
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
JOBID = f"training-pipeline-{TIMESTAMP}"

# Pipeline job
pipeline_ = aiplatform.pipeline_jobs.PipelineJob(
    enable_caching=False,
    display_name=PIPELINE_NAME,
    template_path=pipeline_detail_json,
    job_id=JOBID,
    parameter_values={},
)

# Submit the job
pipeline_.submit(service_account=SERVICE_ACCOUNT)

### 3. Condition
- If-Not confition for components using ```kfp.v2.dsl.Condition```

In [None]:
@kfp.v2.dsl.component(base_image=PYTHON_BASE)
def produce_random_num_func() -> float:
    import random
    return random.random()

@kfp.v2.dsl.component(base_image=PYTHON_BASE)
def print_val_op(val: float):
    print(val)

@kfp.v2.dsl.component(base_image=PYTHON_BASE)
def raise_value_error(val: float):
    raise ValueError(f"The provided value, {val}, must be greater than 0.5")

@kfp.v2.dsl.pipeline(name="pipeline3")
def my_pipeline_3():
    produce_task_0 = produce_random_num_func()
    # Condition 1
    with kfp.v2.dsl.Condition(produce_task_0.output>0.5, name="Provided value greater than"):
        print_val_op(produce_task_0.output)
    # Condition 2
    with kfp.v2.dsl.Condition(produce_task_0.output<=0.5, name="Provided value smaller than"):
        raise_value_error(produce_task_0.output)

In [None]:
# Compile pipeline
pipeline_detail_json = "pipeline_3.json"
compiler.Compiler().compile(pipeline_func=my_pipeline_3, package_path=pipeline_detail_json)

# Check the current time
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
JOBID = f"training-pipeline-{TIMESTAMP}"

# Pipeline job
pipeline_ = aiplatform.pipeline_jobs.PipelineJob(
    enable_caching=False,
    display_name=PIPELINE_NAME,
    template_path=pipeline_detail_json,
    job_id=JOBID,
    parameter_values={},
)

# Submit the job
pipeline_.submit(service_account=SERVICE_ACCOUNT)

### 4. Run jobs parallelly
- a) ```kfp.v2.dsl.ParallelFor```
- b) A normal python ```for``` loop

In [None]:
@kfp.v2.dsl.component(base_image=PYTHON_BASE)
def produce_num_func(val: int) -> float:
    import random
    return val * random.random()

@kfp.v2.dsl.pipeline(name="pipeline4")
def my_pipeline_4():
    num_ls = [i for i in range(5)]
    
    # Parallel     
    with kfp.v2.dsl.ParallelFor(num_ls) as v:
        produce_num_func(v).set_display_name(f"Produce number")
        
    # NB: We are able to use a normal for loop
    for v in num_ls:
        produce_num_func(v)

In [None]:
# Compile pipeline
pipeline_detail_json = "pipeline_4.json"
compiler.Compiler().compile(pipeline_func=my_pipeline_4, package_path=pipeline_detail_json)

# Check the current time
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
JOBID = f"training-pipeline-{TIMESTAMP}"

# Pipeline job
pipeline_ = aiplatform.pipeline_jobs.PipelineJob(
    enable_caching=False,
    display_name=PIPELINE_NAME,
    template_path=pipeline_detail_json,
    job_id=JOBID,
    parameter_values={},
)

# Submit the job
pipeline_.submit(service_account=SERVICE_ACCOUNT)