In [None]:
# Install the packages
! pip3 install --upgrade google-cloud-aiplatform \
                        google-cloud-storage \
                        kfp \
                        google-cloud-pipeline-components

In [1]:
import kfp
import os

ModuleNotFoundError: No module named 'kfp'

In [None]:
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"


In [None]:
PROJECT_ID = os.getenv("PROJECT_ID")
REGION = os.getenv("REGION")
BUCKET_URI = os.getenv("BUCKET_URI")  # @param {type:"string"}
SERVICE_ACCOUNT = os.getenv("SERVICE_ACCOUNT")

In [None]:
# to make bucket in specifed region
! gsutil mb -l $REGION -p $PROJECT_ID $BUCKET_URI

In [None]:

! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_URI

! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_URI

In [None]:
from typing import NamedTuple

import google.cloud.aiplatform as aip
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import component

In [None]:
# API service endpoint
API_ENDPOINT = "{}-aiplatform.googleapis.com".format(REGION)

In [None]:
PIPELINE_ROOT = "{}/pipeline_root/intro".format(BUCKET_URI)


In [None]:
aip.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)

In [None]:
@component(output_component_file="hw.yaml", base_image="python:3.9")
def hello_world(text: str) -> str:
    print(text)
    return text

In [None]:
@component(packages_to_install=["google-cloud-storage"])
def two_outputs(
    text: str,
) -> NamedTuple(
    "Outputs",
    [
        ("output_one", str),  # Return parameters
        ("output_two", str),
    ],
):
    # the import is not actually used for this simple example, but the import
    # is successful, as it was included in the `packages_to_install` list.
    from google.cloud import storage  # noqa: F401

    o1 = f"output one from text: {text}"
    o2 = f"output two from text: {text}"
    print("output one: {}; output_two: {}".format(o1, o2))
    return (o1, o2)

In [None]:
@component
def consumer(text1: str, text2: str, text3: str):
    print(f"text1: {text1}; text2: {text2}; text3: {text3}")

In [None]:
@dsl.pipeline(
    name="intro-pipeline-unique",
    description="A simple intro pipeline",
    pipeline_root=PIPELINE_ROOT,
)
def pipeline(text: str = "hi there"):
    hw_task = (hello_world(text).
        set_cpu_limit('1').
        set_memory_limit('3G')
    )
    
    
    two_outputs_task = (two_outputs(text).
        set_cpu_limit('1').
        set_memory_limit('3G')
    )
    
    consumer_task = (consumer(  # noqa: F841
        hw_task.output,
        two_outputs_task.outputs["output_one"],
        two_outputs_task.outputs["output_two"],
    ).
    set_cpu_limit('1').
    set_memory_limit('3G')
    )
    

In [None]:
from kfp.v2 import compiler  # noqa: F811

compiler.Compiler().compile(pipeline_func=pipeline, package_path="intro_pipeline.json")

In [None]:
# This works

DISPLAY_NAME = "intro_pipeline_job_unique"

job = aip.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path="intro_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
)

job.run()