<a href="https://colab.research.google.com/github/joahofmann/gcp-notebooks/blob/main/Simple2_ok.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Vertex AI Pipelines Example

This notebook demonstrates how to create and run a simple Kubeflow pipeline on Vertex AI.

# 1. Setup and Authentication

In [1]:
# Install necessary libraries
!pip install --upgrade google-cloud-aiplatform google-cloud-storage kfp google-cloud-pipeline-components --quiet

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/269.1 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m266.2/269.1 kB[0m [31m9.1 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m269.1/269.1 kB[0m [31m5.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m84.0/84.0 kB[0m [31m7.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.9/7.9 MB[0m [31m26.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.5/1.5 MB[0m [31m29.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.7/1.7 MB[0m [31m41.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [2]:
# Restart runtime (Colab only)
import sys
if "google.colab" in sys.modules:
    import IPython
    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

In [1]:
# Authenticate to Google Cloud
# If you are running this in a Colab environment, this will open a browser window for authentication.
import sys
if "google.colab" in sys.modules:
    from google.colab import auth
    auth.authenticate_user()

In [2]:
# --- User-defined variables ---
# Replace with your actual project ID and region
PROJECT_ID = "vertex-test-id" # @param {type:"string"}
REGION = "us-central1" # @param {type:"string"}
BUCKET_NAME = "gcs-bucket-name-gamma" # @param {type:"string"}

In [3]:
# Validate inputs
if PROJECT_ID == "your-gcp-project-id" or not PROJECT_ID:
    raise ValueError("Please replace 'your-gcp-project-id' with your actual GCP project ID.")
if BUCKET_NAME == "your-gcs-bucket-name" or not BUCKET_NAME:
    raise ValueError("Please replace 'your-gcs-bucket-name' with your actual GCS bucket name.")

In [4]:
BUCKET_URI = f"gs://{BUCKET_NAME}"
PIPELINE_ROOT = f"{BUCKET_URI}/pipeline_root_simple_example"

print(f"Project ID: {PROJECT_ID}")
print(f"Region: {REGION}")
print(f"Bucket URI: {BUCKET_URI}")
print(f"Pipeline Root: {PIPELINE_ROOT}")

Project ID: vertex-test-id
Region: us-central1
Bucket URI: gs://gcs-bucket-name-gamma
Pipeline Root: gs://gcs-bucket-name-gamma/pipeline_root_simple_example


### Create a Cloud Storage bucket (if it doesn't exist)

Create a storage bucket to store intermediate artifacts such as datasets.

In [5]:
# You only need to run this if your bucket doesn't already exist
! gsutil mb -l {REGION} -p {PROJECT_ID} {BUCKET_URI}

Creating gs://gcs-bucket-name-gamma/...


In [6]:
# Initialize Vertex AI SDK
from google.cloud import aiplatform
aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_URI)

In [7]:
# Get the service account
SERVICE_ACCOUNT = !gcloud projects describe $PROJECT_ID --format="value(projectNumber)"
SERVICE_ACCOUNT = f"{SERVICE_ACCOUNT[0].strip()}-compute@developer.gserviceaccount.com"
print(f"Service Account: {SERVICE_ACCOUNT}")

Service Account: 219162896674-compute@developer.gserviceaccount.com


### Grant necessary permissions to the Compute Engine default service account

Grant `roles/storage.objectAdmin` and `roles/aiplatform.user` to the service account at the project level.

In [8]:
# Grant necessary permissions to the Compute Engine default service account at the project level
!gcloud projects add-iam-policy-binding {PROJECT_ID} --member="serviceAccount:{SERVICE_ACCOUNT}" --role="roles/storage.objectAdmin"
!gcloud projects add-iam-policy-binding {PROJECT_ID} --member="serviceAccount:{SERVICE_ACCOUNT}" --role="roles/aiplatform.user"

#!gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectAdmin {BUCKET_URI}

Updated IAM policy for project [vertex-test-id].
bindings:
- members:
  - serviceAccount:219162896674-compute@developer.gserviceaccount.com
  - serviceAccount:vertex-test-id@appspot.gserviceaccount.com
  - user:Joachim.Hofmann@bluewin.ch
  role: roles/aiplatform.admin
- members:
  - serviceAccount:service-219162896674@gcp-sa-vertex-nb.iam.gserviceaccount.com
  role: roles/aiplatform.colabServiceAgent
- members:
  - serviceAccount:service-219162896674@gcp-sa-aiplatform-cc.iam.gserviceaccount.com
  role: roles/aiplatform.customCodeServiceAgent
- members:
  - serviceAccount:service-219162896674@gcp-sa-aiplatform-vm.iam.gserviceaccount.com
  role: roles/aiplatform.notebookServiceAgent
- members:
  - serviceAccount:service-219162896674@gcp-sa-aiplatform.iam.gserviceaccount.com
  role: roles/aiplatform.serviceAgent
- members:
  - serviceAccount:219162896674-compute@developer.gserviceaccount.com
  - serviceAccount:vertex-test-id@appspot.gserviceaccount.com
  role: roles/aiplatform.user
- memb

# 2. Define Custom Components

In [9]:
from kfp import dsl
from kfp.dsl import component, Input, Output, Artifact, Dataset, Model

from typing import NamedTuple

In [41]:
from kfp import dsl
from kfp.dsl import component, Output, Dataset
from kfp.compiler import Compiler

@component(
    packages_to_install=['scikit-learn', 'pandas'],
    base_image="python:3.9", # Using standard python:3.9
)
def generate_synthetic_data(
    num_samples: int,
    data: Output[Dataset]
):
    """Generates synthetic classification data."""
    # Ensure the imports are inside the function
    import os
    from sklearn.datasets import make_classification
    import pandas as pd

    print(f"Generating {num_samples} samples...")
    X, y = make_classification(n_samples=num_samples, n_features=10, n_classes=2, random_state=42)
    df = pd.DataFrame(X, columns=[f"feature_{i}" for i in range(10)])
    df["target"] = y

    # Create the output directory if it doesn't exist
    os.makedirs(data.path, exist_ok=True)

    # Save the data to the output path
    output_csv_path = os.path.join(data.path, "generated_data.csv")
    df.to_csv(output_csv_path, index=False)
    print(f"Data saved to: {output_csv_path}")

# Compile the component separately
generate_synthetic_data_op = generate_synthetic_data
compiler = Compiler()
compiler.compile(generate_synthetic_data_op, package_path="generate_synthetic_data_component.yaml")

In [42]:
from kfp import dsl
from kfp.dsl import component, Input, Output, Dataset, Model
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
import joblib # for saving the model
import os

@component(
    packages_to_install=['scikit-learn', 'pandas', 'joblib'],
    base_image="python:3.9"
)
def train_model(
    dataset: Input[Dataset],
    model: Output[Model],
    n_estimators: int = 100,
) -> None:
    """Trains a RandomForestClassifier on the input dataset."""
    # Ensure the imports are inside the function
    import pandas as pd
    from sklearn.model_selection import train_test_split
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.metrics import accuracy_score
    import joblib
    import os

    print(f"Loading data from: {dataset.path}")
    df = pd.read_csv(os.path.join(dataset.path, "generated_data.csv"))

    X = df.drop("target", axis=1)
    y = df["target"]

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    print(f"Training RandomForestClassifier with {n_estimators} estimators...")
    model_instance = RandomForestClassifier(n_estimators=n_estimators, random_state=42)
    model_instance.fit(X_train, y_train)

    y_pred = model_instance.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    print(f"Model accuracy: {accuracy}")

    # Create the output directory for the model if it doesn't exist
    os.makedirs(model.path, exist_ok=True)

    # Save the trained model to the output path
    model_save_path = os.path.join(model.path, "model.joblib")
    joblib.dump(model_instance, model_save_path)
    print(f"Model saved to: {model_save_path}")

# 3. Define and Compile the Pipeline

In [43]:
from kfp import dsl
from kfp.components import load_component_from_file

# Load the generate_synthetic_data component from the compiled YAML file
generate_synthetic_data_op = load_component_from_file("generate_synthetic_data_component.yaml")

@dsl.pipeline(
    name="simple-ml-pipeline",
    description="A simple pipeline that generates data and trains a model.",
    pipeline_root=PIPELINE_ROOT,
)
def simple_pipeline(
    num_samples: int = 1000,
    n_estimators: int = 50,
):
    """
    Defines a simple ML pipeline:
    1. Generates synthetic data.
    2. Trains a RandomForest model.
    """

    # Task to generate data using the loaded component
    generate_data_task = generate_synthetic_data_op(num_samples=num_samples)

    # Task to train model, using output from generate_data_task as input
    train_model_task = train_model(
        dataset=generate_data_task.outputs["data"],
        n_estimators=n_estimators,
    )

    # You can add more components here, e.g., for evaluation, model deployment, etc.

In [44]:
from kfp.compiler import Compiler

# Instantiate the compiler
compiler = Compiler()

# Compile the pipeline function
compiler.compile(simple_pipeline, package_path="simple_pipeline.json")

print("Pipeline compiled to simple_pipeline.json")

Pipeline compiled to simple_pipeline.json


# 4. Create and Run the Pipeline Job on Vertex AI

In [45]:
import google.cloud.aiplatform as aiplatform
import time

pipeline_spec_path = "simple_pipeline.json"
parameter_values = {"num_samples": 1000, "n_estimators": 50}

# Create a unique display name for the job
timestamp = int(time.time())
job_display_name = f"simple-ml-pipeline-job-{timestamp}"

job = aiplatform.PipelineJob(
    display_name=job_display_name,
    template_path=pipeline_spec_path,
    pipeline_root=PIPELINE_ROOT,
    parameter_values=parameter_values,
)

job.run()

# 5. Monitor the Pipeline Run

You can monitor the pipeline run in the Google Cloud console. The link to the job is provided in the output of the previous cell.

# 6. Clean up (Optional)

If you want to clean up the resources created during this example, you can delete the Cloud Storage bucket and the pipeline jobs.

In [None]:
# Delete the Cloud Storage bucket (use with caution!)
# ! gsutil rm -r {BUCKET_URI}

# Delete pipeline jobs (optional)
# from google.cloud.aiplatform.matching_engine.matching_engine_index import MatchingEngineIndex
# from google.api_core import exceptions

# try:
#     aiplatform.PipelineJob.list(filter=f'display_name:"simple-ml-pipeline-job*"')[0].delete()
# except exceptions.NotFound:
#     print("No pipeline jobs found to delete.")