-
In my project, I need to implement the following logic. Here is a simple: import asyncio
from langchain_core.runnables import *
from lang_chain.llm.llms import llm
def _test(_dict):
print("value:", _dict)
return _dict
@chain
def my_method(_dict, **keywords):
print(keywords)
return RunnablePassthrough.assign(key=lambda x: keywords.get("i")) | RunnableLambda(_test)
if name == 'main':
loop = asyncio.new_event_loop()
my_list = ["1", "2", "3", " 4", "5"]
head = RunnablePassthrough()
for i in my_list:
head = head | my_method.bind(i=i)
stream = head.invoke({})
#
# async def __stream(stream1):
# async for i in stream1:
# print(i)
#
# loop.run_until_complete(__stream(stream)) When I use the .invoke({}) method, it outputs the following results correctly:
But if I use the astream_log({}) method, it throws an error:
Why is it designed like this? Do I need to implement a runnable similar to the model if I want to achieve the above logic? |
Beta Was this translation helpful? Give feedback.
Replies: 5 comments 2 replies
-
🤖 Hello @jason1315! I'm Dosu, a bot designed to assist while you're waiting for a human maintainer. Whether it's solving bugs, answering questions, or helping you become a contributor, I'm here to make your experience smoother. Let's take a look at your issue together, and I'll provide a full answer shortly. |
Beta Was this translation helpful? Give feedback.
-
🤖 The error you're encountering is due to the way the Here's an example of how to use the from langchain_core.runnables import RunnableLambda
def add(x: int, y: int) -> int:
return x + y
runnable = RunnableLambda(add)
bound_runnable = runnable.bind(y=1)
print(bound_runnable.invoke(2)) # returns 3 In this example, the Regarding the Here's the relevant code from the def __init__(
self,
*,
bound: Runnable[Input, Output],
kwargs: Optional[Mapping[str, Any]] = None,
config: Optional[RunnableConfig] = None,
config_factories: Optional[
List[Callable[[RunnableConfig], RunnableConfig]]
] = None,
custom_input_type: Optional[Union[Type[Input], BaseModel]] = None,
custom_output_type: Optional[Union[Type[Output], BaseModel]] = None,
**other_kwargs: Any,
) -> None:
"""Create a RunnableBinding from a runnable and kwargs. The def invoke(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
) -> Output:
return self.bound.invoke(
input,
self._merge_configs(config),
**{**self.kwargs, **kwargs},
) In this method, You can find more details in the LangChain codebase. SourcesThis response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research. Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant. If you want to continue the conversation, start your reply with @dosu-bot. |
Beta Was this translation helpful? Give feedback.
-
I have the same question too. |
Beta Was this translation helpful? Give feedback.
-
I created an issue here: #17241 to handle the binding issue. The suggestion from dosu is not the usual way to work with RunnableLambdas. The first argument is meant to be used as a dict to accommodate multiple arguments, other arguments are used for callbacks and internal debugging things. We might change this behavior to make it easier to work with runnable lambdas, but would need to confirm there are no issues. Use Runnable Lambdas with an inputs dict to handle multiple inputs: from langchain_core.runnables import RunnableLambda
def my_func(inputs) -> int:
if 'x' not in inputs:
raise KeyError('missing `x`')
if 'y' not in inputs:
raise KeyError('missing `y`')
x = inputs['x']
y = inputs['y']
return x + y
runnable = RunnableLambda(my_func)
print(runnable.invoke({"x": 3, "y": 4}))
print(runnable.invoke({"x": 3, "y": 4})) And binding would look like this -- but I am guessing that binding is actually is unnecessary to achieve the correct behavior from langchain_core.runnables import RunnableLambda
def my_func(inputs, **kwargs) -> int:
if 'x' not in inputs:
raise KeyError('missing `x`')
if 'y' not in inputs:
raise KeyError('missing `y`')
x = inputs['x']
y = inputs['y']
op = kwargs.get('op', 'add')
if op == 'add':
return x + y
elif op == 'mul':
return x * y
else:
return 42
runnable = RunnableLambda(my_func)
print(runnable.bind(op='add').invoke({"x": 3, "y": 4}))
print(runnable.bind(op='mul').invoke({"x": 3, "y": 4})) |
Beta Was this translation helpful? Give feedback.
-
I believe this was addressed here: #17739 |
Beta Was this translation helpful? Give feedback.
🤖
The error you're encountering is due to the way the
bind
method is designed to work withRunnableLambda
. Thebind
method is used to bind arguments to the function encapsulated by theRunnableLambda
instance. However, theastream_log({})
method is not a function encapsulated by aRunnableLambda
instance, but a method of theRunnableLambda
class itself.Here's an example of how to use the
bind
method correctly:In this example, the
bind
method is used to bind the argumenty=1
…