-
Notifications
You must be signed in to change notification settings - Fork 30
Closed
Description
Problem
When using AsyncRedisSaver as the checkpointer backend, calling
aget_state_history() (or any function that internally calls _abatch_load_pending_sends)
raises the following error:
File "/.venv/lib/python3.13/site-packages/langgraph/pregel/main.py", line 1409, in aget_state_history
for checkpoint_tuple in [
^
...<4 lines>...
]:
^
File "/.venv/lib/python3.13/site-packages/langgraph/checkpoint/redis/aio.py", line 788, in alist
pending_sends_map = await self._abatch_load_pending_sends(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
pending_sends_batch_keys
^^^^^^^^^^^^^^^^^^^^^^^^
)
^
File "/.venv/lib/python3.13/site-packages/langgraph/checkpoint/redis/aio.py", line 1759, in _abatch_load_pending_sends
results_map[batch_key] = [(d.type, d.blob) for d in sorted_docs]
^^^^^^
AttributeError: 'Document' object has no attribute 'blob'Proposed Fix
The issue can be resolved by applying the same logic as in the sync version of the Redis saver:
# langgraph/checkpoint/redis/__init__.py#L1253
batch_query = FilterQuery(
filter_expression=thread_filter
& ns_filter
& checkpoint_filter
& channel_filter,
return_fields=[
"checkpoint_id",
"type",
"$.blob",
"task_path",
"task_id",
"idx",
],
num_results=1000, # Increased limit for batch loading
)
batch_results = await self.checkpoint_writes_index.search(batch_query)
# Group results by parent checkpoint ID
writes_by_checkpoint: Dict[str, List[Any]] = {}
for doc in batch_results.docs:
parent_checkpoint_id = from_storage_safe_id(doc.checkpoint_id)
if parent_checkpoint_id not in writes_by_checkpoint:
writes_by_checkpoint[parent_checkpoint_id] = []
writes_by_checkpoint[parent_checkpoint_id].append(doc)
# Sort and format results for each parent checkpoint
for parent_checkpoint_id in parent_checkpoint_ids:
batch_key = (thread_id, checkpoint_ns, parent_checkpoint_id)
writes = writes_by_checkpoint.get(parent_checkpoint_id, [])
# Sort results by task_path, task_id, idx
sorted_writes = sorted(
writes,
key=lambda x: (
getattr(x, "task_path", ""),
getattr(x, "task_id", ""),
getattr(x, "idx", 0),
),
)
# Extract type and blob pairs
# Handle both direct attribute access and JSON path access
results_map[batch_key] = [
(
getattr(doc, "type", ""),
getattr(doc, "$.blob", getattr(doc, "blob", b"")),
)
for doc in sorted_writes
]I’ve already tested this locally with the proposed change, and it resolves the issue without side effects.
If possible, I’ll open a pull request soon to contribute the fix.
Metadata
Metadata
Assignees
Labels
No labels