In [1]:
from typing import NamedTuple

import kfp
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                        OutputPath, ClassificationMetrics, Metrics, component)
from kfp.v2.google.client import AIPlatformClient

from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip

In [1]:
!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

KFP SDK version: 1.8.22
google_cloud_pipeline_components version: 0.1.1


In [3]:
BUCKET_NAME="gs://bucket-test1-pipeline"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"

In [4]:
aiplatform.init(project="annular-weaver-428312-s3", location="us-central1")

@component(base_image="python:3.11", output_component_file="first-component.yaml")
def product_name(text: str) -> str:
    return text

In [5]:
@component(base_image="python:3.11", output_component_file="second-component.yaml", packages_to_install=["emoji"])
def emoji(
    text: str,
) -> NamedTuple(
    "Outputs",
    [
        ("emoji_text", str),  
        ("emoji", str),
    ],
):
    import emoji

    emoji_text = text
    emoji_str = emoji.emojize(':' + emoji_text + ':', language='alias')
    print("output one: {}; output_two: {}".format(emoji_text, emoji_str))
    return (emoji_text, emoji_str)

In [6]:
@component(base_image="python:3.11", output_component_file="third-component.yaml")
def build_sentence(
    product: str,
    emoji: str,
    emojitext: str
) -> str:
    print("We completed the pipeline, hooray!")
    end_str = product + " is "
    if len(emoji) > 0:
        end_str += emoji
    else:
        end_str += emojitext
    return(end_str)

In [7]:
@dsl.pipeline(
    name="hello-world",
    description="An intro pipeline",
    pipeline_root=PIPELINE_ROOT,
)

def intro_pipeline(text: str = "Vertex Pipelines", emoji_str: str = "sparkles"):
    product_task = product_name(text)
    emoji_task = emoji(emoji_str)
    consumer_task = build_sentence(
        product_task.output,
        emoji_task.outputs["emoji"],
        emoji_task.outputs["emoji_text"],
    )

In [8]:
compiler.Compiler().compile(
    pipeline_func=intro_pipeline, package_path="intro_pipeline_job.json"
)



In [10]:
job = aiplatform.PipelineJob(
    display_name="intro_pipeline",
    template_path="intro_pipeline_job.json",
    pipeline_root=PIPELINE_ROOT,
)

job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/370200532108/locations/us-central1/pipelineJobs/hello-world-20240722190808
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/370200532108/locations/us-central1/pipelineJobs/hello-world-20240722190808')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/hello-world-20240722190808?project=370200532108
PipelineJob run completed. Resource name: projects/370200532108/locations/us-central1/pipelineJobs/hello-world-20240722190808


In [11]:
job.delete()

Deleting PipelineJob : projects/370200532108/locations/us-central1/pipelineJobs/hello-world-20240722190808
PipelineJob deleted. . Resource name: projects/370200532108/locations/us-central1/pipelineJobs/hello-world-20240722190808
Deleting PipelineJob resource: projects/370200532108/locations/us-central1/pipelineJobs/hello-world-20240722190808
Delete PipelineJob backing LRO: projects/370200532108/locations/us-central1/operations/339137523999571968
PipelineJob resource projects/370200532108/locations/us-central1/pipelineJobs/hello-world-20240722190808 deleted.
