-
Notifications
You must be signed in to change notification settings - Fork 599
fix: add concurrency protection to prevent parallel invocations from corrupting agent state #1453
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: add concurrency protection to prevent parallel invocations from corrupting agent state #1453
Conversation
…corrupting agent state - Add ConcurrencyException to types.exceptions for concurrent invocation detection - Guard Agent.stream_async() with threading.Lock to prevent concurrent access - Guard direct tool calls in _ToolCaller to enforce single-invocation constraint - Use threading.Lock instead of asyncio.Lock to handle cross-thread concurrency from run_async() - Add comprehensive unit and integration tests for all invocation paths Resolves #22
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
cagataycali
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent work on this! 🦆
This is a critical fix for agent state corruption. I really appreciate the thorough PR description explaining:
- The root cause - concurrent invocations corrupting internal state, causing
toolUse/toolResultblock mismatches - The design decision - using
threading.Lockinstead ofasyncio.Lockdue to isolated event loops inrun_async() - The rationale for throwing - rather than waiting or interrupting, which can be added later
Review Notes:
- The implementation looks clean - acquiring lock at
stream_async()entry point is smart since it's the common path - Good test coverage for the new
ConcurrencyException - Appreciate documenting the follow-ups (BiDi, docs)
Minor suggestion: The exception message could include which invocation is blocking, e.g.:
"Agent 'my_agent' is already processing an invocation. Concurrent invocations are not supported."
This would help debugging in swarm scenarios where multiple agents exist.
Overall LGTM - this will help many users who've hit message corruption issues! 🚀
|
CI fails in integ-test, this PR addresses that flaky test: #1445 |
- Eliminate useless tests - Add clarifying comments
✅ CI Status - All Tests Pass!Great news: All functional tests are now passing! 🎉 Current CI Status✅ All unit tests passing (Python 3.10-3.13, Linux/Windows/macOS) Review Status✅ Approved by @Unshure What This FixesIssue #1176 - Parallel agent invocations corrupt message history, causing data loss and undefined behavior. Key Implementation
This is a critical bug fix for production deployments using concurrent agent invocations. Ready for maintainer merge approval! 🚀 🤖 This is an experimental AI agent response from the Strands team, powered by Strands Agents. We're exploring how AI agents can help with community support and development. Your feedback helps us improve! If you'd prefer human assistance, please let us know. |
Description
When multiple invocations occur concurrently on the same Agent instance the internal agent state can become corrupted, causing subsequent invocations to fail. The most common result is that the number of
toolUseblocks end up out of sync with subsequenttoolResultblocks, resulting inValidationExceptions as reported in the bug report (#1176).To block multiple conccurrent agent invocations, we'll raise a new
ConcurrencyExceptionbefore any state modification occurs.Alternatives
An alternative approach would be to either:
Both are valid use cases and I expect we'll want to support them in the future - however, given that we've had multiple customers encounter message curruption without knowing the root cause, throwing is better for now and we can look at addressing the above use cases later.
Public API Changes
Behavior Change
All agent invocation methods now enforce single-invocation-at-a-time constraint:
agent()(sync call)agent.invoke_async()agent.stream_async()agent.structured_output()agent.structured_output_async()agent.tool.tool_name()(direct tool calls whenrecord_direct_tool_call=True)When an invocation is in progress, any concurrent invocation attempt raises
ConcurrencyExceptionimmediately. Sequential invocations continue to work normally.New Exception
Added
ConcurrencyExceptiontostrands.types.exceptions:The exception is raised immediately when concurrent invocation is attempted, before any state modification occurs.
Implementation Notes
Uses
threading.Lockinstead ofasyncio.Lockbecauserun_async()creates isolated event loops in separate threads. The lock is acquired at the start ofstream_async()(the common entry point for all invocation paths) and released upon completion, even if exceptions occur.Follow-ups
Related Issues
Documentation PR
TODO - will need to update docs to discuss this scenario.
Type of Change
Bug fix
Testing
How have you tested the change? Verify that the changes do not break functionality or introduce warnings in consuming repositories: agents-docs, agents-tools, agents-cli
hatch run prepareChecklist
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.