From c3ce721ebdc4faaa38b6cbbe0b400e55446e83be Mon Sep 17 00:00:00 2001 From: Brian Sam-Bodden Date: Sat, 15 Nov 2025 07:49:00 -0700 Subject: [PATCH] fix(serializer): properly deserialize Interrupt objects to prevent AttributeError (#113) Fixes #113 When Interrupt objects are serialized with orjson, they become plain dictionaries with 'value', 'resumable', 'ns', and 'when' keys. Previously, these were not being reconstructed back to Interrupt objects during deserialization, causing AttributeError when LangGraph tried to access Interrupt attributes. Added logic to _revive_if_needed() to detect serialized Interrupt objects (dicts with exactly 4 keys: value, resumable, ns, when) and reconstruct them as Interrupt objects with proper recursive handling of nested objects. The fix handles: - Direct Interrupt object serialization/deserialization - Interrupts nested in data structures (lists, dicts) - Interrupts in pending_sends during checkpoint resume operations - Nested LangChain objects within Interrupt.value field Tests added: - test_interrupt_serialization_roundtrip: Unit test for basic serialization - test_interrupt_in_pending_sends: Test for Interrupts in pending_sends structure - test_interrupt_resume_workflow: Integration test reproducing the issue scenario --- langgraph/checkpoint/redis/jsonplus_redis.py | 34 +++- .../test_issue_113_interrupt_serialization.py | 149 ++++++++++++++++++ 2 files changed, 181 insertions(+), 2 deletions(-) create mode 100644 tests/test_issue_113_interrupt_serialization.py diff --git a/langgraph/checkpoint/redis/jsonplus_redis.py b/langgraph/checkpoint/redis/jsonplus_redis.py index c668d3f..bb84ced 100644 --- a/langgraph/checkpoint/redis/jsonplus_redis.py +++ b/langgraph/checkpoint/redis/jsonplus_redis.py @@ -62,7 +62,7 @@ def loads(self, data: bytes) -> Any: return super().loads_typed(("json", data)) def _revive_if_needed(self, obj: Any) -> Any: - """Recursively apply reviver to handle LangChain serialized objects. + """Recursively apply reviver to handle LangChain and LangGraph serialized objects. This method is crucial for preventing MESSAGE_COERCION_FAILURE by ensuring that LangChain message objects stored in their serialized format are properly @@ -70,11 +70,14 @@ def _revive_if_needed(self, obj: Any) -> Any: 'lc', 'type', and 'constructor' fields, causing errors when the application expects actual message objects with 'role' and 'content' attributes. + It also handles LangGraph Interrupt objects which serialize to {"value": ..., "resumable": ..., "ns": ..., "when": ...} + and must be reconstructed to prevent AttributeError when accessing Interrupt attributes. + Args: obj: The object to potentially revive, which may be a dict, list, or primitive. Returns: - The revived object with LangChain objects properly reconstructed. + The revived object with LangChain/LangGraph objects properly reconstructed. """ if isinstance(obj, dict): # Check if this is a LangChain serialized object @@ -83,6 +86,33 @@ def _revive_if_needed(self, obj: Any) -> Any: # This converts {'lc': 1, 'type': 'constructor', ...} back to # the actual LangChain object (e.g., HumanMessage, AIMessage) return self._reviver(obj) + + # Check if this is a serialized Interrupt object + # Interrupt objects serialize to {"value": ..., "resumable": ..., "ns": ..., "when": ...} + # This must be done before recursively processing to avoid losing the structure + if ( + "value" in obj + and "resumable" in obj + and "when" in obj + and len(obj) == 4 + and isinstance(obj.get("resumable"), bool) + ): + # Try to reconstruct as an Interrupt object + try: + from langgraph.types import Interrupt + + return Interrupt( + value=self._revive_if_needed(obj["value"]), + resumable=obj["resumable"], + ns=obj["ns"], + when=obj["when"], + ) + except (ImportError, TypeError, ValueError) as e: + # If we can't import or construct Interrupt, log and fall through + logger.debug( + "Failed to deserialize Interrupt object: %s", e, exc_info=True + ) + # Recursively process nested dicts return {k: self._revive_if_needed(v) for k, v in obj.items()} elif isinstance(obj, list): diff --git a/tests/test_issue_113_interrupt_serialization.py b/tests/test_issue_113_interrupt_serialization.py new file mode 100644 index 0000000..f3c6390 --- /dev/null +++ b/tests/test_issue_113_interrupt_serialization.py @@ -0,0 +1,149 @@ +""" +Regression test for Issue #113: Interrupt objects not properly deserialized + +When using interrupt() with RedisSaver, Interrupt objects are serialized to +dictionaries but not reconstructed back to Interrupt objects on deserialization. + +This causes AttributeError: 'dict' object has no attribute 'id' when trying +to resume execution with Command(resume=...). + +The error occurs in LangGraph's _pending_interrupts() method when it tries to +access value[0].id, but value[0] is a dict instead of an Interrupt object. +""" + +import operator +from typing import Annotated, TypedDict +from uuid import uuid4 + +import pytest +from langchain_core.messages import AnyMessage +from langgraph.graph import END, START, StateGraph +from langgraph.types import Command, Interrupt, interrupt + +from langgraph.checkpoint.redis import RedisSaver + + +class AgentState(TypedDict): + """State for the test agent.""" + + messages: Annotated[list[AnyMessage], operator.add] + + +def review_node(state: AgentState): + """Node that interrupts for review.""" + random_str = str(uuid4()) + print(f"Generated string: {random_str}") + print("-------- entry interrupt --------") + + # This creates an Interrupt object that needs to be serialized + user_input = interrupt({"test": "data"}) + + print(f"Received input: {user_input.get('test')}") + print("-------- exit interrupt --------") + return {"messages": [random_str]} + + +def test_interrupt_serialization_roundtrip(redis_url: str) -> None: + """ + Test that Interrupt objects are properly serialized and deserialized. + + This is a unit test that directly tests the serializer behavior. + """ + from langgraph.checkpoint.redis.jsonplus_redis import JsonPlusRedisSerializer + + serializer = JsonPlusRedisSerializer() + + # Create an Interrupt object + original_interrupt = Interrupt(value={"test": "data"}, resumable=True) + + # Serialize it + serialized = serializer.dumps(original_interrupt) + + # Deserialize it + deserialized = serializer.loads(serialized) + + # This should be an Interrupt object, not a dict + assert isinstance(deserialized, Interrupt), ( + f"Expected Interrupt object, got {type(deserialized)}. " + f"This causes AttributeError when LangGraph tries to access attributes" + ) + assert deserialized.value == {"test": "data"} + assert deserialized.resumable is True + + +def test_interrupt_in_pending_sends(redis_url: str) -> None: + """ + Test that Interrupt objects in pending_sends are properly deserialized. + + This tests the actual scenario from issue #113 where interrupts stored + in checkpoint writes need to be reconstructed. + """ + from langgraph.checkpoint.redis.jsonplus_redis import JsonPlusRedisSerializer + + serializer = JsonPlusRedisSerializer() + + # Simulate what gets stored in pending_sends + # In the real scenario, pending_sends contains tuples of (channel, value) + # where value might be an Interrupt object + pending_sends = [ + ("__interrupt__", [Interrupt(value={"test": "data"}, resumable=False)]), + ("messages", ["some message"]), + ] + + # Serialize the pending_sends + serialized = serializer.dumps(pending_sends) + + # Deserialize + deserialized = serializer.loads(serialized) + + # Check the structure + assert isinstance(deserialized, list) + assert len(deserialized) == 2 + + # The first item should have reconstructed Interrupt object + channel, value = deserialized[0] + assert channel == "__interrupt__" + assert isinstance(value, list) + assert len(value) == 1 + + # THIS IS THE CRITICAL CHECK - value[0] must be an Interrupt, not a dict + assert isinstance(value[0], Interrupt), ( + f"Expected Interrupt object in pending_sends, got {type(value[0])}. " + f"This is the root cause of 'dict' object has no attribute error" + ) + assert value[0].value == {"test": "data"} + assert value[0].resumable is False + + +def test_interrupt_resume_workflow(redis_url: str) -> None: + """ + Integration test reproducing the exact scenario from issue #113. + + This test should fail with AttributeError until the fix is implemented. + """ + with RedisSaver.from_conn_string(redis_url) as checkpointer: + checkpointer.setup() + + builder = StateGraph(AgentState) + builder.add_node("review", review_node) + builder.add_edge(START, "review") + builder.add_edge("review", END) + + graph = builder.compile(checkpointer=checkpointer) + + # Use unique thread ID + config = {"configurable": {"thread_id": f"test-interrupt-{uuid4()}"}} + + # First invocation - should hit the interrupt + initial = graph.invoke({}, config=config) + print(f"Initial result: {initial}") + + # Resume with Command - this is where the error occurs + # The error happens because pending_sends contains dicts instead of Interrupt objects + # When LangGraph tries to access Interrupt attributes + # It fails because value[0] is {'value': ..., 'resumable': ..., 'ns': ..., 'when': ...} not Interrupt(...) + final_state = graph.invoke(Command(resume={"test": "response"}), config=config) + + # If we get here, the test passed + assert "messages" in final_state + print(f"Final messages: {final_state['messages']}")