In [None]:
#For testing tpu deployment
from google.cloud.aiplatform.preview import persistent_resource
from google.cloud.aiplatform_v1beta1.types.persistent_resource import ResourcePool
from google.cloud.aiplatform_v1beta1.types.machine_resources import MachineSpec
from google.cloud.aiplatform_v1beta1.types import ReservationAffinity
# Create the persistent resource. This method returns the created resource.

my_example_resource = persistent_resource.PersistentResource.create(
    persistent_resource_id='test-resource-001',
    display_name='TPU Training',
    resource_pools=[
        ResourcePool(
            machine_spec=MachineSpec(
                machine_type='ct5lp-hightpu-1t',
                tpu_topology='1x1'
            ),
            replica_count=1
        )
    ],
    enable_custom_service_account=True,
)

# Setting `sync` to `False` makes the method is non-blocking and the resource
# object returned syncs when the method completes.

SYNC=False

if not SYNC:
    my_example_resource.wait()

In [1]:
import os

PROJECT_ID = "sandbox-373102"
REGION = "us-central1"
BUCKET_NAME = "jk-mlops-test"
BUCKET_URI = f"gs://{BUCKET_NAME}"
EMAIL_RECIPIENTS = [ "jeehyeok@google.com" ]
PIPELINE_ROOT = "{}/pipeline_root/chicago-taxi-pipe".format(BUCKET_URI)
PIPELINE_NAME = "vertex-pipeline-datatrigger-tutorial"
WORKING_DIR = f"{PIPELINE_ROOT}/mlops-datatrigger-tutorial"
os.environ['AIP_MODEL_DIR'] = WORKING_DIR
EXPERIMENT_NAME = PIPELINE_NAME + "-experiment"
PIPELINE_FILE = PIPELINE_NAME + ".yaml"

In [3]:
CONTAINER = f"{REGION}-docker.pkg.dev/{PROJECT_ID}/custom-training-tpu/tensorflow-tpu:latest"
!docker build -t {CONTAINER} .

Sending build context to Docker daemon     28MB
Step 1/80 : FROM us-docker.pkg.dev/vertex-ai/training/tf-tpu.2-15-pod-base-cp310:latest AS base
 ---> 9ef2cb1e70ca
Step 2/80 : FROM ghcr.io/astral-sh/uv:0.7 AS uv
 ---> 125e38a566e3
Step 3/80 : FROM base AS builder
 ---> 9ef2cb1e70ca
Step 4/80 : COPY --from=uv /uv /uvx /bin/
 ---> Using cache
 ---> f6efe5988962
Step 5/80 : ENV UV_LINK_MODE=copy
 ---> Using cache
 ---> 24349a8975b1
Step 6/80 : ENV CC=gcc
 ---> Using cache
 ---> 67f198c911c2
Step 7/80 : ENV CXX=g++
 ---> Using cache
 ---> 1063510ee270
Step 8/80 : WORKDIR /tmp
 ---> Using cache
 ---> eea7d0d634be
Step 9/80 : COPY .python-version uv.lock pyproject.toml ./
 ---> Using cache
 ---> f5b0b7c12871
Step 10/80 : RUN uv python install $(cat /tmp/.python-version) --default --preview
 ---> Using cache
 ---> af00713a59c0
Step 11/80 : COPY . .
 ---> b8467affecc8
Step 12/80 : RUN apt-get update &&     apt-get install -y     build-essential git     apt-transport-https ca-certificates gnupg 

In [4]:
!docker push {REGION}-docker.pkg.dev/{PROJECT_ID}/custom-training-tpu/tensorflow-tpu:latest

The push refers to repository [us-central1-docker.pkg.dev/sandbox-373102/custom-training-tpu/tensorflow-tpu]

[1Bd46dffa8: Preparing 
[1B047ed397: Preparing 
[1B318fff58: Preparing 
[1B7feba016: Preparing 
[1B48316823: Preparing 
[1B3621147d: Preparing 
[1B40342b19: Preparing 
[1B1660d1b9: Preparing 
[1Ba22be3b2: Preparing 
[1B03432052: Preparing 
[1Be6aa51dc: Preparing 
[1Bf51b29d0: Preparing 
[1B5b41329e: Preparing 
[1B28138451: Preparing 
[6B03432052: Waiting g 
[1Bcf0b692f: Preparing 
[1B338b6181: Preparing 
[1B700a0de1: Preparing 
[1B6102c542: Preparing 
[1Be0c3f6c1: Preparing 
[1Bb92c236b: Preparing 
[1B5068e4c1: Preparing 
[1Bc6b5ac1a: Preparing 
[1B226e41cf: Preparing 
[1B98eecf2b: Preparing 
[1B4e21a8d7: Preparing 
[1B70b8453f: Preparing 
[1B33b200e6: Preparing 
[1Bcf42832e: Preparing 
[1B0f027732: Preparing 
[26B621147d: Waiting g 
[1Ba6193222: Preparing 
[27B0342b19: Waiting g 
[27B660d1b9: Waiting g 
[1Bf2db5933: Preparing 
[28B22be3b2: W

In [5]:
from google.cloud import aiplatform

aiplatform.init(
    project=PROJECT_ID,
    staging_bucket=BUCKET_URI,
    location=REGION,
    experiment=EXPERIMENT_NAME)

aiplatform.autolog()

In [6]:
from kfp import dsl
from kfp.dsl import importer
from kfp.dsl import OneOf
from google_cloud_pipeline_components.v1.custom_job import CustomTrainingJobOp
from google_cloud_pipeline_components.types import artifact_types
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from google_cloud_pipeline_components.v1.batch_predict_job import ModelBatchPredictOp
from google_cloud_pipeline_components.v1.model_evaluation import ModelEvaluationRegressionOp
from google_cloud_pipeline_components.v1.vertex_notification_email import VertexNotificationEmailOp
from google_cloud_pipeline_components.v1.endpoint import ModelDeployOp
from google_cloud_pipeline_components.v1.endpoint import EndpointCreateOp
from google.cloud import aiplatform

from kfp import dsl
from kfp import compiler

  return component_factory.create_component_from_func(


In [7]:
# define the train-deploy pipeline
@dsl.pipeline(name="custom-model-training-pipeline")
def custom_model_training_pipeline(
    project: str,
    location: str,
    training_job_display_name: str,
    worker_pool_specs: list,
    base_output_dir: str,
    prediction_container_uri: str,
    model_display_name: str,
    batch_prediction_job_display_name: str,
    target_field_name: str,
    test_data_gcs_uri: list,
    ground_truth_gcs_source: list,
    batch_predictions_gcs_prefix: str,
    batch_predictions_input_format: str="csv",
    batch_predictions_output_format: str="jsonl",
    ground_truth_format: str="csv",
    parent_model_resource_name: str=None,
    parent_model_artifact_uri: str=None,
    existing_model: bool=False

):
    # Notification task
    notify_task = VertexNotificationEmailOp(
                    recipients= EMAIL_RECIPIENTS
                    )
    with dsl.ExitHandler(notify_task, name='MLOps Continuous Training Pipeline'):
        # Train the model
        custom_job_task = CustomTrainingJobOp(
                                    project=project,
                                    display_name=training_job_display_name,
                                    worker_pool_specs=worker_pool_specs,
                                    base_output_directory=base_output_dir,
                                    location=location,
                                    #persistent_resource_id = "test-resource-001",
                                    #service_account = "1045259343465-compute@developer.gserviceaccount.com"
                            )
    return

In [8]:
compiler.Compiler().compile(
    pipeline_func=custom_model_training_pipeline,
    package_path="{}.yaml".format(PIPELINE_NAME),
)

In [9]:
REPO_NAME = "mlops"
# Create a repo in the artifact registry
! gcloud artifacts repositories create $REPO_NAME --location=$REGION --repository-format=KFP

[1;31mERROR:[0m (gcloud.artifacts.repositories.create) ALREADY_EXISTS: the repository already exists


In [10]:
from kfp.registry import RegistryClient

host = f"https://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}"
client = RegistryClient(host=host)
TEMPLATE_NAME, VERSION_NAME = client.upload_pipeline(
file_name=PIPELINE_FILE,
tags=["v1", "latest"],
extra_headers={"description":"This is an example pipeline template."})
TEMPLATE_URI = f"https://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}/{TEMPLATE_NAME}/latest"

In [11]:
DATASET_NAME = "mlops"
TABLE_NAME = "chicago"

worker_pool_specs = [{
                        "machine_spec": {
                            "machine_type": "ct5lp-hightpu-8t",
                            "tpu_topology": "2x4",
                            #"reservation_affinity": {
                            #        "reservation_affinity_type": "SPECIFIC_RESERVATION",
                            #        "key": "compute.googleapis.com/reservation-name",
                            #        "values": [
                            #            "projects/sandbox-373102/zones/us-central1-a/reservations/test-reservation"
                            #        ]
                            #    },
                        },
                        "replica_count": 1,
                        "container_spec":{
                                "image_uri": CONTAINER,
                                "command": ["python3", "main.py"],
                                "args":["--project-id",PROJECT_ID, "--training-dir",f"/gcs/{BUCKET_NAME}","--bq-source",f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}"],
                                "env":[
                                    {"name": "TPU_SLICE_BUILDER_DUMP_CHIP_FORCE",
                                     "value": "true"},
                                    {"name": "TPU_SLICE_BUILDER_DUMP_ICI",
                                     "value": "true"}
                                ]
                        },
}]

parameters = {
    "project": PROJECT_ID,
    "location": REGION,
    "training_job_display_name": "taxifare-prediction-training-job",
    "worker_pool_specs": worker_pool_specs,
    "base_output_dir": BUCKET_URI,
    "prediction_container_uri": "us-docker.pkg.dev/vertex-ai/training/tf-tpu.2-15-pod-base-cp310:latest",
    "model_display_name": "taxifare-prediction-model",
    "batch_prediction_job_display_name": "taxifare-prediction-batch-job",
    "target_field_name": "fare",
    "test_data_gcs_uri": [f"{BUCKET_URI}/test_no_target.csv"],
    "ground_truth_gcs_source": [f"{BUCKET_URI}/test.csv"],
    "batch_predictions_gcs_prefix": f"{BUCKET_URI}/batch_predict_output",
    "existing_model": False
}

In [None]:
# Create a pipeline job
job = aiplatform.PipelineJob(
    display_name="triggered_custom_regression_evaluation",
    template_path=TEMPLATE_URI ,
    parameter_values=parameters,
    pipeline_root=BUCKET_URI,
    enable_caching=False
)
# Run the pipeline job
job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/1045259343465/locations/us-central1/pipelineJobs/custom-model-training-pipeline-20250704044711
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/1045259343465/locations/us-central1/pipelineJobs/custom-model-training-pipeline-20250704044711')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/custom-model-training-pipeline-20250704044711?project=1045259343465
PipelineJob projects/1045259343465/locations/us-central1/pipelineJobs/custom-model-training-pipeline-20250704044711 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/1045259343465/locations/us-central1/pipelineJobs/custom-model-training-pipeline-20250704044711 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/1045259343465/locations/us-central1/pipelineJobs/custom-model-training-pipeline-20250704044711 current state:
PipelineStat