-
Notifications
You must be signed in to change notification settings - Fork 197
iterative stream messages #213
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
src/strands/agent/conversation_manager/sliding_window_conversation_manager.py
Outdated
Show resolved
Hide resolved
callback_handler(**inputs) | ||
else: | ||
stop_reason, message, usage, metrics = event["stop"] | ||
kwargs.setdefault("request_state", {}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will progressively move the callback_handler
invocations upward in the call stack as we convert the event loop to an event generator.
for event in stream_messages(model, system_prompt, messages, tool_config): | ||
if "callback" in event: | ||
inputs = {**event["callback"], **(kwargs if "delta" in event else {})} | ||
callback_handler(**inputs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes this is a bit messy but I am trying to maintain backwards compatibility with these changes. Ultimately, we should formalize and strongly type callback payloads.
Note, this callback invocation was copied out of stream_messages
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the strongly typed callback payloads already tracked? If not, can you add it to the async iterator stuff (we can always break it out later)
"""Handles content block delta updates by appending text, tool input, or reasoning content to the state. | ||
|
||
Args: | ||
event: Delta event. | ||
state: The current state of message processing. | ||
callback_handler: Callback for processing events as they happen. | ||
**kwargs: Additional keyword arguments to pass to the callback handler. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are now yielding events from stream_messages
, we no longer need to pass around callback_handler
and kwargs
. We can invoke the callback_handler
in the event loop where we iterate over stream_messages
.
if "toolUse" in delta_content: | ||
if "input" not in state["current_tool_use"]: | ||
state["current_tool_use"]["input"] = "" | ||
|
||
state["current_tool_use"]["input"] += delta_content["toolUse"]["input"] | ||
callback_handler(delta=delta_content, current_tool_use=state["current_tool_use"], **kwargs) | ||
callback_event["callback"] = {"delta": delta_content, "current_tool_use": state["current_tool_use"]} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a temporary format to maintain backwards compatibility. We are planning on formalizing and strongly typing the construction of our agent events for consistency and ease of use.
|
||
if "messageStart" in chunk: | ||
state["message"] = handle_message_start(chunk["messageStart"], state["message"]) | ||
elif "contentBlockStart" in chunk: | ||
state["current_tool_use"] = handle_content_block_start(chunk["contentBlockStart"]) | ||
elif "contentBlockDelta" in chunk: | ||
state = handle_content_block_delta(chunk["contentBlockDelta"], state, callback_handler, **kwargs) | ||
state, callback_event = handle_content_block_delta(chunk["contentBlockDelta"], state) | ||
yield callback_event |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, keeping for backwards compatibility, but really we should consider getting rid of this extra yield. It is redundant since we have the yield chunk at the start of each loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you clarify and/or elaborate on what the two events are? Are they truly duplciates?
tests-integ/test_mcp_client.py
Outdated
condition=os.environ.get("GITHUB_ACTIONS") == 'true', | ||
reason="streamable transport is failing in GitHub actions, debugging if linux compatibility issue" | ||
condition=os.environ.get("GITHUB_ACTIONS") == "true", | ||
reason="streamable transport is failing in GitHub actions, debugging if linux compatibility issue", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Auto linted.
for event in stream_messages(model, system_prompt, messages, tool_config): | ||
if "callback" in event: | ||
inputs = {**event["callback"], **(kwargs if "delta" in event else {})} | ||
callback_handler(**inputs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the strongly typed callback payloads already tracked? If not, can you add it to the async iterator stuff (we can always break it out later)
src/strands/event_loop/event_loop.py
Outdated
) | ||
for event in stream_messages(model, system_prompt, messages, tool_config): | ||
if "callback" in event: | ||
inputs = {**event["callback"], **(kwargs if "delta" in event else {})} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a comment indicating what's going on/what the change is.
I believe it's effectively:
In the migration to async-iterator, we converted all events that were previously passed to
callback
into yielded events that have thecallback
key, with the properties that we emitted which we now have to unwrap and combine withkwargs
for backwards compatibility
Also, this is clever, so props for that
inputs = {**event["callback"], **(kwargs if "delta" in event else {})} | ||
callback_handler(**inputs) | ||
else: | ||
stop_reason, message, usage, metrics = event["stop"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So stop
right now is not an event that callback gets passed. Given that we don't need backwards compability here, what would you think about turning this into a typed event?
E.g:
class StopStreamEvent(dict):
could also be a follow-up PR
|
||
if "messageStart" in chunk: | ||
state["message"] = handle_message_start(chunk["messageStart"], state["message"]) | ||
elif "contentBlockStart" in chunk: | ||
state["current_tool_use"] = handle_content_block_start(chunk["contentBlockStart"]) | ||
elif "contentBlockDelta" in chunk: | ||
state = handle_content_block_delta(chunk["contentBlockDelta"], state, callback_handler, **kwargs) | ||
state, callback_event = handle_content_block_delta(chunk["contentBlockDelta"], state) | ||
yield callback_event |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you clarify and/or elaborate on what the two events are? Are they truly duplciates?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I presume that test_event_loop has enough UT coverage verifying the callbacks of events are as expected? At which point we have a lot of confidence that we maintained backwards compatibility.
* build(a2a): add a2a deps and mitigate otel conflict
Description
We are currently working on support for an iterative async stream method on the agent class (#83). As part of this work, we need to yield underlying events of the model stream. This PR thus converts the
stream_messages
function to a generator.Related Issues
#83
Type of Change
Testing
hatch fmt --linter
hatch fmt --formatter
hatch test --all
hatch run test-lint
Checklist
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.