Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream Chat LLM Token By Token is not working #78

Closed
4 tasks done
ahmedmoorsy opened this issue Feb 1, 2024 · 15 comments
Closed
4 tasks done

Stream Chat LLM Token By Token is not working #78

ahmedmoorsy opened this issue Feb 1, 2024 · 15 comments

Comments

@ahmedmoorsy
Copy link

Checked other resources

  • I added a very descriptive title to this issue.
  • I searched the LangChain documentation with the integrated search.
  • I used the GitHub search to find a similar question and didn't find it.
  • I am sure that this is a bug in LangChain rather than my code.

Example Code

I am using the code in this notebook:
https://github.com/langchain-ai/langgraph/blob/main/examples/streaming-tokens.ipynb

streaming code:

from langchain_core.messages import HumanMessage
inputs = [HumanMessage(content="what is the weather in sf")]
async for event in app.astream_events(inputs, version="v1"):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        content = event["data"]["chunk"].content
        if content:
            # Empty content in the context of OpenAI means
            # that the model is asking for a tool to be invoked.
            # So we only print non-empty content
            print(content, end="|")
    elif kind == "on_tool_start":
        print("--")
        print(
            f"Starting tool: {event['name']} with inputs: {event['data'].get('input')}"
        )
    elif kind == "on_tool_end":
        print(f"Done tool: {event['name']}")
        print(f"Tool output was: {event['data'].get('output')}")
        print("--")

Error Message and Stack Trace (if applicable)

the streaming is not working, I am not receiving any output from this part:

    if kind == "on_chat_model_stream":
        content = event["data"]["chunk"].content
        if content:
            # Empty content in the context of OpenAI means
            # that the model is asking for a tool to be invoked.
            # So we only print non-empty content
            print(content, end="|")

Description

the streaming is not working, I am not receiving any output

System Info

langchain==0.1.5
langchain-community==0.0.17
langchain-core==0.1.18
langchain-openai==0.0.5
langgraph==0.0.21

@elliottshort
Copy link

This seems to be a bug introduced in the 0.0.21 release. If you downgrade to 0.0.20, and keep the core, community, langchain and openai versions the same, this seems to work.

Although, given that example, the on_tool_start and on_tool_end messages don't seem to be present in the 21, 20 or 19 releases for some reason.

@ahmedmoorsy
Copy link
Author

ahmedmoorsy commented Feb 1, 2024

@elliottshort So, what LangChain version do you recommend? with LangGraph 0.0.20

Because I tried to use LangGraph 0.0.20 with the latest LangChain version but it is still not working.

@SavvasMohito
Copy link

Hey @ahmedmoorsy, which python version are you using? I had a similar issue when running on python 3.10 but the streaming worked once I upgraded to 3.12 (3.11 should also work, I think).

@tgram-3D
Copy link

tgram-3D commented Feb 5, 2024

@SavvasMohito That is interesting. I've been having the same issues with streaming using python 3.10.11. The only way I've been able to get around it is by modifying the wrapper functions. See the other LLM token thread for streaming with astream_log - I just converted the call_model wrapper function to an LCEL chain and it worked fine.

The same approach works with astream_events to get the correct output in the example notebook. I'm sure there's a more elegant way to do this, specifically to pass through the tool name, but this does work at least with python 3.10.11:

async def run_tools(messages):
    last_message = messages[-1]
    tool_name=last_message.additional_kwargs["function_call"]["name"]
    create_function_message = lambda response, tool=tool_name: FunctionMessage(
        content=str(response), name=tool
    )
    action = lambda messages: (ToolInvocation(
        tool=messages[-1].additional_kwargs["function_call"]["name"],
        tool_input=json.loads(messages[-1].additional_kwargs["function_call"]["arguments"]),
    ))
    chain =  action | tool_executor | create_function_message 
    return chain

Then instead of call_model as a node just use model:

workflow.add_node("agent", model)
workflow.add_node("action", run_tools)

@SavvasMohito
Copy link

Nice one @tgram-3D. Does streaming follow-up questions work for you? I can get responses streamed in my initial prompt but when I perform a second question, it never returns anything. It's not that I am not capturing it, it seems like langgraph never exits the model invocation loop.

@tgram-3D
Copy link

tgram-3D commented Feb 5, 2024

@SavvasMohito I've just been playing around with the example streaming-tokens notebook with astream_events, and follow up questions seem to work fine using the node modifications described above. The notebook isn't really set up for memory though since all it does is add a single HumanMessage for each streaming event:

from langchain_core.messages import HumanMessage
inputs = [HumanMessage(content="what is the weather in sf")]
async for event in app.astream_events(inputs, version="v1"):
...

The way I'm currently using it for follow up questions is pretty basic:

class GraphConversation:
    def __init__(self, graph):
        self.graph = graph
        self.messages = []
        
    async def stream_conversation(self, question):
        self.messages.append(HumanMessage(content=question))
        async for event in app.astream_events(self.messages, version="v1"):
            kind = event["event"]
            if kind == "on_chat_model_stream":
                content = event["data"]["chunk"].content
                if content:
                    # Empty content in the context of OpenAI means
                    # that the model is asking for a tool to be invoked.
                    # So we only print non-empty content
                    print(content, end="|")
            elif kind == "on_tool_start":
                print("--")
                print(
                    f"Starting tool: {event['name']} with inputs: {event['data'].get('input')}"
                )
            elif kind == "on_tool_end":
                print(f"Done tool: {event['name']}")
                print(f"Tool output was: {event['data'].get('output')}")
                print("--")
    

graph_convo = GraphConversation(graph=app)

await graph_convo.stream_conversation('what is the weather in sf')

--
Starting tool: tavily_search_results_json with inputs: {'query': 'weather in San Francisco'}
Done tool: tavily_search_results_json
Tool output was: [{'url': 'https://www.marinij.com/2024/02/05/high-winds-heavy-rains-hit-the-bay-area/', 'content': '— San José Fire Dept. (@SJFD) February 5, 2024  Weather | Atmospheric river storm to soak Bay Area this weekend, with major flood danger in Southern California  Crystal Oudit said the forecast will start to improve on Monday, though Bay Area residents should expect more rain.  Weather Weather | High winds, heavy rains hit the Bay Area and the rest of California’s coast(Karl Mondon/Bay Area News Group) Waves crash over a breakwater in Alameda, Calif., with the San Francisco skyline in the background on Sunday, Feb. 4, 2024. High winds and heavy rainfall are ...'}]
--
The| weather| in| San| Francisco| is| currently| experiencing| high| winds| and| heavy| rains|.| You| can| find| more| information| about| the| weather| in| San| Francisco| [|here|](|https|://|www|.m|arin|ij|.com|/|202|4|/|02|/|05|/high|-w|inds|-heavy|-r|ains|-hit|-the|-b|ay|-area|/|).|

await graph_convo.stream_conversation('what was the last question I asked you')
The| last| question| you| asked| me| was| "|what| is| the| weather| in| SF|?"|

I'm sure there's a better way to do this sort of thing. Check out the new Persistence notebook. I currently cannot get astream_events to work with the persistence config at all though:

from langgraph.checkpoint.sqlite import SqliteSaver
memory = SqliteSaver.from_conn_string(":memory:")
app = workflow.compile(checkpointer=memory)

I tried this and got zero output, same deal without the configurable:

async for event in app.astream_events(inputs, {"configurable": {"thread_id": "2"}}, version="v1"):

I'm sure there's a way to combine the two features, but the simple class method should work fine for frontend integration with websockets, etc.

@mingxuan-he
Copy link

Hey @ahmedmoorsy, which python version are you using? I had a similar issue when running on python 3.10 but the streaming worked once I upgraded to 3.12 (3.11 should also work, I think).

Thank you! Upgrading from 3.10.13 to 3.12.2 fixed this demo for me

@dmitryrPlanner5D
Copy link

I was struggling with tokens streaming in this tutorial. And I have found another workaround to enable streaming. I just added extra parameter config to my call_model function and pass it to the llm invoke:

# Define the function that calls the model
async def call_model(messages, config):
    response = await model.ainvoke(messages, config=config)
    # We return a list, because this will get added to the existing list
    return response

That way I was able to fetch tokens stream from llm.

@simonrmonk
Copy link

simonrmonk commented Apr 3, 2024

Not clear on how, but dmitryrPlanner5D's comment solved this for me too. I'm using python 3.10.

@OSS-GR
Copy link

OSS-GR commented Apr 9, 2024

What are you guys passing in the config? @simonrmonk @dmitryrPlanner5D

@dmitryrPlanner5D
Copy link

@OSS-GR
I do not call function call_model myself, so I do not pass config there explicitly
My function call_model is called by langgraph and langgraph passes some config to it.
I do not control this config

@usersina
Copy link

usersina commented Apr 23, 2024

Streaming is also not working if I update the agent_supervisor notebook as specified in the documention (set streaming to True and make the tool calls async).

The operator.add in the AgentState's messages is making it so that the final output from the supervisor has duplicated tokens, if I got it correctly. Nope, the operator.add annotation has nothing to do with it.

For example, here's the log from a slightly tweaked version of the notebook. In this example, the supervisor should output MongoDBAgent but it's outputting MongoMongoDBMongoDBAgent, resulting in a KeyError.

{'op': 'replace', 'path': '', 'value': {'id': '1e88305f-e06c-4f9f-8120-20208defbb88', 'streamed_output': [], 'final_output': None, 'logs': {}, 'name': 'LangGraph', 'type': 'chain'}}
{'op': 'add', 'path': '/logs/ChatOpenAI', 'value': {'id': '69000165-10d9-4828-962d-653e95bbc436', 'name': 'ChatOpenAI', 'type': 'llm', 'tags': ['seq:step:2'], 'metadata': {}, 'start_time': '2024-04-23T13:54:01.044+00:00', 'streamed_output': [], 'streamed_output_str': [], 'final_output': None, 'end_time': None}}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': '', 'name': 'route'}})}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': '{"', 'name': ''}})}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': 'next', 'name': ''}})}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': '":"', 'name': ''}})}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': 'Mongo', 'name': ''}})}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': 'DB', 'name': ''}})}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': 'Agent', 'name': ''}})}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': '"}', 'name': ''}})}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='')}
{'op': 'add', 'path': '/logs/ChatOpenAI/final_output', 'value': {'generations': [[{'text': '', 'generation_info': {'finish_reason': 'stop'}, 'type': 'ChatGenerationChunk', 'message': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': '{"next":"MongoDBAgent"}', 'name': 'route'}}, response_metadata={'finish_reason': 'stop'}, id='run-69000165-10d9-4828-962d-653e95bbc436')}]], 'llm_output': None, 'run': None}}
{'op': 'add', 'path': '/logs/ChatOpenAI/end_time', 'value': '2024-04-23T13:54:02.193+00:00'}
{'op': 'add', 'path': '/streamed_output/-', 'value': {'Supervisor': {'next': 'MongoMongoDBMongoDBAgent'}}}
{'op': 'replace', 'path': '/final_output', 'value': {'Supervisor': {'next': 'MongoMongoDBMongoDBAgent'}}}

@usersina
Copy link

usersina commented Apr 23, 2024

Streaming is also not working if I update the agent_supervisor notebook as specified in the documention (set streaming to True and make the tool calls async).

The operator.add in the AgentState's messages is making it so that the final output from the supervisor has duplicated tokens, if I got it correctly. Nope, the operator.add annotation has nothing to do with it.

For example, here's the log from a slightly tweaked version of the notebook. In this example, the supervisor should output MongoDBAgent but it's outputting MongoMongoDBMongoDBAgent, resulting in a KeyError.

{'op': 'replace', 'path': '', 'value': {'id': '1e88305f-e06c-4f9f-8120-20208defbb88', 'streamed_output': [], 'final_output': None, 'logs': {}, 'name': 'LangGraph', 'type': 'chain'}}
{'op': 'add', 'path': '/logs/ChatOpenAI', 'value': {'id': '69000165-10d9-4828-962d-653e95bbc436', 'name': 'ChatOpenAI', 'type': 'llm', 'tags': ['seq:step:2'], 'metadata': {}, 'start_time': '2024-04-23T13:54:01.044+00:00', 'streamed_output': [], 'streamed_output_str': [], 'final_output': None, 'end_time': None}}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': '', 'name': 'route'}})}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': '{"', 'name': ''}})}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': 'next', 'name': ''}})}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': '":"', 'name': ''}})}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': 'Mongo', 'name': ''}})}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': 'DB', 'name': ''}})}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': 'Agent', 'name': ''}})}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': '"}', 'name': ''}})}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='')}
{'op': 'add', 'path': '/logs/ChatOpenAI/final_output', 'value': {'generations': [[{'text': '', 'generation_info': {'finish_reason': 'stop'}, 'type': 'ChatGenerationChunk', 'message': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': '{"next":"MongoDBAgent"}', 'name': 'route'}}, response_metadata={'finish_reason': 'stop'}, id='run-69000165-10d9-4828-962d-653e95bbc436')}]], 'llm_output': None, 'run': None}}
{'op': 'add', 'path': '/logs/ChatOpenAI/end_time', 'value': '2024-04-23T13:54:02.193+00:00'}
{'op': 'add', 'path': '/streamed_output/-', 'value': {'Supervisor': {'next': 'MongoMongoDBMongoDBAgent'}}}
{'op': 'replace', 'path': '/final_output', 'value': {'Supervisor': {'next': 'MongoMongoDBMongoDBAgent'}}}

I found the issue and the solution.

@hinthornw
Copy link
Contributor

re: #78 (comment)

The reason explicit config passing is needed in python versions < 3.11 is that asyncio didn't add context support until 3.11

@eyurtsev
Copy link
Contributor

Closing issue. You must either pass config explicitly when using python <3.11 or upgrade to python 3.11 starting from which we automatically do that on behalf of the user.

The config object contains the callbacks which are necessary for astream events and astream log to get information about the streaming tokens from the llm!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests