diff --git a/src/strands/session/file_session_manager.py b/src/strands/session/file_session_manager.py index 491f7ad60..93adeb7f2 100644 --- a/src/strands/session/file_session_manager.py +++ b/src/strands/session/file_session_manager.py @@ -1,5 +1,6 @@ """File-based session manager for local filesystem storage.""" +import asyncio import json import logging import os @@ -231,11 +232,20 @@ def list_messages( else: message_files = message_files[offset:] - # Load only the message files - messages: list[SessionMessage] = [] - for filename in message_files: + return asyncio.run(self._load_messages_concurrently(messages_dir, message_files)) + + async def _load_messages_concurrently(self, messages_dir: str, message_files: list[str]) -> list[SessionMessage]: + """Load multiple message files concurrently using async.""" + if not message_files: + return [] + + async def load_message(filename: str) -> SessionMessage: file_path = os.path.join(messages_dir, filename) - message_data = self._read_file(file_path) - messages.append(SessionMessage.from_dict(message_data)) + loop = asyncio.get_event_loop() + message_data = await loop.run_in_executor(None, self._read_file, file_path) + return SessionMessage.from_dict(message_data) + + tasks = [load_message(filename) for filename in message_files] + messages = await asyncio.gather(*tasks) return messages diff --git a/src/strands/session/s3_session_manager.py b/src/strands/session/s3_session_manager.py index c6ce28d80..1f6ffe7f1 100644 --- a/src/strands/session/s3_session_manager.py +++ b/src/strands/session/s3_session_manager.py @@ -1,5 +1,6 @@ """S3-based session manager for cloud storage.""" +import asyncio import json import logging from typing import Any, Dict, List, Optional, cast @@ -283,14 +284,23 @@ def list_messages( else: message_keys = message_keys[offset:] - # Load only the required message objects - messages: List[SessionMessage] = [] - for key in message_keys: - message_data = self._read_s3_object(key) - if message_data: - messages.append(SessionMessage.from_dict(message_data)) - - return messages + # Load message objects concurrently using async + return asyncio.run(self._load_messages_concurrently(message_keys)) except ClientError as e: raise SessionException(f"S3 error reading messages: {e}") from e + + async def _load_messages_concurrently(self, message_keys: List[str]) -> List[SessionMessage]: + """Load multiple message objects concurrently using async.""" + if not message_keys: + return [] + + async def load_message(key: str) -> Optional[SessionMessage]: + loop = asyncio.get_event_loop() + message_data = await loop.run_in_executor(None, self._read_s3_object, key) + return SessionMessage.from_dict(message_data) if message_data else None + + tasks = [load_message(key) for key in message_keys] + loaded_messages = await asyncio.gather(*tasks) + + return [msg for msg in loaded_messages if msg is not None]