Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def __init__(self):
@override
async def on_task_event_send(self, params: SendEventParams) -> None:
logger.info(f"Received task message instruction: {params}")

# 2. Echo back the client's message to show it in the UI. This is not done by default so the agent developer has full control over what is shown to the user.
await adk.messages.create(task_id=params.task.id, content=params.event.content)

Expand Down
1,523 changes: 1,316 additions & 207 deletions examples/tutorials/10_agentic/10_temporal/010_agent_chat/dev.ipynb

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ class StateModel(BaseModel):
turn_number: int


MCP_SERVERS = [
StdioServerParameters(
command="npx",
args=["-y", "@modelcontextprotocol/server-sequential-thinking"],
),
MCP_SERVERS = [ # No longer needed due to reasoning
# StdioServerParameters(
# command="npx",
# args=["-y", "@modelcontextprotocol/server-sequential-thinking"],
# ),
StdioServerParameters(
command="uvx",
args=["openai-websearch-mcp"],
Expand Down
174 changes: 168 additions & 6 deletions examples/tutorials/10_agentic/10_temporal/010_agent_chat/uv.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,6 @@ def __init__(self):
@workflow.signal(name=SignalName.RECEIVE_EVENT)
@override
async def on_task_event_send(self, params: SendEventParams) -> None:
logger.info(f"Received task message instruction: {params}")

if not params.event.content:
return
Expand Down
139 changes: 56 additions & 83 deletions src/agentex/lib/core/services/adk/providers/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
from openai.types.responses import (
ResponseCompletedEvent,
ResponseTextDeltaEvent,
ResponseFunctionToolCall,
ResponseFunctionWebSearch,
ResponseOutputItemDoneEvent,
ResponseReasoningTextDoneEvent,
ResponseCodeInterpreterToolCall,
ResponseReasoningTextDeltaEvent,
ResponseReasoningSummaryTextDoneEvent,
ResponseReasoningSummaryPartDoneEvent,
ResponseReasoningSummaryPartAddedEvent,
ResponseReasoningSummaryTextDeltaEvent,
)

Expand All @@ -29,7 +29,6 @@
from agentex.lib.core.tracing.tracer import AsyncTracer
from agentex.types.task_message_delta import (
TextDelta,
ReasoningContentDelta,
ReasoningSummaryDelta,
)
from agentex.types.task_message_update import (
Expand Down Expand Up @@ -691,7 +690,7 @@ async def run_agent_streamed_auto_send(
if self.agentex_client is None:
raise ValueError("Agentex client must be provided for auto_send methods")

tool_call_map: dict[str, Any] = {}
tool_call_map: dict[str, ResponseFunctionToolCall] = {}

if self.tracer is None:
raise RuntimeError("Tracer not initialized - ensure tracer is provided to OpenAIService")
Expand Down Expand Up @@ -756,6 +755,8 @@ async def run_agent_streamed_auto_send(

item_id_to_streaming_context: dict[str, StreamingTaskMessageContext] = {}
unclosed_item_ids: set[str] = set()
# Simple string to accumulate reasoning summary
current_reasoning_summary: str = ""

try:
# Process streaming events with TaskMessage creation
Expand Down Expand Up @@ -848,103 +849,75 @@ async def run_agent_streamed_auto_send(
type="delta",
),
)

elif isinstance(event.data, ResponseReasoningSummaryTextDeltaEvent):
# Handle reasoning summary text delta
# Reasoning step one: new summary part added
elif isinstance(event.data, ResponseReasoningSummaryPartAddedEvent):
# We need to create a new streaming context for this reasoning item
item_id = event.data.item_id
summary_index = event.data.summary_index

# Reset the reasoning summary string
current_reasoning_summary = ""

streaming_context = self.streaming_service.streaming_task_message_context(
task_id=task_id,
initial_content=ReasoningContent(
author="agent",
summary=[],
content=[],
type="reasoning",
style="active",
),
)

# Check if we already have a streaming context for this reasoning item
if item_id not in item_id_to_streaming_context:
# Create a new streaming context for this reasoning item
streaming_context = self.streaming_service.streaming_task_message_context(
task_id=task_id,
initial_content=ReasoningContent(
author="agent",
summary=[],
content=[],
type="reasoning",
style="active",
),
)
# Open the streaming context
item_id_to_streaming_context[item_id] = await streaming_context.open()
unclosed_item_ids.add(item_id)
else:
streaming_context = item_id_to_streaming_context[item_id]
# Replace the existing streaming context (if it exists)
# Why do we replace? Cause all the reasoning parts use the same item_id!
item_id_to_streaming_context[item_id] = await streaming_context.open()
unclosed_item_ids.add(item_id)

# Reasoning step two: handling summary text delta
elif isinstance(event.data, ResponseReasoningSummaryTextDeltaEvent):
# Accumulate the delta into the string
current_reasoning_summary += event.data.delta
streaming_context = item_id_to_streaming_context[item_id]

# Stream the summary delta through the streaming service
await streaming_context.stream_update(
update=StreamTaskMessageDelta(
parent_task_message=streaming_context.task_message,
delta=ReasoningSummaryDelta(
summary_index=summary_index,
summary_index=event.data.summary_index,
summary_delta=event.data.delta,
type="reasoning_summary",
),
type="delta",
),
)

elif isinstance(event.data, ResponseReasoningTextDeltaEvent):
# Handle reasoning content text delta
item_id = event.data.item_id
content_index = event.data.content_index

# Check if we already have a streaming context for this reasoning item
if item_id not in item_id_to_streaming_context:
# Create a new streaming context for this reasoning item
streaming_context = self.streaming_service.streaming_task_message_context(
task_id=task_id,
initial_content=ReasoningContent(
author="agent",
summary=[],
content=[],
type="reasoning",
style="active",
),
)
# Open the streaming context
item_id_to_streaming_context[item_id] = await streaming_context.open()
unclosed_item_ids.add(item_id)
else:
streaming_context = item_id_to_streaming_context[item_id]

# Stream the content delta through the streaming service
# Reasoning step three: handling summary text done, closing the streaming context
elif isinstance(event.data, ResponseReasoningSummaryPartDoneEvent):
# Handle reasoning summary text completion
streaming_context = item_id_to_streaming_context[item_id]

# Create the complete reasoning content with the accumulated summary
complete_reasoning_content = ReasoningContent(
author="agent",
summary=[current_reasoning_summary],
content=[],
type="reasoning",
style="static",
)

# Send a full message update with the complete reasoning content
await streaming_context.stream_update(
update=StreamTaskMessageDelta(
update=StreamTaskMessageFull(
parent_task_message=streaming_context.task_message,
delta=ReasoningContentDelta(
content_index=content_index,
content_delta=event.data.delta,
type="reasoning_content",
),
type="delta",
content=complete_reasoning_content,
type="full",
),
)

elif isinstance(event.data, ResponseReasoningSummaryTextDoneEvent):
# Handle reasoning summary text completion
item_id = event.data.item_id
summary_index = event.data.summary_index

# We do NOT close the streaming context here as there can be multiple
# reasoning summaries. The context will be closed when the entire
# output item is done (ResponseOutputItemDoneEvent)

# You would think they would use the event ResponseReasoningSummaryPartDoneEvent
# to close the streaming context, but they do!!!
# They output both a ResponseReasoningSummaryTextDoneEvent and a ResponseReasoningSummaryPartDoneEvent
# I have no idea why they do this.

elif isinstance(event.data, ResponseReasoningTextDoneEvent):
# Handle reasoning content text completion
item_id = event.data.item_id
content_index = event.data.content_index

# We do NOT close the streaming context here as there can be multiple
# reasoning content texts. The context will be closed when the entire
# output item is done (ResponseOutputItemDoneEvent)

await streaming_context.close()
unclosed_item_ids.discard(item_id)


elif isinstance(event.data, ResponseOutputItemDoneEvent):
# Handle item completion
Expand Down
19 changes: 13 additions & 6 deletions src/agentex/lib/utils/dev_tools/async_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ def print_task_message(

# Skip empty reasoning messages
if isinstance(message.content, ReasoningContent):
has_summary = message.content.summary and any(s for s in message.content.summary if s)
has_content = message.content.content and any(c for c in message.content.content if c)
has_summary = bool(message.content.summary) and any(s for s in message.content.summary if s)
has_content = bool(message.content.content) and any(c for c in message.content.content if c) if message.content.content is not None else False
if not has_summary and not has_content:
return

Expand Down Expand Up @@ -135,18 +135,19 @@ def print_task_message(

if rich_print and console:
author_color = "bright_cyan" if message.content.author == "user" else "green"
title = f"[bold {author_color}]{message.content.author.upper()}[/bold {author_color}] [{timestamp}]"

# Use different border styles for tool messages
# Use different border styles and colors for different content types
if content_type == "tool_request":
border_style = "yellow"
elif content_type == "tool_response":
border_style = "bright_green"
elif content_type == "reasoning":
border_style = "bright_magenta"
author_color = "bright_magenta" # Also make the author text magenta
else:
border_style = author_color


title = f"[bold {author_color}]{message.content.author.upper()}[/bold {author_color}] [{timestamp}]"
panel = Panel(Markdown(content), title=title, border_style=border_style, width=80)
console.print(panel)
else:
Expand Down Expand Up @@ -329,7 +330,7 @@ def subscribe_to_async_task_messages(

# Deserialize the discriminated union TaskMessageUpdate based on the "type" field
message_type = task_message_update_data.get("type", "unknown")

# Handle different message types for streaming progress
if message_type == "start":
task_message_update = StreamTaskMessageStart.model_validate(task_message_update_data)
Expand Down Expand Up @@ -359,6 +360,9 @@ def subscribe_to_async_task_messages(
if index in active_spinners:
active_spinners[index].stop()
del active_spinners[index]
# Ensure clean line after spinner
if print_messages:
print()

if task_message_update.parent_task_message and task_message_update.parent_task_message.id:
finished_message = client.messages.retrieve(task_message_update.parent_task_message.id)
Expand All @@ -373,6 +377,9 @@ def subscribe_to_async_task_messages(
if index in active_spinners:
active_spinners[index].stop()
del active_spinners[index]
# Ensure clean line after spinner
if print_messages:
print()

if task_message_update.parent_task_message and task_message_update.parent_task_message.id:
finished_message = client.messages.retrieve(task_message_update.parent_task_message.id)
Expand Down
Loading