diff --git a/langgraph/checkpoint/redis/jsonplus_redis.py b/langgraph/checkpoint/redis/jsonplus_redis.py index c668d3f..bb84ced 100644 --- a/langgraph/checkpoint/redis/jsonplus_redis.py +++ b/langgraph/checkpoint/redis/jsonplus_redis.py @@ -62,7 +62,7 @@ def loads(self, data: bytes) -> Any: return super().loads_typed(("json", data)) def _revive_if_needed(self, obj: Any) -> Any: - """Recursively apply reviver to handle LangChain serialized objects. + """Recursively apply reviver to handle LangChain and LangGraph serialized objects. This method is crucial for preventing MESSAGE_COERCION_FAILURE by ensuring that LangChain message objects stored in their serialized format are properly @@ -70,11 +70,14 @@ def _revive_if_needed(self, obj: Any) -> Any: 'lc', 'type', and 'constructor' fields, causing errors when the application expects actual message objects with 'role' and 'content' attributes. + It also handles LangGraph Interrupt objects which serialize to {"value": ..., "resumable": ..., "ns": ..., "when": ...} + and must be reconstructed to prevent AttributeError when accessing Interrupt attributes. + Args: obj: The object to potentially revive, which may be a dict, list, or primitive. Returns: - The revived object with LangChain objects properly reconstructed. + The revived object with LangChain/LangGraph objects properly reconstructed. """ if isinstance(obj, dict): # Check if this is a LangChain serialized object @@ -83,6 +86,33 @@ def _revive_if_needed(self, obj: Any) -> Any: # This converts {'lc': 1, 'type': 'constructor', ...} back to # the actual LangChain object (e.g., HumanMessage, AIMessage) return self._reviver(obj) + + # Check if this is a serialized Interrupt object + # Interrupt objects serialize to {"value": ..., "resumable": ..., "ns": ..., "when": ...} + # This must be done before recursively processing to avoid losing the structure + if ( + "value" in obj + and "resumable" in obj + and "when" in obj + and len(obj) == 4 + and isinstance(obj.get("resumable"), bool) + ): + # Try to reconstruct as an Interrupt object + try: + from langgraph.types import Interrupt + + return Interrupt( + value=self._revive_if_needed(obj["value"]), + resumable=obj["resumable"], + ns=obj["ns"], + when=obj["when"], + ) + except (ImportError, TypeError, ValueError) as e: + # If we can't import or construct Interrupt, log and fall through + logger.debug( + "Failed to deserialize Interrupt object: %s", e, exc_info=True + ) + # Recursively process nested dicts return {k: self._revive_if_needed(v) for k, v in obj.items()} elif isinstance(obj, list): diff --git a/tests/test_issue_113_interrupt_serialization.py b/tests/test_issue_113_interrupt_serialization.py new file mode 100644 index 0000000..f3c6390 --- /dev/null +++ b/tests/test_issue_113_interrupt_serialization.py @@ -0,0 +1,149 @@ +""" +Regression test for Issue #113: Interrupt objects not properly deserialized + +When using interrupt() with RedisSaver, Interrupt objects are serialized to +dictionaries but not reconstructed back to Interrupt objects on deserialization. + +This causes AttributeError: 'dict' object has no attribute 'id' when trying +to resume execution with Command(resume=...). + +The error occurs in LangGraph's _pending_interrupts() method when it tries to +access value[0].id, but value[0] is a dict instead of an Interrupt object. +""" + +import operator +from typing import Annotated, TypedDict +from uuid import uuid4 + +import pytest +from langchain_core.messages import AnyMessage +from langgraph.graph import END, START, StateGraph +from langgraph.types import Command, Interrupt, interrupt + +from langgraph.checkpoint.redis import RedisSaver + + +class AgentState(TypedDict): + """State for the test agent.""" + + messages: Annotated[list[AnyMessage], operator.add] + + +def review_node(state: AgentState): + """Node that interrupts for review.""" + random_str = str(uuid4()) + print(f"Generated string: {random_str}") + print("-------- entry interrupt --------") + + # This creates an Interrupt object that needs to be serialized + user_input = interrupt({"test": "data"}) + + print(f"Received input: {user_input.get('test')}") + print("-------- exit interrupt --------") + return {"messages": [random_str]} + + +def test_interrupt_serialization_roundtrip(redis_url: str) -> None: + """ + Test that Interrupt objects are properly serialized and deserialized. + + This is a unit test that directly tests the serializer behavior. + """ + from langgraph.checkpoint.redis.jsonplus_redis import JsonPlusRedisSerializer + + serializer = JsonPlusRedisSerializer() + + # Create an Interrupt object + original_interrupt = Interrupt(value={"test": "data"}, resumable=True) + + # Serialize it + serialized = serializer.dumps(original_interrupt) + + # Deserialize it + deserialized = serializer.loads(serialized) + + # This should be an Interrupt object, not a dict + assert isinstance(deserialized, Interrupt), ( + f"Expected Interrupt object, got {type(deserialized)}. " + f"This causes AttributeError when LangGraph tries to access attributes" + ) + assert deserialized.value == {"test": "data"} + assert deserialized.resumable is True + + +def test_interrupt_in_pending_sends(redis_url: str) -> None: + """ + Test that Interrupt objects in pending_sends are properly deserialized. + + This tests the actual scenario from issue #113 where interrupts stored + in checkpoint writes need to be reconstructed. + """ + from langgraph.checkpoint.redis.jsonplus_redis import JsonPlusRedisSerializer + + serializer = JsonPlusRedisSerializer() + + # Simulate what gets stored in pending_sends + # In the real scenario, pending_sends contains tuples of (channel, value) + # where value might be an Interrupt object + pending_sends = [ + ("__interrupt__", [Interrupt(value={"test": "data"}, resumable=False)]), + ("messages", ["some message"]), + ] + + # Serialize the pending_sends + serialized = serializer.dumps(pending_sends) + + # Deserialize + deserialized = serializer.loads(serialized) + + # Check the structure + assert isinstance(deserialized, list) + assert len(deserialized) == 2 + + # The first item should have reconstructed Interrupt object + channel, value = deserialized[0] + assert channel == "__interrupt__" + assert isinstance(value, list) + assert len(value) == 1 + + # THIS IS THE CRITICAL CHECK - value[0] must be an Interrupt, not a dict + assert isinstance(value[0], Interrupt), ( + f"Expected Interrupt object in pending_sends, got {type(value[0])}. " + f"This is the root cause of 'dict' object has no attribute error" + ) + assert value[0].value == {"test": "data"} + assert value[0].resumable is False + + +def test_interrupt_resume_workflow(redis_url: str) -> None: + """ + Integration test reproducing the exact scenario from issue #113. + + This test should fail with AttributeError until the fix is implemented. + """ + with RedisSaver.from_conn_string(redis_url) as checkpointer: + checkpointer.setup() + + builder = StateGraph(AgentState) + builder.add_node("review", review_node) + builder.add_edge(START, "review") + builder.add_edge("review", END) + + graph = builder.compile(checkpointer=checkpointer) + + # Use unique thread ID + config = {"configurable": {"thread_id": f"test-interrupt-{uuid4()}"}} + + # First invocation - should hit the interrupt + initial = graph.invoke({}, config=config) + print(f"Initial result: {initial}") + + # Resume with Command - this is where the error occurs + # The error happens because pending_sends contains dicts instead of Interrupt objects + # When LangGraph tries to access Interrupt attributes + # It fails because value[0] is {'value': ..., 'resumable': ..., 'ns': ..., 'when': ...} not Interrupt(...) + final_state = graph.invoke(Command(resume={"test": "response"}), config=config) + + # If we get here, the test passed + assert "messages" in final_state + print(f"Final messages: {final_state['messages']}")