Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core[patch]: fix no current event loop for sql history in async mode #22933

Merged
merged 1 commit into from
Jun 21, 2024

Conversation

mackong
Copy link
Contributor

@mackong mackong commented Jun 15, 2024

  • Description: When use RunnableWithMessageHistory/SQLChatMessageHistory in async mode, we'll get the following error:
Error in RootListenersTracer.on_chain_end callback: RuntimeError("There is no current event loop in thread 'asyncio_3'.")

which throwed by

. and no message history will be add to database.

In this patch, a new _aexit_history function which will'be called in async mode is added, and in turn aadd_messages will be called.

In this patch, we use afunc attribute of a Runnable to check if the end listener should be run in async mode or not.

Copy link

vercel bot commented Jun 15, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Ignored Deployment
Name Status Preview Comments Updated (UTC)
langchain ⬜️ Ignored (Inspect) Visit Preview Jun 21, 2024 1:57pm

@dosubot dosubot bot added size:M This PR changes 30-99 lines, ignoring generated files. Ɑ: Runnables Related to Runnables 🤖:bug Related to a bug, vulnerability, unexpected error with an existing feature labels Jun 15, 2024
@mackong
Copy link
Contributor Author

mackong commented Jun 15, 2024

@pprados Please help to review this PR.

@mackong mackong force-pushed the fix-async-history branch 4 times, most recently from 1be040f to 657e73b Compare June 17, 2024 09:21
@hwchase17
Copy link
Contributor

the issue you linked to is closed. can you share the code you are running to reproduce this error?

@mackong
Copy link
Contributor Author

mackong commented Jun 18, 2024

@hwchase17 Here is a sample langserve code to reproduce this error.

Requirements

langchain-openai==0.1.8
langserve[server]==0.2.2
langchain==0.2.3
langchain-community==0.2.4
pydantic==1.10.13
asyncmy==0.2.9

LangServe server side code

from typing import AsyncIterator, Any

from fastapi import APIRouter, FastAPI
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_community.chat_message_histories.sql import SQLChatMessageHistory
from langchain_core.runnables import Runnable
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_core.runnables.utils import ConfigurableFieldSpec
from langchain_core.tools import Tool
from langchain_openai import AzureChatOpenAI
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain.prompts import (
    ChatPromptTemplate, MessagesPlaceholder, HumanMessagePromptTemplate,
    SystemMessagePromptTemplate
)
from langchain.schema.runnable import RunnableLambda
from langserve import add_routes
from langserve.pydantic_v1 import BaseModel, Field
from sqlalchemy.ext.asyncio import create_async_engine


# Config
sql_server_url = "mysql+asyncmy://dbuser:dbpass@localhost:3306/history"

azure_endpoint = "fixme"
azure_deployment = "fixme"
openai_api_version = "fixme"
openai_api_key = "fixme"


# Model
class LLMInput(BaseModel):
    question: str = Field(
        ...,
        description="user question",
    )
    system_prompt: str = Field(
        default="You are a helpful AI assistant",
        description="system prompt",
    )


# History Stuff
async_sql_engine = create_async_engine(sql_server_url, pool_recycle=600)

history_factory_config = [
    ConfigurableFieldSpec(
        id="user_id",
        annotation=str,
        name="User ID",
        description="Unique identifier for the user.",
        default="",
        is_shared=True,
    ),
    ConfigurableFieldSpec(
        id="conversation_id",
        annotation=str,
        name="Conversation ID",
        description="Unique identifier for the conversation.",
        default="",
        is_shared=True,
    ),
]

def get_session_history(
    user_id: str,
    conversation_id: str,
) -> BaseChatMessageHistory:
    """Get a chat history from a session ID."""
    session_id = f"{user_id}_{conversation_id}"

    return SQLChatMessageHistory(
        session_id=session_id,
        connection=async_sql_engine,
    )


# Tool Stuff
def search(query: str, **kwargs) -> str:
    return f"{query} found"


class SearchToolInput(BaseModel):
    """Input for music tool."""
    query: str = Field(description="keyword for search")


search_tool = Tool(
    name="web_search",
    description="web_search is a tool to search online information.",
    func=search,
    args_schema=SearchToolInput,
)


def build_llm_chain(llm_input: LLMInput) -> Runnable:
    llm = AzureChatOpenAI(
        azure_endpoint=azure_endpoint,
        azure_deployment=azure_deployment,
        openai_api_version=openai_api_version,
        openai_api_key=openai_api_key,
        streaming=True,
        temperature=0.01,
        max_tokens=4000,
    )
    tools = [search_tool]
    prompt = ChatPromptTemplate.from_messages([
        SystemMessagePromptTemplate.from_template(llm_input.system_prompt),
        MessagesPlaceholder(variable_name="history"),
        HumanMessagePromptTemplate.from_template("{question}"),
        MessagesPlaceholder(variable_name="agent_scratchpad"),
    ])
    agent = create_tool_calling_agent(
        llm, tools, prompt,
    )
    return AgentExecutor(
        agent=agent,
        tools=tools,
    )


def build_chain(llm_input: LLMInput) -> Runnable:
    llm_chain = build_llm_chain(llm_input)

    return RunnableWithMessageHistory(
        llm_chain,
        get_session_history,
        input_messages_key="question",
        history_messages_key="history",
        history_factory_config=history_factory_config
    ).with_types(
        input_type=LLMInput
    )

async def llm_main(_input: dict[str, Any]) -> AsyncIterator[Any]:
    _input["question"] = _input["question"].lstrip("\n").rstrip()
    _input["system_prompt"] = _input["system_prompt"] or "You are a helpful AI assistant"

    llm_input: LLMInput = LLMInput.parse_obj(_input)

    chain = build_chain(llm_input)
    config = {
        "configurable": {
            "user_id": "user1",
            "conversation_id": "conv1",
        }
    }

    async for event in chain.astream_events(_input, config, version="v1"):
        yield event


router = APIRouter()

add_routes(
    router,
    RunnableLambda(llm_main).with_types(input_type=LLMInput),
    enabled_endpoints=["invoke", "batch", "stream"]
)

app = FastAPI(
    title="Test Server",
    version="0.01",
    description="Test Server Powered by LangChain&LLM",
)
app.include_router(router)

Save it as async_agent.py, and change configurations about sql server and azure openai to your own at the beginning of the file , then run like

uvicorn async_agent:app --host 0.0.0.0

LangServe client side code

from langserve import RemoteRunnable

client = RemoteRunnable('http://localhost:8000/')
for chunk in yuan.stream({'question': "What's langchain"}):
    print(chunk)

Save it as agent_client.py, and run like

python agent_client.py

Result

On the client side, you will see many output about the event, and final answer about the question is got.
But on the server side, you will see logs like

INFO:     Will watch for changes in these directories: ['/home/mackong/Codes/playground/llm']
INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)
INFO:     Started reloader process [828125] using WatchFiles
INFO:     Started server process [828140]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     127.0.0.1:49590 - "POST /stream HTTP/1.1" 200 OK
Parent run 8e230381-f1bb-42ab-835b-635851ef9bba not found for run 5959e910-f062-4229-9839-1d3de2f22007. Treating as a root run.
Error in RootListenersTracer.on_chain_end callback: RuntimeError("There is no current event loop in thread 'ThreadPoolExecutor-0_0'.")

where the error is throwed by

and no message history will be add to database.

@pprados
Copy link
Contributor

pprados commented Jun 18, 2024

@hwchase17, @mackong
I review this code. It's correct for me.
This fixes the bug I reported here.

With this PR, it's possible to remove the hack I mentioned here in SQLChatMessageHistory.

    def add_messages(self, messages: Sequence[BaseMessage]) -> None:
        # The method RunnableWithMessageHistory._exit_history() call
        #  add_message method by mistake and not aadd_message.
        # See https://github.com/langchain-ai/langchain/issues/22021
        if self.async_mode:
            loop = asyncio.get_event_loop()
            loop.run_until_complete(self.aadd_messages(messages))
        else:
          ...

You can remove this hack.

@mackong mackong force-pushed the fix-async-history branch 2 times, most recently from eeee215 to b1c2f85 Compare June 18, 2024 09:00
@mackong
Copy link
Contributor Author

mackong commented Jun 18, 2024

@hwchase17, @mackong I review this code. It's correct for me. This fixes the bug I reported here.

With this PR, it's possible to remove the hack I mentioned here in SQLChatMessageHistory.

    def add_messages(self, messages: Sequence[BaseMessage]) -> None:
        # The method RunnableWithMessageHistory._exit_history() call
        #  add_message method by mistake and not aadd_message.
        # See https://github.com/langchain-ai/langchain/issues/22021
        if self.async_mode:
            loop = asyncio.get_event_loop()
            loop.run_until_complete(self.aadd_messages(messages))
        else:
          ...

You can remove this hack.

OK, removed.

@pprados
Copy link
Contributor

pprados commented Jun 18, 2024

@mackong, Some checks were not successful

@ccurme ccurme added community Related to langchain-community and removed community Related to langchain-community labels Jun 18, 2024
@mackong
Copy link
Contributor Author

mackong commented Jun 19, 2024

@pprados already passed

@ccurme ccurme added the Ɑ: core Related to langchain-core label Jun 19, 2024
Copy link
Collaborator

@eyurtsev eyurtsev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks overall reasonable to me. It'll generate a trace that's a bit uglier than before, but I think more important to fix the async path

@@ -483,6 +499,23 @@ def _exit_history(self, run: Run, config: RunnableConfig) -> None:
output_messages = self._get_output_messages(output_val)
hist.add_messages(input_messages + output_messages)

async def _aexit_history(self, run: Run, config: RunnableConfig) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mackong would you mind unit testing code to cover the async path?

Copy link
Contributor Author

@mackong mackong Jun 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eyurtsev unit testing added, but there is a unit will be failed caused by a unrelated issue. see https://github.com/langchain-ai/langchain/actions/runs/9594957636/job/26458682186?pr=22933#step:6:164

Now AsyncRootListenersTracer's schema format is original, so on_chat_model_start will fallback to on_llm_start, then type of Run's input will be str not BaseMessage, then it will be ignored by ChatMessageHistory's add_message.

if not isinstance(message, BaseMessage):
raise ValueError
self.messages.append(message)

I have create a PR #23214 which fix the issue, please review #23214 first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eyurtsev now added unit tests passed

@eyurtsev eyurtsev self-assigned this Jun 19, 2024
@dosubot dosubot bot added size:L This PR changes 100-499 lines, ignoring generated files. and removed size:M This PR changes 30-99 lines, ignoring generated files. labels Jun 20, 2024
@dosubot dosubot bot added the lgtm PR looks good. Use to confirm that a PR is ready for merging. label Jun 21, 2024
@eyurtsev eyurtsev merged commit 360a70c into langchain-ai:master Jun 21, 2024
134 checks passed
@mackong mackong deleted the fix-async-history branch June 23, 2024 09:02
lalanikarim pushed a commit to lalanikarim/langchain that referenced this pull request Jun 23, 2024
…angchain-ai#22933)

- **Description:** When use
RunnableWithMessageHistory/SQLChatMessageHistory in async mode, we'll
get the following error:
```
Error in RootListenersTracer.on_chain_end callback: RuntimeError("There is no current event loop in thread 'asyncio_3'.")
```
which throwed by
https://github.com/langchain-ai/langchain/blob/ddfbca38dfa22954eaeda38614c6e1ec0cdecaa9/libs/community/langchain_community/chat_message_histories/sql.py#L259.
and no message history will be add to database.

In this patch, a new _aexit_history function which will'be called in
async mode is added, and in turn aadd_messages will be called.

In this patch, we use `afunc` attribute of a Runnable to check if the
end listener should be run in async mode or not.

  - **Issue:** langchain-ai#22021, langchain-ai#22022 
  - **Dependencies:** N/A
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
🤖:bug Related to a bug, vulnerability, unexpected error with an existing feature Ɑ: core Related to langchain-core lgtm PR looks good. Use to confirm that a PR is ready for merging. Ɑ: Runnables Related to Runnables size:L This PR changes 100-499 lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants