Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

astream_events (V1 and V2) gives duplicate content in on_chat_model_stream #22227

Closed
5 tasks done
Sanzid88888 opened this issue May 28, 2024 · 6 comments
Closed
5 tasks done
Assignees
Labels
01 bug Confirmed bug Ɑ: core Related to langchain-core investigate streaming

Comments

@Sanzid88888
Copy link

Sanzid88888 commented May 28, 2024

Checked other resources

  • I added a very descriptive title to this issue.
  • I searched the LangChain documentation with the integrated search.
  • I used the GitHub search to find a similar question and didn't find it.
  • I am sure that this is a bug in LangChain rather than my code.
  • The bug is not resolved by updating to the latest stable version of LangChain (or the specific integration package).

Example Code

import asyncio
import random

from langchain import hub
from langchain.agents import create_openai_tools_agent, AgentExecutor
from langchain_core.callbacks import Callbacks
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI

model_langchain = ChatOpenAI(temperature=0, streaming=True,
                             openai_api_key="sk-proj-c2xxxx")


@tool
async def where_cat_is_hiding() -> str:
    """Where is the cat hiding right now?"""
    return random.choice(["under the bed", "on the shelf"])


chunks = []


@tool
async def get_items(place: str, callbacks: Callbacks):  # <--- Accept callbacks
    """Use this tool to look up which items are in the given place."""
    template = ChatPromptTemplate.from_messages(
        [
            (
                "human",
                "Can you tell me what kind of items i might find in the following place: '{place}'. "
                "List at least 3 such items separating them by a comma. And include a brief description of each item..",
            )
        ]
    )
    chain = template | model_langchain.with_config(
        {
            "run_name": "Get Items LLM",
            "tags": ["tool_llm"],
            "callbacks": callbacks,  # <-- Propagate callbacks

        }
    )
    r = await chain.ainvoke({"place": place})
    return r


prompt = hub.pull("hwchase17/openai-tools-agent")
tools = [get_items, where_cat_is_hiding]
agent = create_openai_tools_agent(
    model_langchain.with_config({"tags": ["agent_llm"]}), tools, prompt
)
agent_executor = AgentExecutor(agent=agent, tools=tools).with_config(
    {"run_name": "Agent"}
)


async def async_test_langchain():
    async for event in agent_executor.astream_events(
            {"input": "where is the cat hiding? what items are in that location?"},
            version="v1",
    ):
        kind = event["event"]
        if kind == "on_chat_model_stream":
            content = event["data"]["chunk"].content
            if content:
                # Empty content in the context of OpenAI means
                # that the model is asking for a tool to be invoked.
                # So we only print non-empty content
                print(content, end="|")


if __name__ == "__main__":
    asyncio.run(async_test_langchain())

Error Message and Stack Trace (if applicable)

1|1|.|.| Books| Books| -| -| On| On| the| the| shelf| shelf|,|,| you| you| may| may| find| find| a| a| variety| variety| of| of| books| books| ranging| ranging| from| from| fiction| fiction| to| to| non| non|-fiction|-fiction|,|,| covering| covering| different| different| genres| genres| and| and| topics| topics|.|.| Books| Books| are| are| typically| typically| arranged| arranged| in| in| a| a| neat| neat| and| and| organized| organized| manner| manner| for| for| easy| easy| browsing| browsing|.

|.

|2|2|.|.| Photo| Photo| frames| frames| -| -| Photo| Photo| frames| frames| are| are| commonly| commonly| placed| placed| on| on| shelves| shelves| to| to| display| display| cherished| cherished| memories| memories| and| and| moments| moments| captured| captured| in| in| photographs| photographs|.|.| They| They| come| come| in| in| various| various| sizes| sizes|,|,| shapes| shapes|,|,| and| and| designs| designs| to| to| complement| complement| the| the| decor| decor| of| of| the| the| room| room|.

|.

|3|3|.|.| Decor| Decor|ative|ative| figur| figur|ines|ines| -| -| Decor| Decor|ative|ative| figur| figur|ines|ines| such| such| as| as| sculptures| sculptures|,|,| v| v|ases|ases|,|,| or| or| small| small| statues| statues| are| are| often| often| placed| placed| on| on| shelves| shelves| to| to| add| add| a| a| touch| touch| of| of| personality| personality| and| and| style| style| to| to| the| the| space| space|.|.| These| These| items| items| can| can| be| be| made| made| of| of| different| different| materials| materials| like| like| ceramic| ceramic|,|,| glass| glass|,|,| or| or| metal| metal|.|.
Screenshot 2024-05-28 at 3 19 45 PM

Description

astream_events gives duplicate content in on_chat_model_stream.

1|1|.|.| Books| Books| -| -| On| On| the| the| shelf| shelf|,|,| you| you| may| may| find| find| a| a| variety| variety| of| of| books| books| ranging| ranging| from| from| fiction| fiction| to| to| non| non|-fiction|-fiction|,|,| covering| covering| different| different| genres| genres| and| and| topics| topics|.|.| Books| Books| are| are| typically| typically| arranged| arranged| in| in| a| a| neat| neat| and| and| organized| organized| manner| manner| for| for| easy| easy| browsing| browsing|.

Here Books| Books| On| On| getting twice in on_chat_model_stream content

Tried V2 same result as duplicate

I used examples from astream_events :

https://python.langchain.com/v0.1/docs/modules/agents/how_to/streaming/

@hwchase17 @leo-gan

System Info

langchain==0.2.1
langchain-community==0.2.1
langchain-core==0.2.1
langchain-google-genai==1.0.5
langchain-openai==0.1.7
langchain-text-splitters==0.2.0
langchainhub==0.1.15
Platform : Mac OS (Sonioma:14.4) , M1
Python 3.11.6

@dosubot dosubot bot added Ɑ: agent Related to agents module 🔌: openai Primarily related to OpenAI integrations 🤖:bug Related to a bug, vulnerability, unexpected error with an existing feature labels May 28, 2024
@Sanzid88888
Copy link
Author

Sanzid88888 commented May 29, 2024

@eyurtsev or @jacoblee93 can you please look into this example?

@Sanzid88888 Sanzid88888 changed the title astream_events gives duplicate content in on_chat_model_stream astream_events (V1 and V2) gives duplicate content in on_chat_model_stream May 30, 2024
@eyurtsev eyurtsev self-assigned this Jun 4, 2024
@eyurtsev eyurtsev added investigate streaming and removed 🤖:bug Related to a bug, vulnerability, unexpected error with an existing feature labels Jun 4, 2024
@eyurtsev
Copy link
Collaborator

eyurtsev commented Jun 4, 2024

@Sanzid88888 you pasted your API Key for openai when you included the example. I redacted it with the edit, but please login into openai and disable it. Assume it's been publicly leaked now!

@eyurtsev
Copy link
Collaborator

eyurtsev commented Jun 4, 2024

MRE:

from langchain_core.callbacks import Callbacks
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

model = ChatOpenAI(temperature=0)


@tool
async def get_items(place: str, callbacks):  # <--- Accept callbacks
    """Use this tool to look up which items are in the given place."""
    template = ChatPromptTemplate.from_messages(
        [
            (
                "human",
                "Can you tell me what kind of items i might find in the following place: '{place}'. "
                "List at least 3 such items separating them by a comma. And include a brief description of each item..",
            )
        ]
    )
    chain = template | model.with_config(
        {
            "run_name": "Get Items LLM",
            "tags": ["tool_llm"],
            "callbacks": callbacks,  # <-- Propagate callbacks

        }
    )
    r = await chain.ainvoke({"place": place})
    return r

async for event in  get_items.astream_events('hello', version='v1'):
    if event['event'] == "on_chat_model_stream":
        content = event["data"]["chunk"].content
        print(content)

Produces tokens like

I
I
'm
'm
 sorry
 sorry
,
,
 but
 but
 "
 "
hello
hello
"
"

@eyurtsev
Copy link
Collaborator

eyurtsev commented Jun 4, 2024

@Sanzid88888 Looks like issue is with some magic that we do behind the scenes to propagate callbacks on behalf of the user.

Remove the explicit callback passing if you're on python >=3.11

@tool
async def get_items(place: str):
    """Use this tool to look up which items are in the given place."""
    template = ChatPromptTemplate.from_messages(
        [
            (
                "human",
                "Can you tell me what kind of items i might find in the following place: '{place}'. "
                "List at least 3 such items separating them by a comma. And include a brief description of each item..",
            )
        ]
    )
    chain = template | model.with_config(
        {
            "run_name": "Get Items LLM",
            "tags": ["tool_llm"],
        }
    )
    r = await chain.ainvoke({"place": place})
    return r

I'll try to fix this in the meantime. Thanks for reporting the issue

@eyurtsev eyurtsev added 01 bug Confirmed bug Ɑ: core Related to langchain-core and removed 🔌: openai Primarily related to OpenAI integrations Ɑ: agent Related to agents module labels Jun 4, 2024
@eyurtsev eyurtsev removed the 01 bug Confirmed bug label Jun 4, 2024
@eyurtsev
Copy link
Collaborator

eyurtsev commented Jun 4, 2024

@Sanzid88888 you can do this if you're on older python versions.

   chain = template | model_langchain.with_config(
        {
            "run_name": "Get Items LLM",
            "tags": ["tool_llm"],
         }
    )
    r = await chain.ainvoke({"place": place}, {'callbacks': callbacks})

I'm still investigating, but I suspect this isn't exactly a bug, but bad semantics -- we're attaching callbacks to the model + then langchain (if you're on python >=3.11, attempts to automatically propagate callbacks).

We might remove the ability to specify callbacks via with_config or else attempt to dedup the callback handler

@eyurtsev eyurtsev added the 01 bug Confirmed bug label Jun 4, 2024
eyurtsev added a commit that referenced this issue Jun 4, 2024
This PR adds deduplication of callback handlers in merge_configs.

Fix for this issue:
#22227

The issue appears when the code is:

1) running python >=3.11
2) invokes a runnable from within a runnable
3) binds the callbacks to the child runnable from the parent runnable
using with_config

In this case, the same callbacks end up appearing twice: (1) the first
time from with_config, (2) the second time with langchain automatically
propagating them on behalf of the user.


Prior to this PR this will emit duplicate events:

```python
@tool
async def get_items(question: str, callbacks: Callbacks):  # <--- Accept callbacks
    """Ask question"""
    template = ChatPromptTemplate.from_messages(
        [
            (
                "human",
                "'{question}"
            )
        ]
    )
    chain = template | chat_model.with_config(
        {
            "callbacks": callbacks,  # <-- Propagate callbacks
        }
    )
    return await chain.ainvoke({"question": question})
```

Prior to this PR this will work work correctly (no duplicate events):

```python
@tool
async def get_items(question: str, callbacks: Callbacks):  # <--- Accept callbacks
    """Ask question"""
    template = ChatPromptTemplate.from_messages(
        [
            (
                "human",
                "'{question}"
            )
        ]
    )
    chain = template | chat_model
    return await chain.ainvoke({"question": question}, {"callbacks": callbacks})
```

This will also work (as long as the user is using python >= 3.11) -- as
langchain will automatically propagate callbacks

```python
@tool
async def get_items(question: str,):  
    """Ask question"""
    template = ChatPromptTemplate.from_messages(
        [
            (
                "human",
                "'{question}"
            )
        ]
    )
    chain = template | chat_model
    return await chain.ainvoke({"question": question})
```
@eyurtsev
Copy link
Collaborator

eyurtsev commented Jun 4, 2024

Merged fix for this issue. Will be released in the next core release -- in the meantime use the work-arounds ^

@eyurtsev eyurtsev closed this as completed Jun 4, 2024
hinthornw pushed a commit that referenced this issue Jun 20, 2024
This PR adds deduplication of callback handlers in merge_configs.

Fix for this issue:
#22227

The issue appears when the code is:

1) running python >=3.11
2) invokes a runnable from within a runnable
3) binds the callbacks to the child runnable from the parent runnable
using with_config

In this case, the same callbacks end up appearing twice: (1) the first
time from with_config, (2) the second time with langchain automatically
propagating them on behalf of the user.


Prior to this PR this will emit duplicate events:

```python
@tool
async def get_items(question: str, callbacks: Callbacks):  # <--- Accept callbacks
    """Ask question"""
    template = ChatPromptTemplate.from_messages(
        [
            (
                "human",
                "'{question}"
            )
        ]
    )
    chain = template | chat_model.with_config(
        {
            "callbacks": callbacks,  # <-- Propagate callbacks
        }
    )
    return await chain.ainvoke({"question": question})
```

Prior to this PR this will work work correctly (no duplicate events):

```python
@tool
async def get_items(question: str, callbacks: Callbacks):  # <--- Accept callbacks
    """Ask question"""
    template = ChatPromptTemplate.from_messages(
        [
            (
                "human",
                "'{question}"
            )
        ]
    )
    chain = template | chat_model
    return await chain.ainvoke({"question": question}, {"callbacks": callbacks})
```

This will also work (as long as the user is using python >= 3.11) -- as
langchain will automatically propagate callbacks

```python
@tool
async def get_items(question: str,):  
    """Ask question"""
    template = ChatPromptTemplate.from_messages(
        [
            (
                "human",
                "'{question}"
            )
        ]
    )
    chain = template | chat_model
    return await chain.ainvoke({"question": question})
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
01 bug Confirmed bug Ɑ: core Related to langchain-core investigate streaming
Projects
None yet
Development

No branches or pull requests

2 participants