Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 21 additions & 11 deletions sqlspec/adapters/adbc/adk/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -639,31 +639,41 @@ def delete_session(self, session_id: str) -> None:
finally:
cursor.close() # type: ignore[no-untyped-call]

def list_sessions(self, app_name: str, user_id: str) -> "list[SessionRecord]":
"""List all sessions for a user in an app.
def list_sessions(self, app_name: str, user_id: str | None = None) -> "list[SessionRecord]":
"""List sessions for an app, optionally filtered by user.

Args:
app_name: Application name.
user_id: User identifier.
user_id: User identifier. If None, lists all sessions for the app.

Returns:
List of session records ordered by update_time DESC.

Notes:
Uses composite index on (app_name, user_id).
"""
sql = f"""
SELECT id, app_name, user_id, state, create_time, update_time
FROM {self._session_table}
WHERE app_name = ? AND user_id = ?
ORDER BY update_time DESC
Uses composite index on (app_name, user_id) when user_id is provided.
"""
if user_id is None:
sql = f"""
SELECT id, app_name, user_id, state, create_time, update_time
FROM {self._session_table}
WHERE app_name = ?
ORDER BY update_time DESC
"""
params: tuple[str, ...] = (app_name,)
else:
sql = f"""
SELECT id, app_name, user_id, state, create_time, update_time
FROM {self._session_table}
WHERE app_name = ? AND user_id = ?
ORDER BY update_time DESC
"""
params = (app_name, user_id)

try:
with self._config.provide_connection() as conn:
cursor = conn.cursor()
try:
cursor.execute(sql, (app_name, user_id))
cursor.execute(sql, params)
rows = cursor.fetchall()

return [
Expand Down
32 changes: 21 additions & 11 deletions sqlspec/adapters/aiosqlite/adk/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,29 +342,39 @@ async def update_session_state(self, session_id: str, state: "dict[str, Any]") -
await conn.execute(sql, (state_json, now_julian, session_id))
await conn.commit()

async def list_sessions(self, app_name: str, user_id: str) -> "list[SessionRecord]":
"""List all sessions for a user in an app.
async def list_sessions(self, app_name: str, user_id: str | None = None) -> "list[SessionRecord]":
"""List sessions for an app, optionally filtered by user.

Args:
app_name: Application name.
user_id: User identifier.
user_id: User identifier. If None, lists all sessions for the app.

Returns:
List of session records ordered by update_time DESC.

Notes:
Uses composite index on (app_name, user_id).
"""
sql = f"""
SELECT id, app_name, user_id, state, create_time, update_time
FROM {self._session_table}
WHERE app_name = ? AND user_id = ?
ORDER BY update_time DESC
Uses composite index on (app_name, user_id) when user_id is provided.
"""
if user_id is None:
sql = f"""
SELECT id, app_name, user_id, state, create_time, update_time
FROM {self._session_table}
WHERE app_name = ?
ORDER BY update_time DESC
"""
params: tuple[str, ...] = (app_name,)
else:
sql = f"""
SELECT id, app_name, user_id, state, create_time, update_time
FROM {self._session_table}
WHERE app_name = ? AND user_id = ?
ORDER BY update_time DESC
"""
params = (app_name, user_id)

async with self._config.provide_connection() as conn:
await self._enable_foreign_keys(conn)
cursor = await conn.execute(sql, (app_name, user_id))
cursor = await conn.execute(sql, params)
rows = await cursor.fetchall()

return [
Expand Down
32 changes: 21 additions & 11 deletions sqlspec/adapters/asyncmy/adk/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,29 +326,39 @@ async def delete_session(self, session_id: str) -> None:
await cursor.execute(sql, (session_id,))
await conn.commit()

async def list_sessions(self, app_name: str, user_id: str) -> "list[SessionRecord]":
"""List all sessions for a user in an app.
async def list_sessions(self, app_name: str, user_id: str | None = None) -> "list[SessionRecord]":
"""List sessions for an app, optionally filtered by user.

Args:
app_name: Application name.
user_id: User identifier.
user_id: User identifier. If None, lists all sessions for the app.

Returns:
List of session records ordered by update_time DESC.

Notes:
Uses composite index on (app_name, user_id).
"""
sql = f"""
SELECT id, app_name, user_id, state, create_time, update_time
FROM {self._session_table}
WHERE app_name = %s AND user_id = %s
ORDER BY update_time DESC
Uses composite index on (app_name, user_id) when user_id is provided.
"""
if user_id is None:
sql = f"""
SELECT id, app_name, user_id, state, create_time, update_time
FROM {self._session_table}
WHERE app_name = %s
ORDER BY update_time DESC
"""
params: tuple[str, ...] = (app_name,)
else:
sql = f"""
SELECT id, app_name, user_id, state, create_time, update_time
FROM {self._session_table}
WHERE app_name = %s AND user_id = %s
ORDER BY update_time DESC
"""
params = (app_name, user_id)

try:
async with self._config.provide_connection() as conn, conn.cursor() as cursor:
await cursor.execute(sql, (app_name, user_id))
await cursor.execute(sql, params)
rows = await cursor.fetchall()

return [
Expand Down
32 changes: 21 additions & 11 deletions sqlspec/adapters/asyncpg/adk/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,29 +294,39 @@ async def delete_session(self, session_id: str) -> None:
async with self.config.provide_connection() as conn:
await conn.execute(sql, session_id)

async def list_sessions(self, app_name: str, user_id: str) -> "list[SessionRecord]":
"""List all sessions for a user in an app.
async def list_sessions(self, app_name: str, user_id: str | None = None) -> "list[SessionRecord]":
"""List sessions for an app, optionally filtered by user.

Args:
app_name: Application name.
user_id: User identifier.
user_id: User identifier. If None, lists all sessions for the app.

Returns:
List of session records ordered by update_time DESC.

Notes:
Uses composite index on (app_name, user_id).
"""
sql = f"""
SELECT id, app_name, user_id, state, create_time, update_time
FROM {self._session_table}
WHERE app_name = $1 AND user_id = $2
ORDER BY update_time DESC
Uses composite index on (app_name, user_id) when user_id is provided.
"""
if user_id is None:
sql = f"""
SELECT id, app_name, user_id, state, create_time, update_time
FROM {self._session_table}
WHERE app_name = $1
ORDER BY update_time DESC
"""
params = [app_name]
else:
sql = f"""
SELECT id, app_name, user_id, state, create_time, update_time
FROM {self._session_table}
WHERE app_name = $1 AND user_id = $2
ORDER BY update_time DESC
"""
params = [app_name, user_id]

try:
async with self.config.provide_connection() as conn:
rows = await conn.fetch(sql, app_name, user_id)
rows = await conn.fetch(sql, *params)

return [
SessionRecord(
Expand Down
39 changes: 24 additions & 15 deletions sqlspec/adapters/bigquery/adk/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,20 +351,29 @@ async def update_session_state(self, session_id: str, state: "dict[str, Any]") -
"""
await async_(self._update_session_state)(session_id, state)

def _list_sessions(self, app_name: str, user_id: str) -> "list[SessionRecord]":
def _list_sessions(self, app_name: str, user_id: "str | None") -> "list[SessionRecord]":
"""Synchronous implementation of list_sessions."""
table_name = self._get_full_table_name(self._session_table)
sql = f"""
SELECT id, app_name, user_id, JSON_VALUE(state) as state, create_time, update_time
FROM {table_name}
WHERE app_name = @app_name AND user_id = @user_id
ORDER BY update_time DESC
"""

params = [
ScalarQueryParameter("app_name", "STRING", app_name),
ScalarQueryParameter("user_id", "STRING", user_id),
]
if user_id is None:
sql = f"""
SELECT id, app_name, user_id, JSON_VALUE(state) as state, create_time, update_time
FROM {table_name}
WHERE app_name = @app_name
ORDER BY update_time DESC
"""
params = [ScalarQueryParameter("app_name", "STRING", app_name)]
else:
sql = f"""
SELECT id, app_name, user_id, JSON_VALUE(state) as state, create_time, update_time
FROM {table_name}
WHERE app_name = @app_name AND user_id = @user_id
ORDER BY update_time DESC
"""
params = [
ScalarQueryParameter("app_name", "STRING", app_name),
ScalarQueryParameter("user_id", "STRING", user_id),
]

with self._config.provide_connection() as conn:
job_config = QueryJobConfig(query_parameters=params)
Expand All @@ -383,18 +392,18 @@ def _list_sessions(self, app_name: str, user_id: str) -> "list[SessionRecord]":
for row in results
]

async def list_sessions(self, app_name: str, user_id: str) -> "list[SessionRecord]":
"""List all sessions for a user in an app.
async def list_sessions(self, app_name: str, user_id: str | None = None) -> "list[SessionRecord]":
"""List sessions for an app, optionally filtered by user.

Args:
app_name: Application name.
user_id: User identifier.
user_id: User identifier. If None, lists all sessions for the app.

Returns:
List of session records ordered by update_time DESC.

Notes:
Uses clustering on (app_name, user_id) for efficiency.
Uses clustering on (app_name, user_id) when user_id is provided for efficiency.
"""
return await async_(self._list_sessions)(app_name, user_id)

Expand Down
32 changes: 21 additions & 11 deletions sqlspec/adapters/duckdb/adk/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,29 +315,39 @@ def delete_session(self, session_id: str) -> None:
conn.execute(delete_session_sql, (session_id,))
conn.commit()

def list_sessions(self, app_name: str, user_id: str) -> "list[SessionRecord]":
"""List all sessions for a user in an app.
def list_sessions(self, app_name: str, user_id: str | None = None) -> "list[SessionRecord]":
"""List sessions for an app, optionally filtered by user.

Args:
app_name: Application name.
user_id: User identifier.
user_id: User identifier. If None, lists all sessions for the app.

Returns:
List of session records ordered by update_time DESC.

Notes:
Uses composite index on (app_name, user_id).
"""
sql = f"""
SELECT id, app_name, user_id, state, create_time, update_time
FROM {self._session_table}
WHERE app_name = ? AND user_id = ?
ORDER BY update_time DESC
Uses composite index on (app_name, user_id) when user_id is provided.
"""
if user_id is None:
sql = f"""
SELECT id, app_name, user_id, state, create_time, update_time
FROM {self._session_table}
WHERE app_name = ?
ORDER BY update_time DESC
"""
params: tuple[str, ...] = (app_name,)
else:
sql = f"""
SELECT id, app_name, user_id, state, create_time, update_time
FROM {self._session_table}
WHERE app_name = ? AND user_id = ?
ORDER BY update_time DESC
"""
params = (app_name, user_id)

try:
with self._config.provide_connection() as conn:
cursor = conn.execute(sql, (app_name, user_id))
cursor = conn.execute(sql, params)
rows = cursor.fetchall()

return [
Expand Down
Loading