Skip to content

Commit

Permalink
community[minor]: Add native async support to SQLChatMessageHistory (#…
Browse files Browse the repository at this point in the history
…22065)

# package community: Fix SQLChatMessageHistory

## Description
Here is a rewrite of `SQLChatMessageHistory` to properly implement the
asynchronous approach. The code circumvents [issue
22021](#22021) by
accepting a synchronous call to `def add_messages()` in an asynchronous
scenario. This bypasses the bug.

For the same reasons as in [PR
22](langchain-ai/langchain-postgres#32) of
`langchain-postgres`, we use a lazy strategy for table creation. Indeed,
the promise of the constructor cannot be fulfilled without this. It is
not possible to invoke a synchronous call in a constructor. We
compensate for this by waiting for the next asynchronous method call to
create the table.

The goal of the `PostgresChatMessageHistory` class (in
`langchain-postgres`) is, among other things, to be able to recycle
database connections. The implementation of the class is problematic, as
we have demonstrated in [issue
22021](#22021).

Our new implementation of `SQLChatMessageHistory` achieves this by using
a singleton of type (`Async`)`Engine` for the database connection. The
connection pool is managed by this singleton, and the code is then
reentrant.

We also accept the type `str` (optionally complemented by `async_mode`.
I know you don't like this much, but it's the only way to allow an
asynchronous connection string).

In order to unify the different classes handling database connections,
we have renamed `connection_string` to `connection`, and `Session` to
`session_maker`.

Now, a single transaction is used to add a list of messages. Thus, a
crash during this write operation will not leave the database in an
unstable state with a partially added message list. This makes the code
resilient.

We believe that the `PostgresChatMessageHistory` class is no longer
necessary and can be replaced by:
```
PostgresChatMessageHistory = SQLChatMessageHistory
```
This also fixes the bug.


## Issue
- [issue 22021](#22021)
  - Bug in _exit_history()
  - Bugs in PostgresChatMessageHistory and sync usage
  - Bugs in PostgresChatMessageHistory and async usage
- [issue
36](langchain-ai/langchain-postgres#36)
 ## Twitter handle:
pprados

## Tests
- libs/community/tests/unit_tests/chat_message_histories/test_sql.py
(add async test)

@baskaryan, @eyurtsev or @hwchase17 can you check this PR ?
And, I've been waiting a long time for validation from other PRs. Can
you take a look?
- [PR 32](langchain-ai/langchain-postgres#32)
- [PR 15575](#15575)
- [PR 13200](#13200)

---------

Co-authored-by: Eugene Yurtsev <eyurtsev@gmail.com>
  • Loading branch information
2 people authored and hinthornw committed Jun 20, 2024
1 parent d17efe3 commit f71ce8f
Show file tree
Hide file tree
Showing 4 changed files with 325 additions and 45 deletions.
199 changes: 188 additions & 11 deletions libs/community/langchain_community/chat_message_histories/sql.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,22 @@
import asyncio
import contextlib
import json
import logging
from abc import ABC, abstractmethod
from typing import Any, List, Optional
from typing import (
Any,
AsyncGenerator,
Dict,
Generator,
List,
Optional,
Sequence,
Union,
cast,
)

from sqlalchemy import Column, Integer, Text, create_engine
from langchain_core._api import deprecated, warn_deprecated
from sqlalchemy import Column, Integer, Text, delete, select

try:
from sqlalchemy.orm import declarative_base
Expand All @@ -15,7 +28,22 @@
message_to_dict,
messages_from_dict,
)
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
from sqlalchemy.ext.asyncio import (
AsyncEngine,
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from sqlalchemy.orm import (
Session as SQLSession,
)
from sqlalchemy.orm import (
declarative_base,
scoped_session,
sessionmaker,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -80,36 +108,98 @@ def get_sql_model_class(self) -> Any:
return self.model_class


DBConnection = Union[AsyncEngine, Engine, str]

_warned_once_already = False


class SQLChatMessageHistory(BaseChatMessageHistory):
"""Chat message history stored in an SQL database."""

@property
@deprecated("0.2.2", removal="0.3.0", alternative="session_maker")
def Session(self) -> Union[scoped_session, async_sessionmaker]:
return self.session_maker

def __init__(
self,
session_id: str,
connection_string: str,
connection_string: Optional[str] = None,
table_name: str = "message_store",
session_id_field_name: str = "session_id",
custom_message_converter: Optional[BaseMessageConverter] = None,
connection: Union[None, DBConnection] = None,
engine_args: Optional[Dict[str, Any]] = None,
async_mode: Optional[bool] = None, # Use only if connection is a string
):
self.connection_string = connection_string
self.engine = create_engine(connection_string, echo=False)
assert not (
connection_string and connection
), "connection_string and connection are mutually exclusive"
if connection_string:
global _warned_once_already
if not _warned_once_already:
warn_deprecated(
since="0.2.2",
removal="0.3.0",
name="connection_string",
alternative="Use connection instead",
)
_warned_once_already = True
connection = connection_string
self.connection_string = connection_string
if isinstance(connection, str):
self.async_mode = async_mode
if async_mode:
self.async_engine = create_async_engine(
connection, **(engine_args or {})
)
else:
self.engine = create_engine(url=connection, **(engine_args or {}))
elif isinstance(connection, Engine):
self.async_mode = False
self.engine = connection
elif isinstance(connection, AsyncEngine):
self.async_mode = True
self.async_engine = connection
else:
raise ValueError(
"connection should be a connection string or an instance of "
"sqlalchemy.engine.Engine or sqlalchemy.ext.asyncio.engine.AsyncEngine"
)

# To be consistent with others SQL implementations, rename to session_maker
self.session_maker: Union[scoped_session, async_sessionmaker]
if self.async_mode:
self.session_maker = async_sessionmaker(bind=self.async_engine)
else:
self.session_maker = scoped_session(sessionmaker(bind=self.engine))

self.session_id_field_name = session_id_field_name
self.converter = custom_message_converter or DefaultMessageConverter(table_name)
self.sql_model_class = self.converter.get_sql_model_class()
if not hasattr(self.sql_model_class, session_id_field_name):
raise ValueError("SQL model class must have session_id column")
self._create_table_if_not_exists()
self._table_created = False
if not self.async_mode:
self._create_table_if_not_exists()

self.session_id = session_id
self.Session = sessionmaker(self.engine)

def _create_table_if_not_exists(self) -> None:
self.sql_model_class.metadata.create_all(self.engine)
self._table_created = True

async def _acreate_table_if_not_exists(self) -> None:
if not self._table_created:
assert self.async_mode, "This method must be called with async_mode"
async with self.async_engine.begin() as conn:
await conn.run_sync(self.sql_model_class.metadata.create_all)
self._table_created = True

@property
def messages(self) -> List[BaseMessage]: # type: ignore
"""Retrieve all messages from db"""
with self.Session() as session:
with self._make_sync_session() as session:
result = (
session.query(self.sql_model_class)
.where(
Expand All @@ -123,18 +213,105 @@ def messages(self) -> List[BaseMessage]: # type: ignore
messages.append(self.converter.from_sql_model(record))
return messages

def get_messages(self) -> List[BaseMessage]:
return self.messages

async def aget_messages(self) -> List[BaseMessage]:
"""Retrieve all messages from db"""
await self._acreate_table_if_not_exists()
async with self._make_async_session() as session:
stmt = (
select(self.sql_model_class)
.where(
getattr(self.sql_model_class, self.session_id_field_name)
== self.session_id
)
.order_by(self.sql_model_class.id.asc())
)
result = await session.execute(stmt)
messages = []
for record in result.scalars():
messages.append(self.converter.from_sql_model(record))
return messages

def add_message(self, message: BaseMessage) -> None:
"""Append the message to the record in db"""
with self.Session() as session:
with self._make_sync_session() as session:
session.add(self.converter.to_sql_model(message, self.session_id))
session.commit()

async def aadd_message(self, message: BaseMessage) -> None:
"""Add a Message object to the store.
Args:
message: A BaseMessage object to store.
"""
await self._acreate_table_if_not_exists()
async with self._make_async_session() as session:
session.add(self.converter.to_sql_model(message, self.session_id))
await session.commit()

def add_messages(self, messages: Sequence[BaseMessage]) -> None:
# The method RunnableWithMessageHistory._exit_history() call
# add_message method by mistake and not aadd_message.
# See https://github.com/langchain-ai/langchain/issues/22021
if self.async_mode:
loop = asyncio.get_event_loop()
loop.run_until_complete(self.aadd_messages(messages))
else:
with self._make_sync_session() as session:
for message in messages:
session.add(self.converter.to_sql_model(message, self.session_id))
session.commit()

async def aadd_messages(self, messages: Sequence[BaseMessage]) -> None:
# Add all messages in one transaction
await self._acreate_table_if_not_exists()
async with self.session_maker() as session:
for message in messages:
session.add(self.converter.to_sql_model(message, self.session_id))
await session.commit()

def clear(self) -> None:
"""Clear session memory from db"""

with self.Session() as session:
with self._make_sync_session() as session:
session.query(self.sql_model_class).filter(
getattr(self.sql_model_class, self.session_id_field_name)
== self.session_id
).delete()
session.commit()

async def aclear(self) -> None:
"""Clear session memory from db"""

await self._acreate_table_if_not_exists()
async with self._make_async_session() as session:
stmt = delete(self.sql_model_class).filter(
getattr(self.sql_model_class, self.session_id_field_name)
== self.session_id
)
await session.execute(stmt)
await session.commit()

@contextlib.contextmanager
def _make_sync_session(self) -> Generator[SQLSession, None, None]:
"""Make an async session."""
if self.async_mode:
raise ValueError(
"Attempting to use a sync method in when async mode is turned on. "
"Please use the corresponding async method instead."
)
with self.session_maker() as session:
yield cast(SQLSession, session)

@contextlib.asynccontextmanager
async def _make_async_session(self) -> AsyncGenerator[AsyncSession, None]:
"""Make an async session."""
if not self.async_mode:
raise ValueError(
"Attempting to use an async method in when sync mode is turned on. "
"Please use the corresponding async method instead."
)
async with self.session_maker() as session:
yield cast(AsyncSession, session)
21 changes: 8 additions & 13 deletions libs/community/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions libs/community/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ extended_testing = [
"pyjwt",
"oracledb",
"simsimd",
"aiosqlite"
]

[tool.ruff]
Expand Down
Loading

0 comments on commit f71ce8f

Please sign in to comment.