Skip to content

Commit c3ce721

Browse files
committed
fix(serializer): properly deserialize Interrupt objects to prevent AttributeError (#113)
Fixes #113 When Interrupt objects are serialized with orjson, they become plain dictionaries with 'value', 'resumable', 'ns', and 'when' keys. Previously, these were not being reconstructed back to Interrupt objects during deserialization, causing AttributeError when LangGraph tried to access Interrupt attributes. Added logic to _revive_if_needed() to detect serialized Interrupt objects (dicts with exactly 4 keys: value, resumable, ns, when) and reconstruct them as Interrupt objects with proper recursive handling of nested objects. The fix handles: - Direct Interrupt object serialization/deserialization - Interrupts nested in data structures (lists, dicts) - Interrupts in pending_sends during checkpoint resume operations - Nested LangChain objects within Interrupt.value field Tests added: - test_interrupt_serialization_roundtrip: Unit test for basic serialization - test_interrupt_in_pending_sends: Test for Interrupts in pending_sends structure - test_interrupt_resume_workflow: Integration test reproducing the issue scenario
1 parent af45290 commit c3ce721

File tree

2 files changed

+181
-2
lines changed

2 files changed

+181
-2
lines changed

langgraph/checkpoint/redis/jsonplus_redis.py

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,19 +62,22 @@ def loads(self, data: bytes) -> Any:
6262
return super().loads_typed(("json", data))
6363

6464
def _revive_if_needed(self, obj: Any) -> Any:
65-
"""Recursively apply reviver to handle LangChain serialized objects.
65+
"""Recursively apply reviver to handle LangChain and LangGraph serialized objects.
6666
6767
This method is crucial for preventing MESSAGE_COERCION_FAILURE by ensuring
6868
that LangChain message objects stored in their serialized format are properly
6969
reconstructed. Without this, messages would remain as dictionaries with
7070
'lc', 'type', and 'constructor' fields, causing errors when the application
7171
expects actual message objects with 'role' and 'content' attributes.
7272
73+
It also handles LangGraph Interrupt objects which serialize to {"value": ..., "resumable": ..., "ns": ..., "when": ...}
74+
and must be reconstructed to prevent AttributeError when accessing Interrupt attributes.
75+
7376
Args:
7477
obj: The object to potentially revive, which may be a dict, list, or primitive.
7578
7679
Returns:
77-
The revived object with LangChain objects properly reconstructed.
80+
The revived object with LangChain/LangGraph objects properly reconstructed.
7881
"""
7982
if isinstance(obj, dict):
8083
# Check if this is a LangChain serialized object
@@ -83,6 +86,33 @@ def _revive_if_needed(self, obj: Any) -> Any:
8386
# This converts {'lc': 1, 'type': 'constructor', ...} back to
8487
# the actual LangChain object (e.g., HumanMessage, AIMessage)
8588
return self._reviver(obj)
89+
90+
# Check if this is a serialized Interrupt object
91+
# Interrupt objects serialize to {"value": ..., "resumable": ..., "ns": ..., "when": ...}
92+
# This must be done before recursively processing to avoid losing the structure
93+
if (
94+
"value" in obj
95+
and "resumable" in obj
96+
and "when" in obj
97+
and len(obj) == 4
98+
and isinstance(obj.get("resumable"), bool)
99+
):
100+
# Try to reconstruct as an Interrupt object
101+
try:
102+
from langgraph.types import Interrupt
103+
104+
return Interrupt(
105+
value=self._revive_if_needed(obj["value"]),
106+
resumable=obj["resumable"],
107+
ns=obj["ns"],
108+
when=obj["when"],
109+
)
110+
except (ImportError, TypeError, ValueError) as e:
111+
# If we can't import or construct Interrupt, log and fall through
112+
logger.debug(
113+
"Failed to deserialize Interrupt object: %s", e, exc_info=True
114+
)
115+
86116
# Recursively process nested dicts
87117
return {k: self._revive_if_needed(v) for k, v in obj.items()}
88118
elif isinstance(obj, list):
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
"""
2+
Regression test for Issue #113: Interrupt objects not properly deserialized
3+
4+
When using interrupt() with RedisSaver, Interrupt objects are serialized to
5+
dictionaries but not reconstructed back to Interrupt objects on deserialization.
6+
7+
This causes AttributeError: 'dict' object has no attribute 'id' when trying
8+
to resume execution with Command(resume=...).
9+
10+
The error occurs in LangGraph's _pending_interrupts() method when it tries to
11+
access value[0].id, but value[0] is a dict instead of an Interrupt object.
12+
"""
13+
14+
import operator
15+
from typing import Annotated, TypedDict
16+
from uuid import uuid4
17+
18+
import pytest
19+
from langchain_core.messages import AnyMessage
20+
from langgraph.graph import END, START, StateGraph
21+
from langgraph.types import Command, Interrupt, interrupt
22+
23+
from langgraph.checkpoint.redis import RedisSaver
24+
25+
26+
class AgentState(TypedDict):
27+
"""State for the test agent."""
28+
29+
messages: Annotated[list[AnyMessage], operator.add]
30+
31+
32+
def review_node(state: AgentState):
33+
"""Node that interrupts for review."""
34+
random_str = str(uuid4())
35+
print(f"Generated string: {random_str}")
36+
print("-------- entry interrupt --------")
37+
38+
# This creates an Interrupt object that needs to be serialized
39+
user_input = interrupt({"test": "data"})
40+
41+
print(f"Received input: {user_input.get('test')}")
42+
print("-------- exit interrupt --------")
43+
return {"messages": [random_str]}
44+
45+
46+
def test_interrupt_serialization_roundtrip(redis_url: str) -> None:
47+
"""
48+
Test that Interrupt objects are properly serialized and deserialized.
49+
50+
This is a unit test that directly tests the serializer behavior.
51+
"""
52+
from langgraph.checkpoint.redis.jsonplus_redis import JsonPlusRedisSerializer
53+
54+
serializer = JsonPlusRedisSerializer()
55+
56+
# Create an Interrupt object
57+
original_interrupt = Interrupt(value={"test": "data"}, resumable=True)
58+
59+
# Serialize it
60+
serialized = serializer.dumps(original_interrupt)
61+
62+
# Deserialize it
63+
deserialized = serializer.loads(serialized)
64+
65+
# This should be an Interrupt object, not a dict
66+
assert isinstance(deserialized, Interrupt), (
67+
f"Expected Interrupt object, got {type(deserialized)}. "
68+
f"This causes AttributeError when LangGraph tries to access attributes"
69+
)
70+
assert deserialized.value == {"test": "data"}
71+
assert deserialized.resumable is True
72+
73+
74+
def test_interrupt_in_pending_sends(redis_url: str) -> None:
75+
"""
76+
Test that Interrupt objects in pending_sends are properly deserialized.
77+
78+
This tests the actual scenario from issue #113 where interrupts stored
79+
in checkpoint writes need to be reconstructed.
80+
"""
81+
from langgraph.checkpoint.redis.jsonplus_redis import JsonPlusRedisSerializer
82+
83+
serializer = JsonPlusRedisSerializer()
84+
85+
# Simulate what gets stored in pending_sends
86+
# In the real scenario, pending_sends contains tuples of (channel, value)
87+
# where value might be an Interrupt object
88+
pending_sends = [
89+
("__interrupt__", [Interrupt(value={"test": "data"}, resumable=False)]),
90+
("messages", ["some message"]),
91+
]
92+
93+
# Serialize the pending_sends
94+
serialized = serializer.dumps(pending_sends)
95+
96+
# Deserialize
97+
deserialized = serializer.loads(serialized)
98+
99+
# Check the structure
100+
assert isinstance(deserialized, list)
101+
assert len(deserialized) == 2
102+
103+
# The first item should have reconstructed Interrupt object
104+
channel, value = deserialized[0]
105+
assert channel == "__interrupt__"
106+
assert isinstance(value, list)
107+
assert len(value) == 1
108+
109+
# THIS IS THE CRITICAL CHECK - value[0] must be an Interrupt, not a dict
110+
assert isinstance(value[0], Interrupt), (
111+
f"Expected Interrupt object in pending_sends, got {type(value[0])}. "
112+
f"This is the root cause of 'dict' object has no attribute error"
113+
)
114+
assert value[0].value == {"test": "data"}
115+
assert value[0].resumable is False
116+
117+
118+
def test_interrupt_resume_workflow(redis_url: str) -> None:
119+
"""
120+
Integration test reproducing the exact scenario from issue #113.
121+
122+
This test should fail with AttributeError until the fix is implemented.
123+
"""
124+
with RedisSaver.from_conn_string(redis_url) as checkpointer:
125+
checkpointer.setup()
126+
127+
builder = StateGraph(AgentState)
128+
builder.add_node("review", review_node)
129+
builder.add_edge(START, "review")
130+
builder.add_edge("review", END)
131+
132+
graph = builder.compile(checkpointer=checkpointer)
133+
134+
# Use unique thread ID
135+
config = {"configurable": {"thread_id": f"test-interrupt-{uuid4()}"}}
136+
137+
# First invocation - should hit the interrupt
138+
initial = graph.invoke({}, config=config)
139+
print(f"Initial result: {initial}")
140+
141+
# Resume with Command - this is where the error occurs
142+
# The error happens because pending_sends contains dicts instead of Interrupt objects
143+
# When LangGraph tries to access Interrupt attributes
144+
# It fails because value[0] is {'value': ..., 'resumable': ..., 'ns': ..., 'when': ...} not Interrupt(...)
145+
final_state = graph.invoke(Command(resume={"test": "response"}), config=config)
146+
147+
# If we get here, the test passed
148+
assert "messages" in final_state
149+
print(f"Final messages: {final_state['messages']}")

0 commit comments

Comments
 (0)