Skip to content

Commit d9877c1

Browse files
committed
Indexing: Deduplicate replay events to cut CPU
- Replay (Phase 1 of `run_replay_event_loop`) now deduplicates FSEvents by normalized path before processing, same pattern as the live event loop - Events accumulate in a `HashMap` and flush every 1,000 raw events via `flush_replay_batch()` - High-churn files (SQLite journals, browser caches) that generated hundreds of identical events per second now collapse to a single `symlink_metadata()` + `resolve_path()` call per batch - Added `REPLAY_DEDUP_BATCH_SIZE` constant (1,000) matching the existing `UpdateLastEventId` cadence - Logs dedup ratio after replay completes (for example, "deduplicated 10000 raw events to 71 unique (99% reduction)") - Four new tests covering single-path collapse, multi-path preservation, mixed event merging, and realistic event storm scenarios
1 parent 0f28b51 commit d9877c1

2 files changed

Lines changed: 294 additions & 20 deletions

File tree

apps/desktop/src-tauri/src/indexing/CLAUDE.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ Key test files are alongside each module (test functions within `#[cfg(test)]` b
147147

148148
**FSEvents `item_removed` must be verified against disk**: macOS FSEvents can deliver `item_removed` for paths that still exist (atomic file swaps by editors/git, coalesced events with OR'd flags, `merge_fs_events` discarding `item_created` when `item_removed` is present). `handle_removal()` stats the path before deleting: if the file exists, it delegates to `handle_creation_or_modification()` (upsert) instead. Without this, false removals progressively delete live entries from the DB — especially damaging for directories since `DeleteSubtreeById` is recursive. `handle_creation_or_modification()` already has the inverse pattern: if stat fails, it deletes.
149149

150-
**Live events are deduplicated and batched with a 1s window**: Both `run_live_event_loop` and the Phase 3 live loop in `run_replay_event_loop` (both in `event_loop.rs`) collect incoming events into a `HashMap<String, FsChangeEvent>` keyed by normalized path. On each 1s flush tick, only the deduplicated set is processed through `process_live_event`. `merge_fs_events` keeps the most significant flags when events collide: `must_scan_sub_dirs` always wins, then `removed`, then `created`, then `modified`. `UpdateLastEventId` is sent once per batch (in `process_live_batch`) instead of per-event, reducing writer channel pressure during event storms.
150+
**Events are deduplicated and batched in all modes**: Live events (both `run_live_event_loop` and Phase 3 of `run_replay_event_loop`) use a 1s flush window. Replay events (Phase 1 of `run_replay_event_loop`) use `REPLAY_DEDUP_BATCH_SIZE` (1,000 events). Both collect into a `HashMap<String, FsChangeEvent>` keyed by normalized path and flush via `merge_fs_events`. Flag priority: `must_scan_sub_dirs` always wins, then `removed`, then `created`, then `modified`. `UpdateLastEventId` is sent once per batch. The replay dedup is critical for performance: high-churn files (SQLite journals, browser caches) can generate hundreds of identical FSEvents per second; without dedup, each event triggers a `symlink_metadata()` syscall and a `resolve_path()` component walk.
151151

152152
**Writer-side delete-with-propagation**: Both path-keyed (`DeleteEntry`/`DeleteSubtree`) and integer-keyed (`DeleteEntryById`/`DeleteSubtreeById`) handlers in the writer automatically read old data before deleting and propagate accurate negative deltas. The integer-keyed variants use `propagate_delta_by_id` which walks the `parent_id` chain via `get_parent_id` lookups. This means every deletion -- replay, live, verification -- gets correct dir_stats updates without callers needing to send separate `PropagateDelta` messages.
153153

apps/desktop/src-tauri/src/indexing/event_loop.rs

Lines changed: 293 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,13 @@ const MAX_PENDING_RESCANS: usize = 1_000;
4949
/// FDA was toggled and the app suddenly sees millions of previously hidden paths.
5050
const REPLAY_EVENT_COUNT_LIMIT: u64 = 1_000_000;
5151

52+
/// Replay events are deduplicated by normalized path in batches of this
53+
/// size before processing. Dramatically reduces CPU when the FSEvents
54+
/// journal contains many duplicate events for the same path (for example,
55+
/// SQLite journal files, browser cache). Matches the `UpdateLastEventId`
56+
/// batching cadence.
57+
const REPLAY_DEDUP_BATCH_SIZE: u64 = 1_000;
58+
5259
/// Configuration for a replay event loop.
5360
pub(super) struct ReplayConfig {
5461
pub(super) volume_id: String,
@@ -320,6 +327,11 @@ pub(super) async fn run_replay_event_loop(
320327

321328
// ── Phase 1: Replay (before HistoryDone) ─────────────────────────
322329

330+
// Deduplicate events by normalized path before processing, same as
331+
// the live event loop. Flushed every REPLAY_DEDUP_BATCH_SIZE events.
332+
let mut replay_pending = HashMap::<String, watcher::FsChangeEvent>::new();
333+
let mut deduped_total = 0u64;
334+
323335
while let Some(event) = event_rx.recv().await {
324336
// Check for journal gap on the first event
325337
if !first_event_checked {
@@ -353,6 +365,15 @@ pub(super) async fn run_replay_event_loop(
353365
if event.flags.history_done {
354366
log::info!("Replay: HistoryDone received after {event_count} events");
355367

368+
// Flush remaining deduplicated events before leaving Phase 1
369+
deduped_total += flush_replay_batch(
370+
&mut replay_pending,
371+
&conn,
372+
&writer,
373+
&mut affected_paths,
374+
&mut affected_paths_overflow,
375+
) as u64;
376+
356377
// Process the HistoryDone event itself (it may carry other flags)
357378
if let Some(paths) = reconciler::process_fs_event(&event, &conn, &writer)
358379
&& !affected_paths_overflow
@@ -385,20 +406,20 @@ pub(super) async fn run_replay_event_loop(
385406
continue;
386407
}
387408

388-
// Process event and collect affected paths
389-
if let Some(paths) = reconciler::process_fs_event(&event, &conn, &writer)
390-
&& !affected_paths_overflow
391-
{
392-
affected_paths.extend(paths);
393-
if affected_paths.len() >= MAX_AFFECTED_PATHS {
394-
log::warn!(
395-
"Replay: affected paths cap reached ({MAX_AFFECTED_PATHS}). \
396-
Will emit a full refresh notification instead of individual paths."
397-
);
398-
affected_paths_overflow = true;
399-
affected_paths.clear();
400-
}
401-
}
409+
// Accumulate into dedup buffer instead of processing immediately.
410+
// Same pattern as the live event loop: normalize path, merge flags.
411+
let normalized = firmlinks::normalize_path(&event.path);
412+
let deduped_event = watcher::FsChangeEvent {
413+
path: normalized.clone(),
414+
event_id: event.event_id,
415+
flags: event.flags.clone(),
416+
};
417+
replay_pending
418+
.entry(normalized)
419+
.and_modify(|existing| {
420+
*existing = merge_fs_events(existing, &deduped_event);
421+
})
422+
.or_insert(deduped_event);
402423

403424
last_event_id = event.event_id;
404425
event_count += 1;
@@ -423,11 +444,20 @@ pub(super) async fn run_replay_event_loop(
423444
return Ok(());
424445
}
425446

426-
// Batch UpdateLastEventId every 1000 events (reduces writer load ~10x)
427-
if event_count.is_multiple_of(1000)
428-
&& let Err(e) = writer.send(WriteMessage::UpdateLastEventId(last_event_id))
429-
{
430-
log::warn!("Replay: UpdateLastEventId send failed: {e}");
447+
// Flush dedup buffer and batch UpdateLastEventId
448+
if event_count.is_multiple_of(REPLAY_DEDUP_BATCH_SIZE) {
449+
deduped_total += flush_replay_batch(
450+
&mut replay_pending,
451+
&conn,
452+
&writer,
453+
&mut affected_paths,
454+
&mut affected_paths_overflow,
455+
) as u64;
456+
if last_event_id > since_event_id
457+
&& let Err(e) = writer.send(WriteMessage::UpdateLastEventId(last_event_id))
458+
{
459+
log::warn!("Replay: UpdateLastEventId send failed: {e}");
460+
}
431461
}
432462

433463
// Emit progress every 500ms during replay
@@ -451,6 +481,14 @@ pub(super) async fn run_replay_event_loop(
451481

452482
// ── Phase 2: After HistoryDone ───────────────────────────────────
453483

484+
if deduped_total < event_count {
485+
log::info!(
486+
"Replay: deduplicated {event_count} raw events to {deduped_total} unique \
487+
({:.0}% reduction)",
488+
(1.0 - deduped_total as f64 / event_count.max(1) as f64) * 100.0,
489+
);
490+
}
491+
454492
// Send final UpdateLastEventId
455493
if last_event_id > since_event_id
456494
&& let Err(e) = writer.send(WriteMessage::UpdateLastEventId(last_event_id))
@@ -750,6 +788,35 @@ pub(super) async fn run_background_verification(affected_paths: HashSet<String>,
750788

751789
// ── Helpers ──────────────────────────────────────────────────────────
752790

791+
/// Drain the replay dedup buffer, process each event through the
792+
/// reconciler, and collect affected paths. Returns the number of
793+
/// deduplicated events processed.
794+
fn flush_replay_batch(
795+
pending: &mut HashMap<String, watcher::FsChangeEvent>,
796+
conn: &Connection,
797+
writer: &IndexWriter,
798+
affected_paths: &mut HashSet<String>,
799+
affected_paths_overflow: &mut bool,
800+
) -> usize {
801+
let count = pending.len();
802+
for (_path, event) in pending.drain() {
803+
if let Some(paths) = reconciler::process_fs_event(&event, conn, writer)
804+
&& !*affected_paths_overflow
805+
{
806+
affected_paths.extend(paths);
807+
if affected_paths.len() >= MAX_AFFECTED_PATHS {
808+
log::warn!(
809+
"Replay: affected paths cap reached ({MAX_AFFECTED_PATHS}). \
810+
Will emit a full refresh notification instead of individual paths."
811+
);
812+
*affected_paths_overflow = true;
813+
affected_paths.clear();
814+
}
815+
}
816+
}
817+
count
818+
}
819+
753820
/// Result of `verify_affected_dirs`.
754821
struct VerifyResult {
755822
/// Entries in DB but not on disk (deleted).
@@ -1332,4 +1399,211 @@ mod tests {
13321399
));
13331400
assert_eq!(reconciler.buffer_len(), 0, "buffer_event should be no-op in live mode");
13341401
}
1402+
1403+
// ── Replay dedup tests ───────────────────────────────────────────
1404+
1405+
/// Replay dedup: 500 removal events for the same path (like a SQLite
1406+
/// journal file) collapse to a single merged event.
1407+
#[test]
1408+
fn replay_dedup_collapses_duplicate_events() {
1409+
let mut pending = HashMap::<String, watcher::FsChangeEvent>::new();
1410+
1411+
for i in 0..500 {
1412+
let path = "/Users/test/Library/peewee-sqlite.db-journal".to_string();
1413+
let event = make_event(
1414+
&path,
1415+
1000 + i,
1416+
watcher::FsEventFlags {
1417+
item_removed: true,
1418+
item_is_file: true,
1419+
..Default::default()
1420+
},
1421+
);
1422+
pending
1423+
.entry(path)
1424+
.and_modify(|existing| {
1425+
*existing = merge_fs_events(existing, &event);
1426+
})
1427+
.or_insert(event);
1428+
}
1429+
1430+
assert_eq!(pending.len(), 1, "500 events for same path should collapse to 1");
1431+
let merged = pending.values().next().unwrap();
1432+
assert_eq!(merged.event_id, 1499, "highest event_id should be kept");
1433+
assert!(merged.flags.item_removed, "item_removed flag should be preserved");
1434+
}
1435+
1436+
/// Replay dedup: events for different paths are all preserved while
1437+
/// duplicates within each path are merged.
1438+
#[test]
1439+
fn replay_dedup_preserves_distinct_paths_merges_duplicates() {
1440+
let mut pending = HashMap::<String, watcher::FsChangeEvent>::new();
1441+
1442+
// 100 events: 10 paths x 10 events each
1443+
for path_idx in 0..10u64 {
1444+
for event_idx in 0..10u64 {
1445+
let path = format!("/path/{path_idx}/file.txt");
1446+
let event = make_event(
1447+
&path,
1448+
path_idx * 10 + event_idx,
1449+
watcher::FsEventFlags {
1450+
item_modified: true,
1451+
item_is_file: true,
1452+
..Default::default()
1453+
},
1454+
);
1455+
pending
1456+
.entry(path)
1457+
.and_modify(|existing| {
1458+
*existing = merge_fs_events(existing, &event);
1459+
})
1460+
.or_insert(event);
1461+
}
1462+
}
1463+
1464+
assert_eq!(pending.len(), 10, "10 unique paths should be preserved");
1465+
for path_idx in 0..10u64 {
1466+
let path = format!("/path/{path_idx}/file.txt");
1467+
let event = &pending[&path];
1468+
assert_eq!(
1469+
event.event_id,
1470+
path_idx * 10 + 9,
1471+
"each path should keep its highest event_id"
1472+
);
1473+
}
1474+
}
1475+
1476+
/// Replay dedup: mixed create/modify/remove events for the same path
1477+
/// merge with correct flag priority (removed wins).
1478+
#[test]
1479+
fn replay_dedup_mixed_events_merge_correctly() {
1480+
let mut pending = HashMap::<String, watcher::FsChangeEvent>::new();
1481+
let path = "/test/file.txt".to_string();
1482+
1483+
let events = [
1484+
(
1485+
1,
1486+
watcher::FsEventFlags {
1487+
item_created: true,
1488+
item_is_file: true,
1489+
..Default::default()
1490+
},
1491+
),
1492+
(
1493+
2,
1494+
watcher::FsEventFlags {
1495+
item_modified: true,
1496+
item_is_file: true,
1497+
..Default::default()
1498+
},
1499+
),
1500+
(
1501+
3,
1502+
watcher::FsEventFlags {
1503+
item_modified: true,
1504+
item_is_file: true,
1505+
..Default::default()
1506+
},
1507+
),
1508+
(
1509+
4,
1510+
watcher::FsEventFlags {
1511+
item_removed: true,
1512+
item_is_file: true,
1513+
..Default::default()
1514+
},
1515+
),
1516+
];
1517+
1518+
for (id, flags) in events {
1519+
let event = make_event(&path, id, flags);
1520+
pending
1521+
.entry(path.clone())
1522+
.and_modify(|existing| {
1523+
*existing = merge_fs_events(existing, &event);
1524+
})
1525+
.or_insert(event);
1526+
}
1527+
1528+
assert_eq!(pending.len(), 1);
1529+
let merged = &pending[&path];
1530+
assert!(merged.flags.item_removed, "removed should win over created+modified");
1531+
assert!(
1532+
!merged.flags.item_created,
1533+
"created should be dropped when removed wins"
1534+
);
1535+
assert_eq!(merged.event_id, 4, "highest event_id should be kept");
1536+
}
1537+
1538+
/// Replay dedup: simulates realistic event storm with a mix of high-churn
1539+
/// paths (SQLite journals, Chrome cache) and unique paths. Verifies the
1540+
/// dedup ratio matches expectations.
1541+
#[test]
1542+
fn replay_dedup_realistic_event_storm() {
1543+
let mut pending = HashMap::<String, watcher::FsChangeEvent>::new();
1544+
let mut raw_count = 0u64;
1545+
1546+
// 500 events for a SQLite journal (same path, rapid create/delete)
1547+
for i in 0..500 {
1548+
let path = "/Users/test/Library/aw-server/peewee-sqlite.db-journal".to_string();
1549+
let event = make_event(
1550+
&path,
1551+
i,
1552+
watcher::FsEventFlags {
1553+
item_removed: true,
1554+
item_is_file: true,
1555+
..Default::default()
1556+
},
1557+
);
1558+
pending
1559+
.entry(path)
1560+
.and_modify(|e| *e = merge_fs_events(e, &event))
1561+
.or_insert(event);
1562+
raw_count += 1;
1563+
}
1564+
1565+
// 200 events for Chrome cache (20 different todelete_ files, 10 events each)
1566+
for file_idx in 0..20 {
1567+
for event_idx in 0..10 {
1568+
let path = format!("/Users/test/Library/Chrome/todelete_{file_idx:04x}");
1569+
let event = make_event(
1570+
&path,
1571+
500 + file_idx * 10 + event_idx,
1572+
watcher::FsEventFlags {
1573+
item_removed: true,
1574+
item_is_file: true,
1575+
..Default::default()
1576+
},
1577+
);
1578+
pending
1579+
.entry(path)
1580+
.and_modify(|e| *e = merge_fs_events(e, &event))
1581+
.or_insert(event);
1582+
raw_count += 1;
1583+
}
1584+
}
1585+
1586+
// 50 unique file modifications (no duplicates)
1587+
for i in 0..50 {
1588+
let path = format!("/Users/test/projects/file_{i}.rs");
1589+
let event = make_event(
1590+
&path,
1591+
700 + i,
1592+
watcher::FsEventFlags {
1593+
item_modified: true,
1594+
item_is_file: true,
1595+
..Default::default()
1596+
},
1597+
);
1598+
pending
1599+
.entry(path)
1600+
.and_modify(|e| *e = merge_fs_events(e, &event))
1601+
.or_insert(event);
1602+
raw_count += 1;
1603+
}
1604+
1605+
assert_eq!(raw_count, 750, "should have 750 raw events");
1606+
// 1 (journal) + 20 (chrome) + 50 (unique) = 71 unique paths
1607+
assert_eq!(pending.len(), 71, "should deduplicate to 71 unique paths");
1608+
}
13351609
}

0 commit comments

Comments
 (0)