Skip to content

iterative stream messages #213

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed

iterative stream messages #213

wants to merge 5 commits into from

Conversation

pgrayy
Copy link
Member

@pgrayy pgrayy commented Jun 12, 2025

Description

We are currently working on support for an iterative async stream method on the agent class (#83). As part of this work, we need to yield underlying events of the model stream. This PR thus converts the stream_messages function to a generator.

Related Issues

#83

Type of Change

  • Bug fix
  • New feature
  • Breaking change
  • Documentation update
  • Other (please describe):

Testing

  • hatch fmt --linter
  • hatch fmt --formatter
  • hatch test --all
  • Verify that the changes do not break functionality or introduce warnings in consuming repositories: agents-docs, agents-tools, agents-cli
  • hatch run test-lint

Checklist

  • I have read the CONTRIBUTING document
  • I have added tests that prove my fix is effective or my feature works
  • I have updated the documentation accordingly
  • I have added an appropriate example to the documentation to outline the feature
  • My changes generate no new warnings
  • Any dependent changes have been merged and published

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

callback_handler(**inputs)
else:
stop_reason, message, usage, metrics = event["stop"]
kwargs.setdefault("request_state", {})
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will progressively move the callback_handler invocations upward in the call stack as we convert the event loop to an event generator.

for event in stream_messages(model, system_prompt, messages, tool_config):
if "callback" in event:
inputs = {**event["callback"], **(kwargs if "delta" in event else {})}
callback_handler(**inputs)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is a bit messy but I am trying to maintain backwards compatibility with these changes. Ultimately, we should formalize and strongly type callback payloads.

Note, this callback invocation was copied out of stream_messages.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the strongly typed callback payloads already tracked? If not, can you add it to the async iterator stuff (we can always break it out later)

"""Handles content block delta updates by appending text, tool input, or reasoning content to the state.

Args:
event: Delta event.
state: The current state of message processing.
callback_handler: Callback for processing events as they happen.
**kwargs: Additional keyword arguments to pass to the callback handler.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are now yielding events from stream_messages, we no longer need to pass around callback_handler and kwargs. We can invoke the callback_handler in the event loop where we iterate over stream_messages.

if "toolUse" in delta_content:
if "input" not in state["current_tool_use"]:
state["current_tool_use"]["input"] = ""

state["current_tool_use"]["input"] += delta_content["toolUse"]["input"]
callback_handler(delta=delta_content, current_tool_use=state["current_tool_use"], **kwargs)
callback_event["callback"] = {"delta": delta_content, "current_tool_use": state["current_tool_use"]}
Copy link
Member Author

@pgrayy pgrayy Jun 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a temporary format to maintain backwards compatibility. We are planning on formalizing and strongly typing the construction of our agent events for consistency and ease of use.


if "messageStart" in chunk:
state["message"] = handle_message_start(chunk["messageStart"], state["message"])
elif "contentBlockStart" in chunk:
state["current_tool_use"] = handle_content_block_start(chunk["contentBlockStart"])
elif "contentBlockDelta" in chunk:
state = handle_content_block_delta(chunk["contentBlockDelta"], state, callback_handler, **kwargs)
state, callback_event = handle_content_block_delta(chunk["contentBlockDelta"], state)
yield callback_event
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, keeping for backwards compatibility, but really we should consider getting rid of this extra yield. It is redundant since we have the yield chunk at the start of each loop.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you clarify and/or elaborate on what the two events are? Are they truly duplciates?

condition=os.environ.get("GITHUB_ACTIONS") == 'true',
reason="streamable transport is failing in GitHub actions, debugging if linux compatibility issue"
condition=os.environ.get("GITHUB_ACTIONS") == "true",
reason="streamable transport is failing in GitHub actions, debugging if linux compatibility issue",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Auto linted.

@pgrayy pgrayy added the approved-for-integ-test Indicator that a pull request has been approved to run integration tests label Jun 12, 2025
@zastrowm zastrowm self-requested a review June 13, 2025 18:45
zastrowm
zastrowm previously approved these changes Jun 13, 2025
for event in stream_messages(model, system_prompt, messages, tool_config):
if "callback" in event:
inputs = {**event["callback"], **(kwargs if "delta" in event else {})}
callback_handler(**inputs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the strongly typed callback payloads already tracked? If not, can you add it to the async iterator stuff (we can always break it out later)

)
for event in stream_messages(model, system_prompt, messages, tool_config):
if "callback" in event:
inputs = {**event["callback"], **(kwargs if "delta" in event else {})}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment indicating what's going on/what the change is.

I believe it's effectively:

In the migration to async-iterator, we converted all events that were previously passed to callback into yielded events that have the callback key, with the properties that we emitted which we now have to unwrap and combine with kwargs for backwards compatibility

Also, this is clever, so props for that

inputs = {**event["callback"], **(kwargs if "delta" in event else {})}
callback_handler(**inputs)
else:
stop_reason, message, usage, metrics = event["stop"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So stop right now is not an event that callback gets passed. Given that we don't need backwards compability here, what would you think about turning this into a typed event?

E.g:

class StopStreamEvent(dict):

could also be a follow-up PR


if "messageStart" in chunk:
state["message"] = handle_message_start(chunk["messageStart"], state["message"])
elif "contentBlockStart" in chunk:
state["current_tool_use"] = handle_content_block_start(chunk["contentBlockStart"])
elif "contentBlockDelta" in chunk:
state = handle_content_block_delta(chunk["contentBlockDelta"], state, callback_handler, **kwargs)
state, callback_event = handle_content_block_delta(chunk["contentBlockDelta"], state)
yield callback_event
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you clarify and/or elaborate on what the two events are? Are they truly duplciates?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I presume that test_event_loop has enough UT coverage verifying the callbacks of events are as expected? At which point we have a lot of confidence that we maintained backwards compatibility.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
approved-for-integ-test Indicator that a pull request has been approved to run integration tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants