In [None]:
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Note: This notebook is for reference and educational purposes only. Not intended for production use.

# Questions? mateuswagner at google.com

# Vertex AI Pipelines with Persistent Resources

This notebook demonstrates how to run Vertex AI Pipelines using a Persistent Resource cluster.

A **Persistent Resource** is a long-running cluster that provides:
- **Reduced startup time** - Pre-provisioned compute eliminates cold start delays
- **Guaranteed availability** - Reserved resources ensure capacity for pipeline tasks
- **GPU support** - Compatible with all VMs and GPUs supported by custom training jobs

---

## Overview

| Component | Description |
|-----------|-------------|
| **Pipeline Type** | Single-step Python component |
| **Compute** | Persistent Resource (pre-provisioned cluster) |
| **Purpose** | Reference implementation for pipeline execution on Persistent Resources |

## Prerequisites

- Google Cloud project with Vertex AI API enabled
- Existing Persistent Resource cluster
- GCS bucket for pipeline staging

## Table of Contents

1. [Environment Setup](#environment-setup)
2. [Pipeline Component Definition](#pipeline-component-definition)
3. [Pipeline Definition](#pipeline-definition)
4. [Persistent Resource Configuration](#persistent-resource-configuration)
5. [Submit Pipeline Job](#submit-pipeline-job)

---

## Note: Persistent Resource Imports (Jan 20, 2026)

When creating a Persistent Resource, you must use the **preview** module and **v1beta1** types:

```python
from google.cloud.aiplatform.preview import persistent_resource
from google.cloud.aiplatform_v1beta1.types.persistent_resource import ResourcePool
from google.cloud.aiplatform_v1beta1.types.machine_resources import MachineSpec, DiskSpec
```

This is required because:
- `PersistentResource` is in the `preview` module (newer feature)
- `ResourcePool` and `MachineSpec` are proto-generated types in `v1beta1` (beta API)

### Reference:
https://docs.cloud.google.com/vertex-ai/docs/pipelines/persistent-resources#create-persistent-resource-python

https://docs.cloud.google.com/vertex-ai/docs/training/persistent-resource-create#create-persistent-resource-console


In [None]:
# Environment Setup
# Install required dependencies
%pip install -q --upgrade kfp google-cloud-aiplatform

In [None]:
# Import libraries
import os
import time
import json

from kfp import dsl, compiler
from google.cloud import aiplatform
from google.cloud.aiplatform.preview import persistent_resource as pr
from google.cloud import aiplatform_v1beta1
from google.cloud.aiplatform_v1beta1.types import pipeline_job as pipeline_job_types

In [None]:
# Configuration
PROJECT_ID = "xxx-xxxx" # Change it!
LOCATION = "us-central1"
STAGING_BUCKET = "gs://b1bd1e40-2c20-433d-83a1-xxxxxxxxxxxxx" # Change it!
PIPELINE_ROOT = os.path.join(STAGING_BUCKET, "pipeline-root")

---

## Pipeline Component Definition

Define a Python component that performs basic data processing.

In [None]:
@dsl.component(base_image="python:3.12")
def process_data(seed: int, sample_size: int) -> str:
    """Generates random samples and computes basic statistics."""
    import math, random, json
    
    random.seed(seed)
    samples = [math.sqrt(-2 * math.log(random.random())) * math.cos(2 * math.pi * random.random()) 
               for _ in range(sample_size)]
    
    mean = sum(samples) / len(samples)
    std_dev = math.sqrt(sum((x - mean) ** 2 for x in samples) / len(samples))
    
    return json.dumps({"mean": round(mean, 4), "std_dev": round(std_dev, 4), 
                       "min": round(min(samples), 4), "max": round(max(samples), 4)})

---

## Pipeline Definition

Define and compile the pipeline with a single component.

In [None]:
@dsl.pipeline(
    name="simple-python-pipeline",
    pipeline_root=PIPELINE_ROOT
)
def pipeline(seed: int, sample_size: int):
    """Simple pipeline that generates samples and computes statistics."""
    process_data(seed=seed, sample_size=sample_size)

# Compile the pipeline
compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path="simple_python_pipeline.json",
)

---

## Persistent Resource Configuration

Retrieve an existing Persistent Resource cluster for pipeline execution. Using pre-provisioned compute resources reduces pipeline startup time.

In [None]:
# Initialize Vertex AI SDK
aiplatform.init(project=PROJECT_ID, location=LOCATION)

# Persistent Resource ID
PERSISTENT_RESOURCE_ID = "cluster-20260120-144145" # Change it!

# Verify persistent resource exists
resource_list = pr.PersistentResource.list()
matching = [r for r in resource_list if r.name.split('/')[-1] == PERSISTENT_RESOURCE_ID]

if matching:
    persistent_resource = matching[0]
    print(f"Using Persistent Resource: {persistent_resource.resource_name}")
    print(f"State: {persistent_resource.state}")
else:
    print(f"Persistent resource '{PERSISTENT_RESOURCE_ID}' not found.")
    print("Available resources:")
    for r in resource_list:
        print(f"  - {r.name.split('/')[-1]}")

---

## Submit Pipeline Job

Configure runtime settings and submit the pipeline to Vertex AI.

In [None]:
# Create API client
client_options = {"api_endpoint": f"{LOCATION}-aiplatform.googleapis.com"}
client = aiplatform_v1beta1.PipelineServiceClient(client_options=client_options)

# Load compiled pipeline spec
with open("simple_python_pipeline.json", "r") as f:
    pipeline_spec = json.load(f)

# Disable caching for all tasks
for task_name, task_spec in pipeline_spec.get("root", {}).get("dag", {}).get("tasks", {}).items():
    task_spec["cachingOptions"] = {"enableCache": False}

# Project number for persistent resource path
PROJECT_NUMBER = "000000000000" # Change it!!!

# Configure persistent resource runtime
pr_runtime_detail = pipeline_job_types.PipelineJob.RuntimeConfig.PersistentResourceRuntimeDetail(
    persistent_resource_name=(
        f"projects/{PROJECT_NUMBER}/"
        f"locations/{LOCATION}/"
        f"persistentResources/{PERSISTENT_RESOURCE_ID}"
    ),
    task_resource_unavailable_wait_time_ms=600000,  # 10 minutes
    task_resource_unavailable_timeout_behavior="FALL_BACK_TO_ON_DEMAND",
)

# Configure default runtime
default_runtime = pipeline_job_types.PipelineJob.RuntimeConfig.DefaultRuntime(
    persistent_resource_runtime_detail=pr_runtime_detail
)

# Build runtime configuration
runtime_config = pipeline_job_types.PipelineJob.RuntimeConfig(
    gcs_output_directory=PIPELINE_ROOT,
    parameter_values={
        "seed": 42,
        "sample_size": 1000
    },
    default_runtime=default_runtime
)

# Create pipeline job
pipeline_job = pipeline_job_types.PipelineJob(
    display_name=f"simple-python-pipeline-{int(time.time())}",
    pipeline_spec=pipeline_spec,
    runtime_config=runtime_config,
)

# Submit pipeline job
parent_path = f"projects/{PROJECT_ID}/locations/{LOCATION}"
request = aiplatform_v1beta1.CreatePipelineJobRequest(
    parent=parent_path,
    pipeline_job=pipeline_job,
)

response = client.create_pipeline_job(request=request)

# Print job details
job_id = response.name.split('/')[-1]
console_link = (
    f"https://console.cloud.google.com/vertex-ai/locations/{LOCATION}"
    f"/pipelines/runs/{job_id}"
    f"?project={PROJECT_ID}"
)

print(f"Job name: {response.name}")
print(f"Console URL: {console_link}")