Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/strands/multiagent/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def from_dict(cls, data: dict[str, Any]) -> "MultiAgentResult":
metrics = _parse_metrics(data.get("accumulated_metrics", {}))

multiagent_result = cls(
status=Status(data.get("status", Status.PENDING.value)),
status=Status(data["status"]),
results=results,
accumulated_usage=usage,
accumulated_metrics=metrics,
Expand All @@ -164,8 +164,13 @@ class MultiAgentBase(ABC):

This class integrates with existing Strands Agent instances and provides
multi-agent orchestration capabilities.

Attributes:
id: Unique MultiAgent id for session management,etc.
"""

id: str

@abstractmethod
async def invoke_async(
self, task: str | list[ContentBlock], invocation_state: dict[str, Any] | None = None, **kwargs: Any
Expand Down
79 changes: 72 additions & 7 deletions src/strands/session/file_session_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,24 @@
import os
import shutil
import tempfile
from typing import Any, Optional, cast
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, Optional, cast

from .. import _identifier
from ..types.exceptions import SessionException
from ..types.session import Session, SessionAgent, SessionMessage
from ..types.session import Session, SessionAgent, SessionMessage, SessionType
from .repository_session_manager import RepositorySessionManager
from .session_repository import SessionRepository

if TYPE_CHECKING:
from ..multiagent.base import MultiAgentBase

logger = logging.getLogger(__name__)

SESSION_PREFIX = "session_"
AGENT_PREFIX = "agent_"
MESSAGE_PREFIX = "message_"
MULTI_AGENT_PREFIX = "multi_agent_"


class FileSessionManager(RepositorySessionManager, SessionRepository):
Expand All @@ -37,19 +42,27 @@ class FileSessionManager(RepositorySessionManager, SessionRepository):
```
"""

def __init__(self, session_id: str, storage_dir: Optional[str] = None, **kwargs: Any):
def __init__(
self,
session_id: str,
storage_dir: Optional[str] = None,
*,
session_type: SessionType = SessionType.AGENT,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to require that this is a named parameter - @dbschmigelski can you confirm - I recall you we discussed a similar situation before? (if so, is there anywhere we can document/write this down)?

Suggested change
session_type: SessionType = SessionType.AGENT,
*,
session_type: SessionType = SessionType.AGENT,

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, why does this need to be a named parameter?

**kwargs: Any,
):
"""Initialize FileSession with filesystem storage.

Args:
session_id: ID for the session.
ID is not allowed to contain path separators (e.g., a/b).
storage_dir: Directory for local filesystem storage (defaults to temp dir).
session_type: single agent or multiagent.
**kwargs: Additional keyword arguments for future extensibility.
"""
self.storage_dir = storage_dir or os.path.join(tempfile.gettempdir(), "strands/sessions")
os.makedirs(self.storage_dir, exist_ok=True)

super().__init__(session_id=session_id, session_repository=self)
super().__init__(session_id=session_id, session_repository=self, session_type=session_type)

def _get_session_path(self, session_id: str) -> str:
"""Get session directory path.
Expand Down Expand Up @@ -107,8 +120,11 @@ def _read_file(self, path: str) -> dict[str, Any]:
def _write_file(self, path: str, data: dict[str, Any]) -> None:
"""Write JSON file."""
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
# This automic write ensure the completeness of session files in both single agent/ multi agents
tmp = f"{path}.tmp"
with open(tmp, "w", encoding="utf-8", newline="\n") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
os.replace(tmp, path)

def create_session(self, session: Session, **kwargs: Any) -> Session:
"""Create a new session."""
Expand All @@ -118,8 +134,10 @@ def create_session(self, session: Session, **kwargs: Any) -> Session:

# Create directory structure
os.makedirs(session_dir, exist_ok=True)
os.makedirs(os.path.join(session_dir, "agents"), exist_ok=True)

if self.session_type == SessionType.AGENT:
os.makedirs(os.path.join(session_dir, "agents"), exist_ok=True)
else:
os.makedirs(os.path.join(session_dir, "multi_agents"), exist_ok=True)
# Write session file
session_file = os.path.join(session_dir, "session.json")
session_dict = session.to_dict()
Expand All @@ -136,6 +154,15 @@ def read_session(self, session_id: str, **kwargs: Any) -> Optional[Session]:
session_data = self._read_file(session_file)
return Session.from_dict(session_data)

def update_session(self, session_id: str, **kwargs: Any) -> None:
"""Update session updated_at field."""
session_file = os.path.join(self._get_session_path(session_id), "session.json")
session_data = self.read_session(session_id)
if session_data is None:
raise SessionException(f"Session {session_id} does not exist")
session_data.updated_at = datetime.now(timezone.utc).isoformat()
self._write_file(session_file, session_data.to_dict())

def delete_session(self, session_id: str, **kwargs: Any) -> None:
"""Delete session and all associated data."""
session_dir = self._get_session_path(session_id)
Expand Down Expand Up @@ -239,3 +266,41 @@ def list_messages(
messages.append(SessionMessage.from_dict(message_data))

return messages

def _get_multi_agent_path(self, session_id: str, multi_agent_id: str) -> str:
"""Get multi-agent state file path."""
session_path = self._get_session_path(session_id)
multi_agent_id = _identifier.validate(multi_agent_id, _identifier.Identifier.AGENT)
return os.path.join(session_path, "multi_agents", f"{MULTI_AGENT_PREFIX}{multi_agent_id}")

def create_multi_agent(self, session_id: str, multi_agent: "MultiAgentBase", **kwargs: Any) -> None:
"""Create a new multiagent state in the session."""
multi_agent_id = multi_agent.id
multi_agent_dir = self._get_multi_agent_path(session_id, multi_agent_id)
os.makedirs(multi_agent_dir, exist_ok=True)

multi_agent_file = os.path.join(multi_agent_dir, "multi_agent.json")
session_data = multi_agent.serialize_state()
self._write_file(multi_agent_file, session_data)

def read_multi_agent(self, session_id: str, multi_agent_id: str, **kwargs: Any) -> Optional[dict[str, Any]]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For read_agent and update_agent, we pass around a SessionAgent instance. For consistency, do we want to setup a SessionMultiAgent type?

Copy link
Contributor Author

@JackYPCOnline JackYPCOnline Oct 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially I introduced a data class MultiagentState then we decided to remove that, so now, I don't think we should do that for update
For read we only have def read_agent(self, session_id: str, agent_id: str, **kwargs: Any) -> Optional[SessionAgent]

"""Read multi-agent state from filesystem."""
multi_agent_file = os.path.join(self._get_multi_agent_path(session_id, multi_agent_id), "multi_agent.json")
if not os.path.exists(multi_agent_file):
return None
return self._read_file(multi_agent_file)

def update_multi_agent(self, session_id: str, multi_agent_state: dict[str, Any], **kwargs: Any) -> None:
"""Update multi-agent state from filesystem."""
multi_agent_id = multi_agent_state.get("id")
if multi_agent_id is None:
raise SessionException("MultiAgent state must have an 'id' field")
previous_multi_agent_state = self.read_multi_agent(session_id=session_id, multi_agent_id=multi_agent_id)
if previous_multi_agent_state is None:
raise SessionException(f"MultiAgent state {multi_agent_id} in session {session_id} does not exist")

multi_agent_file = os.path.join(self._get_multi_agent_path(session_id, multi_agent_id), "multi_agent.json")
self._write_file(multi_agent_file, multi_agent_state)

# Update session.update_at
self.update_session(session_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we also need to call self.update_session for update_agent() calls?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see SessionAgent has filed updated_at but yes, even for sinlge agent we should update matadata.

42 changes: 39 additions & 3 deletions src/strands/session/repository_session_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,22 @@

if TYPE_CHECKING:
from ..agent.agent import Agent
from ..multiagent.base import MultiAgentBase

logger = logging.getLogger(__name__)


class RepositorySessionManager(SessionManager):
"""Session manager for persisting agents in a SessionRepository."""

def __init__(self, session_id: str, session_repository: SessionRepository, **kwargs: Any):
def __init__(
self,
session_id: str,
session_repository: SessionRepository,
*,
session_type: SessionType = SessionType.AGENT,
**kwargs: Any,
):
"""Initialize the RepositorySessionManager.

If no session with the specified session_id exists yet, it will be created
Expand All @@ -34,22 +42,27 @@ def __init__(self, session_id: str, session_repository: SessionRepository, **kwa
session_id: ID to use for the session. A new session with this id will be created if it does
not exist in the repository yet
session_repository: Underlying session repository to use to store the sessions state.
session_type: single agent or multiagent.
**kwargs: Additional keyword arguments for future extensibility.

"""
super().__init__(session_type=session_type)

self.session_repository = session_repository
self.session_id = session_id
session = session_repository.read_session(session_id)
# Create a session if it does not exist yet
if session is None:
logger.debug("session_id=<%s> | session not found, creating new session", self.session_id)
session = Session(session_id=session_id, session_type=SessionType.AGENT)
session = Session(session_id=session_id, session_type=session_type)
session_repository.create_session(session)

self.session = session
self.session_type = session.session_type

# Keep track of the latest message of each agent in case we need to redact it.
self._latest_agent_message: dict[str, Optional[SessionMessage]] = {}
if self.session_type == SessionType.AGENT:
self._latest_agent_message: dict[str, Optional[SessionMessage]] = {}

def append_message(self, message: Message, agent: "Agent", **kwargs: Any) -> None:
"""Append a message to the agent's session.
Expand Down Expand Up @@ -152,3 +165,26 @@ def initialize(self, agent: "Agent", **kwargs: Any) -> None:

# Restore the agents messages array including the optional prepend messages
agent.messages = prepend_messages + [session_message.to_message() for session_message in session_messages]

def sync_multi_agent(self, source: "MultiAgentBase", **kwargs: Any) -> None:
"""Serialize and update the multi-agent state into the session repository.

Args:
source: Multi-agent source object to sync to the session.
**kwargs: Additional keyword arguments for future extensibility.
"""
self.session_repository.update_multi_agent(self.session_id, source.serialize_state())

def initialize_multi_agent(self, source: "MultiAgentBase", **kwargs: Any) -> None:
"""Initialize multi-agent state from the session repository.

Args:
source: Multi-agent source object to restore state into
**kwargs: Additional keyword arguments for future extensibility.
"""
state = self.session_repository.read_multi_agent(self.session_id, source.id, **kwargs)
if state is None:
self.session_repository.create_multi_agent(self.session_id, source)
else:
logger.debug("session_id=<%s> | restoring multi-agent state", self.session_id)
source.deserialize_state(state)
43 changes: 40 additions & 3 deletions src/strands/session/s3_session_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,27 @@

import json
import logging
from typing import Any, Dict, List, Optional, cast
from typing import TYPE_CHECKING, Any, Dict, List, Optional, cast

import boto3
from botocore.config import Config as BotocoreConfig
from botocore.exceptions import ClientError

from .. import _identifier
from ..types.exceptions import SessionException
from ..types.session import Session, SessionAgent, SessionMessage
from ..types.session import Session, SessionAgent, SessionMessage, SessionType
from .repository_session_manager import RepositorySessionManager
from .session_repository import SessionRepository

if TYPE_CHECKING:
from ..multiagent.base import MultiAgentBase

logger = logging.getLogger(__name__)

SESSION_PREFIX = "session_"
AGENT_PREFIX = "agent_"
MESSAGE_PREFIX = "message_"
MULTI_AGENT_PREFIX = "multi_agent_"


class S3SessionManager(RepositorySessionManager, SessionRepository):
Expand Down Expand Up @@ -46,6 +50,8 @@ def __init__(
boto_session: Optional[boto3.Session] = None,
boto_client_config: Optional[BotocoreConfig] = None,
region_name: Optional[str] = None,
*,
session_type: SessionType = SessionType.AGENT,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have to make session_type a named parameter on FileSessionManager but not here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it should, I had a bad rebase

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does it need to be a named parameter though?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This forces users to provide clarity by explicitly showing what sesion manager type they are using.

**kwargs: Any,
):
"""Initialize S3SessionManager with S3 storage.
Expand All @@ -58,6 +64,7 @@ def __init__(
boto_session: Optional boto3 session
boto_client_config: Optional boto3 client configuration
region_name: AWS region for S3 storage
session_type: single agent or multiagent.
**kwargs: Additional keyword arguments for future extensibility.
"""
self.bucket = bucket
Expand All @@ -78,7 +85,7 @@ def __init__(
client_config = BotocoreConfig(user_agent_extra="strands-agents")

self.client = session.client(service_name="s3", config=client_config)
super().__init__(session_id=session_id, session_repository=self)
super().__init__(session_id=session_id, session_type=session_type, session_repository=self)

def _get_session_path(self, session_id: str) -> str:
"""Get session S3 prefix.
Expand Down Expand Up @@ -294,3 +301,33 @@ def list_messages(

except ClientError as e:
raise SessionException(f"S3 error reading messages: {e}") from e

def _get_multi_agent_path(self, session_id: str, multi_agent_id: str) -> str:
"""Get multi-agent S3 prefix."""
session_path = self._get_session_path(session_id)
multi_agent_id = _identifier.validate(multi_agent_id, _identifier.Identifier.AGENT)
return f"{session_path}multi_agents/{MULTI_AGENT_PREFIX}{multi_agent_id}/"

def create_multi_agent(self, session_id: str, multi_agent: "MultiAgentBase", **kwargs: Any) -> None:
"""Create a new multiagent state in S3."""
multi_agent_id = multi_agent.id
multi_agent_key = f"{self._get_multi_agent_path(session_id, multi_agent_id)}multi_agent.json"
session_data = multi_agent.serialize_state()
self._write_s3_object(multi_agent_key, session_data)

def read_multi_agent(self, session_id: str, multi_agent_id: str, **kwargs: Any) -> Optional[dict[str, Any]]:
"""Read multi-agent state from S3."""
multi_agent_key = f"{self._get_multi_agent_path(session_id, multi_agent_id)}multi_agent.json"
return self._read_s3_object(multi_agent_key)

def update_multi_agent(self, session_id: str, multi_agent_state: dict[str, Any], **kwargs: Any) -> None:
"""Update multi-agent state in S3."""
multi_agent_id = multi_agent_state.get("id")
if multi_agent_id is None:
raise SessionException("MultiAgent state must have an 'id' field")
previous_multi_agent_state = self.read_multi_agent(session_id=session_id, multi_agent_id=multi_agent_id)
if previous_multi_agent_state is None:
raise SessionException(f"MultiAgent state {multi_agent_id} in session {session_id} does not exist")

multi_agent_key = f"{self._get_multi_agent_path(session_id, multi_agent_id)}multi_agent.json"
self._write_s3_object(multi_agent_key, multi_agent_state)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need an update_session call here also as we did in `FileSessionManager?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function wise no, s3 files has update at in console.
For file itself, yes. I had this conversation with @dbschmigelski in previous large PR. The conclusion is not updating it in S3

Loading