Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/agents/extensions/memory/sqlalchemy_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,3 +319,16 @@ async def clear_session(self) -> None:
await sess.execute(
delete(self._sessions).where(self._sessions.c.session_id == self.session_id)
)

@property
def engine(self) -> AsyncEngine:
"""Access the underlying SQLAlchemy AsyncEngine.

This property provides direct access to the engine for advanced use cases,
such as checking connection pool status, configuring engine settings,
or manually disposing the engine when needed.

Returns:
AsyncEngine: The SQLAlchemy async engine instance.
"""
return self._engine
54 changes: 54 additions & 0 deletions tests/extensions/memory/test_sqlalchemy_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
Summary,
)
from sqlalchemy import select, text, update
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from sqlalchemy.sql import Select

pytest.importorskip("sqlalchemy") # Skip tests if SQLAlchemy is not installed
Expand Down Expand Up @@ -390,3 +391,56 @@ async def recording_execute(statement: Any, *args: Any, **kwargs: Any) -> Any:

assert _item_ids(retrieved_full) == ["rs_first", "msg_second"]
assert _item_ids(retrieved_limited) == ["rs_first", "msg_second"]


async def test_engine_property_from_url():
"""Test that the engine property returns the AsyncEngine from from_url."""
session_id = "engine_property_test"
session = SQLAlchemySession.from_url(session_id, url=DB_URL, create_tables=True)

# Verify engine property returns an AsyncEngine instance
assert isinstance(session.engine, AsyncEngine)

# Verify we can use the engine for advanced operations
# For example, check pool status
assert session.engine.pool is not None

# Verify we can manually dispose the engine
await session.engine.dispose()


async def test_engine_property_from_external_engine():
"""Test that the engine property returns the external engine."""
session_id = "external_engine_test"

# Create engine externally
external_engine = create_async_engine(DB_URL)

# Create session with external engine
session = SQLAlchemySession(session_id, engine=external_engine, create_tables=True)

# Verify engine property returns the same engine instance
assert session.engine is external_engine

# Verify we can use the engine
assert isinstance(session.engine, AsyncEngine)

# Clean up - user is responsible for disposing external engine
await external_engine.dispose()


async def test_engine_property_is_read_only():
"""Test that the engine property cannot be modified."""
session_id = "readonly_engine_test"
session = SQLAlchemySession.from_url(session_id, url=DB_URL, create_tables=True)

# Verify engine property exists
assert hasattr(session, "engine")

# Verify it's a property (read-only, cannot be set)
# Type ignore needed because mypy correctly detects this is read-only
with pytest.raises(AttributeError):
session.engine = create_async_engine(DB_URL) # type: ignore[misc]

# Clean up
await session.engine.dispose()