Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions langgraph/checkpoint/redis/jsonplus_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,22 @@ def loads(self, data: bytes) -> Any:
return super().loads_typed(("json", data))

def _revive_if_needed(self, obj: Any) -> Any:
"""Recursively apply reviver to handle LangChain serialized objects.
"""Recursively apply reviver to handle LangChain and LangGraph serialized objects.

This method is crucial for preventing MESSAGE_COERCION_FAILURE by ensuring
that LangChain message objects stored in their serialized format are properly
reconstructed. Without this, messages would remain as dictionaries with
'lc', 'type', and 'constructor' fields, causing errors when the application
expects actual message objects with 'role' and 'content' attributes.

It also handles LangGraph Interrupt objects which serialize to {"value": ..., "resumable": ..., "ns": ..., "when": ...}
and must be reconstructed to prevent AttributeError when accessing Interrupt attributes.

Args:
obj: The object to potentially revive, which may be a dict, list, or primitive.

Returns:
The revived object with LangChain objects properly reconstructed.
The revived object with LangChain/LangGraph objects properly reconstructed.
"""
if isinstance(obj, dict):
# Check if this is a LangChain serialized object
Expand All @@ -83,6 +86,33 @@ def _revive_if_needed(self, obj: Any) -> Any:
# This converts {'lc': 1, 'type': 'constructor', ...} back to
# the actual LangChain object (e.g., HumanMessage, AIMessage)
return self._reviver(obj)

# Check if this is a serialized Interrupt object
# Interrupt objects serialize to {"value": ..., "resumable": ..., "ns": ..., "when": ...}
# This must be done before recursively processing to avoid losing the structure
if (
"value" in obj
and "resumable" in obj
and "when" in obj
and len(obj) == 4
and isinstance(obj.get("resumable"), bool)
):
# Try to reconstruct as an Interrupt object
try:
from langgraph.types import Interrupt

return Interrupt(
value=self._revive_if_needed(obj["value"]),
resumable=obj["resumable"],
ns=obj["ns"],
when=obj["when"],
)
except (ImportError, TypeError, ValueError) as e:
# If we can't import or construct Interrupt, log and fall through
logger.debug(
"Failed to deserialize Interrupt object: %s", e, exc_info=True
)

# Recursively process nested dicts
return {k: self._revive_if_needed(v) for k, v in obj.items()}
elif isinstance(obj, list):
Expand Down
149 changes: 149 additions & 0 deletions tests/test_issue_113_interrupt_serialization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
"""
Regression test for Issue #113: Interrupt objects not properly deserialized

When using interrupt() with RedisSaver, Interrupt objects are serialized to
dictionaries but not reconstructed back to Interrupt objects on deserialization.

This causes AttributeError: 'dict' object has no attribute 'id' when trying
to resume execution with Command(resume=...).

The error occurs in LangGraph's _pending_interrupts() method when it tries to
access value[0].id, but value[0] is a dict instead of an Interrupt object.
"""

import operator
from typing import Annotated, TypedDict
from uuid import uuid4

import pytest
from langchain_core.messages import AnyMessage
from langgraph.graph import END, START, StateGraph
from langgraph.types import Command, Interrupt, interrupt

from langgraph.checkpoint.redis import RedisSaver


class AgentState(TypedDict):
"""State for the test agent."""

messages: Annotated[list[AnyMessage], operator.add]


def review_node(state: AgentState):
"""Node that interrupts for review."""
random_str = str(uuid4())
print(f"Generated string: {random_str}")
print("-------- entry interrupt --------")

# This creates an Interrupt object that needs to be serialized
user_input = interrupt({"test": "data"})

print(f"Received input: {user_input.get('test')}")
print("-------- exit interrupt --------")
return {"messages": [random_str]}


def test_interrupt_serialization_roundtrip(redis_url: str) -> None:
"""
Test that Interrupt objects are properly serialized and deserialized.

This is a unit test that directly tests the serializer behavior.
"""
from langgraph.checkpoint.redis.jsonplus_redis import JsonPlusRedisSerializer

serializer = JsonPlusRedisSerializer()

# Create an Interrupt object
original_interrupt = Interrupt(value={"test": "data"}, resumable=True)

# Serialize it
serialized = serializer.dumps(original_interrupt)

# Deserialize it
deserialized = serializer.loads(serialized)

# This should be an Interrupt object, not a dict
assert isinstance(deserialized, Interrupt), (
f"Expected Interrupt object, got {type(deserialized)}. "
f"This causes AttributeError when LangGraph tries to access attributes"
)
assert deserialized.value == {"test": "data"}
assert deserialized.resumable is True


def test_interrupt_in_pending_sends(redis_url: str) -> None:
"""
Test that Interrupt objects in pending_sends are properly deserialized.

This tests the actual scenario from issue #113 where interrupts stored
in checkpoint writes need to be reconstructed.
"""
from langgraph.checkpoint.redis.jsonplus_redis import JsonPlusRedisSerializer

serializer = JsonPlusRedisSerializer()

# Simulate what gets stored in pending_sends
# In the real scenario, pending_sends contains tuples of (channel, value)
# where value might be an Interrupt object
pending_sends = [
("__interrupt__", [Interrupt(value={"test": "data"}, resumable=False)]),
("messages", ["some message"]),
]

# Serialize the pending_sends
serialized = serializer.dumps(pending_sends)

# Deserialize
deserialized = serializer.loads(serialized)

# Check the structure
assert isinstance(deserialized, list)
assert len(deserialized) == 2

# The first item should have reconstructed Interrupt object
channel, value = deserialized[0]
assert channel == "__interrupt__"
assert isinstance(value, list)
assert len(value) == 1

# THIS IS THE CRITICAL CHECK - value[0] must be an Interrupt, not a dict
assert isinstance(value[0], Interrupt), (
f"Expected Interrupt object in pending_sends, got {type(value[0])}. "
f"This is the root cause of 'dict' object has no attribute error"
)
assert value[0].value == {"test": "data"}
assert value[0].resumable is False


def test_interrupt_resume_workflow(redis_url: str) -> None:
"""
Integration test reproducing the exact scenario from issue #113.

This test should fail with AttributeError until the fix is implemented.
"""
with RedisSaver.from_conn_string(redis_url) as checkpointer:
checkpointer.setup()

builder = StateGraph(AgentState)
builder.add_node("review", review_node)
builder.add_edge(START, "review")
builder.add_edge("review", END)

graph = builder.compile(checkpointer=checkpointer)

# Use unique thread ID
config = {"configurable": {"thread_id": f"test-interrupt-{uuid4()}"}}

# First invocation - should hit the interrupt
initial = graph.invoke({}, config=config)
print(f"Initial result: {initial}")

# Resume with Command - this is where the error occurs
# The error happens because pending_sends contains dicts instead of Interrupt objects
# When LangGraph tries to access Interrupt attributes
# It fails because value[0] is {'value': ..., 'resumable': ..., 'ns': ..., 'when': ...} not Interrupt(...)
final_state = graph.invoke(Command(resume={"test": "response"}), config=config)

# If we get here, the test passed
assert "messages" in final_state
print(f"Final messages: {final_state['messages']}")