-
Notifications
You must be signed in to change notification settings - Fork 421
[DRAFT] tool interrupt #879
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
7d6bf04
to
ec157f0
Compare
ec157f0
to
dc0f361
Compare
dc0f361
to
4df352a
Compare
4df352a
to
98aabf3
Compare
98aabf3
to
1fe2bc0
Compare
1fe2bc0
to
0450dba
Compare
Raises: | ||
AttributeError: If the tool doesn't exist. | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still need to figure out how to support interrupts in direct tool calls. I would prefer to allow users to pass in the resume context into the call (e.g., agent.tool.my_tool({"resume": ...})
). I don't think though we can add this in a backwards compatible manner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have to support interruptions for direct tool calls? Seems a bit silly IMHO
ConversationManager, | ||
SlidingWindowConversationManager, | ||
) | ||
from .execution_state import ExecutionState |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With interrupts, the agent now enters into a state that requires specific input from the user to continue. To track this, I created an ExecutionState enum. More details below.
|
||
self.execution_state = ExecutionState.ASSISTANT | ||
|
||
self.interrupts = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If in an INTERRUPT execution state, the agent will hold a reference to the interrupts raised by the user. To get things working, I am storing the interrupts in a dictionary in memory. As a follow up, I will think of a more formal mechanism for storing the interrupt state that can also be serialized for session management.
|
||
if result.stop_reason == "interrupt": | ||
self.execution_state = ExecutionState.INTERRUPT | ||
self.interrupts = {interrupt.name: interrupt for interrupt in result.interrupts} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Right now we are only supporting interrupts for tools. We use tool names for the interrupt names.
- We support raising multiple interrupts in a single request to the agent because the agent can execute multiple tools in parallel.
- Only one interrupt is allowed for each tool. However, users can provide multiple reasons for interrupting a tool. More details on this below.
raise ValueError("<TODO>.") | ||
|
||
for interrupt in self.interrupts.values(): | ||
interrupt.resume = prompt["resume"][interrupt.name] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Users fill in a resume content block that maps interrupt names to the user provided input required for resuming a tool execution after interrupt.
message: Message | ||
metrics: EventLoopMetrics | ||
state: Any | ||
interrupts: Optional[list[Interrupt]] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Users can parse the raised interrupts from the AgentResult returned from the agent invoke. interrupts
will be populated when stop_reason
is "interrupt".
""" | ||
|
||
ASSISTANT = "assistant" | ||
INTERRUPT = "interrupt" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking we could add another state called "TOOL" to indicate that the agent is waiting for tool results. Under this state, users would be able to pass in tool result content blocks into agent invoke. This is something to consider for follow up though.
) | ||
invocation_state["event_loop_cycle_span"] = cycle_span | ||
|
||
# Create a trace for the stream_messages call |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the red here was moved below into _handle_model_execution
. It is a straight copy and paste.
async for event in stream_messages(agent.model, agent.system_prompt, agent.messages, tool_specs): | ||
if not isinstance(event, ModelStopReason): | ||
yield event | ||
if agent.execution_state == ExecutionState.INTERRUPT: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the reason I moved the model execution code. So that I could create this if else condition. If we are in an interrupt state, we need to go straight to tool calling.
) -> AsyncGenerator[TypedEvent, None]: | ||
"""<TODO>.""" | ||
# Create a trace for the stream_messages call | ||
stream_trace = Trace("stream_messages", parent_id=cycle_trace.id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All this green is a direct copy and paste. No changes were made to the model execution logic.
0450dba
to
c1491ad
Compare
tool_use: The tool parameters that will be passed to selected_tool. | ||
invocation_state: Keyword arguments that will be passed to the tool. | ||
cancel: A user defined message that when set, will lead to canceling of the tool call. | ||
The message is used to populate a tool result with status "error". |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a new complementary feature to tool interrupts. It provides user a mechanism to cancel a tool call if the resume response is not sufficient. For example, a user could interrupt from a BeforeToolInvocationEvent to ask if a user accepts or rejects the call. If rejected, users can then set the cancel message from their event hook.
|
||
|
||
@dataclass | ||
class Interrupt: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to explore adding an async pause
method that will allow users to asynchronously ask for HIL inputs. That would be P1 work. P0 is focused on full on interrupt.
self.activated = False | ||
return self.resume | ||
|
||
self.reasons.append(reason) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We allow all hooks registered on a BeforeToolInvocationEvent to run. This allows users to raise multiple interrupts on a single tool use. The reasons are appended.
try: | ||
callback(event) | ||
except InterruptException: | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We catch the InterruptException to allow all registered hooks to run. The state of the interrupt is stored as an instance variable on the hook event.
result = await asyncio.to_thread(self._tool_func, **validated_input) # type: ignore | ||
yield self._wrap_tool_result(tool_use_id, result) | ||
|
||
except InterruptException as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Users raise interrupts in the same manner in their python tools. The callable interrupt instance is passed through tool context:
@tool(context=True)
def delete_tool(tool_context: "ToolContext", key: str) -> str:
approval = tool_context.interrupt("APPROVAL")
if approval != "A":
return "approval not granted"
print(f"DELET_TOOL | deleting {key}")
return f"successfully deleted {key}"
"content": [{"text": f"{tool_name} interrupted"}], | ||
} | ||
after_event = agent.hooks.invoke_callbacks( | ||
AfterToolInvocationEvent( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AfterToolInvocationEvent hooks are allowed to run during a tool interrupt.
) | ||
|
||
if before_event.interrupt.activated: | ||
yield ToolInterruptEvent(before_event.interrupt) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Idea: We should consider allowing users to define tool result content in their BeforeToolInvocationEvent hooks. We can then append the extra content to the final tool result. This would be particularly useful for users to add their HIL responses.
Note, if interrupting from within a tool, users can add the HIL responses themselves to the final output.
task_events[task_id].set() | ||
|
||
asyncio.gather(*tasks) | ||
await asyncio.gather(*tasks) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Caught this and will fix today in a separate PR.
yield event | ||
|
||
if isinstance(event, ToolInterruptEvent): | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unlike concurrent tool execution, for sequential execution, we break early if we see a ToolInterruptEvent.
guardContent: GuardContent | ||
image: ImageContent | ||
reasoningContent: ReasoningContentBlock | ||
resume: dict[str, Any] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does not get added to the model messages array.
# String input - convert to user message | ||
messages = [{"role": "user", "content": [{"text": prompt}]}] | ||
elif isinstance(prompt, dict): | ||
messages = [{"role": "user", "content": prompt}] if "resume" not in prompt else [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not add resume prompts to the model messages array since it is Strands specific.
yield ModelMessageEvent(message=message) | ||
|
||
|
||
async def _handle_tool_execution( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not shown yet, but when resuming from interrupt, I will skip the tool calls that already succeeded. We just filter out the tool_uses here and add the successful tool results to the tool result message below.
Description
Support interrupting tool calls to ask for human feedback.
For more details, see Usage section below and comments under "Files changed".
Related Issues
#204
Documentation PR
TODO
Usage
Type of Change
New feature
Testing
TODO
How have you tested the change? Verify that the changes do not break functionality or introduce warnings in consuming repositories: agents-docs, agents-tools, agents-cli
hatch run prepare
Checklist
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.