Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/strands/agent/conversation_manager/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
It includes:

- ConversationManager: Abstract base class defining the conversation management interface
- ProactiveCompressionConfig: Configuration type for proactive compression settings
- NullConversationManager: A no-op implementation that does not modify conversation history
- SlidingWindowConversationManager: An implementation that maintains a sliding window of messages to control context
size while preserving conversation coherence
Expand All @@ -13,14 +14,15 @@
is critical for effective agent interactions.
"""

from .conversation_manager import ConversationManager
from .conversation_manager import ConversationManager, ProactiveCompressionConfig
from .null_conversation_manager import NullConversationManager
from .sliding_window_conversation_manager import SlidingWindowConversationManager
from .summarizing_conversation_manager import SummarizingConversationManager

__all__ = [
"ConversationManager",
"NullConversationManager",
"ProactiveCompressionConfig",
"SlidingWindowConversationManager",
"SummarizingConversationManager",
]
144 changes: 121 additions & 23 deletions src/strands/agent/conversation_manager/conversation_manager.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,33 @@
"""Abstract interface for conversation history management."""

import logging
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, TypedDict, Union

from ...hooks.events import BeforeModelCallEvent
from ...hooks.registry import HookProvider, HookRegistry
from ...types.content import Message

if TYPE_CHECKING:
from ...agent.agent import Agent

logger = logging.getLogger(__name__)

DEFAULT_COMPRESSION_THRESHOLD = 0.7
DEFAULT_CONTEXT_WINDOW_LIMIT = 200_000
Comment thread
opieter-aws marked this conversation as resolved.
Comment thread
opieter-aws marked this conversation as resolved.


class ProactiveCompressionConfig(TypedDict, total=False):
"""Configuration for proactive compression when passed as an object.

Attributes:
compression_threshold: Ratio of context window usage that triggers proactive compression.
Value between 0 (exclusive) and 1 (inclusive).
Defaults to 0.7 (compress when 70% of the context window is used).
"""

compression_threshold: float


class ConversationManager(ABC, HookProvider):
"""Abstract base class for managing conversation history.
Expand All @@ -22,45 +41,122 @@ class ConversationManager(ABC, HookProvider):

ConversationManager implements the HookProvider protocol, allowing derived classes to register hooks for agent
lifecycle events. Derived classes that override register_hooks must call the base implementation to ensure proper
hook registration.
hook registration chain.

The primary responsibility of a ConversationManager is overflow recovery: when the model encounters a context
window overflow, :meth:`reduce_context` is called with ``e`` set and MUST reduce the history enough for the next
model call to succeed.

Subclasses can enable proactive compression by passing ``proactive_compression`` in the constructor.
When enabled, the base class registers a ``BeforeModelCallEvent`` hook that checks projected input tokens
against the model's context window limit and calls :meth:`reduce_context` (without ``e``) when the
threshold is exceeded. This is a best-effort operation — errors are swallowed so the model call can
still proceed.

Example:
```python
class MyConversationManager(ConversationManager):
def register_hooks(self, registry: HookRegistry, **kwargs: Any) -> None:
super().register_hooks(registry, **kwargs)
# Register additional hooks here
# Enable proactive compression with default threshold (0.7)
SlidingWindowConversationManager(window_size=50, proactive_compression=True)

# Enable proactive compression with custom threshold
SummarizingConversationManager(proactive_compression={"compression_threshold": 0.8})
```
"""

def __init__(self) -> None:
def __init__(self, *, proactive_compression: Union[bool, "ProactiveCompressionConfig", None] = None) -> None:
"""Initialize the ConversationManager.

Args:
proactive_compression: Enable proactive context compression before the model call.
- ``True``: compress when 70% of the context window is used (default threshold).
- ``{"compression_threshold": float}``: compress at the specified ratio (0, 1].
- ``False`` or ``None``: disabled, only reactive overflow recovery is used.

Raises:
ValueError: If compression_threshold is not in the valid range (0, 1].

Attributes:
removed_message_count: The messages that have been removed from the agents messages array.
These represent messages provided by the user or LLM that have been removed, not messages
included by the conversation manager through something like summarization.
"""
# Resolve the threshold from proactive_compression parameter
if proactive_compression is True:
threshold: float | None = DEFAULT_COMPRESSION_THRESHOLD
elif isinstance(proactive_compression, dict):
threshold = proactive_compression.get("compression_threshold", DEFAULT_COMPRESSION_THRESHOLD)
else:
threshold = None

if threshold is not None and (threshold <= 0 or threshold > 1):
raise ValueError(
f"compression_threshold must be between 0 (exclusive) and 1 (inclusive), got {threshold}"
)

self.removed_message_count = 0
self._compression_threshold = threshold
self._context_window_limit_warned = False

def register_hooks(self, registry: HookRegistry, **kwargs: Any) -> None:
"""Register hooks for agent lifecycle events.

Always registers a ``BeforeModelCallEvent`` hook for proactive compression.
When ``proactive_compression`` is not configured, the handler is a no-op (early return).

Derived classes that override this method must call the base implementation to ensure proper hook
registration chain.

Args:
registry: The hook registry to register callbacks with.
**kwargs: Additional keyword arguments for future extensibility.
"""
# Always subscribe — the threshold check happens inside the handler
registry.add_callback(BeforeModelCallEvent, self._on_before_model_call_threshold)
Comment thread
opieter-aws marked this conversation as resolved.

Example:
```python
def register_hooks(self, registry: HookRegistry, **kwargs: Any) -> None:
super().register_hooks(registry, **kwargs)
registry.add_callback(SomeEvent, self.on_some_event)
```
def _on_before_model_call_threshold(self, event: BeforeModelCallEvent) -> None:
"""Handle BeforeModelCallEvent for proactive compression.

When proactive compression is not configured, this is a no-op.
When configured, checks projected input tokens against the context window limit
and calls reduce_context() without error (best-effort) when threshold is exceeded.

Args:
event: The before model call event.
"""
pass
# Early return if proactive compression is not enabled
if self._compression_threshold is None:
return

context_window_limit = event.agent.model.context_window_limit
if context_window_limit is None:
context_window_limit = DEFAULT_CONTEXT_WINDOW_LIMIT
if not self._context_window_limit_warned:
self._context_window_limit_warned = True
logger.warning(
"context_window_limit=<%s> | context_window_limit not set on model, using default."
" Set context_window_limit in your model config for accurate proactive compression",
DEFAULT_CONTEXT_WINDOW_LIMIT,
)

if event.projected_input_tokens is None:
logger.debug("projected_input_tokens=<None> | skipping proactive compression")
return

ratio = event.projected_input_tokens / context_window_limit
if ratio >= self._compression_threshold:
logger.debug(
"projected_tokens=<%s>, limit=<%s>, ratio=<%.2f>, compression_threshold=<%s>"
" | compression threshold exceeded, reducing context",
event.projected_input_tokens,
context_window_limit,
ratio,
self._compression_threshold,
)
# Proactive compression is best-effort: swallow errors so the model call can still proceed.
try:
Comment thread
opieter-aws marked this conversation as resolved.
self.reduce_context(agent=event.agent)
except Exception:
logger.debug("proactive compression failed, will proceed with model call", exc_info=True)

def restore_from_session(self, state: dict[str, Any]) -> list[Message] | None:
"""Restore the Conversation Manager's state from a session.
Expand Down Expand Up @@ -99,22 +195,24 @@ def apply_management(self, agent: "Agent", **kwargs: Any) -> None:

@abstractmethod
def reduce_context(self, agent: "Agent", e: Exception | None = None, **kwargs: Any) -> None:
"""Called when the model's context window is exceeded.

This method should implement the specific strategy for reducing the window size when a context overflow occurs.
It is typically called after a ContextWindowOverflowException is caught.
"""Reduce the conversation history.

Implementations might use strategies such as:
Called in two scenarios:
1. **Reactive** (e is set): A context window overflow occurred. The implementation
MUST remove enough history for the next model call to succeed, or re-raise the error.
2. **Proactive** (e is None): The compression threshold was exceeded. This is best-effort —
returning without reduction or raising is acceptable; the model call proceeds regardless.

- Removing the N oldest messages
- Summarizing older context
- Applying importance-based filtering
- Maintaining critical conversation markers
Implementations should modify ``agent.messages`` in-place.

Args:
agent: The agent whose conversation history will be reduced.
This list is modified in-place.
e: The exception that triggered the context reduction, if any.
When set, this is a reactive overflow recovery call — the implementation MUST
reduce enough history for the next model call to succeed.
When None, this is a proactive compression call — best-effort reduction to avoid
hitting the context window limit.
**kwargs: Additional keyword arguments for future extensibility.
"""
pass
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
if TYPE_CHECKING:
from ...agent.agent import Agent

from ...types.exceptions import ContextWindowOverflowException
from .conversation_manager import ConversationManager


Expand All @@ -29,18 +28,18 @@ def apply_management(self, agent: "Agent", **kwargs: Any) -> None:
pass

def reduce_context(self, agent: "Agent", e: Exception | None = None, **kwargs: Any) -> None:
"""Does not reduce context and raises an exception.
"""Does not reduce context.

When called reactively (e is not None), re-raises the overflow exception since this
manager cannot reduce context. When called proactively (e is None), returns silently.

Args:
agent: The agent whose conversation history will remain unmodified.
e: The exception that triggered the context reduction, if any.
**kwargs: Additional keyword arguments for future extensibility.

Raises:
e: If provided.
ContextWindowOverflowException: If e is None.
e: If provided (reactive overflow).
"""
if e:
raise e
else:
raise ContextWindowOverflowException("Context window overflowed!")
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from ...types.content import ContentBlock, Messages
from ...types.exceptions import ContextWindowOverflowException
from ...types.tools import ToolResultContent
from .conversation_manager import ConversationManager
from .conversation_manager import ConversationManager, ProactiveCompressionConfig

logger = logging.getLogger(__name__)

Expand All @@ -37,6 +37,7 @@ def __init__(
should_truncate_results: bool = True,
*,
per_turn: bool | int = False,
proactive_compression: bool | ProactiveCompressionConfig | None = None,
):
"""Initialize the sliding window conversation manager.

Expand All @@ -54,6 +55,10 @@ def __init__(
manage message history and prevent the agent loop from slowing down. Start with
per_turn=True and adjust to a specific frequency (e.g., per_turn=5) if needed
for performance tuning.
proactive_compression: Enable proactive context compression before the model call.
- ``True``: compress when 70% of the context window is used (default threshold).
- ``{"compression_threshold": float}``: compress at the specified ratio (0, 1].
- ``False`` or ``None``: disabled, only reactive overflow recovery is used.

Raises:
ValueError: If window_size is negative, or if per_turn is 0 or a negative integer.
Expand All @@ -63,7 +68,7 @@ def __init__(
if isinstance(per_turn, int) and not isinstance(per_turn, bool) and per_turn <= 0:
raise ValueError(f"per_turn must be a positive integer, True, or False, got {per_turn}")

super().__init__()
super().__init__(proactive_compression=proactive_compression)

self.window_size = window_size
self.should_truncate_results = should_truncate_results
Expand Down Expand Up @@ -158,6 +163,12 @@ def apply_management(self, agent: "Agent", **kwargs: Any) -> None:
def reduce_context(self, agent: "Agent", e: Exception | None = None, **kwargs: Any) -> None:
"""Trim the oldest messages to reduce the conversation context size.

When ``e`` is set (reactive overflow recovery), attempts to truncate large tool results
first before falling back to message trimming.

When ``e`` is None (proactive compression or routine management), only trims messages
Comment thread
opieter-aws marked this conversation as resolved.
without attempting tool result truncation.

The method handles special cases where trimming the messages leads to:
- toolResult with no corresponding toolUse
- toolUse with no corresponding toolResult
Expand All @@ -166,12 +177,14 @@ def reduce_context(self, agent: "Agent", e: Exception | None = None, **kwargs: A
agent: The agent whose messages will be reduce.
This list is modified in-place.
e: The exception that triggered the context reduction, if any.
When set, this is a reactive overflow recovery call.
When None, this is a proactive or routine management call.
**kwargs: Additional keyword arguments for future extensibility.

Raises:
ContextWindowOverflowException: If the context cannot be reduced further and a context overflow
error was provided (e is not None). When called during routine window management (e is None),
logs a warning and returns without modification.
error was provided (e is not None). When called during routine window management or
proactive compression (e is None), logs a warning and returns without modification.
"""
messages = agent.messages

Expand All @@ -181,16 +194,18 @@ def reduce_context(self, agent: "Agent", e: Exception | None = None, **kwargs: A
messages[:] = []
return

# Try to truncate the tool result first
oldest_message_idx_with_tool_results = self._find_oldest_message_with_tool_results(messages)
if oldest_message_idx_with_tool_results is not None and self.should_truncate_results:
logger.debug(
"message_index=<%s> | found message with tool results at index", oldest_message_idx_with_tool_results
)
results_truncated = self._truncate_tool_results(messages, oldest_message_idx_with_tool_results)
if results_truncated:
logger.debug("message_index=<%s> | tool results truncated", oldest_message_idx_with_tool_results)
return
# Try to truncate the tool result first (only for reactive overflow, not proactive compression)
if e is not None:
oldest_message_idx_with_tool_results = self._find_oldest_message_with_tool_results(messages)
if oldest_message_idx_with_tool_results is not None and self.should_truncate_results:
logger.debug(
"message_index=<%s> | found message with tool results at index",
oldest_message_idx_with_tool_results,
)
results_truncated = self._truncate_tool_results(messages, oldest_message_idx_with_tool_results)
if results_truncated:
logger.debug("message_index=<%s> | tool results truncated", oldest_message_idx_with_tool_results)
return

# Try to trim index id when tool result cannot be truncated anymore
# If the number of messages is less than the window_size, then we default to 2, otherwise, trim to window size
Expand Down
Loading
Loading