fix(memory/ingestion): bound the job channel + reject submits at cap (#2442)#2444
fix(memory/ingestion): bound the job channel + reject submits at cap (#2442)#2444obchain wants to merge 2 commits into
Conversation
Producers can DoS the core today by calling `put_doc` or `store_skill_sync` faster than the worker drains; the channel was `mpsc::unbounded_channel`, so a runaway loop would grow the in-flight buffer (each `IngestionJob` owns a full document body) until the process OOMs. Switch the channel to `mpsc::channel(DEFAULT_QUEUE_CAPACITY)` (512) and change `submit` from `tx.send()` to `tx.try_send()`. The two drop reasons (`Full`, `Closed`) are logged with distinct messages so observability can tell over-pressure apart from worker shutdown. Both paths roll `IngestionState::dequeue()` back so the `memory_ingestion_status` queue-depth gauge stays accurate under sustained overflow. `start_worker_with_capacity` is exposed (in addition to `start_worker_with_state`, which now delegates with the default cap) so unit tests can drive the at-capacity branch deterministically without faking a slow worker. Tests added in the same file: capacity enforcement, recovery after drain, channel-closed drop accounting, and a guardrail on `DEFAULT_QUEUE_CAPACITY` so future bumps don't regress the memory-ceiling intent. Closes tinyhumansai#2442
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughThis PR converts the unbounded ingestion queue to a bounded channel with ChangesBounded Ingestion Queue
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Suggested labels
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
src/openhuman/memory/ingestion/queue.rs (1)
96-113: ⚡ Quick winInclude
document_idin the drop-path warnings.These warnings are the only breadcrumb when extraction is skipped after the document was already upserted, so they should carry the stable ID too.
♻️ Suggested log enrichment
log::warn!( - "[memory:ingestion_queue] dropping job: queue at capacity ({} pending) namespace={} title={}", + "[memory:ingestion_queue] dropping job: queue at capacity ({} pending) doc_id={} namespace={} title={}", self.tx.max_capacity(), + dropped.document_id, dropped.document.namespace, dropped.document.title, ); @@ log::warn!( - "[memory:ingestion_queue] dropping job: worker channel closed (shutdown?) namespace={} title={}", + "[memory:ingestion_queue] dropping job: worker channel closed (shutdown?) doc_id={} namespace={} title={}", + dropped.document_id, dropped.document.namespace, dropped.document.title, );As per coding guidelines, "Use structured, grep-friendly context with stable prefixes ... and include correlation fields such as request IDs, method names, and entity IDs when available."
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/openhuman/memory/ingestion/queue.rs` around lines 96 - 113, Update the two warning logs in the ingestion queue drop paths to include the stable document ID: add dropped.document.document_id to the formatted message and arguments for both the capacity-full branch and the Closed branch (the code around self.tx.max_capacity() and the self.state.dequeue() branch). Ensure the same field name (dropped.document.document_id) is used in both log lines so the warnings include namespace, title, and the stable document_id for tracing.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/openhuman/memory/ingestion/queue.rs`:
- Around line 153-161: The function start_worker_with_capacity currently calls
tokio::sync::mpsc::channel(capacity) which will panic if capacity is 0; add an
explicit guard at the start of start_worker_with_capacity to check for capacity
== 0 and return/fail fast with a clear error message (e.g.,
panic!("start_worker_with_capacity: capacity must be >= 1") or convert to a
Result and return an Err) before calling mpsc::channel, so callers of
UnifiedMemory/IngestionQueue receive a clear diagnostic instead of an internal
Tokio panic.
---
Nitpick comments:
In `@src/openhuman/memory/ingestion/queue.rs`:
- Around line 96-113: Update the two warning logs in the ingestion queue drop
paths to include the stable document ID: add dropped.document.document_id to the
formatted message and arguments for both the capacity-full branch and the Closed
branch (the code around self.tx.max_capacity() and the self.state.dequeue()
branch). Ensure the same field name (dropped.document.document_id) is used in
both log lines so the warnings include namespace, title, and the stable
document_id for tracing.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: cb6f2cdc-71aa-4d4f-8bf9-74674130f331
📒 Files selected for processing (1)
src/openhuman/memory/ingestion/queue.rs
graycyrus
left a comment
There was a problem hiding this comment.
Clean fix — well scoped, well tested, addresses a real robustness gap.
What this does
Bounds the ingestion job channel from mpsc::unbounded_channel → mpsc::channel(512) and switches submit to non-blocking try_send with distinct Full vs Closed logging. The worker-side drain and the IngestionState accounting are both handled correctly on both drop paths. Four new tests exercise the capacity-bound, drain-recovery, worker-gone, and constant-guardrail branches.
| Area | Files | Verdict |
|---|---|---|
| Rust core (memory/ingestion) | queue.rs |
✅ |
Notes
- CodeRabbit already flagged the zero-capacity guard on
start_worker_with_capacity— agree that's worth adding,tokio::sync::mpsc::channel(0)panics. DEFAULT_QUEUE_CAPACITY = 512is a reasonable middle ground; the doc comment justifying the number is appreciated.- No producer signature changes, no cross-cutting breakage —
put_docandstore_skill_synccontinue to callsubmit()unchanged. - The
enqueue()/dequeue()accounting is correctly balanced on all three paths (success, full, closed).
Nothing else to flag beyond what CodeRabbit caught. Nice work.
… doc_id on drop CodeRabbit on tinyhumansai#2444 flagged two follow-ups: 1. `tokio::sync::mpsc::channel(0)` panics with a cryptic Tokio-internal message ("mpsc bounded channel requires buffer > 0"). Add an explicit `assert!(capacity > 0, …)` in `start_worker_with_capacity` so misuse surfaces a clear, grep-friendly message at the call site instead of looking like a Tokio bug. New `#[should_panic]` test `start_worker_rejects_zero_capacity` pins the contract. 2. The drop-path warn logs now include `doc_id` alongside `namespace` and `title` so each warn line is a stable breadcrumb back to the upserted document whose graph-extraction follow-up was skipped. 5/5 tests pass; cargo fmt + check clean.
|
Pushed
|
Summary
mpsc::unbounded_channelin the memory ingestion queue withmpsc::channel(DEFAULT_QUEUE_CAPACITY)(512).IngestionQueue::submitfromtx.send()totx.try_send(); distinguishFullfromClosedin the warn-level log so observability can tell over-pressure apart from worker shutdown.IngestionState::dequeue()back on both drop paths so thememory_ingestion_statusqueue-depth gauge stays accurate under sustained overflow.start_worker_with_capacityso unit tests can drive the at-capacity branch deterministically without faking a slow worker.DEFAULT_QUEUE_CAPACITYceiling.Problem
src/openhuman/memory/ingestion/queue.rs:106onmain(6137b67) built the job channel withmpsc::unbounded_channel. The worker (ingestion_workerin the same file) drains one job at a time under theIngestionState::acquire()singleton lock because the local extraction LLM cannot run concurrently — per-job work is on the order of seconds-to-minutes depending on doc size + model.Two producer sites push without backpressure:
src/openhuman/memory/store/client.rs:152—put_docsrc/openhuman/memory/store/client.rs:266—store_skill_syncBoth increment
IngestionState::enqueue()and callIngestionQueue::submit(job).submitalready handled the "worker gone" path (SendError) but the channel itself had no capacity bound, so a buggy / misconfigured / hostile producer that submits faster than the worker can drain grows the in-flight buffer indefinitely (eachIngestionJobowns a fullNamespaceDocumentInput— title, body, metadata) until the process OOMs.Not exploitable across a trust boundary, but it is a robustness gap: a runaway skill, a misconfigured Composio sync, or an agent re-ingesting the same source on every tick can DoS the local core with no user-visible warning.
Solution
Three-step fix, all in
src/openhuman/memory/ingestion/queue.rs:mpsc::unbounded_channel→mpsc::channel(DEFAULT_QUEUE_CAPACITY).DEFAULT_QUEUE_CAPACITY = 512keeps the in-flight buffer comfortably under ~50 MB at typical doc sizes (1–100 KB), while still absorbing reasonable bulk-import bursts (Notion workspace backfill, large Slack history).tx.send(job)→tx.try_send(job)(non-blocking). TheFullandClosedvariants are logged distinctly so observability can tell over-pressure apart from worker shutdown. Both callstate.dequeue()so the queue-depth gauge does not drift upward.start_worker_with_capacity(memory, state, capacity)is exposed for tests;start_worker_with_statedelegates with the default cap so callers see no signature change.No producer signature changes —
put_docandstore_skill_synccontinue to callsubmit()exactly as before.Submission Checklist
DEFAULT_QUEUE_CAPACITYguardrail.submit,start_worker_with_capacity, and the constant guardrail is exercised by a dedicated test.## Related— N/A: no matrix row affected.Closes #NNNin the## Relatedsection.Impact
try_sendis cheaper thansend(no async wait) and bounded channels have the same fast path as unbounded for non-full sends.submitreturnsfalsefor the overflow jobs instead of silently buffering them. Callers (put_doc,store_skill_sync) already ignore the return value, so the observable change is "dropped + logged" instead of "buffered until OOM". The underlying document upsert that ran beforesubmitis unaffected — only the graph-extraction follow-up is skipped.Related
feat: background ingestion queue for memory graph extraction).AI Authored PR Metadata (required for Codex/Linear PRs)
Linear Issue
Commit & Branch
fix/2442-bounded-ingestion-queueValidation Run
pnpm --filter openhuman-app format:check— N/A: no frontend changes.pnpm typecheck— N/A: no TypeScript changes.cargo test --lib -- ingestion::queue(4/4 pass).cargo fmt --all --checkclean;cargo check --manifest-path Cargo.tomlclean (warnings unrelated to this change).Validation Blocked
command:N/Aerror:N/Aimpact:N/ABehavior Changes
memory_ingestion_statusreflects the bounded depth instead of growing without limit.Parity Contract
submitstill returnstrueon accepted enqueue,falseon any drop; existingput_doc/store_skill_synccall sites continue to ignore the return value exactly as before; worker loop semantics are unchanged.IngestionState::enqueue/dequeueaccounting still matches the worker'sacquire/dequeuepairing — the drop paths add a matchingdequeue()so the gauge is correct.Duplicate / Superseded PR Handling
Summary by CodeRabbit