In [None]:
# Copyright 2024 Forusone
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Lightweight KFP Pipelines
* In this tutorial, you learn to use the KFP SDK to build lightweight Python function-based components, and then you learn to use Vertex AI Pipelines to execute the pipeline.
* This lab simplifies the original notebook [Lightweight kfp](https://colab.sandbox.google.com/github/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/lightweight_functions_component_io_kfp.ipynb)

## Install Vertex AI SDK for Python and other required packages

In [2]:
%pip install --user --quiet google-cloud-aiplatform \
                                 google-cloud-storage \
                                 kfp \
                                 "numpy<2" \
                                 google-cloud-pipeline-components

In [3]:
# @title Authentication to GCP
import sys

if "google.colab" in sys.modules:
    from google.colab import auth
    auth.authenticate_user()

In [4]:
# @title Set GCP information
PROJECT_ID = "ai-hangsik"  # @param {type:"string"}
LOCATION = "us-central1"  # @param {type:"string"}

In [5]:
# @title Create a bucket.
BUCKET_URI = f"gs://mlops-{PROJECT_ID}-1209"
! gsutil mb -l {LOCATION} -p {PROJECT_ID} {BUCKET_URI}

Creating gs://mlops-ai-hangsik-1209/...
ServiceException: 409 A Cloud Storage bucket named 'mlops-ai-hangsik-1209' already exists. Try another name. Bucket names must be globally unique across all Google Cloud projects, including those outside of your organization.


In [None]:
# @title Service account
shell_output = ! gcloud projects describe  $PROJECT_ID
project_number = shell_output[-1].split(":")[1].strip().replace("'", "")

SERVICE_ACCOUNT = f"{project_number}-compute@developer.gserviceaccount.com"

print(f"SERVICE_ACCOUNT: {SERVICE_ACCOUNT}")

SERVICE_ACCOUNT: 721521243942-compute@developer.gserviceaccount.com


In [None]:
! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_URI
! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_URI

No changes made to gs://mlops-ai-hangsik-1209/
No changes made to gs://mlops-ai-hangsik-1209/


In [None]:
# @title Import libraries
from typing import NamedTuple

import kfp
from google.cloud import aiplatform
from kfp import compiler, dsl
from kfp.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                     OutputPath, component)

In [None]:
# @title Pipelines constants
PIPELINE_ROOT = "{}/pipeline_root/shakespeare".format(BUCKET_URI)

In [None]:
# @title Initialize Vertex AI SDK
aiplatform.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)

In [None]:
# @title Define Python function-based pipeline components

@component(base_image="python:3.9")
def preprocess(
    # An input parameter of type string.
    message: str,
    # Use Output to get a metadata-rich handle to the output artifact
    # of type `Dataset`.
    output_dataset_one: Output[Dataset],
    # A locally accessible filepath for another output artifact of type
    # `Dataset`.
    output_dataset_two_path: OutputPath("Dataset"),
    # A locally accessible filepath for an output parameter of type string.
    output_parameter_path: OutputPath(str),
):
    """'Mock' preprocessing step.
    Writes out the passed in message to the output "Dataset"s and the output message.
    """
    output_dataset_one.metadata["hello"] = "there"
    # Use OutputArtifact.path to access a local file path for writing.
    # One can also use OutputArtifact.uri to access the actual URI file path.
    with open(output_dataset_one.path, "w") as f:
        f.write(message)

    # OutputPath is used to just pass the local file path of the output artifact
    # to the function.
    with open(output_dataset_two_path, "w") as f:
        f.write(message)

    with open(output_parameter_path, "w") as f:
        f.write(message)

In [None]:
# @title Define train component

@component(
    base_image="python:3.9",  # Use a different base image.
)
def train(
    # An input parameter of type string.
    message: str,
    # Use InputPath to get a locally accessible path for the input artifact
    # of type `Dataset`.
    dataset_one_path: InputPath("Dataset"),
    # Use InputArtifact to get a metadata-rich handle to the input artifact
    # of type `Dataset`.
    dataset_two: Input[Dataset],
    # Output artifact of type Model.
    imported_dataset: Input[Dataset],
    model: Output[Model],
    # An input parameter of type int with a default value.
    num_steps: int = 3,
    # Use NamedTuple to return either artifacts or parameters.
    # When returning artifacts like this, return the contents of
    # the artifact. The assumption here is that this return value
    # fits in memory.
) -> NamedTuple(
    "Outputs",
    [
        ("output_message", str),  # Return parameter.
        ("generic_artifact", Artifact),  # Return generic Artifact.
    ],
):
    """'Mock' Training step.
    Combines the contents of dataset_one and dataset_two into the
    output Model.
    Constructs a new output_message consisting of message repeated num_steps times.
    """

    # Directly access the passed in GCS URI as a local file (uses GCSFuse).
    with open(dataset_one_path) as input_file:
        dataset_one_contents = input_file.read()

    # dataset_two is an Artifact handle. Use dataset_two.path to get a
    # local file path (uses GCSFuse).
    # Alternately, use dataset_two.uri to access the GCS URI directly.
    with open(dataset_two.path) as input_file:
        dataset_two_contents = input_file.read()

    with open(model.path, "w") as f:
        f.write("My Model")

    with open(imported_dataset.path) as f:
        data = f.read()
    print("Imported Dataset:", data)

    # Use model.get() to get a Model artifact, which has a .metadata dictionary
    # to store arbitrary metadata for the output artifact. This metadata is
    # recorded in Managed Metadata and can be queried later. It also shows up
    # in the Google Cloud console.
    model.metadata["accuracy"] = 0.9
    model.metadata["framework"] = "Tensorflow"
    model.metadata["time_to_train_in_seconds"] = 257

    artifact_contents = "{}\n{}".format(dataset_one_contents, dataset_two_contents)
    output_message = " ".join([message for _ in range(num_steps)])
    return (output_message, artifact_contents)

In [None]:
# @title Define read_artifact_input component
@component(base_image="python:3.9")
def read_artifact_input(
    generic: Input[Artifact],
):
    with open(generic.path) as input_file:
        generic_contents = input_file.read()
        print(f"generic contents: {generic_contents}")

In [None]:
# @title Define a pipeline that uses your components and the Importer
@dsl.pipeline(
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root=PIPELINE_ROOT,
    # A name for the pipeline. Use to determine the pipeline Context.
    name="metadata-pipeline-v2",
)
def pipeline(message: str):
    importer = kfp.dsl.importer(
        artifact_uri="gs://ml-pipeline-playground/shakespeare1.txt",
        artifact_class=Dataset,
        reimport=False,
    )
    preprocess_task = preprocess(message=message)
    train_task = train(
        dataset_one_path=preprocess_task.outputs["output_dataset_one"],
        dataset_two=preprocess_task.outputs["output_dataset_two_path"],
        imported_dataset=importer.output,
        message=preprocess_task.outputs["output_parameter_path"],
        num_steps=5,
    )
    read_task = read_artifact_input(  # noqa: F841
        generic=train_task.outputs["generic_artifact"]
    )

In [None]:
# @title Compile the pipeline
compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="lightweight_pipeline.yaml"
)

In [None]:
# @title Run the pipeline
DISPLAY_NAME = "shakespeare"

job = aiplatform.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path="lightweight_pipeline.yaml",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"message": "Hello, World"},
    enable_caching=False,
)

job.run()

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/721521243942/locations/us-central1/pipelineJobs/metadata-pipeline-v2-20250102224741
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/721521243942/locations/us-central1/pipelineJobs/metadata-pipeline-v2-20250102224741')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/metadata-pipeline-v2-20250102224741?project=721521243942
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/721521243942/locations/us-central1/pipelineJobs/metadata-pipeline-v2-20250102224741 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/721521243942/locations/us-central

In [None]:
# @title Delete the pipeline job
job.delete()

INFO:google.cloud.aiplatform.base:Deleting PipelineJob : projects/721521243942/locations/us-central1/pipelineJobs/metadata-pipeline-v2-20250102224741
INFO:google.cloud.aiplatform.base:PipelineJob deleted. . Resource name: projects/721521243942/locations/us-central1/pipelineJobs/metadata-pipeline-v2-20250102224741
INFO:google.cloud.aiplatform.base:Deleting PipelineJob resource: projects/721521243942/locations/us-central1/pipelineJobs/metadata-pipeline-v2-20250102224741
INFO:google.cloud.aiplatform.base:Delete PipelineJob backing LRO: projects/721521243942/locations/us-central1/operations/1685094924975865856
INFO:google.cloud.aiplatform.base:PipelineJob resource projects/721521243942/locations/us-central1/pipelineJobs/metadata-pipeline-v2-20250102224741 deleted.


In [None]:
# @title Cleaning up
delete_bucket = False

if delete_bucket:
    ! gsutil rm -r $BUCKET_URI

! rm lightweight_pipeline.yaml