<a href="https://colab.research.google.com/github/romeodiaz/colabgoogle/blob/main/%5BMAKE_A_COPY%5D_CVIP_%5BGE_AIVE%5D_ADK_Deploy.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ADK Agent Deployment Notebook

This notebook deploys agents to Vertex AI Agent Engine, mirroring the functionality of `deploy_agents.sh`.

**Deployment Order:**
1. `sales_plays_agent` (sub-agent)
2. `company_research_agent` (sub-agent)
3. `partnership_strategy_agent` (orchestrator)

In [None]:
# @title 1. Authenticate GCP
from google.colab import auth
auth.authenticate_user()

In [None]:
# @title 2. Configuration { display-mode: "form" }

# @markdown ### GCP Project Settings
PROJECT_ID = "cvip-16562"  # @param {type:"string"}
DEPLOY_REGION = "us-central1"  # @param {type:"string"}
MODEL_REGION = "global"  # @param {type:"string"}
STAGING_BUCKET = "gs://adk_partnership_strategy_agent"  # @param {type:"string"}
IMPERSONATE_SERVICE_ACCOUNT = "adk-svc-acc@cvip-16562.iam.gserviceaccount.com"  # @param {type:"string"}
AGENT_SERVICE_ACCOUNT = "adk-svc-acc@cvip-16562.iam.gserviceaccount.com"  # @param {type:"string"}

# @markdown ### GitHub Repository (for private repos, add a Personal Access Token)
GITHUB_REPO = "https://github.com/NextPhase-ai/AIVE-ADK"  # @param {type:"string"}
REPO_FOLDER = "AIVE-ADK"  # @param {type:"string"}
BRANCH_NAME = "main"  # @param {type:"string"}
GITHUB_TOKEN = ""  # @param {type:"string"}

# @markdown ### Agent Deployment Settings
AGENTS_DIR = "agents"  # @param {type:"string"}
AGENT_DESCRIPTION = "ADK Agent deployed via Colab"  # @param {type:"string"}

# @markdown ### Optional sub-agent wiring (recommended if discovery is restricted)
SALES_PLAYS_AGENT_ENGINE_ID = "5493104017275879424"  # @param {type:"string"}
COMPANY_RESEARCH_AGENT_ENGINE_ID = "3208653106292195328"  # @param {type:"string"}
SALES_PLAYS_AGENT_REGION = "us-central1"  # @param {type:"string"}
COMPANY_RESEARCH_AGENT_REGION = "us-central1"  # @param {type:"string"}

# ---
# Non-form configuration (edit in code view)
AGENTS_TO_DEPLOY = ["sales_plays_agent", "company_research_agent", "partnership_strategy_agent"]
USE_AGENT_REQUIREMENTS = True
BASE_REQUIREMENTS = ["google-cloud-aiplatform[adk,agent_engines]"]

print("Configuration:")
print(f"  Project: {PROJECT_ID}")
print(f"  Deploy Region: {DEPLOY_REGION}")
print(f"  Model Region: {MODEL_REGION}")
print(f"  Staging Bucket: {STAGING_BUCKET}")
print(f"  Impersonate SA: {IMPERSONATE_SERVICE_ACCOUNT or '(not set)'}")
print(f"  Agent SA: {AGENT_SERVICE_ACCOUNT or '(not set)'}")
print(f"  Use agent requirements: {USE_AGENT_REQUIREMENTS}")
print(f"  GitHub Token: {'***' if GITHUB_TOKEN else '(not set)'}")
print(f"  Agents to Deploy: {AGENTS_TO_DEPLOY}")

In [None]:
# @title 3. Clone GitHub Repo
import os

# Ensure we are in /content to avoid stale CWD issues
CONTENT_DIR = "/content"
os.makedirs(CONTENT_DIR, exist_ok=True)
os.chdir(CONTENT_DIR)

repo_path = os.path.join(CONTENT_DIR, REPO_FOLDER)

# Clean up if repo already exists
if os.path.exists(repo_path):
    !rm -rf {repo_path}

if GITHUB_TOKEN:
    print("Using authenticated clone (token provided)")
    # Configure git to use the token
    !git config --global credential.helper store
    
    # Write credentials to git credential store
    with open(os.path.expanduser('~/.git-credentials'), 'w') as f:
        f.write(f'https://x-access-token:{GITHUB_TOKEN}@github.com\n')
    
    !git clone {GITHUB_REPO} {repo_path}
    
    # Clean up credentials file after clone
    os.remove(os.path.expanduser('~/.git-credentials'))
else:
    print("Using public clone (no token)")
    !git clone {GITHUB_REPO} {repo_path}

%cd {repo_path}
!git checkout {BRANCH_NAME}

# Verify agents directory exists
if os.path.exists(AGENTS_DIR):
    print(f"\n✓ Found agents directory: {AGENTS_DIR}")
    !ls -la {AGENTS_DIR}/
else:
    print(f"\n✗ Agents directory not found: {AGENTS_DIR}")

In [None]:
# @title 4. Install Dependencies
!pip install -q google-cloud-aiplatform[adk,agent_engines] google-adk

In [None]:
# @title 5. Initialize Vertex AI
import vertexai
from vertexai import agent_engines

vertexai.init(
    project=PROJECT_ID,
    location=DEPLOY_REGION,
    staging_bucket=STAGING_BUCKET,
)

print(f"✓ Vertex AI initialized")
print(f"  Project: {PROJECT_ID}")
print(f"  Location: {DEPLOY_REGION}")
print(f"  Staging Bucket: {STAGING_BUCKET}")

In [None]:
# @title 6. Define Deployment Helper Functions
import os
import sys
import shutil
import tempfile
import importlib
import inspect
import subprocess
import re
import requests
from typing import Optional, List, Dict, Any
from datetime import datetime
from google.api_core.exceptions import PermissionDenied


def get_access_token() -> str:
    """Get GCP access token."""
    token = !gcloud auth print-access-token
    return token[0].strip()


def get_active_gcloud_account() -> Optional[str]:
    """Return active gcloud account if available."""
    try:
        account = subprocess.check_output(
            ["gcloud", "config", "get-value", "account"], text=True
        ).strip()
        return account if account and account != "(unset)" else None
    except Exception:
        return None


def _list_reasoning_engines() -> List[Dict[str, Any]]:
    """List reasoning engines with version fallback."""
    token = get_access_token()
    headers = {"Authorization": f"Bearer {token}"}
    last_error = None

    for api_version in ("v1beta1", "v1"):
        url = (
            f"https://{DEPLOY_REGION}-aiplatform.googleapis.com/{api_version}"
            f"/projects/{PROJECT_ID}/locations/{DEPLOY_REGION}/reasoningEngines"
        )
        try:
            response = requests.get(url, headers=headers)
            if response.ok:
                data = response.json() if response.text else {}
                return data.get("reasoningEngines", [])
            last_error = f"{response.status_code} {response.text[:200]}"
        except Exception as e:
            last_error = str(e)

    if last_error:
        print(f"  Warning: Could not query existing engines: {last_error}")
    return []


def get_existing_agent_engine_id(display_name: str) -> Optional[str]:
    """
    Find existing agent engine by display name.
    Returns the most recent engine ID if found, None otherwise.
    """
    engines = _list_reasoning_engines()
    matching = [e for e in engines if e.get("displayName") == display_name]

    if matching:
        # Sort by createTime descending to get the most recent
        matching.sort(key=lambda x: x.get("createTime", ""), reverse=True)
        engine_id = matching[0]["name"].split("/")[-1]
        return engine_id

    return None


def extract_engine_resource(error_text: str) -> Optional[str]:
    """Extract reasoning engine resource path from an error string."""
    match = re.search(
        r"projects/\d+/locations/[a-z0-9-]+/reasoningEngines/\d+",
        error_text,
    )
    return match.group(0) if match else None


def print_engine_log_help(resource_name: str):
    """Print helpful log commands for a failed reasoning engine."""
    print("  Logs: open Cloud Logging and filter by this resource name:")
    print(f"    {resource_name}")
    print("  Or run in a Colab cell:")
    engine_id = resource_name.split("/")[-1]
    cmd = (
        "!gcloud logging read "
        "'resource.type=\"aiplatform.googleapis.com/ReasoningEngine\" "
        f"AND resource.labels.reasoning_engine_id=\"{engine_id}\"' "
        "--limit=50 --freshness=2h"
    )
    print(f"    {cmd}")


def load_requirements_file(path: str) -> List[str]:
    """Load requirements.txt lines, stripping comments and blanks."""
    if not os.path.exists(path):
        return []

    requirements = []
    with open(path, "r", encoding="utf-8") as handle:
        for raw_line in handle:
            line = raw_line.strip()
            if not line or line.startswith("#"):
                continue
            if "#" in line:
                line = line.split("#", 1)[0].strip()
            if line:
                requirements.append(line)
    return requirements


def get_agent_requirements(
    agent_name: str,
    agents_dir: str,
    base_requirements: Optional[List[str]] = None,
) -> List[str]:
    """Combine base requirements with the agent's requirements.txt."""
    requirements = list(base_requirements or [])
    req_path = os.path.join(agents_dir, agent_name, "requirements.txt")
    requirements.extend(load_requirements_file(req_path))

    seen = set()
    deduped = []
    for req in requirements:
        if req not in seen:
            deduped.append(req)
            seen.add(req)
    return deduped


def _extract_engine_id(resource_name: Optional[str]) -> Optional[str]:
    if not resource_name:
        return None
    match = re.search(r"reasoningEngines/([^/]+)", resource_name)
    return match.group(1) if match else None


def _resolve_engine_id_from_app(app: Any) -> Optional[str]:
    resource_name = getattr(app, "resource_name", None) or ""
    return _extract_engine_id(resource_name)


def get_deploy_env_vars(
    agent_name: str,
    deployed_apps: Optional[Dict[str, Any]] = None,
) -> Optional[Dict[str, str]]:
    """Return environment variables to inject into the agent runtime."""
    env: Dict[str, str] = {}
    if MODEL_REGION:
        env["VERTEX_AI_MODEL_LOCATION"] = MODEL_REGION
    if IMPERSONATE_SERVICE_ACCOUNT:
        env["AIVE_IMPERSONATE_SERVICE_ACCOUNT"] = IMPERSONATE_SERVICE_ACCOUNT

    if agent_name != "partnership_strategy_agent":
        return env or None

    sales_engine_id = SALES_PLAYS_AGENT_ENGINE_ID
    if not sales_engine_id and deployed_apps:
        sales_engine_id = _resolve_engine_id_from_app(
            deployed_apps.get("sales_plays_agent")
        )
    if sales_engine_id:
        env["SALES_PLAYS_AGENT_ENGINE_ID"] = sales_engine_id
        env["SALES_PLAYS_AGENT_REGION"] = SALES_PLAYS_AGENT_REGION or DEPLOY_REGION

    company_engine_id = COMPANY_RESEARCH_AGENT_ENGINE_ID
    if not company_engine_id and deployed_apps:
        company_engine_id = _resolve_engine_id_from_app(
            deployed_apps.get("company_research_agent")
        )
    if company_engine_id:
        env["COMPANY_RESEARCH_AGENT_ENGINE_ID"] = company_engine_id
        env["COMPANY_RESEARCH_AGENT_REGION"] = (
            COMPANY_RESEARCH_AGENT_REGION or DEPLOY_REGION
        )

    return env or None


def _apply_env_vars(
    fn,
    kwargs: Dict[str, Any],
    env_vars: Optional[Dict[str, str]],
) -> Dict[str, Any]:
    if not env_vars:
        return kwargs

    param_name = None
    for candidate in ("env_vars", "env", "environment_variables", "environment"):
        if candidate in inspect.signature(fn).parameters:
            param_name = candidate
            break

    if not param_name:
        print("  ! Warning: agent_engines API does not accept env vars; skipping env injection.")
        return kwargs

    kwargs[param_name] = env_vars
    return kwargs


def _apply_service_account(
    fn,
    kwargs: Dict[str, Any],
    service_account: Optional[str],
) -> Dict[str, Any]:
    if not service_account:
        return kwargs

    param_name = None
    for candidate in ("service_account", "service_account_email", "serviceAccount"):
        if candidate in inspect.signature(fn).parameters:
            param_name = candidate
            break

    if not param_name:
        print("  ! Warning: agent_engines API does not accept service_account; skipping.")
        return kwargs

    kwargs[param_name] = service_account
    return kwargs


def list_agent_files(package_root: str) -> List[str]:
    """
    List all Python files and data files in a package root.
    Returns paths relative to the package root.
    """
    extra_packages = []
    
    for root, dirs, files in os.walk(package_root):
        # Skip __pycache__ and hidden directories
        dirs[:] = [d for d in dirs if not d.startswith('.') and d != '__pycache__']
        
        for file in files:
            # Skip hidden files and cache files
            if file.startswith('.') or file.endswith('.pyc'):
                continue
            
            # Include Python files, JSON files, JSONL files, and other data files
            if file.endswith(('.py', '.json', '.jsonl', '.txt', '.yaml', '.yml')):
                rel_path = os.path.relpath(os.path.join(root, file), package_root)
                extra_packages.append(rel_path)
    
    return extra_packages


def prepare_agent_staging(agent_name: str, agents_dir: str) -> str:
    """
    Prepare staging directory with agent and common/ folder.
    Returns path to the staging directory.
    """
    agent_dir = os.path.join(agents_dir, agent_name)
    common_dir = os.path.join(agents_dir, 'common')
    
    if not os.path.exists(agent_dir):
        raise FileNotFoundError(f"Agent directory not found: {agent_dir}")
    
    # Create temp staging directory
    staging_dir = tempfile.mkdtemp(prefix=f"adk_build_{agent_name}_")
    staged_agent_dir = os.path.join(staging_dir, agent_name)
    
    # Copy agent directory
    shutil.copytree(agent_dir, staged_agent_dir)
    
    # Copy common/ (if exists)
    if os.path.exists(common_dir):
        # Keep a copy inside the agent package for local relative imports
        shutil.copytree(common_dir, os.path.join(staged_agent_dir, 'common'))
        # Also expose common as a top-level package for runtime imports
        shutil.copytree(common_dir, os.path.join(staging_dir, 'common'))
        print(f"  ✓ Copied common/ directory into agent package and root")
    else:
        print(f"  ! No common/ directory found at {common_dir}")

    # Ensure packages so relative imports work
    init_path = os.path.join(staged_agent_dir, "__init__.py")
    if not os.path.exists(init_path):
        open(init_path, "a").close()

    common_init = os.path.join(staged_agent_dir, "common", "__init__.py")
    if os.path.exists(os.path.dirname(common_init)) and not os.path.exists(common_init):
        open(common_init, "a").close()

    root_common_init = os.path.join(staging_dir, "common", "__init__.py")
    if os.path.exists(os.path.dirname(root_common_init)) and not os.path.exists(root_common_init):
        open(root_common_init, "a").close()

    return staged_agent_dir


def normalize_extra_packages(package_root: str, extra_packages: List[str]) -> List[str]:
    """Ensure extra_packages are valid paths for the current working dir."""
    normalized = []
    for rel_path in extra_packages:
        # Strip leading './' or absolute staging_dir prefix if present
        rel_path = rel_path.lstrip("./")
        normalized.append(rel_path)
    return normalized


def load_agent_module(agent_dir: str, agent_name: str):
    """
    Dynamically load the agent module from directory.
    Returns the root_app (AdkApp with greeting) if available, otherwise root_agent.
    """
    parent_dir = os.path.dirname(agent_dir)
    for path in [parent_dir, agent_dir]:
        if path not in sys.path:
            sys.path.insert(0, path)

    # Clear any cached imports
    modules_to_remove = [
        m for m in sys.modules
        if m == agent_name
        or m.startswith(f"{agent_name}.")
        or m.startswith('common')
    ]
    for m in modules_to_remove:
        del sys.modules[m]

    # Import as a package so relative imports resolve
    module = importlib.import_module(f"{agent_name}.agent")
    importlib.reload(module)

    # Prefer root_app (AdkApp with greeting session service) if available
    if hasattr(module, 'root_app') and module.root_app is not None:
        print(f"  ✓ Using root_app (AdkApp with greeting session service)")
        return module.root_app
    else:
        print(f"  ! root_app not found, using root_agent")
        return module.root_agent


def show_package_sizes(agents_dir: str, agents: List[str]):
    """Show package sizes for agents (helps identify bloat)."""
    print("\nAgent package sizes (smaller = faster deploy):")
    for agent_name in agents:
        agent_dir = os.path.join(agents_dir, agent_name)
        if os.path.exists(agent_dir):
            # Calculate size
            total_size = 0
            for dirpath, dirnames, filenames in os.walk(agent_dir):
                for f in filenames:
                    fp = os.path.join(dirpath, f)
                    total_size += os.path.getsize(fp)
            
            # Format size
            if total_size > 1024 * 1024:
                size_str = f"{total_size / (1024*1024):.1f}M"
            elif total_size > 1024:
                size_str = f"{total_size / 1024:.1f}K"
            else:
                size_str = f"{total_size}B"
            
            print(f"  {agent_name}: {size_str}")
        else:
            print(f"  {agent_name}: NOT FOUND")
    print()


print("✓ Helper functions defined")

In [None]:
# @title 7. Define Main Deployment Function
import time
from vertexai import agent_engines


def deploy_agent(
    agent_name: str,
    agents_dir: str,
    requirements: List[str],
    description: str = "ADK Agent",
    env_vars: Optional[Dict[str, str]] = None,
) -> Optional[Any]:
    """
    Deploy a single agent to Vertex AI Agent Engine.
    Updates existing agent if found, creates new otherwise.
    """
    start_time = time.time()
    original_dir = os.getcwd()
    
    print("\n" + "=" * 60)
    print(f"DEPLOYING: {agent_name}")
    print("=" * 60)
    
    try:
        # Step 1: Check for existing agent
        print(f"\n  Step 1/4: Checking for existing agent...")
        existing_id = get_existing_agent_engine_id(agent_name)
        
        if existing_id:
            print(f"  ✓ Found existing agent: {existing_id}")
            resource_name = f"projects/{PROJECT_ID}/locations/{DEPLOY_REGION}/reasoningEngines/{existing_id}"
        else:
            print(f"  ! No existing agent found, will create new")
            resource_name = None
        
        # Step 2: Prepare staging directory with common/
        print(f"\n  Step 2/4: Preparing staging directory...")
        staged_agent_dir = prepare_agent_staging(agent_name, agents_dir)
        print(f"  ✓ Staging directory: {staged_agent_dir}")
        
        # Step 3: Load agent module (need to be in the directory)
        print(f"\n  Step 3/4: Loading agent module...")
        os.chdir(staged_agent_dir)
        root_agent = load_agent_module(staged_agent_dir, agent_name)
        print(f"  ✓ Loaded agent: {root_agent}")
        
        # Package from the staging root so paths resolve
        package_root = os.path.dirname(staged_agent_dir)
        os.chdir(package_root)
        extra_packages = list_agent_files(package_root)
        extra_packages = normalize_extra_packages(package_root, extra_packages)
        print(f"  ✓ Found {len(extra_packages)} files to package")
        
        # Step 4: Deploy (from the agent directory so relative paths work)
        print(f"\n  Step 4/4: Deploying to Agent Engine...")
        
        deploy_kwargs = {
            "agent_engine": root_agent,
            "description": description,
            "requirements": requirements,
            "extra_packages": extra_packages,
        }
        if env_vars:
            print(f"  Injecting env vars: {sorted(env_vars.keys())}")
        if AGENT_SERVICE_ACCOUNT:
            print(f"  Using service account: {AGENT_SERVICE_ACCOUNT}")

        if resource_name:
            # Update existing
            print(f"  Updating existing agent...")
            deploy_kwargs["resource_name"] = resource_name
            deploy_kwargs = _apply_env_vars(agent_engines.update, deploy_kwargs, env_vars)
            deploy_kwargs = _apply_service_account(
                agent_engines.update, deploy_kwargs, AGENT_SERVICE_ACCOUNT
            )
            remote_app = agent_engines.update(**deploy_kwargs)
        else:
            # Create new
            print(f"  Creating new agent...")
            deploy_kwargs["display_name"] = agent_name
            deploy_kwargs = _apply_env_vars(agent_engines.create, deploy_kwargs, env_vars)
            deploy_kwargs = _apply_service_account(
                agent_engines.create, deploy_kwargs, AGENT_SERVICE_ACCOUNT
            )
            remote_app = agent_engines.create(**deploy_kwargs)
        
        # Return to original directory
        os.chdir(original_dir)
        
        # Cleanup staging directory
        shutil.rmtree(os.path.dirname(staged_agent_dir), ignore_errors=True)
        
        duration = time.time() - start_time
        print("\n" + "=" * 60)
        print(f"✓ {agent_name} deployed successfully ({duration:.1f}s)")
        print(f"  Resource: {remote_app.resource_name}")
        print("=" * 60)
        
        return remote_app
        
    except Exception as e:
        # Make sure we return to original directory on error
        os.chdir(original_dir)
        duration = time.time() - start_time
        print("\n" + "=" * 60)
        print(f"✗ {agent_name} deployment FAILED ({duration:.1f}s)")
        print(f"  Error: {e}")
        if isinstance(e, PermissionDenied):
            account = get_active_gcloud_account()
            if account:
                print(f"  Active gcloud account: {account}")
            print("  Fix: grant IAM permission 'aiplatform.reasoningEngines.create'")
            print("  Suggested roles: roles/aiplatform.user or roles/aiplatform.admin")
            print("  Also ensure bucket permissions: roles/storage.objectAdmin or roles/storage.admin")
        else:
            resource_name = extract_engine_resource(str(e))
            if resource_name:
                print_engine_log_help(resource_name)
        print("=" * 60)
        raise


def deploy_all_agents(
    agents: List[str],
    agents_dir: str,
    requirements: List[str],
    description: str = "ADK Agent"
) -> Dict[str, Any]:
    """
    Deploy all agents sequentially.
    Returns dict of agent_name -> deployed app.
    """
    total_start = time.time()
    
    print("\n" + "#" * 60)
    print("#          ADK AGENT DEPLOYMENT TO VERTEX AI               #")
    print("#" * 60)
    
    # Show package sizes
    show_package_sizes(agents_dir, agents)
    
    print("Deployment Plan:")
    for i, agent in enumerate(agents, 1):
        agent_type = "orchestrator" if i == len(agents) else "sub-agent"
        print(f"  {i}. {agent} ({agent_type})")
    print()
    
    deployed_apps = {}
    
    for agent_name in agents:
        try:
            agent_requirements = (
                get_agent_requirements(agent_name, agents_dir, requirements)
                if USE_AGENT_REQUIREMENTS
                else requirements
            )
            env_vars = get_deploy_env_vars(agent_name, deployed_apps)
            app = deploy_agent(
                agent_name=agent_name,
                agents_dir=agents_dir,
                requirements=agent_requirements,
                description=description,
                env_vars=env_vars,
            )
            deployed_apps[agent_name] = app
        except Exception as e:
            print(f"\n! Stopping deployment due to error in {agent_name}")
            break
    
    total_duration = time.time() - total_start
    
    print("\n" + "#" * 60)
    print("#          DEPLOYMENT COMPLETE                             #")
    print("#" * 60)
    print(f"\n  Total time: {total_duration:.1f}s")
    print(f"  Agents deployed: {len(deployed_apps)}/{len(agents)}")
    
    print("\nDeployed Resources:")
    for name, app in deployed_apps.items():
        print(f"  {name}: {app.resource_name}")
    
    print(f"\nView agents in Cloud Console:")
    print(f"  https://console.cloud.google.com/vertex-ai/agents?project={PROJECT_ID}")
    
    return deployed_apps


print("✓ Deployment functions defined")

In [None]:
# @title 8. Deploy All Agents (Run This Cell)
# This deploys all agents in order: sub-agents first, then orchestrator

deployed_apps = deploy_all_agents(
    agents=AGENTS_TO_DEPLOY,
    agents_dir=AGENTS_DIR,
    requirements=BASE_REQUIREMENTS,
    description=AGENT_DESCRIPTION,
)

---
## Alternative: Deploy Single Agent

Use the cells below to deploy a single agent or test locally.

In [None]:
# @title 9. Deploy Single Agent (Optional) { display-mode: "form" }
# @markdown Select which agent to deploy individually

SINGLE_AGENT = "partnership_strategy_agent"  # @param ["sales_plays_agent", "company_research_agent", "partnership_strategy_agent"]

single_requirements = (
    get_agent_requirements(SINGLE_AGENT, AGENTS_DIR, BASE_REQUIREMENTS)
    if USE_AGENT_REQUIREMENTS
    else BASE_REQUIREMENTS
)
single_env = get_deploy_env_vars(SINGLE_AGENT)

single_app = deploy_agent(
    agent_name=SINGLE_AGENT,
    agents_dir=AGENTS_DIR,
    requirements=single_requirements,
    description=AGENT_DESCRIPTION,
    env_vars=single_env,
)

print(f"\nDeployed: {single_app.resource_name}")

---
## Testing Deployed Agents

In [None]:
# @title 10. Test Agent: Create Session
# @markdown Test a deployed agent by creating a session

TEST_AGENT = "partnership_strategy_agent"  # @param {type:"string"}
TEST_USER_ID = "test_user_123"  # @param {type:"string"}

# Get the deployed app (either from deploy_all or deploy_single)
if TEST_AGENT in deployed_apps:
    test_app = deployed_apps[TEST_AGENT]
elif 'single_app' in dir() and single_app:
    test_app = single_app
else:
    # Fetch existing agent
    existing_id = get_existing_agent_engine_id(TEST_AGENT)
    if existing_id:
        resource_name = f"projects/{PROJECT_ID}/locations/{DEPLOY_REGION}/reasoningEngines/{existing_id}"
        test_app = agent_engines.get(resource_name)
    else:
        raise ValueError(f"Agent {TEST_AGENT} not found")

print(f"Testing agent: {test_app.resource_name}")

# Create session via API
token = get_access_token()
headers = {"Authorization": f"Bearer {token}"}
data = {'class_method': 'async_create_session', 'input': {'user_id': TEST_USER_ID}}
endpoint = f'https://{DEPLOY_REGION}-aiplatform.googleapis.com/v1/{test_app.resource_name}:query'

res = requests.post(endpoint, headers=headers, json=data)
print(f"\nResponse ({res.status_code}):")
print(res.json())

In [None]:
# @title 11. Test Agent: Send Query
# @markdown Send a test message to the agent

TEST_MESSAGE = "hello"  # @param {type:"string"}

data = {
    'class_method': 'async_stream_query',
    'input': {
        'user_id': TEST_USER_ID,
        'message': TEST_MESSAGE
    }
}
endpoint = f'https://{DEPLOY_REGION}-aiplatform.googleapis.com/v1/{test_app.resource_name}:streamQuery?alt=sse'

res = requests.post(endpoint, headers=headers, json=data)
print(f"Response ({res.status_code}):")
print(res.text[:2000] if len(res.text) > 2000 else res.text)

---
## Local Testing with ADK CLI

In [None]:
# @title 12. Test Agent Locally (ADK CLI)
# @markdown Run an agent locally using `adk run`

LOCAL_TEST_AGENT = "sales_plays_agent"  # @param ["sales_plays_agent", "company_research_agent", "partnership_strategy_agent"]

agent_path = os.path.join(AGENTS_DIR, LOCAL_TEST_AGENT)
print(f"Running: adk run {agent_path}")
print("Press Ctrl+C to stop\n")

!cd {agent_path} && adk run .

---
## Utility: List All Deployed Agents

In [None]:
# @title 13. List All Agent Engines in Project

token = get_access_token()
headers = {"Authorization": f"Bearer {token}"}
url = f"https://{DEPLOY_REGION}-aiplatform.googleapis.com/v1/projects/{PROJECT_ID}/locations/{DEPLOY_REGION}/reasoningEngines"

res = requests.get(url, headers=headers)
data = res.json()

engines = data.get('reasoningEngines', [])
print(f"Found {len(engines)} Agent Engine(s):\n")

for engine in engines:
    name = engine.get('displayName', 'N/A')
    engine_id = engine['name'].split('/')[-1]
    created = engine.get('createTime', 'N/A')[:19]
    print(f"  {name}")
    print(f"    ID: {engine_id}")
    print(f"    Created: {created}")
    print()