Skip to content

feat: ADK Web Async Agent Compatibility #1420

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions contributing/samples/adk_web_async/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# ADK Web Async Agent Compatibility

This minimal example demonstrates the technical foundation for making async agents compatible with ADK Web interface.

## Problem Solved

ADK Web had two main compatibility issues with async agents:

1. **MCP Tools Event Loop Conflicts**: "Event loop is closed" errors with uvloop
2. **Session State Customization**: No way to load custom data before template processing

## Solution

The ADK now provides:

1. **Automatic MCP uvloop compatibility** in the MCP tool implementation
2. **Session state preprocessor callback** for custom session data

## Usage

Create an agent with optional `session_state_preprocessor` function:

```python
# agent.py
async def session_state_preprocessor(state):
"""Called before template processing - load custom data here"""
# Load user data, project context, etc.
return state

def create_adk_web_agent():
"""Standard ADK Web agent factory function"""
return your_async_agent
```

## Key Features

- **MCP Tools**: Work automatically in ADK Web (uvloop compatibility handled by ADK)
- **Session Preprocessor**: Load database data, set defaults, add custom variables
- **Template Variables**: Use standard ADK `{variable}` format with your custom data
- **ADK Web Detection**: ADK Web calls `create_adk_web_agent()` and provides `user_id="1"` default

## Files

- `README.md` - This documentation
- `agent.py` - Minimal async agent with preprocessor example
- `main.py` - Simple test script

## Test

```bash
# Test the agent
python main.py

# Use with ADK Web
adk web --port 8088
```

That's it! The ADK handles all the complexity automatically.
6 changes: 6 additions & 0 deletions contributing/samples/adk_web_async/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""
ADK Web Async Agent Compatibility Example.

Minimal example demonstrating the technical foundation for
making async agents compatible with ADK Web interface.
"""
120 changes: 120 additions & 0 deletions contributing/samples/adk_web_async/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
"""
Minimal ADK Web Async Agent Example.

Demonstrates the technical foundation for ADK Web compatibility:
1. MCP tools work automatically (uvloop compatibility handled by ADK)
2. Session state preprocessor for custom data before template processing
"""

import asyncio
import logging
from typing import Dict, Any, Optional

from google.adk.agents.llm_agent import LlmAgent
from google.adk.models.lite_llm import LiteLlm

logger = logging.getLogger(__name__)


async def session_state_preprocessor(state: Dict[str, Any]) -> Dict[str, Any]:
"""
Session state preprocessor - called by ADK Web before template processing.

This is where you load custom data that becomes available in templates.

Args:
state: Current session state (includes user_id="1" from ADK Web)

Returns:
Enhanced session state with your custom data
"""
logger.info("🔧 Session preprocessor called")

# Example: Load user data based on user_id
user_id = state.get("user_id")
if user_id:
# In real app: user_data = await load_from_database(user_id)
state["user_name"] = f"Demo User {user_id}"
state["user_role"] = "developer"

# Example: Add application context
state["app_name"] = "ADK Web Demo"
state["features"] = "MCP tools, async operations"

logger.info(f"✅ Session enhanced: {list(state.keys())}")
return state


async def create_async_agent() -> LlmAgent:
"""Create the async agent with MCP tools."""

# Simple instruction using variables from session_state_preprocessor
instruction = """
You are an ADK Web compatible async agent.

User: {user_name} (ID: {user_id}, Role: {user_role})
App: {app_name}
Features: {features}

You demonstrate:
- MCP tools working in ADK Web (automatic uvloop compatibility)
- Session state preprocessor for custom template data
- Standard ADK template variables {variable}
"""

agent = LlmAgent(
name="adk_web_async_demo",
model=LiteLlm(model="openai/gpt-4o", stream=True),
instruction=instruction,
description="Minimal example of ADK Web async compatibility",
tools=[] # MCP tools would be added here automatically
)

logger.info("✅ Async agent created for ADK Web")
return agent


def create_adk_web_agent() -> Optional[LlmAgent]:
"""
ADK Web entry point - called by ADK Web interface.

This function is called by ADK Web when it loads the agent.
The mere fact that this function is called indicates ADK Web mode.

Returns:
Agent instance for ADK Web usage
"""
try:
logger.info("🌐 ADK Web agent creation requested...")

# Handle async creation in sync context
try:
loop = asyncio.get_running_loop()
# If event loop exists, use thread executor
import concurrent.futures

def run_creation():
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
try:
return new_loop.run_until_complete(create_async_agent())
finally:
new_loop.close()

with concurrent.futures.ThreadPoolExecutor() as executor:
agent = executor.submit(run_creation).result()

except RuntimeError:
# No event loop, use asyncio.run
agent = asyncio.run(create_async_agent())

logger.info("✅ ADK Web agent created successfully")
return agent

except Exception as e:
logger.error(f"❌ ADK Web agent creation failed: {e}")
return None


# Export for ADK Web
__all__ = ["create_adk_web_agent", "session_state_preprocessor"]
70 changes: 70 additions & 0 deletions contributing/samples/adk_web_async/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""
Test script for ADK Web async agent compatibility.

Simple test to verify the agent works correctly.
"""

import asyncio
import logging
import os

from agent import create_adk_web_agent, session_state_preprocessor

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


async def test_session_preprocessor():
"""Test the session state preprocessor."""
logger.info("🧪 Testing session state preprocessor...")

# Simulate ADK Web session state
test_state = {"user_id": "1"} # ADK Web default

# Call preprocessor
enhanced_state = await session_state_preprocessor(test_state)

# Verify enhancement
expected_keys = ["user_id", "user_name", "user_role", "app_name", "features"]
for key in expected_keys:
assert key in enhanced_state, f"Missing key: {key}"

logger.info(f"✅ Session state enhanced: {enhanced_state}")


def test_agent_creation():
"""Test agent creation for ADK Web."""
logger.info("🧪 Testing agent creation...")

# Create agent (simulates ADK Web calling create_adk_web_agent)
agent = create_adk_web_agent()

assert agent is not None, "Agent creation failed"
assert agent.name == "adk_web_async_demo"

logger.info(f"✅ Agent created: {agent.name}")
return agent


async def main():
"""Run all tests."""
logger.info("🚀 Starting ADK Web async compatibility tests...")

try:
# Test session preprocessor
await test_session_preprocessor()

# Test agent creation
agent = test_agent_creation()

logger.info("🎉 All tests passed!")
logger.info("💡 Ready for ADK Web: adk web --port 8088")

except Exception as e:
logger.error(f"❌ Test failed: {e}")
raise


if __name__ == "__main__":
asyncio.run(main())
72 changes: 72 additions & 0 deletions src/google/adk/cli/fast_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,53 @@

_EVAL_SET_FILE_EXTENSION = ".evalset.json"

# Global agents directory for session state preprocessor access
_agents_dir = None


async def _apply_session_state_preprocessor(
app_name: str,
state: dict[str, Any]
) -> dict[str, Any]:
"""
Apply session state preprocessor callback if agent provides one.

This allows agents to customize session state before template processing.
Useful for loading user data, setting project defaults, etc.

Args:
app_name: Name of the agent/app
state: Current session state

Returns:
Updated session state with agent-specific modifications
"""
try:
# Try to import and call agent's session preprocessor
# Use the global agents_dir that was set during app initialization
if _agents_dir:
agent_module = AgentLoader(_agents_dir).load_agent(app_name)
else:
# Fallback: try to load without agents_dir (may fail)
agent_module = AgentLoader().load_agent(app_name)

# Look for session_state_preprocessor function
if hasattr(agent_module, 'session_state_preprocessor'):
preprocessor = getattr(agent_module, 'session_state_preprocessor')

if callable(preprocessor):
# Call preprocessor (support both sync and async)
if asyncio.iscoroutinefunction(preprocessor):
state = await preprocessor(state)
else:
state = preprocessor(state)

except Exception as e:
logger.warning(f"Session state preprocessor not available or failed: {e}")
# Continue without preprocessor - this is optional

return state


class ApiServerSpanExporter(export.SpanExporter):

Expand Down Expand Up @@ -202,6 +249,10 @@ def get_fast_api_app(
trace_to_cloud: bool = False,
lifespan: Optional[Lifespan[FastAPI]] = None,
) -> FastAPI:
# Store agents_dir globally for session state preprocessor access
global _agents_dir
_agents_dir = agents_dir

# InMemory tracing dict.
trace_dict: dict[str, Any] = {}
session_trace_dict: dict[str, Any] = {}
Expand Down Expand Up @@ -400,6 +451,19 @@ async def create_session_with_id(
status_code=400, detail=f"Session already exists: {session_id}"
)
logger.info("New session created: %s", session_id)

# Initialize state with ADK Web development defaults
if state is None:
state = {}

# Call session state preprocessor callback if agent provides one
state = await _apply_session_state_preprocessor(app_name, state)

# Add basic ADK Web development defaults
# These defaults only apply when values aren't already set
if "user_id" not in state:
state["user_id"] = "1" # Default ADK Web user for development

return await session_service.create_session(
app_name=app_name, user_id=user_id, state=state, session_id=session_id
)
Expand All @@ -415,6 +479,14 @@ async def create_session(
events: Optional[list[Event]] = None,
) -> Session:
logger.info("New session created")

# Initialize state with ADK Web development defaults
if state is None:
state = {}

# Call session state preprocessor callback if agent provides one
state = await _apply_session_state_preprocessor(app_name, state)

session = await session_service.create_session(
app_name=app_name, user_id=user_id, state=state
)
Expand Down
Loading