参考:   
https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/pipelines_intro_kfp.ipynb

## Define constants

In [None]:
from datetime import datetime
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
# -> '20220305052457'

In [None]:
PROJECT_ID = "" 
REGION = "us-central1"
BUCKET_NAME = "gs://"
PIPELINE_ROOT = "{}/pipeline_root/hello_world".format(BUCKET_NAME)
API_ENDPOINT = "{}-aiplatform.googleapis.com".format(REGION)
# -> 'us-central1-aiplatform.googleapis.com'
DISPLAY_NAME = "hello_world_" + TIMESTAMP

## Define components and a pipeline

In [None]:
# Set up your Google Cloud project
!gcloud config set project $PROJECT_ID

In [None]:
import google.cloud.aiplatform as aip

In [None]:
from typing import NamedTuple

from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import component

In [None]:
# Set up your Google Cloud project
aip.init(project=PROJECT_ID, staging_bucket=BUCKET_NAME)

In [None]:
@component(base_image="python:3.9")
def hello_world(text: str) -> str:
    print(text)
    return text

In [None]:
@component(packages_to_install=["google-cloud-storage"])
def two_outputs(
    text: str,
) -> NamedTuple(
    "Outputs",
    [
        ("output_one", str),  # Return parameters
        ("output_two", str),
    ],
):
    # the import is not actually used for this simple example, but the import
    # is successful, as it was included in the `packages_to_install` list.
    from google.cloud import storage  # noqa: F401

    o1 = f"output one from text: {text}"
    o2 = f"output two from text: {text}"
    print("output one: {}; output_two: {}".format(o1, o2))
    return (o1, o2)

In [None]:
@component
def consumer(text1: str, text2: str, text3: str):
    print(f"text1: {text1}; text2: {text2}; text3: {text3}")

In [None]:
@dsl.pipeline(
    name="hello-world",
    description="A simple examlple pipeline",
    pipeline_root=PIPELINE_ROOT,
)
def pipeline(text: str = "hi there"):
    hello_world_task = hello_world(text)
    two_outputs_task = two_outputs(text)
    consumer_task = consumer(  # noqa: F841
        hello_world_task.output,
        two_outputs_task.outputs["output_one"],
        two_outputs_task.outputs["output_two"],
    )

## Compile the pipeline

In [None]:
from kfp.v2 import compiler  # noqa: F811

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="pipeline_hello_world.json".replace(" ", "_")
)

## Run the pipeline

In [None]:
job = aip.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path="pipeline_hello_world.json".replace(" ", "_"),
    pipeline_root=PIPELINE_ROOT,
)

job.run()