From 28f744c1f5714d7be073d3a52ae40b09cd5124e0 Mon Sep 17 00:00:00 2001 From: Eugene Yurtsev Date: Thu, 6 Jun 2024 16:37:52 -0400 Subject: [PATCH] core[patch]: Correctly order parent ids in astream events (from root to immediate parent), add defensive check for cycles (#22637) This PR makes two changes: 1. Fixes the order of parent IDs to be from root to immediate parent 2. Adds a simple defensive check for cycles --- .../langchain_core/tracers/event_stream.py | 17 +++++--- .../runnables/test_runnable_events_v2.py | 40 ++++++++++++++++++- 2 files changed, 50 insertions(+), 7 deletions(-) diff --git a/libs/core/langchain_core/tracers/event_stream.py b/libs/core/langchain_core/tracers/event_stream.py index f519debaa14231..c7839dca3001d8 100644 --- a/libs/core/langchain_core/tracers/event_stream.py +++ b/libs/core/langchain_core/tracers/event_stream.py @@ -118,13 +118,20 @@ def __init__( def _get_parent_ids(self, run_id: UUID) -> List[str]: """Get the parent IDs of a run (non-recursively) cast to strings.""" parent_ids = [] - parent_id = self.parent_map[run_id] - while parent_id is not None: - parent_ids.append(str(parent_id)) - parent_id = self.parent_map[parent_id] + while parent_id := self.parent_map.get(run_id): + str_parent_id = str(parent_id) + if str_parent_id in parent_ids: + raise AssertionError( + f"Parent ID {parent_id} is already in the parent_ids list. " + f"This should never happen." + ) + parent_ids.append(str_parent_id) + run_id = parent_id - return parent_ids + # Return the parent IDs in reverse order, so that the first + # parent ID is the root and the last ID is the immediate parent. + return parent_ids[::-1] def _send(self, event: StreamEvent, event_type: str) -> None: """Send an event to the stream.""" diff --git a/libs/core/tests/unit_tests/runnables/test_runnable_events_v2.py b/libs/core/tests/unit_tests/runnables/test_runnable_events_v2.py index 6c7357ff1b59fc..81924b14bce33b 100644 --- a/libs/core/tests/unit_tests/runnables/test_runnable_events_v2.py +++ b/libs/core/tests/unit_tests/runnables/test_runnable_events_v2.py @@ -2033,8 +2033,8 @@ async def parent(x: str, config: RunnableConfig) -> str: "metadata": {}, "name": "grandchild", "parent_ids": [ - "00000000-0000-0000-0000-000000000008", "00000000-0000-0000-0000-000000000007", + "00000000-0000-0000-0000-000000000008", ], "run_id": "00000000-0000-0000-0000-000000000009", "tags": [], @@ -2045,8 +2045,8 @@ async def parent(x: str, config: RunnableConfig) -> str: "metadata": {}, "name": "grandchild", "parent_ids": [ - "00000000-0000-0000-0000-000000000008", "00000000-0000-0000-0000-000000000007", + "00000000-0000-0000-0000-000000000008", ], "run_id": "00000000-0000-0000-0000-000000000009", "tags": [], @@ -2081,6 +2081,42 @@ async def parent(x: str, config: RunnableConfig) -> str: ] +async def test_bad_parent_ids() -> None: + """Test handling of situation where a run id is duplicated in the run tree.""" + + # Type ignores in the code below need to be investigated. + # Looks like a typing issue when using RunnableLambda as a decorator + # with async functions. + @RunnableLambda # type: ignore + async def child(x: str) -> str: + return x + + @RunnableLambda # type: ignore + async def parent(x: str, config: RunnableConfig) -> str: + config["run_id"] = uuid.UUID(int=7) + return await child.ainvoke(x, config) # type: ignore + + bond = uuid.UUID(int=7) + events = await _collect_events( + parent.astream_events("hello", {"run_id": bond}, version="v2"), + with_nulled_ids=False, + ) + # Includes only a partial list of events since the run ID gets duplicated + # between parent and child run ID and the callback handler throws an exception. + # The exception does not get bubbled up to the user. + assert events == [ + { + "data": {"input": "hello"}, + "event": "on_chain_start", + "metadata": {}, + "name": "parent", + "parent_ids": [], + "run_id": "00000000-0000-0000-0000-000000000007", + "tags": [], + } + ] + + async def test_runnable_generator() -> None: """Test async events from sync lambda."""