Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

astream_log produces TypeError: unsupported operand type(s) for +: 'dict' and 'dict' in passthrough.py #136

Closed
4 tasks done
sploithunter opened this issue Feb 22, 2024 · 9 comments
Assignees
Labels
bug Something isn't working

Comments

@sploithunter
Copy link

Checked other resources

  • I added a very descriptive title to this issue.
  • I searched the LangChain documentation with the integrated search.
  • I used the GitHub search to find a similar question and didn't find it.
  • I am sure that this is a bug in LangChain rather than my code.

Example Code

The following code produces the error. I have found it in many different scenarios, but this uses one of your base examples from https://github.com/langchain-ai/langgraph/blob/main/examples/multi_agent/agent_supervisor.ipynb. The only change is the async invocation to produce the aysnc for output in graph.astream_log(): located at the very bottom of the code.

import getpass
import os

from langchain_community.chat_models import ChatOpenAI
# Optional, add tracing in LangSmith
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "Multi-agent Collaboration"

from typing import Annotated, List, Tuple, Union


from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.tools import tool
from langchain_experimental.tools import PythonREPLTool

tavily_tool = TavilySearchResults(max_results=5)



# This executes code locally, which can be unsafe
python_repl_tool = PythonREPLTool()

from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_core.messages import BaseMessage, HumanMessage
from langchain_openai import ChatOpenAI


def create_agent(llm: ChatOpenAI, tools: list, system_prompt: str):
    # Each worker node will be given a name and some tools.
    prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                system_prompt,
            ),
            MessagesPlaceholder(variable_name="messages"),
            MessagesPlaceholder(variable_name="agent_scratchpad"),
        ]
    )
    agent = create_openai_tools_agent(llm, tools, prompt)
    executor = AgentExecutor(agent=agent, tools=tools)
    return executor

def agent_node(state, agent, name):
    result = agent.invoke(state)
    return {"messages": [HumanMessage(content=result["output"], name=name)]}

from langchain.output_parsers.openai_functions import JsonOutputFunctionsParser
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

members = ["Researcher", "Coder"]
system_prompt = (
    "You are a supervisor tasked with managing a conversation between the"
    " following workers:  {members}. Given the following user request,"
    " respond with the worker to act next. Each worker will perform a"
    " task and respond with their results and status. When finished,"
    " respond with FINISH."
)
# Our team supervisor is an LLM node. It just picks the next agent to process
# and decides when the work is completed
options = ["FINISH"] + members
# Using openai function calling can make output parsing easier for us
function_def = {
    "name": "route",
    "description": "Select the next role.",
    "parameters": {
        "title": "routeSchema",
        "type": "object",
        "properties": {
            "next": {
                "title": "Next",
                "anyOf": [
                    {"enum": options},
                ],
            }
        },
        "required": ["next"],
    },
}
prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system_prompt),
        MessagesPlaceholder(variable_name="messages"),
        (
            "system",
            "Given the conversation above, who should act next?"
            " Or should we FINISH? Select one of: {options}",
        ),
    ]
).partial(options=str(options), members=", ".join(members))

llm = ChatOpenAI(model="gpt-4-1106-preview", streaming=True)

supervisor_chain = (
    prompt
    | llm.bind_functions(functions=[function_def], function_call="route")
    | JsonOutputFunctionsParser()
)

import operator
from typing import Annotated, Any, Dict, List, Optional, Sequence, TypedDict
import functools

from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langgraph.graph import StateGraph, END


# The agent state is the input to each node in the graph
class AgentState(TypedDict):
    # The annotation tells the graph that new messages will always
    # be added to the current states
    messages: Annotated[Sequence[BaseMessage], operator.add]
    # The 'next' field indicates where to route to next
    next: str


research_agent = create_agent(llm, [tavily_tool], "You are a web researcher.")
research_node = functools.partial(agent_node, agent=research_agent, name="Researcher")

# NOTE: THIS PERFORMS ARBITRARY CODE EXECUTION. PROCEED WITH CAUTION
code_agent = create_agent(
    llm,
    [python_repl_tool],
    "You may generate safe python code to analyze data and generate charts using matplotlib.",
)
code_node = functools.partial(agent_node, agent=code_agent, name="Coder")

workflow = StateGraph(AgentState)
workflow.add_node("Researcher", research_node)
workflow.add_node("Coder", code_node)
workflow.add_node("supervisor", supervisor_chain)

for member in members:
    # We want our workers to ALWAYS "report back" to the supervisor when done
    workflow.add_edge(member, "supervisor")
# The supervisor populates the "next" field in the graph state
# which routes to a node or finishes
conditional_map = {k: k for k in members}
conditional_map["FINISH"] = END
workflow.add_conditional_edges("supervisor", lambda x: x["next"], conditional_map)
# Finally, add entrypoint
workflow.set_entry_point("supervisor")

graph = workflow.compile()

async def main():
   async for output in graph.astream_log(
        {
            "messages": [
                HumanMessage(content="Code hello world and print it to the terminal")
            ]
        }, include_types=["llm"]
    ):
        for op in output.ops:
            if op["path"] == "/streamed_output/-":
                # this is the output from .stream()
                ...
            elif op["path"].startswith("/logs/") and op["path"].endswith(
                "/streamed_output/-"
            ):
                # because we chose to only include LLMs, these are LLM tokens
                print(op["value"])
if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

Error Message and Stack Trace (if applicable)

(agents_v09) JasonMacPro:agents_v09 jason$ python langgraph_astream_events.py
content='' additional_kwargs={'function_call': {'arguments': '', 'name': 'route'}}
content='' additional_kwargs={'function_call': {'arguments': '{"', 'name': ''}}
content='' additional_kwargs={'function_call': {'arguments': 'next', 'name': ''}}
content='' additional_kwargs={'function_call': {'arguments': '":"', 'name': ''}}
Traceback (most recent call last):
File "/Users/jason/Documents/agents_v09/langgraph_astream_events.py", line 165, in
asyncio.run(main())
File "/Users/jason/.pyenv/versions/3.10.11/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/Users/jason/.pyenv/versions/3.10.11/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
return future.result()
File "/Users/jason/Documents/agents_v09/langgraph_astream_events.py", line 147, in main
async for output in graph.astream_log(
File "/Users/jason/.pyenv/versions/3.10.11/lib/python3.10/site-packages/langchain_core/runnables/base.py", line 683, in astream_log
async for item in _astream_log_implementation( # type: ignore
File "/Users/jason/.pyenv/versions/3.10.11/lib/python3.10/site-packages/langchain_core/tracers/log_stream.py", line 612, in _astream_log_implementation
await task
File "/Users/jason/.pyenv/versions/3.10.11/lib/python3.10/site-packages/langchain_core/tracers/log_stream.py", line 566, in consume_astream
async for chunk in runnable.astream(input, config, **kwargs):
File "/Users/jason/.pyenv/versions/3.10.11/lib/python3.10/site-packages/langgraph/pregel/init.py", line 657, in astream
async for chunk in self.atransform(
File "/Users/jason/.pyenv/versions/3.10.11/lib/python3.10/site-packages/langgraph/pregel/init.py", line 675, in atransform
async for chunk in self._atransform_stream_with_config(
File "/Users/jason/.pyenv/versions/3.10.11/lib/python3.10/site-packages/langchain_core/runnables/base.py", line 1597, in _atransform_stream_with_config
chunk = cast(Output, await py_anext(iterator))
File "/Users/jason/.pyenv/versions/3.10.11/lib/python3.10/site-packages/langchain_core/tracers/log_stream.py", line 237, in tap_output_aiter
async for chunk in output:
File "/Users/jason/.pyenv/versions/3.10.11/lib/python3.10/site-packages/langgraph/pregel/init.py", line 524, in _atransform
_interrupt_or_proceed(done, inflight, step)
File "/Users/jason/.pyenv/versions/3.10.11/lib/python3.10/site-packages/langgraph/pregel/init.py", line 698, in _interrupt_or_proceed
raise exc
File "/Users/jason/.pyenv/versions/3.10.11/lib/python3.10/site-packages/langgraph/pregel/init.py", line 836, in _aconsume
async for _ in iterator:
File "/Users/jason/.pyenv/versions/3.10.11/lib/python3.10/site-packages/langchain_core/runnables/base.py", line 4140, in astream
async for item in self.bound.astream(
File "/Users/jason/.pyenv/versions/3.10.11/lib/python3.10/site-packages/langchain_core/runnables/base.py", line 2452, in astream
async for chunk in self.atransform(input_aiter(), config, **kwargs):
File "/Users/jason/.pyenv/versions/3.10.11/lib/python3.10/site-packages/langchain_core/runnables/base.py", line 2435, in atransform
async for chunk in self._atransform_stream_with_config(
File "/Users/jason/.pyenv/versions/3.10.11/lib/python3.10/site-packages/langchain_core/runnables/base.py", line 1597, in _atransform_stream_with_config
chunk = cast(Output, await py_anext(iterator))
File "/Users/jason/.pyenv/versions/3.10.11/lib/python3.10/site-packages/langchain_core/tracers/log_stream.py", line 237, in tap_output_aiter
async for chunk in output:
File "/Users/jason/.pyenv/versions/3.10.11/lib/python3.10/site-packages/langchain_core/runnables/base.py", line 2405, in _atransform
async for output in final_pipeline:
File "/Users/jason/.pyenv/versions/3.10.11/lib/python3.10/site-packages/langchain_core/runnables/passthrough.py", line 280, in atransform
final = final + chunk
TypeError: unsupported operand type(s) for +: 'dict' and 'dict'

Description

I am trying to stream output from a compiled langgraph using the astream_log (astream_events also produces this error). It is easily reproducible with example code in many of the langgraph examples if using astream_log rather than astream or synchronous calls.

System Info

langchain==0.1.8
langchain-community==0.0.21
langchain-core==0.1.25
langchain-experimental==0.0.52
langchain-mistralai==0.0.4
langchain-openai==0.0.6
langgraph==0.0.25
langsmith==0.1.5

Mac OSX 12.6.5

Python 3.10.11

@russell-dot-js
Copy link

also mentioned in #124

@svaditya
Copy link

svaditya commented Mar 1, 2024

any update on this one?

@mingxuan-he
Copy link

I had the same issue and used some suggestions in #78 . after upgrading to python 3.12 and downgrading to langgraph 0.0.20 I could run all the example notebooks including this one (agent supervisor).

However my graph needs sqlite memory from langgraph>=0.0.22 so I'm a bit stuck.
Now I'm back on langgraph=0.0.26 and using astream_events, I got these messages:

LangChainBetaWarning: This API is in beta and may change in the future.
  warn_beta(
NotImplementedError in LogStreamCallbackHandler.on_chain_end callback: NotImplementedError("Trying to load an object that doesn't implement serialization: {'lc': 1, 'type': 'not_implemented', 'id': ['builtins', 'object'], 'repr': '<object object at 0x00000204E3FA63D0>'}")`
---------------------------------------------------------------------------

    [283](file:///C:/ProgramData/Anaconda3/envs/llm/Lib/site-packages/langchain_core/runnables/passthrough.py:283)     config = ensure_config(config)

TypeError: unsupported operand type(s) for +: 'dict' and 'dict'

Would love to hear if anyone has a solution to this issue yet

@eyurtsev eyurtsev self-assigned this Mar 7, 2024
@eyurtsev
Copy link
Contributor

eyurtsev commented Mar 7, 2024

Will investigate tomorrow if it's langchain-core or langgraph. Similar error message appears in langserve

@eyurtsev
Copy link
Contributor

eyurtsev commented Mar 7, 2024

cross linking langchain-ai/langserve#504

@mingxuan-he
Copy link

Thank you for the fix @eyurtsev ! I upgraded to langchain_core 0.1.31 but still got the same error. Looks like the error message is pointing to langchain_core/runnables/passthrough.py instead of base.py. Do you mind looking into it and see if the same solution _adapt_first_streaming_chunk can be applied for transform and atransform there?

@eyurtsev
Copy link
Contributor

@mingxuan-he thanks for flagging -- This will be available during the next release.

@niros1
Copy link

niros1 commented Mar 16, 2024

when do you expect the next release to be released?

@nfcampos
Copy link
Contributor

This has been released in latest version of langchain-core

gkorland pushed a commit to FalkorDB/langchain that referenced this issue Mar 30, 2024
hinthornw pushed a commit to langchain-ai/langchain that referenced this issue Apr 26, 2024
hinthornw pushed a commit to langchain-ai/langchain that referenced this issue Apr 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

7 participants