Skip to content

Commit 7942f18

Browse files
authored
fix(performance): improve recall and retain performance on large banks (#469)
1 parent 5aff8e0 commit 7942f18

23 files changed

+2036
-830
lines changed
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
"""Add partial indexes on memory_units temporal date fields for fast temporal retrieval
2+
3+
Revision ID: b3c4d5e6f7g8
4+
Revises: c1a2b3d4e5f6
5+
Create Date: 2026-03-02
6+
7+
The temporal retrieval entry-point query filters memory_units by occurred_start,
8+
occurred_end, and mentioned_at using OR conditions. Without dedicated indexes the
9+
planner falls back to a sequential scan of all bank rows after applying the
10+
(bank_id, fact_type) index, then re-checks each date field.
11+
12+
These three partial indexes give the planner bitmap-index scan options for the
13+
three most common date predicates, dramatically reducing the row set before any
14+
embedding computation is required.
15+
16+
All indexes are created CONCURRENTLY so the migration does not block writes on
17+
memory_units during production deployments. CONCURRENTLY requires running outside
18+
a transaction block; see migrations.py for how this is handled safely.
19+
"""
20+
21+
from collections.abc import Sequence
22+
23+
from alembic import context, op
24+
25+
revision: str = "b3c4d5e6f7g8"
26+
down_revision: str | Sequence[str] | None = "c1a2b3d4e5f6"
27+
branch_labels: str | Sequence[str] | None = None
28+
depends_on: str | Sequence[str] | None = None
29+
30+
31+
def _get_schema_prefix() -> str:
32+
schema = context.config.get_main_option("target_schema")
33+
return f'"{schema}".' if schema else ""
34+
35+
36+
def upgrade() -> None:
37+
schema = _get_schema_prefix()
38+
# Partial index on occurred_start (covers "occurred_start BETWEEN $4 AND $5")
39+
op.execute("COMMIT")
40+
op.execute(
41+
f"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_memory_units_bank_occurred_start "
42+
f"ON {schema}memory_units(bank_id, fact_type, occurred_start) "
43+
f"WHERE occurred_start IS NOT NULL"
44+
)
45+
# Partial index on occurred_end (covers "occurred_end BETWEEN $4 AND $5")
46+
op.execute("COMMIT")
47+
op.execute(
48+
f"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_memory_units_bank_occurred_end "
49+
f"ON {schema}memory_units(bank_id, fact_type, occurred_end) "
50+
f"WHERE occurred_end IS NOT NULL"
51+
)
52+
# Partial index on mentioned_at (covers "mentioned_at BETWEEN $4 AND $5")
53+
op.execute("COMMIT")
54+
op.execute(
55+
f"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_memory_units_bank_mentioned_at "
56+
f"ON {schema}memory_units(bank_id, fact_type, mentioned_at) "
57+
f"WHERE mentioned_at IS NOT NULL"
58+
)
59+
60+
61+
def downgrade() -> None:
62+
schema = _get_schema_prefix()
63+
op.execute("COMMIT")
64+
op.execute(f"DROP INDEX CONCURRENTLY IF EXISTS {schema}idx_memory_units_bank_mentioned_at")
65+
op.execute("COMMIT")
66+
op.execute(f"DROP INDEX CONCURRENTLY IF EXISTS {schema}idx_memory_units_bank_occurred_end")
67+
op.execute("COMMIT")
68+
op.execute(f"DROP INDEX CONCURRENTLY IF EXISTS {schema}idx_memory_units_bank_occurred_start")
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
"""Enable pg_trgm extension and add GIN trigram index on entities.canonical_name
2+
3+
Revision ID: c1a2b3d4e5f6
4+
Revises: b4c5d6e7f8a9
5+
Create Date: 2026-03-02
6+
7+
Index is created CONCURRENTLY so the migration does not block writes on entities
8+
during production deployments. CONCURRENTLY requires running outside a transaction
9+
block; see migrations.py for how this is handled safely.
10+
"""
11+
12+
from collections.abc import Sequence
13+
14+
from alembic import context, op
15+
16+
revision: str = "c1a2b3d4e5f6"
17+
down_revision: str | Sequence[str] | None = "b4c5d6e7f8a9"
18+
branch_labels: str | Sequence[str] | None = None
19+
depends_on: str | Sequence[str] | None = None
20+
21+
22+
def _get_schema_prefix() -> str:
23+
schema = context.config.get_main_option("target_schema")
24+
return f'"{schema}".' if schema else ""
25+
26+
27+
def upgrade() -> None:
28+
# pg_trgm ships with every standard PostgreSQL installation as a contrib module.
29+
# It enables fast similarity lookups via GIN indexes, used for entity name matching.
30+
op.execute("CREATE EXTENSION IF NOT EXISTS pg_trgm")
31+
32+
schema = _get_schema_prefix()
33+
# GIN index on canonical_name enables sub-millisecond trigram similarity queries
34+
# (% operator, similarity()) instead of full-table scans across all bank entities.
35+
op.execute("COMMIT")
36+
op.execute(
37+
f"CREATE INDEX CONCURRENTLY IF NOT EXISTS entities_canonical_name_trgm_idx "
38+
f"ON {schema}entities USING GIN (canonical_name gin_trgm_ops)"
39+
)
40+
41+
42+
def downgrade() -> None:
43+
schema = _get_schema_prefix()
44+
op.execute("COMMIT")
45+
op.execute(f"DROP INDEX CONCURRENTLY IF EXISTS {schema}entities_canonical_name_trgm_idx")
46+
# Note: not dropping pg_trgm extension as other indexes may depend on it
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
"""Add covering and composite indexes to speed up link expansion graph retrieval.
2+
3+
Two indexes target the two bottlenecks identified by EXPLAIN ANALYZE on a 17M-row
4+
memory_links table:
5+
6+
1. idx_memory_links_to_type_weight (to_unit_id, link_type, weight DESC)
7+
The semantic incoming direction — finding facts that consider seeds as their
8+
nearest neighbour — currently hits an expensive BitmapAnd of two separate
9+
bitmap scans (to_unit_id bitmap ∩ link_type bitmap). A composite index
10+
on (to_unit_id, link_type) turns this into a single index scan and reduces
11+
latency from ~36 ms to < 5 ms per query.
12+
13+
2. idx_memory_links_entity_covering (from_unit_id) INCLUDE (to_unit_id, entity_id)
14+
WHERE link_type = 'entity'
15+
The entity co-occurrence expansion uses COUNT(DISTINCT ml.entity_id) and
16+
joins on ml.to_unit_id. Without a covering index the planner must read
17+
~2 500 heap pages to fetch entity_id and to_unit_id after the bitmap index
18+
scan, adding ~230 ms of random I/O. INCLUDE adds those two columns to the
19+
index leaf pages so the entire query can be served from the index (index-only
20+
scan), eliminating the heap reads entirely.
21+
Partial index (WHERE link_type = 'entity') keeps index size ~40 % smaller.
22+
23+
Both indexes are created with CONCURRENTLY so the migration does not block
24+
concurrent reads or writes on memory_links. CONCURRENTLY requires running
25+
outside a transaction block, so the migration emits an explicit COMMIT before
26+
each statement and uses IF NOT EXISTS for idempotency.
27+
28+
Revision ID: d2e3f4a5b6c7
29+
Revises: b3c4d5e6f7g8
30+
Create Date: 2026-03-02
31+
"""
32+
33+
from collections.abc import Sequence
34+
35+
from alembic import context, op
36+
37+
revision: str = "d2e3f4a5b6c7"
38+
down_revision: str | Sequence[str] | None = "b3c4d5e6f7g8"
39+
branch_labels: str | Sequence[str] | None = None
40+
depends_on: str | Sequence[str] | None = None
41+
42+
43+
def _get_schema_prefix() -> str:
44+
schema = context.config.get_main_option("target_schema")
45+
return f'"{schema}".' if schema else ""
46+
47+
48+
def upgrade() -> None:
49+
schema = _get_schema_prefix()
50+
51+
# CREATE INDEX CONCURRENTLY cannot run inside a transaction block.
52+
# Commit the current Alembic transaction, then issue each CONCURRENTLY
53+
# statement in its own implicit autocommit transaction.
54+
# IF NOT EXISTS makes each statement idempotent if the migration is retried.
55+
56+
# Index for the semantic *incoming* direction in link_expansion_retrieval.py.
57+
# Replaces the BitmapAnd of idx_memory_links_to_unit ∩ idx_memory_links_link_type
58+
# with a single composite index scan.
59+
op.execute("COMMIT")
60+
op.execute(
61+
f"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_memory_links_to_type_weight "
62+
f"ON {schema}memory_links(to_unit_id, link_type, weight DESC)"
63+
)
64+
65+
# Covering index for entity co-occurrence expansion.
66+
# Enables an index-only scan: entity_id and to_unit_id are read from the
67+
# index leaf pages instead of the heap, eliminating ~2 500 random heap-page
68+
# reads per expansion query.
69+
op.execute("COMMIT")
70+
op.execute(
71+
f"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_memory_links_entity_covering "
72+
f"ON {schema}memory_links(from_unit_id) "
73+
f"INCLUDE (to_unit_id, entity_id) "
74+
f"WHERE link_type = 'entity'"
75+
)
76+
77+
78+
def downgrade() -> None:
79+
schema = _get_schema_prefix()
80+
op.execute("COMMIT")
81+
op.execute(f"DROP INDEX CONCURRENTLY IF EXISTS {schema}idx_memory_links_entity_covering")
82+
op.execute("COMMIT")
83+
op.execute(f"DROP INDEX CONCURRENTLY IF EXISTS {schema}idx_memory_links_to_type_weight")

hindsight-api/hindsight_api/api/http.py

Lines changed: 26 additions & 142 deletions
Original file line numberDiff line numberDiff line change
@@ -2385,148 +2385,32 @@ async def api_stats(
23852385
):
23862386
"""Get statistics about memory nodes and links for a memory bank."""
23872387
try:
2388-
# Authenticate and set tenant schema
2389-
await app.state.memory._authenticate_tenant(request_context)
2390-
if app.state.memory._operation_validator:
2391-
from hindsight_api.extensions import BankReadContext
2392-
2393-
ctx = BankReadContext(bank_id=bank_id, operation="get_bank_stats", request_context=request_context)
2394-
await app.state.memory._validate_operation(
2395-
app.state.memory._operation_validator.validate_bank_read(ctx)
2396-
)
2397-
pool = await app.state.memory._get_pool()
2398-
async with acquire_with_retry(pool) as conn:
2399-
# Get node counts by fact_type
2400-
node_stats = await conn.fetch(
2401-
f"""
2402-
SELECT fact_type, COUNT(*) as count
2403-
FROM {fq_table("memory_units")}
2404-
WHERE bank_id = $1
2405-
GROUP BY fact_type
2406-
""",
2407-
bank_id,
2408-
)
2409-
2410-
# Get link counts by link_type
2411-
link_stats = await conn.fetch(
2412-
f"""
2413-
SELECT ml.link_type, COUNT(*) as count
2414-
FROM {fq_table("memory_links")} ml
2415-
JOIN {fq_table("memory_units")} mu ON ml.from_unit_id = mu.id
2416-
WHERE mu.bank_id = $1
2417-
GROUP BY ml.link_type
2418-
""",
2419-
bank_id,
2420-
)
2421-
2422-
# Get link counts by fact_type (from nodes)
2423-
link_fact_type_stats = await conn.fetch(
2424-
f"""
2425-
SELECT mu.fact_type, COUNT(*) as count
2426-
FROM {fq_table("memory_links")} ml
2427-
JOIN {fq_table("memory_units")} mu ON ml.from_unit_id = mu.id
2428-
WHERE mu.bank_id = $1
2429-
GROUP BY mu.fact_type
2430-
""",
2431-
bank_id,
2432-
)
2433-
2434-
# Get link counts by fact_type AND link_type
2435-
link_breakdown_stats = await conn.fetch(
2436-
f"""
2437-
SELECT mu.fact_type, ml.link_type, COUNT(*) as count
2438-
FROM {fq_table("memory_links")} ml
2439-
JOIN {fq_table("memory_units")} mu ON ml.from_unit_id = mu.id
2440-
WHERE mu.bank_id = $1
2441-
GROUP BY mu.fact_type, ml.link_type
2442-
""",
2443-
bank_id,
2444-
)
2445-
2446-
# Get pending and failed operations counts
2447-
ops_stats = await conn.fetch(
2448-
f"""
2449-
SELECT status, COUNT(*) as count
2450-
FROM {fq_table("async_operations")}
2451-
WHERE bank_id = $1
2452-
GROUP BY status
2453-
""",
2454-
bank_id,
2455-
)
2456-
ops_by_status = {row["status"]: row["count"] for row in ops_stats}
2457-
pending_operations = ops_by_status.get("pending", 0)
2458-
failed_operations = ops_by_status.get("failed", 0)
2459-
2460-
# Get document count
2461-
doc_count_result = await conn.fetchrow(
2462-
f"""
2463-
SELECT COUNT(*) as count
2464-
FROM {fq_table("documents")}
2465-
WHERE bank_id = $1
2466-
""",
2467-
bank_id,
2468-
)
2469-
total_documents = doc_count_result["count"] if doc_count_result else 0
2470-
2471-
# Get consolidation stats from memory-level tracking
2472-
consolidation_stats = await conn.fetchrow(
2473-
f"""
2474-
SELECT
2475-
MAX(consolidated_at) as last_consolidated_at,
2476-
COUNT(*) FILTER (WHERE consolidated_at IS NULL AND fact_type IN ('experience', 'world')) as pending
2477-
FROM {fq_table("memory_units")}
2478-
WHERE bank_id = $1
2479-
""",
2480-
bank_id,
2481-
)
2482-
last_consolidated_at = consolidation_stats["last_consolidated_at"] if consolidation_stats else None
2483-
pending_consolidation = consolidation_stats["pending"] if consolidation_stats else 0
2484-
2485-
# Count total observations (consolidated knowledge)
2486-
observation_count_result = await conn.fetchrow(
2487-
f"""
2488-
SELECT COUNT(*) as count
2489-
FROM {fq_table("memory_units")}
2490-
WHERE bank_id = $1 AND fact_type = 'observation'
2491-
""",
2492-
bank_id,
2493-
)
2494-
total_observations = observation_count_result["count"] if observation_count_result else 0
2495-
2496-
# Format results
2497-
nodes_by_type = {row["fact_type"]: row["count"] for row in node_stats}
2498-
links_by_type = {row["link_type"]: row["count"] for row in link_stats}
2499-
links_by_fact_type = {row["fact_type"]: row["count"] for row in link_fact_type_stats}
2500-
2501-
# Build detailed breakdown: {fact_type: {link_type: count}}
2502-
links_breakdown = {}
2503-
for row in link_breakdown_stats:
2504-
fact_type = row["fact_type"]
2505-
link_type = row["link_type"]
2506-
count = row["count"]
2507-
if fact_type not in links_breakdown:
2508-
links_breakdown[fact_type] = {}
2509-
links_breakdown[fact_type][link_type] = count
2510-
2511-
total_nodes = sum(nodes_by_type.values())
2512-
total_links = sum(links_by_type.values())
2513-
2514-
return BankStatsResponse(
2515-
bank_id=bank_id,
2516-
total_nodes=total_nodes,
2517-
total_links=total_links,
2518-
total_documents=total_documents,
2519-
nodes_by_fact_type=nodes_by_type,
2520-
links_by_link_type=links_by_type,
2521-
links_by_fact_type=links_by_fact_type,
2522-
links_breakdown=links_breakdown,
2523-
pending_operations=pending_operations,
2524-
failed_operations=failed_operations,
2525-
last_consolidated_at=(last_consolidated_at.isoformat() if last_consolidated_at else None),
2526-
pending_consolidation=pending_consolidation,
2527-
total_observations=total_observations,
2528-
)
2529-
2388+
stats = await app.state.memory.get_bank_stats(bank_id, request_context=request_context)
2389+
nodes_by_type = stats["node_counts"]
2390+
links_by_type = stats["link_counts"]
2391+
links_by_fact_type = stats["link_counts_by_fact_type"]
2392+
links_breakdown: dict[str, dict[str, int]] = {}
2393+
for row in stats["link_breakdown"]:
2394+
ft = row["fact_type"]
2395+
if ft not in links_breakdown:
2396+
links_breakdown[ft] = {}
2397+
links_breakdown[ft][row["link_type"]] = row["count"]
2398+
ops = stats["operations"]
2399+
return BankStatsResponse(
2400+
bank_id=bank_id,
2401+
total_nodes=sum(nodes_by_type.values()),
2402+
total_links=sum(links_by_type.values()),
2403+
total_documents=stats["total_documents"],
2404+
nodes_by_fact_type=nodes_by_type,
2405+
links_by_link_type=links_by_type,
2406+
links_by_fact_type=links_by_fact_type,
2407+
links_breakdown=links_breakdown,
2408+
pending_operations=ops.get("pending", 0),
2409+
failed_operations=ops.get("failed", 0),
2410+
last_consolidated_at=stats["last_consolidated_at"],
2411+
pending_consolidation=stats["pending_consolidation"],
2412+
total_observations=stats["total_observations"],
2413+
)
25302414
except OperationValidationError as e:
25312415
raise HTTPException(status_code=e.status_code, detail=e.reason)
25322416
except (AuthenticationError, HTTPException):

0 commit comments

Comments
 (0)