In [None]:
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Track artifacts and metrics across Vertex AI Pipelines runs

Learn more about [Vertex ML Metadata](https://cloud.google.com/vertex-ai/docs/ml-metadata) and [Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/introduction).

*   https://colab.sandbox.google.com/github/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/ml_metadata/vertex-pipelines-ml-metadata.ipynb


### Dataset

This notebook uses scikit-learn to train a model and classify bean types using the [Dry Beans Dataset](https://archive.ics.uci.edu/ml/datasets/Dry+Bean+Dataset) from UCI Machine Learning. This is a tabular dataset that includes measurements and characteristics of seven different types of beans taken from images.


## Install Vertex AI SDK

In [1]:
! pip3 install --upgrade --quiet google-cloud-aiplatform \
                                 kfp

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/343.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m [32m337.9/343.6 kB[0m [31m11.0 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m343.6/343.6 kB[0m [31m6.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m84.0/84.0 kB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m50.1/50.1 kB[0m [31m1.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.5/6.5 MB[0m [31m40.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.7/1.7 MB[0m [31m39.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [2]:
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"

KFP SDK version: 2.10.1


## Configuration

### Authenticate your notebook environment

In [3]:
import sys
from IPython.display import Markdown, display

PROJECT_ID="ai-hangsik"
LOCATION="us-central1"

# For only colab user, no need this process for Colab Enterprise in Vertex AI.
if "google.colab" in sys.modules:
    from google.colab import auth
    auth.authenticate_user(project_id=PROJECT_ID)

# set project.
!gcloud config set project {PROJECT_ID}

Updated property [core/project].


### Initialize Vertex AI SDK

In [4]:

from google.cloud import aiplatform

aiplatform.init(project=PROJECT_ID, location=LOCATION)

### Create a bucket

In [5]:
# Create a bucket.
BUCKET_URI = f"gs://mlops-{PROJECT_ID}-1209"
! gsutil mb -l {LOCATION} -p {PROJECT_ID} {BUCKET_URI}

Creating gs://mlops-ai-hangsik-1209/...
ServiceException: 409 A Cloud Storage bucket named 'mlops-ai-hangsik-1209' already exists. Try another name. Bucket names must be globally unique across all Google Cloud projects, including those outside of your organization.


In [6]:
shell_output = ! gcloud projects describe  $PROJECT_ID
project_number = shell_output[-1].split(":")[1].strip().replace("'", "")

SERVICE_ACCOUNT = f"{project_number}-compute@developer.gserviceaccount.com"

print(f"SERVICE_ACCOUNT: {SERVICE_ACCOUNT}")

SERVICE_ACCOUNT: 721521243942-compute@developer.gserviceaccount.com


### Set access for Service account

In [7]:
! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_URI
! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_URI

No changes made to gs://mlops-ai-hangsik-1209/
No changes made to gs://mlops-ai-hangsik-1209/


### Import libraries and define constants

Import required libraries.

In [8]:
import matplotlib.pyplot as plt
import pandas as pd

from google.cloud import aiplatform, aiplatform_v1beta1
from google.cloud.aiplatform import pipeline_jobs
from kfp import compiler, dsl
from kfp.dsl import (Artifact, Dataset, Input, Metrics, Model, Output,
                     OutputPath, component)

Define some constants

In [9]:
# PATH = get_ipython().run_line_magic("env", "PATH")
# %env PATH={PATH}:/home/jupyter/.local/bin
# REGION = "asia-northeast3"

PIPELINE_ROOT = f"{BUCKET_URI}/pipeline/custom/metadata"
PIPELINE_ROOT

'gs://mlops-ai-hangsik-1209/pipeline/custom/metadata'

## Create a pipeline


### Create and define Python function based components

In [10]:
@component(
    packages_to_install=["google-cloud-bigquery[pandas]", "pyarrow"],
    base_image="python:3.10",
    output_component_file="get_dataframe.yaml",
)

def get_dataframe(
    project_id: str, bq_table: str, output_data_path: OutputPath("Dataset")
):
    from google.cloud import bigquery

    bqclient = bigquery.Client(project=project_id)
    table = bigquery.TableReference.from_string(bq_table)
    rows = bqclient.list_rows(table)
    dataframe = rows.to_dataframe(
        create_bqstorage_client=True,
    )
    dataframe = dataframe.sample(frac=1, random_state=2)
    dataframe.to_csv(output_data_path)

  @component(
  def get_dataframe(


Next, create a component to train a scikit-learn model. This component does the following:
* Imports a CSV as a pandas DataFrame.
* Splits the DataFrame into train and test sets.
* Trains a scikit-learn model.
* Logs metrics from the model.
* Saves the model artifacts as a local `model.joblib` file.

In [11]:
@component(
    packages_to_install=["scikit-learn==1.2", "pandas", "joblib", "numpy==1.26.4"],
    base_image="python:3.10",
    output_component_file="sklearn_train.yaml",
)
def sklearn_train(
    dataset: Input[Dataset], metrics: Output[Metrics], model: Output[Model]
):
    import pandas as pd
    from joblib import dump
    from sklearn.model_selection import train_test_split
    from sklearn.tree import DecisionTreeClassifier

    df = pd.read_csv(dataset.path)
    labels = df.pop("Class").tolist()
    data = df.values.tolist()
    x_train, x_test, y_train, y_test = train_test_split(data, labels)

    skmodel = DecisionTreeClassifier()
    skmodel.fit(x_train, y_train)
    score = skmodel.score(x_test, y_test)
    print("accuracy is:", score)

    metrics.log_params["units"] = (score * 30.0)
    metrics.log_params["batch_size"] = 120
    metrics.log_params["note"] = "hyperparameter"

    metrics.log_metric("accuracy", (score * 100.0))
    metrics.log_metric("framework", "Scikit Learn")
    metrics.log_metric("dataset_size", len(df))

    dump(skmodel, model.path + ".joblib")


  @component(
  def sklearn_train(


Finally, the last component  takes the trained model from the previous step, uploads the model to Vertex AI, and deploys it to an endpoint:

In [12]:
@component(
    packages_to_install=["google-cloud-aiplatform"],
    base_image="python:3.10",
    output_component_file="deploy_model.yaml",
)
def deploy_model(
    model: Input[Model],
    project: str,
    region: str,
    vertex_endpoint: Output[Artifact],
    vertex_model: Output[Model],
):
    from google.cloud import aiplatform

    aiplatform.init(project=project, location=region)

    deployed_model = aiplatform.Model.upload(
        display_name="tracking_metadata_pipeline",
        artifact_uri=model.uri.replace("model", ""),
        serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-2:latest",
    )
    endpoint = deployed_model.deploy(machine_type="n1-standard-4")

    # Save data to the output params
    vertex_endpoint.uri = endpoint.resource_name
    vertex_model.uri = deployed_model.resource_name

  @component(
  def deploy_model(


### Define and compile the pipeline

In [13]:
@dsl.pipeline(
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root=PIPELINE_ROOT,
    # A name for the pipeline.
    name="tracking_metadata_pipeline",
)
def pipeline(
    bq_table: str,
    output_data_path: str,
    project: str,
    region: str,
):
    dataset_task = get_dataframe(project_id=project, bq_table=bq_table)

    model_task = sklearn_train(dataset=dataset_task.output)

    deploy_model(model=model_task.outputs["model"], project=project, region=region)

The following generates a JSON file that is then used to run the pipeline:

In [14]:
compiler.Compiler().compile(pipeline_func=pipeline, package_path="tracking_metadata_pipeline.json")

### Initiate pipeline runs

Create a pipeline run using a small version of the same dataset.

In [15]:
from datetime import datetime
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

run1 = pipeline_jobs.PipelineJob(
    display_name="tracking_metadata_pipeline",
    template_path="tracking_metadata_pipeline.json",
    job_id="tracking-metadata-pipeline-small-{}".format(TIMESTAMP),
    parameter_values={
        "bq_table": "sara-vertex-demos.beans_demo.small_dataset",
        "output_data_path": "data.csv",
        "project": PROJECT_ID,
        "region": LOCATION,
    },
    enable_caching=True,
)

Next, create another pipeline run using a larger version of the same dataset.

In [16]:
run2 = pipeline_jobs.PipelineJob(
    display_name="tracking_metadata_pipeline",
    template_path="tracking_metadata_pipeline.json",
    job_id="tracking-metadata-pipeline-large-{}".format(TIMESTAMP),
    parameter_values={
        "bq_table": "sara-vertex-demos.beans_demo.large_dataset",
        "output_data_path": "data.csv",
        "project": PROJECT_ID,
        "region": LOCATION,
    },
    enable_caching=True,
)

Finally, kick off pipeline executions for both runs. It's best to do this in two separate notebook cells so you can see the output for each run.

In [17]:
run1.submit()

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/721521243942/locations/us-central1/pipelineJobs/tracking-metadata-pipeline-small-20241208213405
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/721521243942/locations/us-central1/pipelineJobs/tracking-metadata-pipeline-small-20241208213405')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/tracking-metadata-pipeline-small-20241208213405?project=721521243942


Then, kick off the second run:

In [18]:
run2.submit()

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/721521243942/locations/us-central1/pipelineJobs/tracking-metadata-pipeline-large-20241208213405
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/721521243942/locations/us-central1/pipelineJobs/tracking-metadata-pipeline-large-20241208213405')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/tracking-metadata-pipeline-large-20241208213405?project=721521243942


## Cleaning up

To clean up all Google Cloud resources used in this project, you can [delete the Google Cloud
project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.

If you don't want to delete the project, do the following to clean up the resources you used:

* If you used Vertex AI Workbench notebooks to run this, stop or delete the notebook instance.

* The pipeline runs you executed deployed endpoints in Vertex AI. Navigate to the [Google Cloud console](https://console.cloud.google.com/vertex-ai/endpoints) to delete those endpoints.

* Delete the [Cloud Storage bucket](https://console.cloud.google.com/storage/browser/) you created.

Alternatively, you can execute the below cell to clean up the resources used in this notebook.

In [None]:
# delete pipelines
try:
    run1.delete()
    run2.delete()
except Exception as e:
    print(e)

# undeploy model from endpoints
endpoints = aiplatform.Endpoint.list(
    filter='display_name="beans-model-pipeline_endpoint"'
)
for endpoint in endpoints:
    deployed_models = endpoint.list_models()
    for deployed_model in deployed_models:
        endpoint.undeploy(deployed_model_id=deployed_model.id)
    # delete endpoint
    endpoint.delete()

# delete model
model_ids = aiplatform.Model.list(filter='display_name="beans-model-pipeline"')
for model_id in model_ids:
    model = aiplatform.Model(model_name=model_id.resource_name)
    model.delete()

# delete locally generated files
! rm -rf beans_deploy_component.yaml beans_model_component.yaml create_dataset.yaml mlmd_pipeline.json

# delete cloud storage bucket
delete_bucket = False  # set True for deletion
if delete_bucket:
    ! gsutil rm -rf {BUCKET_URI}