Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

It's impossible to use and **async** ChatMessageHistory with langchain-core. #22021

Closed
5 tasks done
pprados opened this issue May 22, 2024 · 7 comments
Closed
5 tasks done
Labels
🤖:bug Related to a bug, vulnerability, unexpected error with an existing feature 🔌: postgres Related to postgres integrations

Comments

@pprados
Copy link
Contributor

pprados commented May 22, 2024

Checked other resources

  • I added a very descriptive title to this issue.
  • I searched the LangChain documentation with the integrated search.
  • I used the GitHub search to find a similar question and didn't find it.
  • I am sure that this is a bug in LangChain rather than my code.
  • The bug is not resolved by updating to the latest stable version of LangChain (or the specific integration package).

Example Code

import asyncio
import uuid
from pprint import pprint

import psycopg
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_openai import ChatOpenAI

from langchain_postgres import PostgresChatMessageHistory

model = ChatOpenAI()
prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You're an assistant who's good at {ability}. Respond in 20 words or fewer",
        ),
        MessagesPlaceholder(variable_name="history"),
        ("human", "{input}"),
    ]
)
runnable = prompt | model

table_name = "chat_history"

async_connection = None


async def init_async_connection():
    global async_connection
    async_connection = await psycopg.AsyncConnection.connect(
        user="postgres",
        password="password_postgres",
        host="localhost",
        port=5432)


def aget_session_history(session_id: str) -> BaseChatMessageHistory:
    return PostgresChatMessageHistory(
        table_name,
        session_id,
        async_connection=async_connection
    )


awith_message_history = RunnableWithMessageHistory(
    runnable,
    aget_session_history,
    input_messages_key="input",
    history_messages_key="history",
)


async def amain():
    await init_async_connection()
    result = await awith_message_history.ainvoke(
        {"ability": "math", "input": "What does cosine mean?"},
        config={"configurable": {"session_id": str(uuid.uuid4())}},
    )
    pprint(result)


asyncio.run(amain())

Error Message and Stack Trace (if applicable)

Error in RootListenersTracer.on_chain_end callback: ValueError('Please initialize the PostgresChatMessageHistory with a sync connection or use the aadd_messages method instead.')

Description

It's impossible to use and async ChatMessageHistory with langchain-core.

The ChatMessageHistory class is synchronous and doesn't have an async counterpart.
This is a problem because the RunnableWithMessageHistory class requires a ChatMessageHistory object to be passed to it. This means that it's impossible to use an async ChatMessageHistory with langchain-core.

I can't find any example of how to use it. I will try to create an example of how to use PostgresChatMessageHistory with async mode.

There are many problems:

  • Bug in _exit_history()
  • Bugs in PostgresChatMessageHistory and sync usage
  • Bugs in PostgresChatMessageHistory and async usage

Bug in _exit_history()

In RunnableWithMessageHistory, the _exit_history() is called because the chain has | runnable.with_listeners(on_end=self._exit_history). This method is not async and it will raise an error. This method call add_messages() and not await aadd_messages().

import asyncio
import uuid
from pprint import pprint

import psycopg
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_openai import ChatOpenAI

from langchain_postgres import PostgresChatMessageHistory

model = ChatOpenAI()
prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You're an assistant who's good at {ability}. Respond in 20 words or fewer",
        ),
        MessagesPlaceholder(variable_name="history"),
        ("human", "{input}"),
    ]
)
runnable = prompt | model

table_name = "chat_history"

async_connection = None


async def init_async_connection():
    global async_connection
    async_connection = await psycopg.AsyncConnection.connect(
        user="postgres",
        password="password_postgres",
        host="localhost",
        port=5432)


def aget_session_history(session_id: str) -> BaseChatMessageHistory:
    return PostgresChatMessageHistory(
        table_name,
        session_id,
        async_connection=async_connection
    )


awith_message_history = RunnableWithMessageHistory(
    runnable,
    aget_session_history,
    input_messages_key="input",
    history_messages_key="history",
)


async def amain():
    await init_async_connection()  # Glups ! It's not a global initialization
    result = await awith_message_history.ainvoke(
        {"ability": "math", "input": "What does cosine mean?"},
        config={"configurable": {"session_id": str(uuid.uuid4())}},
    )
    pprint(result)


asyncio.run(amain())

Result

Error in RootListenersTracer.on_chain_end callback: ValueError('Please initialize the PostgresChatMessageHistory with a sync connection or use the aadd_messages method instead.')
AIMessage(content='Cosine is a trigonometric function that represents the ratio of the adjacent side to the hypotenuse in a right triangle.', response_metadata={'token_usage': {'completion_tokens': 26, 'prompt_tokens': 33, 'total_tokens': 59}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None}, id='run-a00fc81a-1844-4a47-98fa-7a30d6e51228-0')

Bugs in PostgresChatMessageHistory and sync usage

In PostgresChatMessageHistory, the design is problematic.
Langchain, with LCEL, is declarative programming. You have to declare a chain in global variables, then invoke them when necessary. This is how langserv is able to publish interfaces with add_route().

For optimization reasons, PostgresChatMessageHistory wishes to recycle connections. The class provides a constructor which accepts a sync_connection parameter. However, it is not possible to have a global connection, in order to reuse it when implementing get_session_history().

sync_connection = psycopg.connect(  # ERROR: A connection is not reentrant!
    user="postgres",
    password="password_postgres",
    host="localhost",
    port=5432)


def get_session_history(session_id: str) -> BaseChatMessageHistory:
    return PostgresChatMessageHistory(
        table_name,
        session_id,
        sync_connection=sync_connection
    )

A connection is not reentrant! You can't use the same connection in multiple threads. But, the design of langchain-postgres is to have a global connection. This is a problem.

The alternative is to create a new connection each time you need to access the database.

def get_session_history(session_id: str) -> BaseChatMessageHistory:
    sync_connection = psycopg.connect(  # ERROR: A connection is not reentrant!
        user="postgres",
        password="password_postgres",
        host="localhost",
        port=5432)
    return PostgresChatMessageHistory(
        table_name,
        session_id,
        sync_connection=sync_connection
    )

Then, why accept only a connection and not an engine? The engine is a connection pool.

Bugs in PostgresChatMessageHistory and async usage

If we ignore the problem mentioned above with _exit_history(), there are even more difficulties. It's not easy to initialize a global async connection. Because it's must be initialized in an async function.

async_connection = None

async def init_async_connection(): # Where call this function?
    global async_connection
    async_connection = await psycopg.AsyncConnection.connect(
        user="postgres",
        password="password_postgres",
        host="localhost",
        port=5432)

And, it's not possible to call init_async_connection() in get_session_history(). get_session_history() is not async. It's a problem.

def get_session_history(session_id: str) -> BaseChatMessageHistory:
    async_connection = await psycopg.AsyncConnection.connect( # ERROR: 'await' outside async function
        user="postgres",
        password="password_postgres",
        host="localhost",
        port=5432)
    return PostgresChatMessageHistory(
        table_name,
        session_id,
        async_connection=async_connection
    )

It is therefore currently impossible to implement session history correctly in asynchronous mode.
Either you use a global connection, but that's not possible, or you open the connection in ̀get_session_history()`, but that's impossible.

The only solution is to completely break the use of LCEL, by building the chain just after the connection is opened. It's still very strange. To publish it with langserv, you need to use a RunnableLambda.

async def async_lambda_history(input:Dict[str,Any],config:Dict[str,Any]):
    async_connection = await psycopg.AsyncConnection.connect(
        user="postgres",
        password="password_postgres",
        host="localhost",
        port=5432)

    def _get_session_history(session_id: str) -> BaseChatMessageHistory:
        return PostgresChatMessageHistory(
            table_name,
            session_id,
            async_connection=async_connection
        )

    awith_message_history = RunnableWithMessageHistory(
        runnable,
        _get_session_history,
        input_messages_key="input",
        history_messages_key="history",
    )
    result = await awith_message_history.ainvoke(
        input,
        config=config,
    )
    pprint(result)

def nope():
    pass

lambda_chain=RunnableLambda(func=nope,afunc=async_lambda_history)

async def lambda_amain():
    result = await lambda_chain.ainvoke(
        {"ability": "math", "input": "What does cosine mean?"},
        config={"configurable": {"session_id": str(uuid.uuid4())}},
    )
    pprint(result)

asyncio.run(lambda_amain())

It's a very strange way to use langchain.

But a good use of langchain in a website consists precisely in using only asynchronous approaches. This must include history management.

System Info

langchain==0.1.20
langchain-community==0.0.38
langchain-core==0.2.1
langchain-openai==0.1.7
langchain_postgres==0.0.6
langchain-rag==0.1.46
langchain-text-splitters==0.0.1

@dosubot dosubot bot added 🔌: postgres Related to postgres integrations 🤖:bug Related to a bug, vulnerability, unexpected error with an existing feature labels May 22, 2024
@pprados
Copy link
Contributor Author

pprados commented May 22, 2024

The solution is to use

SQLChatMessageHistory( 
                session_id=session_id,
                connection_string=history_url, # Open a new connection each time
            )
``̀

@eyurtsev
Copy link
Collaborator

@pprados

The solution is to use

SQLChatMessageHistory( 
                session_id=session_id,
                connection_string=history_url, # Open a new connection each time
            )
``̀

I assume this is a work-around around one of the issues? I think you identified a number of separate issues that need to be addressed? Is that correct?

@pprados
Copy link
Contributor Author

pprados commented May 22, 2024

Yes. This is a work-around around one of the issues in PostgresChatMessageHistory() (see here.
For the problem with _exit_history(), is here, for each async chat message history implementation, with a check of usage.

@pprados
Copy link
Contributor Author

pprados commented May 27, 2024

@eyurtsev

I'm trying to re-implement the SQLChatMessageHistory class, to really use the asynchronous approach, and there's the same problem. The add_message() method is invoked by RunnableWithMessageHistory._exit_history().

An implementation workaround is to use this:

    def add_messages(self, messages: Sequence[BaseMessage]) -> None:
        # The methode RunnableWithMessageHistory._exit_history() call
        #  add_message method by mistake and not aadd_message.
        # See https://github.com/langchain-ai/langchain/discussions/22022
        if self.async_mode:
            loop = asyncio.get_event_loop()
            loop.run_until_complete(self.aadd_message(smessages))
        else:
            with self.session_maker() as session:
                for message in messages:
                    session.add(self.converter.to_sql_model(message, self.session_id))
                session.commit()

I'll propose a PR for this, improving the interface for accepting an (Async)Engine, in addition to the connection string. That way, it will be possible to use the connection pool.

I'm going to use an approach similar to the PR I'm proposing to finally make langchain use resilient.

@baskaryan I'd like you to take the time to validate this PR, which prohibits me from publishing an article explaining how to make langchain use resilient.
This PR is just the beginning of the story
Without it, it's simply not possible in the current state of implementation. There are situations where a crash at the wrong time completely breaks applications (During the import of documents into the vector database, during the saving of the history which is done message by message, and not the entire conversation in a single transaction, etc.).

@waadarsh
Copy link

waadarsh commented Jun 1, 2024

is this resolved?

hidayattaufiqur added a commit to hidayattaufiqur/NFO that referenced this issue Jun 5, 2024
this bug is non-existent on local dev, but on prod it does. this is a newly-known issue on Langchain GitHub which can be accessed here: [langchain-ai/langchain#22021]
eyurtsev added a commit that referenced this issue Jun 5, 2024
…22065)

# package community: Fix SQLChatMessageHistory

## Description
Here is a rewrite of `SQLChatMessageHistory` to properly implement the
asynchronous approach. The code circumvents [issue
22021](#22021) by
accepting a synchronous call to `def add_messages()` in an asynchronous
scenario. This bypasses the bug.

For the same reasons as in [PR
22](langchain-ai/langchain-postgres#32) of
`langchain-postgres`, we use a lazy strategy for table creation. Indeed,
the promise of the constructor cannot be fulfilled without this. It is
not possible to invoke a synchronous call in a constructor. We
compensate for this by waiting for the next asynchronous method call to
create the table.

The goal of the `PostgresChatMessageHistory` class (in
`langchain-postgres`) is, among other things, to be able to recycle
database connections. The implementation of the class is problematic, as
we have demonstrated in [issue
22021](#22021).

Our new implementation of `SQLChatMessageHistory` achieves this by using
a singleton of type (`Async`)`Engine` for the database connection. The
connection pool is managed by this singleton, and the code is then
reentrant.

We also accept the type `str` (optionally complemented by `async_mode`.
I know you don't like this much, but it's the only way to allow an
asynchronous connection string).

In order to unify the different classes handling database connections,
we have renamed `connection_string` to `connection`, and `Session` to
`session_maker`.

Now, a single transaction is used to add a list of messages. Thus, a
crash during this write operation will not leave the database in an
unstable state with a partially added message list. This makes the code
resilient.

We believe that the `PostgresChatMessageHistory` class is no longer
necessary and can be replaced by:
```
PostgresChatMessageHistory = SQLChatMessageHistory
```
This also fixes the bug.


## Issue
- [issue 22021](#22021)
  - Bug in _exit_history()
  - Bugs in PostgresChatMessageHistory and sync usage
  - Bugs in PostgresChatMessageHistory and async usage
- [issue
36](langchain-ai/langchain-postgres#36)
 ## Twitter handle:
pprados

## Tests
- libs/community/tests/unit_tests/chat_message_histories/test_sql.py
(add async test)

@baskaryan, @eyurtsev or @hwchase17 can you check this PR ?
And, I've been waiting a long time for validation from other PRs. Can
you take a look?
- [PR 32](langchain-ai/langchain-postgres#32)
- [PR 15575](#15575)
- [PR 13200](#13200)

---------

Co-authored-by: Eugene Yurtsev <eyurtsev@gmail.com>
@pprados
Copy link
Contributor Author

pprados commented Jun 7, 2024

My PR fixed this bug.

@pprados pprados closed this as completed Jun 7, 2024
hinthornw pushed a commit that referenced this issue Jun 20, 2024
…22065)

# package community: Fix SQLChatMessageHistory

## Description
Here is a rewrite of `SQLChatMessageHistory` to properly implement the
asynchronous approach. The code circumvents [issue
22021](#22021) by
accepting a synchronous call to `def add_messages()` in an asynchronous
scenario. This bypasses the bug.

For the same reasons as in [PR
22](langchain-ai/langchain-postgres#32) of
`langchain-postgres`, we use a lazy strategy for table creation. Indeed,
the promise of the constructor cannot be fulfilled without this. It is
not possible to invoke a synchronous call in a constructor. We
compensate for this by waiting for the next asynchronous method call to
create the table.

The goal of the `PostgresChatMessageHistory` class (in
`langchain-postgres`) is, among other things, to be able to recycle
database connections. The implementation of the class is problematic, as
we have demonstrated in [issue
22021](#22021).

Our new implementation of `SQLChatMessageHistory` achieves this by using
a singleton of type (`Async`)`Engine` for the database connection. The
connection pool is managed by this singleton, and the code is then
reentrant.

We also accept the type `str` (optionally complemented by `async_mode`.
I know you don't like this much, but it's the only way to allow an
asynchronous connection string).

In order to unify the different classes handling database connections,
we have renamed `connection_string` to `connection`, and `Session` to
`session_maker`.

Now, a single transaction is used to add a list of messages. Thus, a
crash during this write operation will not leave the database in an
unstable state with a partially added message list. This makes the code
resilient.

We believe that the `PostgresChatMessageHistory` class is no longer
necessary and can be replaced by:
```
PostgresChatMessageHistory = SQLChatMessageHistory
```
This also fixes the bug.


## Issue
- [issue 22021](#22021)
  - Bug in _exit_history()
  - Bugs in PostgresChatMessageHistory and sync usage
  - Bugs in PostgresChatMessageHistory and async usage
- [issue
36](langchain-ai/langchain-postgres#36)
 ## Twitter handle:
pprados

## Tests
- libs/community/tests/unit_tests/chat_message_histories/test_sql.py
(add async test)

@baskaryan, @eyurtsev or @hwchase17 can you check this PR ?
And, I've been waiting a long time for validation from other PRs. Can
you take a look?
- [PR 32](langchain-ai/langchain-postgres#32)
- [PR 15575](#15575)
- [PR 13200](#13200)

---------

Co-authored-by: Eugene Yurtsev <eyurtsev@gmail.com>
eyurtsev pushed a commit that referenced this issue Jun 21, 2024
…22933)

- **Description:** When use
RunnableWithMessageHistory/SQLChatMessageHistory in async mode, we'll
get the following error:
```
Error in RootListenersTracer.on_chain_end callback: RuntimeError("There is no current event loop in thread 'asyncio_3'.")
```
which throwed by
https://github.com/langchain-ai/langchain/blob/ddfbca38dfa22954eaeda38614c6e1ec0cdecaa9/libs/community/langchain_community/chat_message_histories/sql.py#L259.
and no message history will be add to database.

In this patch, a new _aexit_history function which will'be called in
async mode is added, and in turn aadd_messages will be called.

In this patch, we use `afunc` attribute of a Runnable to check if the
end listener should be run in async mode or not.

  - **Issue:** #22021, #22022 
  - **Dependencies:** N/A
@alew3
Copy link

alew3 commented Jul 1, 2024

await init_async_connection()

@pprados do you have sample code of how to use it correctly with async with your fix?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
🤖:bug Related to a bug, vulnerability, unexpected error with an existing feature 🔌: postgres Related to postgres integrations
Projects
None yet
Development

No branches or pull requests

4 participants