In [None]:
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

## VAPO Transcription Example 2-3

- [Original Reference](https://github.com/GoogleCloudPlatform/generative-ai/blob/main/gemini/prompts/prompt_optimizer/vertex_ai_prompt_optimizer_sdk_custom_metric.ipynb) from [Ivan Nardini](https://github.com/inardini)

### Objective

This notebook demonstrates how to leverage Vertex AI prompt optimizer to optimize a simple prompt for a Gemini model using your own metric. The goal is to use Vertex AI prompt optimizer to find a new prompt template that generates better responses based on your own optimization metric.

This tutorial uses the following Google Cloud services and resources:

- Generative AI on Vertex AI
- Vertex AI prompt optimizer
- Vertex AI Gen AI evaluation
- Vertex AI Custom job
- Cloud Run

The steps performed include:

1. Define the prompt template you want to optimize.
2. Prepare the prompt optimization dataset.
3. Define and deploy your own custom evaluation metric on Cloud function.
4. Set optimization mode and steps.
5. Run the automatic prompt optimization job.
6. Collect the best prompt template and eval metric.
7. Validate the best prompt template.

### Costs

This tutorial uses billable components of Google Cloud:

* Vertex AI
* Cloud Storage

Learn about [Vertex AI pricing](https://cloud.google.com/vertex-ai/pricing) and [Cloud Storage pricing](https://cloud.google.com/storage/pricing) and use the [Pricing Calculator](https://cloud.google.com/products/calculator/) to generate a cost estimate based on your projected usage.

## 2. Before you start

### Install Vertex AI SDK for Python and other required packages


In [5]:
%pip install --upgrade --quiet 'google-cloud-aiplatform[evaluation]'
%pip install --upgrade --quiet 'plotly' 'asyncio' 'tqdm' 'tenacity' 'etils' 'importlib_resources' 'fsspec' 'gcsfs' 'nbformat>=4.2.0'

In [6]:
!wget https://raw.githubusercontent.com/GoogleCloudPlatform/generative-ai/main/gemini/prompts/prompt_optimizer/vapo_lib.py
import vapo_lib

In [3]:
! mkdir -p ./tutorial/utils && wget https://raw.githubusercontent.com/GoogleCloudPlatform/generative-ai/main/gemini/prompts/prompt_optimizer/vapo_lib.py -P ./tutorial/utils

### Authenticate your notebook environment (Colab only)

Authenticate your environment on Google Colab.


In [7]:
import sys

if "google.colab" in sys.modules:
    try:
        import google.auth
        import google.auth.transport.requests
        from google.colab import auth

        auth.authenticate_user()
        creds, project = google.auth.default()
        authentication = google.auth.transport.requests.Request()
        if creds.token:
            print("Authentication successful.")
        else:
            print("Authentication successful, but no token was returned.")
    except Exception as e:
        print(f"Error during Colab authentication: {e}")

! gcloud auth login

### Set Google Cloud project information

To get started using Vertex AI, you must have an existing Google Cloud project and [enable the following APIs](https://console.cloud.google.com/flows/enableapi?apiid=cloudresourcemanager.googleapis.com,aiplatform.googleapis.com,cloudfunctions.googleapis.com,run.googleapis.com).

Learn more about [setting up a project and a development environment](https://cloud.google.com/vertex-ai/docs/start/cloud-environment).

#### Set your project ID and project number

In [8]:
PROJECT_ID = "$YOUR_PROJECT_ID"  # @param {type:"string"}

# Set the project id
! gcloud config set project {PROJECT_ID}

In [59]:
PROJECT_NUMBER = !gcloud projects describe {PROJECT_ID} --format="get(projectNumber)"[0]
PROJECT_NUMBER = PROJECT_NUMBER[0]

#### Region

You can also change the `REGION` variable used by Vertex AI. Learn more about [Vertex AI regions](https://cloud.google.com/vertex-ai/docs/general/locations).

In [10]:
REGION = "us-central1" #"asia-northeast3"  # @param {type: "string"}

#### Create a Cloud Storage bucket

Create a storage bucket to store intermediate artifacts such as datasets.

In [11]:
BUCKET_NAME = "$YOUR_BUCKET_NAME"  # @param {type:"string"}
BUCKET_URI = f"gs://{BUCKET_NAME}"  # @param {type:"string"}

In [None]:
# ! gsutil mb -l {REGION} -p {PROJECT_ID} {BUCKET_URI}

#### Service Account and permissions

Vertex AI Prompt optimizer requires a service account with the following permissions:

-   `Vertex AI User` to call Vertex LLM API
-   `Storage Object Admin` to read and write to your GCS bucket.
-   `Artifact Registry Reader` to download the pipeline template from Artifact Registry.
-   `Cloud Run Developer` to deploy function on Cloud Run.

[Check out the documentation](https://cloud.google.com/iam/docs/manage-access-service-accounts#iam-view-access-sa-gcloud) to learn how to grant those permissions to a single service account.


> If you run following commands using Vertex AI Workbench, please directly run in the terminal.


In [12]:
SERVICE_ACCOUNT = f"{PROJECT_NUMBER}-compute@developer.gserviceaccount.com"

In [13]:
for role in ['aiplatform.user', 'storage.objectAdmin', 'artifactregistry.reader', 'run.developer', 'run.invoker']:

    ! gcloud projects add-iam-policy-binding {PROJECT_ID} \
      --member=serviceAccount:{SERVICE_ACCOUNT} \
      --role=roles/{role} --condition=None

### Set tutorial folder and workspace

Set a local folder to collect and organize data and any tutorial artifacts.

In [14]:
from pathlib import Path as path

ROOT_PATH = path.cwd()
TUTORIAL_PATH = ROOT_PATH / "tutorial_case2_3"
BUILD_PATH = TUTORIAL_PATH / "build_case2_3"

TUTORIAL_PATH.mkdir(parents=True, exist_ok=True)
BUILD_PATH.mkdir(parents=True, exist_ok=True)

Set an associated workspace to store prompt optimization results on Cloud Storage bucket.

In [15]:
from etils import epath

WORKSPACE_URI = epath.Path(BUCKET_URI) / "optimization_case2_3"
INPUT_DATA_URI = epath.Path(WORKSPACE_URI) / "data_case2_3"

WORKSPACE_URI.mkdir(parents=True, exist_ok=True)
INPUT_DATA_URI.mkdir(parents=True, exist_ok=True)

### Import libraries

In [18]:
# Tutorial
from argparse import Namespace
import json

# General
import logging
from pprint import pprint
import warnings

from IPython.display import HTML, display
from google.cloud import aiplatform
import pandas as pd
import requests
from sklearn.model_selection import train_test_split

### Libraries settings

In [19]:
warnings.filterwarnings("ignore")
logging.getLogger("urllib3.connectionpool").setLevel(logging.ERROR)

### Define constants

In [80]:
INPUT_DATA_FILE_URI = "$YOUR_FILE_PATH"

INPUT_OPTIMIZATION_DATA_URI = epath.Path(WORKSPACE_URI) / "prompt_optimization_data"
INPUT_OPTIMIZATION_DATA_FILE_URI = str(
    INPUT_DATA_URI / "prompt_optimization_dataset.jsonl"
)
OUTPUT_OPTIMIZATION_DATA_URI = epath.Path(WORKSPACE_URI) / "optimization_jobs"
APD_CONTAINER_URI = (
    "us-docker.pkg.dev/vertex-ai-restricted/builtin-algorithm/apd:preview_v1_0"
)
CONFIG_FILE_URI = str(WORKSPACE_URI / "config" / "config.json")

### Initialize Vertex AI SDK for Python

Initialize the Vertex AI SDK for Python for your project.

In [81]:
print(REGION)

In [82]:
aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_URI)

## 3. Automated prompt design with Vertex AI prompt optimizer

### Load the dataset

Load the STT dataset from a Google Cloud Storage bucket. The dataset contains the following columns:

*   `audio`
*   `target`

In [83]:
prompt_optimization_df = pd.read_json(INPUT_DATA_FILE_URI, lines=True)

In [84]:
prompt_optimization_df.head()

In [85]:
print(prompt_optimization_df['target'][6])

In [78]:
vapo_lib.print_df_rows(prompt_optimization_df[['audio', 'target']], n=1)

### Optimize the prompt template with Vertex AI prompt optimizer with custom metric


#### Prepare the prompt template you want to optimize

A prompt consists of two key parts:

* **System Instruction Template** which is a fixed part of the prompt that control or alter the model's behavior across all queries for a given task.

* **Prompt Template** which is a dynamic part of the prompt that changes based on the task. Prompt template includes context, task and more. To learn more, see [components of a prompt](https://cloud.google.com/vertex-ai/generative-ai/docs/learn/prompts/prompt-design-strategies#components-of-a-prompt) in the official documentation.

In this scenario, you use Vertex AI prompt optimizer to optimize a simple system instruction template. And you use some examples in the remaining prompt template for evaluating different instruction templates along the optimization process.

> Having the `target` placeholder in the prompt template is optional. It represents the prompt's ground truth response in your prompt optimization dataset that you aim to optimize for your templates. If you don't have the prompt's ground truth response, remember to set the `source_model` parameter to your prompt optimizer configuration (see below) instead of adding ground truth responses. Vertex AI prompt optimizer would run your sample prompts on the source model to generate the ground truth responses for you.

In [86]:
SYSTEM_INSTRUCTION_TEMPLATE = """
Generate a transcript of the speech. Only include the transcript in your response, and do not provide any other answer.
"""

PROMPT_TEMPLATE = """
Speech: {{audio}} @@@audio/wav
Answer : {{target}}
"""

#### Prepare the prompt optimization dataset

To use Vertex AI prompt optimizer, you'll need a CSV or JSONL file with labeled examples.  These examples should follow a specific naming convention. For details see [Optimize prompts](https://cloud.google.com/vertex-ai/generative-ai/docs/learn/prompts/prompt-optimizer).


> For effective **prompt optimization**, provide a dataset of examples where your model is poor in performance when using current system instruction template. For reliable results, use 50-100 distinct samples.

> In case of **prompt migration**, consider using the source model to label examples that the target model struggles with, helping to identify areas for improvement.

#### Define and deploy your own custom optimization metric on Cloud function

To optimize your prompt template using a custom optimization metric, you need to deploy a function with your own metric code on a Cloud function. To deploy a Cloud function with your own custom metric, you cover the following steps:

1.   Define requirements
2.   Write your own custom metric function code
3.   Deploy the custom code as Cloud function


##### Define requirements

Set the custom metric dependencies.

In [87]:
requirements = """
functions-framework==3.*
google-cloud-aiplatform

"""

with open(BUILD_PATH / "requirements.txt", "w") as f:
    f.write(requirements)

In [88]:
from pydantic import BaseModel, Field
from typing import Any, Dict, List, Optional, Tuple

class OptimizationConfig(BaseModel):
    """
    A comprehensive prompt optimization configuration model.
    """

    # Basic Configuration
    system_instruction: str = Field(
        ...,
        description="System instructions for the target model. String. This field is required.",
    )
    prompt_template: str = Field(
        ..., description="Template for prompts. String. This field is required."
    )
    target_model: str = Field(
        "gemini-2.5-flash",
        description='Target model for optimization. Supported models: "gemini-2.5-flash", "gemini-2.5-pro"',
    )
    thinking_budget: int = Field(
        -1,
        description="Thinking budget for thinking models. -1 means auto/no thinking. Integer.",
    )
    optimization_mode: str = Field(
        "instruction",
        description='Optimization mode. Supported modes: "instruction", "demonstration", "instruction_and_demo".',
    )
    project: str = Field(
        ..., description="Google Cloud project ID. This field is required."
    )

    # Evaluation Settings
    eval_metrics_types: List[str] = Field(
        description='List of evaluation metrics. E.g., "bleu", "rouge_l", "safety".'
    )
    eval_metrics_weights: List[float] = Field(
        description="Weights for evaluation metrics. Length must match eval_metrics_types and should sum to 1."
    )
    aggregation_type: str = Field(
        "weighted_sum",
        description='Aggregation type for metrics. Supported: "weighted_sum", "weighted_average".',
    )
    custom_metric_name: str = Field(
        "",
        description="Metric name, as defined by the key that corresponds in the dictionary returned from Cloud function. String.",
    )
    custom_metric_cloud_function_name: str = Field(
        "",
        description="Cloud Run function name you previously deployed. String.",
    )

    # Data and I/O Paths
    input_data_path: str = Field(
        ...,
        description="Cloud Storage URI to input optimization data. This field is required.",
    )
    output_path: str = Field(
        ...,
        description="Cloud Storage URI to save optimization results. This field is required.",
    )

    # (Optional) Advanced Configuration
    num_steps: int = Field(
        10,
        ge=10,
        le=20,
        description="Number of iterations in instruction optimization mode. Integer between 10 and 20.",
    )
    num_demo_set_candidates: int = Field(
        10,
        ge=10,
        le=30,
        description="Number of demonstrations evaluated. Integer between 10 and 30.",
    )
    demo_set_size: int = Field(
        3,
        ge=3,
        le=6,
        description="Number of demonstrations generated per prompt. Integer between 3 and 6.",
    )

    # (Optional) Model Locations and QPS
    target_model_location: str = Field(
        "us-central1", description="Location of the target model. Default us-central1."
    )
    target_model_qps: int = Field(
        1,
        ge=1,
        description="QPS for the target model. Integer >= 1, based on your quota.",
    )
    optimizer_model_location: str = Field(
        "us-central1",
        description="Location of the optimizer model. Default us-central1.",
    )
    optimizer_model_qps: int = Field(
        1,
        ge=1,
        description="QPS for the optimization model. Integer >= 1, based on your quota.",
    )
    source_model: str = Field(
        "",
        description="Google model previously used with these prompts. Not needed if providing a target column.",
    )
    source_model_location: str = Field(
        "us-central1", description="Location of the source model. Default us-central1."
    )
    source_model_qps: Optional[int] = Field(
        None, ge=1, description="Optional QPS for the source model. Integer >= 1."
    )
    eval_qps: int = Field(
        1,
        ge=1,
        description="QPS for the eval model. Integer >= 1, based on your quota.",
    )

    # (Optional) Response, Language, and Data Handling
    response_mime_type: str = Field(
        "text/plain",
        description="MIME response type from the target model. E.g., 'text/plain', 'application/json'.",
    )
    response_schema: str = Field(
        "", description="The Vertex AI Controlled Generation response schema."
    )
    language: str = Field(
        "English",
        description='Language of the system instructions. E.g., "English", "Japanese".',
    )
    placeholder_to_content: Dict[str, Any] = Field(
        {},
        description="Dictionary of placeholders to replace parameters in the system instruction.",
    )
    data_limit: int = Field(
        10,
        ge=5,
        le=100,
        description="Amount of data used for validation. Integer between 5 and 100.",
    )
    translation_source_field_name: str = Field(
        "",
        description="Field name for source text if using translation metrics (Comet, MetricX).",
    )
    has_multimodal_inputs: bool = Field(
        False, description="Whether the input data is multimodal."
    )

In [89]:
INPUT_DATA_FILE_URI

In [91]:
output_path = f"{BUCKET_URI}/optimization_results/"

# English
# vapo_data_settings = {
#     "system_instruction": SYSTEM_INSTRUCTION_TEMPLATE,
#     "prompt_template": PROMPT_TEMPLATE,
#     "has_multimodal_inputs": True,
#     "target_model": "gemini-2.5-flash",
#     "thinking_budget": -1,
#     "optimization_mode": "instruction",
#     "eval_metrics_types": ["exact_match"],
#     "eval_metrics_weights": [1.0],
#     "aggregation_type": "weighted_sum",
#     "input_data_path": INPUT_DATA_FILE_URI,
#     "output_path": output_path,
#     "project": PROJECT_ID,
#     "language": "English"
# }

# Mandarin
# vapo_data_settings = {
#     "system_instruction": SYSTEM_INSTRUCTION_TEMPLATE,
#     "prompt_template": PROMPT_TEMPLATE,
#     "has_multimodal_inputs": True,
#     "target_model": "gemini-2.5-flash",
#     "thinking_budget": -1,
#     "optimization_mode": "instruction", 
#     "eval_metrics_types": ["exact_match"],
#     "eval_metrics_weights": [1.0],
#     "aggregation_type": "weighted_sum",
#     "input_data_path": INPUT_DATA_FILE_URI,
#     "output_path": output_path,
#     "project": PROJECT_ID,
#     "language": "Simplified Chinese"
# }

# Korean
vapo_data_settings = {
    "system_instruction": SYSTEM_INSTRUCTION_TEMPLATE,
    "prompt_template": PROMPT_TEMPLATE,
    "has_multimodal_inputs": True,
    "target_model": "gemini-2.5-flash",
    "thinking_budget": -1,
    "optimization_mode": "instruction",
    "eval_metrics_types": ["exact_match"],
    "eval_metrics_weights": [1.0],
    "aggregation_type": "weighted_sum",
    "input_data_path": INPUT_DATA_FILE_URI,
    "output_path": output_path,
    "project": PROJECT_ID,
    "language": "Korean"
}

vapo_data_config = OptimizationConfig(**vapo_data_settings)
vapo_data_config_json = vapo_data_config.model_dump()

In [92]:
config_path = f"{BUCKET_URI}/config.json"

with epath.Path(config_path).open("w") as config_file:
    json.dump(vapo_data_config_json, config_file)
config_file.close()

In [93]:
import vertexai

client = vertexai.Client(project=PROJECT_ID, location=REGION)

vapo_data_run_config = {
    "config_path": config_path,
    "wait_for_completion": False,
    "service_account": SERVICE_ACCOUNT,
}
print(output_path)

client.prompt_optimizer.optimize(method="vapo", config=vapo_data_run_config)

In [94]:
from google.cloud import storage

def format_demonstrations(demos: Any) -> List[str]:
    """Format demonstrations into readable strings."""
    if isinstance(demos, str):
        try:
            demos = json.loads(demos)
        except (json.JSONDecodeError, ValueError):
            return []

    if not isinstance(demos, list):
        return []

    formatted = []
    for demo in demos:
        if isinstance(demo, dict):
            demo_str = "\n".join(f"{k}: {v}" for k, v in demo.items())
            formatted.append(demo_str)
        else:
            formatted.append(str(demo))

    return formatted


def split_gcs_path(gcs_path: str) -> Tuple[str, str]:
    """Split GCS path into bucket name and prefix."""
    if not gcs_path.startswith("gs://"):
        raise ValueError(f"Invalid GCS path. Must start with gs://: {gcs_path}")

    path = gcs_path[len("gs://"):]
    parts = path.split("/", 1)
    return parts[0], parts[1] if len(parts) > 1 else ""


def list_gcs_objects(gcs_path: str) -> List[str]:
    """List all objects under given GCS path."""
    bucket_name, prefix = parse_gcs_path(gcs_path)

    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blobs = bucket.list_blobs(prefix=prefix)

    return [blob.name for blob in blobs]


def find_directories_with_files(
    base_path: str, required_files: List[str]
) -> List[str]:
    """Find directories containing all required files."""
    bucket_name, prefix = split_gcs_path(base_path)
    all_paths = list_gcs_objects(base_path)

    # Group files by directory
    directories: Dict[str, set] = {}
    for path in all_paths:
        dir_path = "/".join(path.split("/")[:-1])
        filename = path.split("/")[-1]

        if dir_path not in directories:
            directories[dir_path] = set()
        directories[dir_path].add(filename)

    # Find directories with all required files
    matching_dirs = []
    for dir_path, files in directories.items():
        if all(req_file in files for req_file in required_files):
            matching_dirs.append(f"gs://{bucket_name}/{dir_path}")

    return matching_dirs

def parse_gcs_path(gcs_path: str) -> Tuple[str, str]:
    """Parse GCS path into bucket name and prefix."""
    if not gcs_path.startswith("gs://"):
        raise ValueError("Invalid GCS path. Must start with gs://")

    path_without_prefix = gcs_path[5:]  # Remove 'gs://'
    parts = path_without_prefix.split("/", 1)
    bucket_name = parts[0]
    prefix = parts[1] if len(parts) > 1 else ""

    return bucket_name, prefix

def get_best_vapo_results(
    base_path: str, metric_name: Optional[str] = None
) -> Tuple[str, List[str]]:
    """Get the best system instruction and demonstrations across all VAPO runs."""
    # Find all valid runs
    required_files = ["eval_results.json", "templates.json"]
    runs = find_directories_with_files(base_path, required_files)

    if not runs:
        raise ValueError(f"No valid runs found in {base_path}")

    best_score = float("-inf")
    best_instruction = ""
    best_demonstrations: List[str] = []

    for run_path in runs:
        try:
            # Check main templates.json first
            templates_path = f"{run_path}/templates.json"
            with epath.Path(templates_path).open("r") as f:
                templates_data = json.load(f)

            if templates_data:
                df = pd.json_normalize(templates_data)

                # Find metric column
                metric_columns = [
                    col for col in df.columns
                    if "metric" in col and "mean" in col
                ]

                if metric_columns:
                    # Select appropriate metric
                    if metric_name:
                        metric_col = next(
                            (col for col in metric_columns if metric_name in col),
                            None
                        )
                    else:
                        composite_cols = [
                            col for col in metric_columns
                            if "composite_metric" in col
                        ]
                        metric_col = (
                            composite_cols[0] if composite_cols else metric_columns[0]
                        )

                    if metric_col and metric_col in df.columns:
                        best_idx = df[metric_col].argmax()
                        score = float(df.iloc[best_idx][metric_col])

                        if score > best_score:
                            best_score = score
                            best_row = df.iloc[best_idx]

                            # Extract instruction if present
                            if "prompt" in best_row or "instruction" in best_row:
                                instruction = best_row.get(
                                    "prompt", best_row.get("instruction", "")
                                )
                                if instruction:
                                    instruction = instruction.replace(
                                        "store('answer', llm())", "{{llm()}}"
                                    )
                                    best_instruction = instruction

                            # Extract demonstrations if present
                            if "demonstrations" in best_row or "demo_set" in best_row:
                                demos = best_row.get(
                                    "demonstrations", best_row.get("demo_set", [])
                                )
                                best_demonstrations = format_demonstrations(demos)

            # Check instruction-specific optimization
            instruction_path = f"{run_path}/instruction/templates.json"
            try:
                with epath.Path(instruction_path).open("r") as f:
                    instruction_data = json.load(f)

                if instruction_data:
                    inst_df = pd.json_normalize(instruction_data)
                    metric_columns = [
                        col for col in inst_df.columns
                        if "metric" in col and "mean" in col
                    ]

                    if metric_columns:
                        if metric_name:
                            metric_col = next(
                                (col for col in metric_columns if metric_name in col),
                                None,
                            )
                        else:
                            composite_cols = [
                                col for col in metric_columns
                                if "composite_metric" in col
                            ]
                            metric_col = (
                                composite_cols[0] if composite_cols else metric_columns[0]
                            )

                        if metric_col and metric_col in inst_df.columns:
                            inst_best_idx = inst_df[metric_col].argmax()
                            inst_score = float(inst_df.iloc[inst_best_idx][metric_col])

                            if inst_score > best_score:
                                best_score = inst_score
                                best_row = inst_df.iloc[inst_best_idx]

                                instruction = best_row.get(
                                    "prompt", best_row.get("instruction", "")
                                )
                                if instruction:
                                    instruction = instruction.replace(
                                        "store('answer', llm())", "{{llm()}}"
                                    )
                                    best_instruction = instruction
                                # In instruction-only mode, there might not be demonstrations
                                if "demonstrations" not in best_row and "demo_set" not in best_row:
                                    best_demonstrations = []
            except FileNotFoundError:
                pass

            # Check demonstration-specific optimization
            demo_path = f"{run_path}/demonstration/templates.json"
            try:
                with epath.Path(demo_path).open("r") as f:
                    demo_data = json.load(f)

                if demo_data:
                    demo_df = pd.json_normalize(demo_data)
                    metric_columns = [
                        col for col in demo_df.columns
                        if "metric" in col and "mean" in col
                    ]

                    if metric_columns:
                        if metric_name:
                            metric_col = next(
                                (col for col in metric_columns if metric_name in col),
                                None,
                            )
                        else:
                            composite_cols = [
                                col for col in metric_columns
                                if "composite_metric" in col
                            ]
                            metric_col = (
                                composite_cols[0] if composite_cols else metric_columns[0]
                            )

                        if metric_col and metric_col in demo_df.columns:
                            demo_best_idx = demo_df[metric_col].argmax()
                            demo_score = float(demo_df.iloc[demo_best_idx][metric_col])

                            if demo_score > best_score:
                                best_score = demo_score
                                best_row = demo_df.iloc[demo_best_idx]

                                demos = best_row.get(
                                    "demonstrations", best_row.get("demo_set", [])
                                )
                                best_demonstrations = format_demonstrations(demos)
                                # In demo-only mode, there might not be an instruction
                                if "prompt" not in best_row and "instruction" not in best_row:
                                    best_instruction = ""
                                else:
                                    instruction = best_row.get(
                                        "prompt", best_row.get("instruction", "")
                                    )
                                    if instruction:
                                        instruction = instruction.replace(
                                            "store('answer', llm())", "{{llm()}}"
                                        )
                                        best_instruction = instruction
            except (FileNotFoundError, json.JSONDecodeError):
                pass

        except Exception as e:
            logging.warning(f"Error processing run {run_path}: {e}")
            continue

    if best_score == float("-inf"):
        raise ValueError("Could not find any valid results")

    return best_instruction, best_demonstrations

In [72]:
best_instruction, _ = get_best_vapo_results(output_path)
print("The optimized instruction is:\n", best_instruction)

## 4. Clean up

In [None]:
delete_bucket = False
delete_job = False
delete_tutorial = False

if delete_bucket:
    ! gsutil rm -r {BUCKET_URI}

if delete_tutorial:
    import shutil

    shutil.rmtree(str(TUTORIAL_PATH))