Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 5 additions & 15 deletions src/strands/session/file_session_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""File-based session manager for local filesystem storage."""

import asyncio
import json
import logging
import os
Expand Down Expand Up @@ -232,20 +231,11 @@ def list_messages(
else:
message_files = message_files[offset:]

return asyncio.run(self._load_messages_concurrently(messages_dir, message_files))

async def _load_messages_concurrently(self, messages_dir: str, message_files: list[str]) -> list[SessionMessage]:
"""Load multiple message files concurrently using async."""
if not message_files:
return []

async def load_message(filename: str) -> SessionMessage:
# Load only the message files
messages: list[SessionMessage] = []
for filename in message_files:
file_path = os.path.join(messages_dir, filename)
loop = asyncio.get_event_loop()
message_data = await loop.run_in_executor(None, self._read_file, file_path)
return SessionMessage.from_dict(message_data)

tasks = [load_message(filename) for filename in message_files]
messages = await asyncio.gather(*tasks)
message_data = self._read_file(file_path)
messages.append(SessionMessage.from_dict(message_data))

return messages
26 changes: 8 additions & 18 deletions src/strands/session/s3_session_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""S3-based session manager for cloud storage."""

import asyncio
import json
import logging
from typing import Any, Dict, List, Optional, cast
Expand Down Expand Up @@ -284,23 +283,14 @@ def list_messages(
else:
message_keys = message_keys[offset:]

# Load message objects concurrently using async
return asyncio.run(self._load_messages_concurrently(message_keys))
# Load only the required message objects
messages: List[SessionMessage] = []
for key in message_keys:
message_data = self._read_s3_object(key)
if message_data:
messages.append(SessionMessage.from_dict(message_data))

return messages

except ClientError as e:
raise SessionException(f"S3 error reading messages: {e}") from e

async def _load_messages_concurrently(self, message_keys: List[str]) -> List[SessionMessage]:
"""Load multiple message objects concurrently using async."""
if not message_keys:
return []

async def load_message(key: str) -> Optional[SessionMessage]:
loop = asyncio.get_event_loop()
message_data = await loop.run_in_executor(None, self._read_s3_object, key)
return SessionMessage.from_dict(message_data) if message_data else None

tasks = [load_message(key) for key in message_keys]
loaded_messages = await asyncio.gather(*tasks)

return [msg for msg in loaded_messages if msg is not None]
Loading