Skip to content

Commit cbb8fc6

Browse files
authored
fix: retain async with timestamp might fails (#253)
1 parent c33b9b8 commit cbb8fc6

File tree

4 files changed

+113
-10
lines changed

4 files changed

+113
-10
lines changed

hindsight-api/hindsight_api/engine/retain/fact_extraction.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -759,6 +759,10 @@ async def _extract_facts_from_chunk(
759759

760760
# Build user message with metadata and chunk content in a clear format
761761
# Format event_date with day of week for better temporal reasoning
762+
# Handle both datetime objects and ISO string formats (from deserialized async tasks)
763+
from .orchestrator import parse_datetime_flexible
764+
765+
event_date = parse_datetime_flexible(event_date)
762766
event_date_formatted = event_date.strftime("%A, %B %d, %Y") # e.g., "Monday, June 10, 2024"
763767
user_message = f"""Extract facts from the following text chunk.
764768
@@ -1346,6 +1350,8 @@ def _add_temporal_offsets(facts: list[ExtractedFactType], contents: list[RetainC
13461350
13471351
Modifies facts in place.
13481352
"""
1353+
from .orchestrator import parse_datetime_flexible
1354+
13491355
# Group facts by content_index
13501356
current_content_idx = 0
13511357
content_fact_start = 0
@@ -1360,10 +1366,10 @@ def _add_temporal_offsets(facts: list[ExtractedFactType], contents: list[RetainC
13601366
fact_position = i - content_fact_start
13611367
offset = timedelta(seconds=fact_position * SECONDS_PER_FACT)
13621368

1363-
# Apply offset to all temporal fields
1369+
# Apply offset to all temporal fields (handle both datetime objects and ISO strings)
13641370
if fact.occurred_start:
1365-
fact.occurred_start = fact.occurred_start + offset
1371+
fact.occurred_start = parse_datetime_flexible(fact.occurred_start) + offset
13661372
if fact.occurred_end:
1367-
fact.occurred_end = fact.occurred_end + offset
1373+
fact.occurred_end = parse_datetime_flexible(fact.occurred_end) + offset
13681374
if fact.mentioned_at:
1369-
fact.mentioned_at = fact.mentioned_at + offset
1375+
fact.mentioned_at = parse_datetime_flexible(fact.mentioned_at) + offset

hindsight-api/hindsight_api/engine/retain/orchestrator.py

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import time
99
import uuid
1010
from datetime import UTC, datetime
11+
from typing import Any
1112

1213
from ..db_utils import acquire_with_retry
1314
from . import bank_utils
@@ -18,6 +19,39 @@ def utcnow():
1819
return datetime.now(UTC)
1920

2021

22+
def parse_datetime_flexible(value: Any) -> datetime:
23+
"""
24+
Parse a datetime value that could be either a datetime object or an ISO string.
25+
26+
This handles datetime values from both direct Python calls and deserialized JSON
27+
(where datetime objects are serialized as ISO strings).
28+
29+
Args:
30+
value: Either a datetime object or an ISO format string
31+
32+
Returns:
33+
datetime object (timezone-aware)
34+
35+
Raises:
36+
TypeError: If value is neither datetime nor string
37+
ValueError: If string is not a valid ISO datetime
38+
"""
39+
if isinstance(value, datetime):
40+
# Ensure timezone-aware
41+
if value.tzinfo is None:
42+
return value.replace(tzinfo=UTC)
43+
return value
44+
elif isinstance(value, str):
45+
# Parse ISO format string (handles both 'Z' and '+00:00' timezone formats)
46+
dt = datetime.fromisoformat(value.replace("Z", "+00:00"))
47+
# Ensure timezone-aware
48+
if dt.tzinfo is None:
49+
return dt.replace(tzinfo=UTC)
50+
return dt
51+
else:
52+
raise TypeError(f"Expected datetime or string, got {type(value).__name__}")
53+
54+
2155
from ..response_models import TokenUsage
2256
from . import (
2357
chunk_storage,
@@ -89,10 +123,18 @@ async def retain_batch(
89123
# Merge item-level tags with document-level tags
90124
item_tags = item.get("tags", []) or []
91125
merged_tags = list(set(item_tags + (document_tags or [])))
126+
127+
# Handle event_date: parse flexibly (handles both datetime objects and ISO strings)
128+
event_date_value = item.get("event_date")
129+
if event_date_value:
130+
event_date_value = parse_datetime_flexible(event_date_value)
131+
else:
132+
event_date_value = utcnow()
133+
92134
content = RetainContent(
93135
content=item["content"],
94136
context=item.get("context", ""),
95-
event_date=item.get("event_date") or utcnow(),
137+
event_date=event_date_value,
96138
metadata=item.get("metadata", {}),
97139
entities=item.get("entities", []),
98140
tags=merged_tags,

hindsight-api/tests/test_http_api_integration.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1174,3 +1174,58 @@ async def test_retain_with_multiple_timestamps(api_client, test_bank_id):
11741174
data = response.json()
11751175
assert data["success"] is True
11761176
assert data["items_count"] == 3
1177+
1178+
1179+
@pytest.mark.asyncio
1180+
async def test_retain_with_timestamp_async_complete_processing(api_client, test_bank_id):
1181+
"""Test that async retain with timestamp completes full processing including fact extraction."""
1182+
# Submit async retain with timestamp
1183+
response = await api_client.post(
1184+
f"/v1/default/banks/{test_bank_id}/memories",
1185+
json={
1186+
"items": [
1187+
{
1188+
"content": "The quarterly meeting was held on January 30th 2026",
1189+
"context": "meetings",
1190+
"timestamp": "2026-01-30T11:45:00Z"
1191+
}
1192+
],
1193+
"async": True
1194+
}
1195+
)
1196+
1197+
assert response.status_code == 200, f"Expected 200, got {response.status_code}: {response.text}"
1198+
data = response.json()
1199+
assert data["success"] is True
1200+
assert data["async"] is True
1201+
operation_id = data["operation_id"]
1202+
1203+
# Wait for async processing to complete (poll operation status)
1204+
max_wait_seconds = 30
1205+
poll_interval = 0.5
1206+
elapsed = 0
1207+
operation_completed = False
1208+
1209+
while elapsed < max_wait_seconds:
1210+
response = await api_client.get(f"/v1/default/banks/{test_bank_id}/operations/{operation_id}")
1211+
if response.status_code == 200:
1212+
op_status = response.json()
1213+
if op_status.get("status") == "completed":
1214+
operation_completed = True
1215+
break
1216+
elif op_status.get("status") == "failed":
1217+
raise AssertionError(f"Operation failed: {op_status.get('error_message')}")
1218+
1219+
await asyncio.sleep(poll_interval)
1220+
elapsed += poll_interval
1221+
1222+
assert operation_completed, f"Async operation did not complete within {max_wait_seconds} seconds"
1223+
1224+
# Verify memories were actually stored
1225+
response = await api_client.get(
1226+
f"/v1/default/banks/{test_bank_id}/memories/list",
1227+
params={"limit": 10}
1228+
)
1229+
assert response.status_code == 200
1230+
items = response.json()["items"]
1231+
assert len(items) > 0, "Should have stored memories after async processing"

uv.lock

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)