In [None]:
  # Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Set up the environment

Please follow the instruction in:
https://cloud.google.com/vertex-ai/generative-ai/docs/agent-engine/set-up#sdk-installation

In [None]:
from google.colab import auth
auth.authenticate_user()

aiofiles are required in colab enviroment

In [None]:
!pip install aiofiles

Collecting aiofiles
  Downloading aiofiles-24.1.0-py3-none-any.whl.metadata (10 kB)
Downloading aiofiles-24.1.0-py3-none-any.whl (15 kB)
Installing collected packages: aiofiles
Successfully installed aiofiles-24.1.0


The change [CL/756896866](CL/756896866) enabling asynchronous stream functionality in Agent Engine is released in google-cloud-aiplatform version 1.93.0.

In [None]:
!pip3 install "google-cloud-aiplatform[agent_engines,adk]==1.93.0" --force-reinstall --quiet

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m66.6/66.6 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m62.0/62.0 kB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.6/7.6 MB[0m [31m59.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m47.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m160.1/160.1 kB[0m [31m11.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m216.1/216.1 kB[0m [31m16.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m253.5/253.5 kB[0m [31m20.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m229.5/229.5 kB[0m [31m16.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [None]:
# prompt: how to install specific version of ADK

!pip install google-adk==0.5.0

Collecting google-adk==0.5.0
  Downloading google_adk-0.5.0-py3-none-any.whl.metadata (9.7 kB)
Downloading google_adk-0.5.0-py3-none-any.whl (1.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m21.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: google-adk
  Attempting uninstall: google-adk
    Found existing installation: google-adk 1.0.0
    Uninstalling google-adk-1.0.0:
      Successfully uninstalled google-adk-1.0.0
Successfully installed google-adk-0.5.0


Please restart the kernal.

## Initialize

In [None]:
import vertexai
from vertexai import agent_engines

PROJECT_ID = "vital-octagon-19612" #@param {type:"string"}
LOCATION = "us-central1" #@param {type:"string"}
STAGING_BUCKET = "gs://d11finetuning" #@param {type:"string"}

vertexai.init(
    project=PROJECT_ID,
    location=LOCATION,
    staging_bucket=STAGING_BUCKET,
)

# Develop an AdkApp agent

## Step 1: Define and configure a model

Define the model version to use

In [None]:
model = "gemini-2.0-flash-001"

## Step 2: Define MCP Toolset and Agent

Follow the example in: https://google.github.io/adk-docs/tools/mcp-tools/#example-1-file-system-mcp-server.




AE by default does not install the node.js.
To run the npx command/stdio file system server, we setup and run the npx in a bash file.

In [None]:
%%writefile setup_and_run_npx.sh
#!/bin/bash

# Exit immediately if a command exits with a non-zero status.
set -e

echo "Starting setup..."

# 1. Install curl
echo "Installing curl..."
apt-get update
apt-get install -y curl

# 2. Install nvm
echo "Installing nvm..."
curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.39.7/install.sh | bash

# 3. Source .bashrc (this is tricky in a script, see notes below)
#    For nvm to be available in the *current* script session, we need to source it directly.
export NVM_DIR="$HOME/.nvm"
[ -s "$NVM_DIR/nvm.sh" ] && \. "$NVM_DIR/nvm.sh"  # This loads nvm
[ -s "$NVM_DIR/bash_completion" ] && \. "$NVM_DIR/bash_completion"  # This loads nvm bash_completion

# 4. Make nvm.sh executable
echo "Ensuring nvm.sh is executable..."
chmod +x "$HOME/.nvm/nvm.sh"

# 5. Install Node.js version
echo "Installing Node.js 20.10.0..."
nvm install 20.10.0

echo "Setup complete. Starting npx server..."

# Now, execute your npx command
# The first argument to this script will be the mcp_folder
MCP_FOLDER="$1"

npx -y @modelcontextprotocol/server-filesystem "$MCP_FOLDER"

Overwriting setup_and_run_npx.sh


In [None]:
!chmod +x ./setup_and_run_npx.sh

Notice, in Colab enviroment, `sys.stdin`, `sys.stdout`, and `sys.stderr` are replaced with custom stream objects that does not have traditional file descriptors. If we use the default mcp exis_stack from adk, we will get a `fileno` issue.

In this case, we explictly define a exit stack with error log file path.

In [None]:
# User provided mcp tools builder.
async def mcp_tools_builder(exit_stack):
  import os
  import shutil
  import aiofiles # Colab Enviroment Only.

  from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset, StdioServerParameters

  # Prepares the MCP folder and file.
  # Colab only allow access the "/content" folder.
  mcp_folder = "/content/mcp"
  if not os.path.exists(mcp_folder):
    os.makedirs(mcp_folder)
  test_file_path = os.path.join(mcp_folder, "test.txt")
  os.environ["mcp_file"] = test_file_path

  if not os.path.exists(test_file_path):
    with open(test_file_path, "w") as f:
      f.write("Lawrence Dellalio says 'Oh yeah he's winning!'")

  # The following `errlog` section is required by Colab enviroment only.
  # Use aiofiles.open for asynchronous file operations
  errlog_file_path = os.path.join(mcp_folder, "error.log")
  errlog = await exit_stack.enter_async_context(
      aiofiles.open(errlog_file_path, "w+")
  )

  # Gets tools from MCP Server.
  if shutil.which("npx"):
    tools, _ = await MCPToolset.from_server(
        connection_params=StdioServerParameters(
            command='npx',
            args=[
              "-y",
              "@modelcontextprotocol/server-filesystem",
              mcp_folder,
            ],
        ),
        async_exit_stack=exit_stack,
        errlog=errlog, # Colab Enviroment Only.
    )
  else:
    # If the current enviroment does not install npx,
    # Run the setup_and_run_npx.sh bash to start npx.
    tools, _ = await MCPToolset.from_server(
        connection_params=StdioServerParameters(
            command='bash',
            args=[
                "./setup_and_run_npx.sh",
                mcp_folder,
            ],
        ),
        async_exit_stack=exit_stack,
        errlog=errlog, # Colab Enviroment Only.
    )
  return tools

Session service is required by adk.

In non-AE enviroment, the default session service is `in-memory` session.

In AE, the default session service is a database based `VertexSession`.

To keep the consistent behavior between local and remote, we explictly define an `in-memory` session service builder here.

In [None]:
def session_service_builder():
  """Builds the session service to use in the ADK app."""

  # This is needed to ensure InitGoogle and AdkApp setup is called first.
  from google.adk.sessions.in_memory_session_service import InMemorySessionService

  # if "GOOGLE_CLOUD_AGENT_ENGINE_ID" not in os.environ:
  return InMemorySessionService()


In [None]:
# User provided agent builder.
def agent_builder(tools):
  from vertexai.preview.reasoning_engines import AdkApp
  from google.adk.agents.llm_agent import LlmAgent

  return AdkApp(
    agent=LlmAgent(
        model='gemini-2.0-flash',
        name='filesystem_assistant',
        instruction=(
            'Help user interact with the local filesystem using available tools.'
            'Be precise and use the tools provided.'
        ),
        tools=tools,
    ),
    session_service_builder=session_service_builder,
  )

## Step 3: Testing Resource Manager locally


To effectively manage the exit stack and asynchronous calls, we've implemented an AgentResourceManager class.

In [None]:
%%writefile resource_manager.py
class AgentResourcesManager:
    def __init__(self, mcp_tools_builder, agent_builder):
        self._is_active = False
        self._mcp_tools_builder = mcp_tools_builder
        self._agent_builder = agent_builder
        self._exit_stack = None
        self.agent_app = None

    async def _setup_agent(self):
        if self.agent_app is None and self._exit_stack:
            tools = await self._mcp_tools_builder(self._exit_stack)
            self.agent_app = self._agent_builder(tools)
        elif not self._exit_stack:
            raise RuntimeError("Attempted to setup agent without an active exit stack.")

    async def __aenter__(self):
        import contextlib

        if self._is_active:
            print("DEBUG: AgentResourcesManager.__aenter__ - Already active. This might be an issue.")
            return self

        print("DEBUG: AgentResourcesManager.__aenter__ - Starting up resources...")
        self._exit_stack = contextlib.AsyncExitStack()
        await self._exit_stack.__aenter__()
        try:
            await self._setup_agent()
            self._is_active = True
            print("DEBUG: AgentResourcesManager is now active.")
            return self
        except Exception:
            import sys
            if self._exit_stack:
                await self._exit_stack.__aexit__(*sys.exc_info())
                self._exit_stack = None
            raise

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if not self._is_active:
            return False

        if not self._exit_stack:
            print("DEBUG: AgentResourcesManager.__aexit__ - No exit stack to clean. Already exited or error during enter.")
            self._is_active = False
            self.agent_app = None
            return False


        print("DEBUG: AgentResourcesManager.__aexit__ - Shutting down resources...")
        self._is_active = False
        result = False
        try:
            result = await self._exit_stack.__aexit__(exc_type, exc_val, exc_tb)
        finally:
            self._exit_stack = None
            self.agent_app = None
            print("DEBUG: AgentResourcesManager resources shut down.")
        return result

    async def async_stream_query(self, user_id, query):
        if not self._is_active or not self.agent_app:
            raise RuntimeError(
                "AgentResourcesManager is not active or agent_app not initialized. "
                "Ensure it's used within an 'async with' block."
            )
        async for event in self.agent_app.async_stream_query(user_id=user_id, message=query):
            yield event

Overwriting resource_manager.py


In [None]:
from resource_manager import AgentResourcesManager

In [None]:
import os

manager = AgentResourcesManager(
    mcp_tools_builder=mcp_tools_builder,
    agent_builder=agent_builder,
)

mcp_file_path = "" # Will be set by mcp_tools_builder

async with manager:
    mcp_file_path = os.environ.get("mcp_file")
    print(f"\n--- Querying agent for file: {mcp_file_path} ---")
    try:
        async for event in manager.async_stream_query(
            user_id="syang",
            query=f"""
            Read the contents of the file '{mcp_file_path}',
            find what Lawrence Dellalio says.
            Give me just the quote.
            """,
        ):
            print(event)
    except Exception as e:
        print(f"An error occurred during async_stream_query: {e}")
        import traceback
        traceback.print_exc()

DEBUG: AgentResourcesManager.__aenter__ - Starting up resources...
DEBUG: AgentResourcesManager is now active.

--- Querying agent for file: /content/mcp/test.txt ---




{'content': {'parts': [{'function_call': {'id': 'adk-1e21c1fd-6b02-456e-b6e1-4b08dfb04123', 'args': {'path': '/content/mcp/test.txt'}, 'name': 'read_file'}}], 'role': 'model'}, 'invocation_id': 'e-6b7060ad-103e-460f-b83f-e28b64a06b49', 'author': 'filesystem_assistant', 'actions': {'state_delta': {}, 'artifact_delta': {}, 'requested_auth_configs': {}}, 'long_running_tool_ids': set(), 'id': 'nUJBlhLP', 'timestamp': 1747886954.049095}
{'content': {'parts': [{'function_response': {'id': 'adk-1e21c1fd-6b02-456e-b6e1-4b08dfb04123', 'name': 'read_file', 'response': {'result': {'content': [{'type': 'text', 'text': "Lawrence Dellalio says 'Oh yeah he's winning!'"}], 'isError': False}}}}], 'role': 'user'}, 'invocation_id': 'e-6b7060ad-103e-460f-b83f-e28b64a06b49', 'author': 'filesystem_assistant', 'actions': {'state_delta': {}, 'artifact_delta': {}, 'requested_auth_configs': {}}, 'id': '6IiJ8dwH', 'timestamp': 1747886955.188378}
{'content': {'parts': [{'text': "Lawrence Dellalio says 'Oh yeah he

## Step 4: Wrap Resource Manager as Agent Engine App


In [None]:
class AEApp:
    def __init__(self, mcp_tools_builder, agent_builder):
        self._mcp_tools_builder = mcp_tools_builder
        self._agent_builder = agent_builder
        self._manager = None

    def set_up(self):
        self._manager = AgentResourcesManager(
            mcp_tools_builder=self._mcp_tools_builder,
            agent_builder=self._agent_builder,
        )

    async def async_stream_query(self, user_id, query):
        if not self._manager:
            self.set_up()
        async with self._manager:
            async for event in self._manager.async_stream_query(user_id, query):
                yield event

In [None]:
app = AEApp(
    mcp_tools_builder=mcp_tools_builder,
    agent_builder=agent_builder,
)
app.set_up()

async for event in app.async_stream_query(
    user_id="syang",
    query=f"""
    Read the contents of the file '{mcp_file_path}',
    find what Lawrence Dellalio says.
    Give me just the quote.
    """,
):
    print(event)

DEBUG: AgentResourcesManager.__aenter__ - Starting up resources...
DEBUG: AgentResourcesManager is now active.




{'content': {'parts': [{'function_call': {'id': 'adk-d4f11dae-9109-45db-893d-83df6aee0f15', 'args': {'path': '/content/mcp/test.txt'}, 'name': 'read_file'}}], 'role': 'model'}, 'invocation_id': 'e-51d82c6f-8647-43ca-97e0-9f8a0732b18c', 'author': 'filesystem_assistant', 'actions': {'state_delta': {}, 'artifact_delta': {}, 'requested_auth_configs': {}}, 'long_running_tool_ids': set(), 'id': 'SI4yycUP', 'timestamp': 1747886968.382106}
{'content': {'parts': [{'function_response': {'id': 'adk-d4f11dae-9109-45db-893d-83df6aee0f15', 'name': 'read_file', 'response': {'result': {'content': [{'type': 'text', 'text': "Lawrence Dellalio says 'Oh yeah he's winning!'"}], 'isError': False}}}}], 'role': 'user'}, 'invocation_id': 'e-51d82c6f-8647-43ca-97e0-9f8a0732b18c', 'author': 'filesystem_assistant', 'actions': {'state_delta': {}, 'artifact_delta': {}, 'requested_auth_configs': {}}, 'id': 'lGqwchnV', 'timestamp': 1747886969.516037}
{'content': {'parts': [{'text': "Lawrence Dellalio says 'Oh yeah he

# Deploy an Agent

https://cloud.google.com/vertex-ai/generative-ai/docs/agent-engine/deploy#package-requirements

**Remark** (specific to ADK): The automatic provisioning of permissions for Vertex AI Sessions have not rolled out yet, so you'll have to follow https://cloud.google.com/vertex-ai/generative-ai/docs/agent-engine/set-up#service-agent in the meantime to grant "Vertex AI User" (`roles/aiplatform.user`) to the service account in your GCP project

In [None]:
remote_app = agent_engines.create(
    AEApp(
        mcp_tools_builder=mcp_tools_builder,
        agent_builder=agent_builder,
    ),
    requirements=[
        "google-cloud-aiplatform[agent_engines,adk] @ git+https://github.com/googleapis/python-aiplatform.git@copybara_756896866",
        "aiofiles",
    ],
    extra_packages=[
        "resource_manager.py",
        "setup_and_run_npx.sh",
    ],
)

## Supported operations

The following operations are supported:

* `async_stream_query`: for async streaming a response to a query.

In [None]:
import pprint

pprint.pprint(remote_app.operation_schemas())

## Async Stream a response to a query

In [None]:
async for event in remote_app.async_stream_query(
    user_id="syang",
    query=f"""
    Read the contents of the file '{mcp_file_path}',
    find what Lawrence Dellalio says.
    Give me just the quote.
    """,
):
    print(event)

# Clean up

https://cloud.google.com/vertex-ai/generative-ai/docs/agent-engine/quickstart#clean-up

In [None]:
remote_app.delete(force=True)