perf(learning): bounded-concurrency persist in transcript ingestion#3234
Conversation
Transcript ingestion persisted every kept candidate and reflection in a sequential for-loop. A single transcript can yield dozens of items, and each persist is a markdown write + SQLite tx + an embedding round-trip, so they ran back-to-back on the background ingest job. Drive both persist loops through buffer_unordered(PERSIST_CONCURRENCY=8) so their network/disk waits overlap, finishing the job sooner, while capping in-flight requests so a large transcript can't open an unbounded number of concurrent embed calls against the provider. Per-item Ok/Err accounting is preserved (each future logs its own error and yields 1/0, summed by fold). The futures are collected into a Vec before buffer_unordered: mapping lazily on the stream stores the closure in the polled state and requires it to hold for any lifetime (HRTB), which fails to compile once the ingest future is spawned (Send + 'static. Adds a regression test that drives 10 distinct candidates (> the bound) and asserts the mock store's observed peak concurrency stays within [2, PERSIST_CONCURRENCY] — proving the persists genuinely overlap yet stay bounded. EOF )
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughThis PR replaces sequential persistence of kept candidates and reflections with bounded concurrent persistence (PERSIST_CONCURRENCY) via futures streams and buffer_unordered, and adds test instrumentation to measure and assert concurrency behavior. ChangesTranscript Persistence Concurrency
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
src/openhuman/learning/transcript_ingest/mod.rs (1)
135-170: ⚡ Quick winAdd transcript correlation fields to the concurrent failure logs.
Up to
PERSIST_CONCURRENCYpersist warnings can now interleave, but these branches don't carry the transcript context needed to tie a failure back to a specific ingest. Please include stable fields likethreadandpathon both warnings, especially the reflection branch.As per coding guidelines, "In Rust, default to verbose diagnostics on new/changed flows using `log`/`tracing` at `debug`/`trace` levels with stable grep-friendly prefixes and correlation fields".🪵 Suggested logging tweak
+ let thread_log = thread_id.as_deref().unwrap_or("-"); + let path_log = path_display.as_str(); + let candidate_futs: Vec<_> = kept .iter() .map(|candidate| async move { match persist::store_candidate(memory, candidate).await { Ok(()) => 1usize, Err(err) => { log::warn!( - "[transcript_ingest] failed to persist candidate kind={:?} importance={:?}: {err}", + "[transcript_ingest] failed to persist candidate thread={} path={} kind={:?} importance={:?}: {err}", + thread_log, + path_log, candidate.kind, candidate.importance ); 0usize } } }) .collect(); @@ Ok(()) => 1usize, Err(err) => { - log::warn!("[transcript_ingest] failed to persist reflection: {err}"); + log::warn!( + "[transcript_ingest] failed to persist reflection thread={} path={}: {err}", + thread_log, + path_log + ); 0usize } } }) .collect();🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/openhuman/learning/transcript_ingest/mod.rs` around lines 135 - 170, The concurrent persist failure logs in the async closures wrapping persist::store_candidate and persist::store_reflection lack transcript correlation fields; update both log lines to include stable correlation fields (e.g., thread and path) and other relevant identifiers (retain candidate.kind and candidate.importance for the candidate branch, and include reflection id/marker for reflections) — also lower the level to debug/trace per diagnostics guidelines if appropriate; locate the warn calls inside the closures mapping kept and kept_reflections and augment the log invocation to include thread and path fields.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@src/openhuman/learning/transcript_ingest/mod.rs`:
- Around line 135-170: The concurrent persist failure logs in the async closures
wrapping persist::store_candidate and persist::store_reflection lack transcript
correlation fields; update both log lines to include stable correlation fields
(e.g., thread and path) and other relevant identifiers (retain candidate.kind
and candidate.importance for the candidate branch, and include reflection
id/marker for reflections) — also lower the level to debug/trace per diagnostics
guidelines if appropriate; locate the warn calls inside the closures mapping
kept and kept_reflections and augment the log invocation to include thread and
path fields.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: a277d27b-a224-4315-bf57-91faa33e4091
📒 Files selected for processing (2)
src/openhuman/learning/transcript_ingest/mod.rssrc/openhuman/learning/transcript_ingest/tests.rs
…logs Address CodeRabbit nitpick: the per-item warn! logs in the bounded-concurrency persist closures lacked transcript correlation. Add stable thread/path fields (plus reflection theme) so background-ingest failures can be traced to the source conversation, per the repo debug-logging rule.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/openhuman/learning/transcript_ingest/mod.rs`:
- Around line 139-140: The logs currently emit unredacted transcript-derived
fields—replace use of path_display/path_label and thread_id with a non-sensitive
label (e.g., compute and log only the file basename or a fixed placeholder via
thread_id.as_deref().map(|_| "redacted") ) instead of the full path, and do not
log reflection.theme verbatim; either drop it from the warn logs or replace it
with a hashed/blurred value (e.g., hash the theme string) before emitting.
Update places referencing path_label/thread_id and reflection.theme (variables
thread_id, path_display, path_label, and reflection.theme in the transcript
ingestion flow) to apply redaction or hashing consistently for all warn logs.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 7439e7fc-5a67-48f5-9873-ea4ec3d9480f
📒 Files selected for processing (1)
src/openhuman/learning/transcript_ingest/mod.rs
Address CodeRabbit: the per-item warn! logs emitted the full transcript path (leaks the home directory) and reflection.theme verbatim (conversational content). Log the transcript basename instead of the full path and drop the theme, keeping only non-sensitive enum fields (kind/importance), per the repo "never log full PII" rule.
Summary
The background transcript-ingestion job (
ingest_session_transcript) persists each kept memory candidate and each kept reflection with a sequential.awaitin twoforloops. A single transcript routinely yields dozens of candidates, and every persist is a markdown write + SQLite tx + an embedding round-trip. The SQLite connection mutex is not held across that embed.await(the lock is taken after the embed resolves), so these independent writes can genuinely overlap their network/disk waits.This PR replaces the two sequential loops with bounded-concurrency fan-out using
futures::stream::iter(...).buffer_unordered(PERSIST_CONCURRENCY), so up toPERSIST_CONCURRENCY(8) persists are in flight at once instead of one-at-a-time.This is a background job (spawned off the chat path via
Agent::spawn_transcript_ingestion), so it's not interactive latency — but the job finishes meaningfully sooner on a transcript that yields many candidates, freeing the runtime quicker. Honest magnitude: modest, and only on multi-candidate transcripts; a single-candidate transcript is unchanged.Why bounded (not unbounded
join_all)join_allover N candidates would open N concurrent embedding requests at once — a transcript yielding dozens of items would fan out dozens of simultaneous provider round-trips.buffer_unordered(8)caps the in-flight count so a large transcript can't stampede the embedding provider, while still overlapping enough waits to win.Implementation note (HRTB)
The per-item futures are collected into a
Vecbefore being handed tobuffer_unordered. Mapping lazily on the stream (stream::iter(it.map(|c| async move {...}))) stores the closure in the polled state and requires it to satisfy a higher-ranked lifetime bound, which fails to compile once the whole ingest future is spawned (Send + 'static) — "FnOnce is not general enough". Collecting runs each closure up front, so the stream only carries already-built futures with concrete lifetimes. This is documented inline.Behavior preserved
log::warn!messages, same fields) and yields1on success /0on failure; thefoldsums them intostored/stored_reflections. Order was already irrelevant — only the success count matters.IngestionReportshape are unchanged.Test plan
ingest_persists_candidates_with_bounded_concurrency: 10 distinct preferences (all survive dedupe → all reach persist). The in-memoryMemorymock tracks live in-flightstorecalls and a high-water mark; the test assertspeak <= PERSIST_CONCURRENCY(bound holds) andpeak >= 2(persists genuinely overlap, so the bound isn't vacuously true). The mock yields several times before taking its sync lock so sibling futures actually interleave.cargo check --manifest-path Cargo.toml— clean.bash scripts/test-rust-with-mock.sh --lib transcript_ingest— 13 passed, 0 failed.Summary by CodeRabbit
Performance Improvements
Tests