In [1]:
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

## Overview

This tutorial demonstrates how to trigger a Vertex Pipeline:
- Via storage upload
- Directly via the API

## Costs 

This tutorial uses billable components of Google Cloud:

* Artifact Registry
* BigQuery
* Cloud Resource Manager
* Cloud AI Companion
* Cloud Run Functions
* IAM
* EventArc
* Cloud Build
* Google Cloud Storage
* Compute Engine
* Dataform
* PubSub
* Vertex AI

Other API's may have been enabled and may not be included in this list.

In [None]:
# @title Install the required libraries
# ! pip install google-cloud-aiplatform 

### Restart runtime (Colab only)

To use the newly installed packages, you must restart the runtime on Google Colab.

In [None]:
import sys

if "google.colab" in sys.modules:

    #import IPython

    #app = IPython.Application.instance()
    #app.kernel.do_shutdown(True)
    print("Skipping the restart")

<div class="alert alert-block alert-warning">
<b>⚠️ The kernel is going to restart. Wait until it's finished before continuing to the next step. ⚠️</b>
</div>


### Authenticate your notebook environment (Colab only)

Authenticate your environment on Google Colab.


In [None]:
import sys

if "google.colab" in sys.modules:

    from google.colab import auth

    auth.authenticate_user()

In [2]:
# @title Imports
from google.cloud import aiplatform as vertex
from datetime import datetime
from google.auth import default
from google.auth.transport.requests import AuthorizedSession

### Set Google Cloud project information

To get started using Vertex AI, you must have an existing Google Cloud project. Learn more about [setting up a project and a development environment](https://cloud.google.com/vertex-ai/docs/start/cloud-environment).

In [1]:
# @title Set the variables


### Trigger the pipeline directly

In [None]:
# @title Setup the Pipeline Parameters

display_name = f"notebook_{datetime.now().isoformat()}"
persistent_resource_id = PERSISTENT_RESOURCE_NAME.split("/")[-1] if PERSISTENT_RESOURCE_NAME else None
pipeline_root = f"gs://{BUCKET}/pipeline_triggered_via_notebook/{display_name}"
model_artifact_dir = f"{pipeline_root}/model/"

pipeline_parameters = {
"training_job_display_name": f"training_{display_name}",
  "bq_training_data_uri": BQ_TRAINING_DATA_URI,
  "existing_model": EXISTING_MODEL,
  "parent_model_resource_name": PARENT_MODEL_RESOURCE_NAME,
  "pipeline_root": pipeline_root,
  "prediction_container_image_uri": PREDICTION_CONTAINER_IMAGE_URI,
  "production_endpoint_id": PRODUCTION_ENDPOINT_ID,
  "tensorboard": TENSORBOARD,
  "service_account": RUNNER_SERVICE_ACCOUNT_EMAIL,
  "persistent_resource_id": persistent_resource_id,
  "model_artifact_dir": model_artifact_dir,
  "worker_pool_specs": [
    {
      "machine_spec": {
        "machine_type": MACHINE_TYPE
      },
      "replica_count": 1,
      "container_spec": {
        "image_uri": TRAINING_CONTAINER_IMAGE_URI,
        "command": [
          "python",
          "train.py"
        ],
        "args": [
          "--data_path",
          BQ_TRAINING_DATA_URI,
          "--model_checkpoint_dir",
          MODEL_CHECKPOINT_DIR
        ]
      }
    }
  ]
}


In [None]:
vertex.init(project=PROJECT_ID, location=REGION, staging_bucket=pipeline_root)

In [None]:
# @title Helper functions

def _form_request(
    project_id: str,
    location: str,
    pipeline_job: vertex.PipelineJob,
    service_account: str,
    persistent_resource_name: str,
) -> tuple[str, dict, dict]:
    """Forms the API request for pipeline job submission."""

    endpoint = f"https://{location}-aiplatform.googleapis.com/v1beta1/projects/{project_id}/locations/{location}/pipelineJobs"
    headers = {"Content-Type": "application/json"}

    pipeline_spec = pipeline_job.to_dict()
    pipeline_spec["serviceAccount"] = service_account
    pipeline_spec["runtimeConfig"]["defaultRuntime"] = {
        "persistentResourceRuntimeDetail": {"persistentResourceName": persistent_resource_name}
    }

    return endpoint, headers, pipeline_spec


def submit_pipeline_job(
    project_id: str,
    location: str,
    pipeline_parameters: dict,
    pipeline_template_path: str,
    service_account: str,
    pipeline_root: str,
    persistent_resource_name: str,
    display_name: str = display_name,  # Provide a default display name
    enable_caching: bool = False,
) -> str:
    """Submits a Vertex AI pipeline job.

    Args:
        project_id: Project ID.
        location: Region of the pipeline job.
        pipeline_parameters: Pipeline parameters.
        pipeline_template_path: Artifact Registry URI or GCS path for the compiled pipeline.
        service_account: Service account email.
        pipeline_root: GCS path for the pipeline root directory.
        persistent_resource_name: Name of the persistent resource.
        display_name: Display name for the pipeline job.
        enable_caching: Enable caching.

    Returns:
        The pipeline job resource name.
    """

    print(f"Creating Pipeline Job with display_name={display_name}")

    pipeline_job = vertex.PipelineJob(
        display_name=display_name,
        parameter_values=pipeline_parameters,
        enable_caching=enable_caching,
        template_path=pipeline_template_path,
        pipeline_root=pipeline_root,
    )

    endpoint, headers, pipeline_spec = _form_request(
        project_id=project_id,
        location=location,
        pipeline_job=pipeline_job,
        service_account=service_account,
        persistent_resource_name=persistent_resource_name,
    )

    credentials, _ = default()  # No need to retrieve project_id again
    session = AuthorizedSession(credentials)

    response = session.post(endpoint, headers=headers, json=pipeline_spec)  # Use json parameter directly
    response.raise_for_status() # Raise an exception for bad status codes

    response_json = response.json()
    pipeline_job_resource_name = response_json["name"]


    print(f"Pipeline Job submitted: {pipeline_job_resource_name}")
    # Constructing the console link
    run_id = pipeline_job_resource_name.split('/')[-1]
    console_link = f"https://console.cloud.google.com/vertex-ai/pipelines/locations/{location}/runs/{run_id}?project={project_id}"
    print(f"Console link: {console_link}")


    return pipeline_job_resource_name

### Trigger via File Upload

In [None]:
# Trigger the pipeline via file upload
# Once you upload a file to the trigger bucket, a Cloud Run Function will detect the new file and will kick off the flow
# It will create a new BQ Table with the data that was uploaded, then it will send a Pub/Sub message to a topic with all 
# the required pipeline information
# Another Cloud Run Function will be triggered, which will then instantiate the pipeline run.

! gsutil cp $GCS_SAMPLE_TRAINING_DATA_URI gs://$TRIGGER_BUCKET/

print("Go to the link below to see the new pipeline run")
print(f"https://console.cloud.google.com/vertex-ai/pipelines/runs?project={PROJECT_ID}")

In [None]:
# Trigger the pipeline via API

submit_pipeline_job(
    project_id = PROJECT_ID,
    location = REGION,
    pipeline_parameters=pipeline_parameters,
    pipeline_template_path=PIPELINE_TEMPLATE_PATH,
    service_account=RUNNER_SERVICE_ACCOUNT_EMAIL,
    pipeline_root=pipeline_root,
    persistent_resource_name=PERSISTENT_RESOURCE_NAME,
    display_name = display_name,
    enable_caching = False
)