Skip to content

Commit 68be3ab

Browse files
committed
Drive indexing: four bugs fixed
1. Replay flush hang — Micro-scans during replay sent nested transactions that broke the writer. Fixed by adding replay_active suppression flag to MicroScanManager. 2. Live events silently dropped — cmdr-fsevent-stream used strict from_bits() which rejected events with unknown macOS flag bits. Changed to from_bits_truncate(). 3. Micro-scans permanently suppressed after replay — set_replay_active(false) was placed after the infinite Phase 3 loop. Moved it inside run_replay_event_loop before Phase 3 starts. 4. delete_subtree taking 14+ seconds per call — LIKE queries on a 5M-row table caused full table scans. Converted to range queries (path > prefix/ AND path < prefix0) that use the PRIMARY KEY index. Result: replay dropped from 5+ minutes to 2.3 seconds.
1 parent 5e10fa9 commit 68be3ab

8 files changed

Lines changed: 930 additions & 126 deletions

File tree

apps/desktop/src-tauri/src/indexing/CLAUDE.md

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@ Full design: `docs/specs/drive-indexing/plan.md`
1010

1111
- **mod.rs** -- Public API: `init()`, `start_indexing()`, `stop_indexing()`, `clear_index()`, `enrich_entries_with_index()`. `IndexManager` coordinates all subsystems. Global read-only store for enrichment.
1212
- **store.rs** -- SQLite schema (entries, dir_stats, meta), read queries (`get_dir_stats_batch`, `get_index_status`), DB open/migrate. Schema version check: mismatch triggers drop+rebuild.
13-
- **writer.rs** -- Single writer thread, owns the write connection, processes `WriteMessage` channel (bounded mpsc). Priority: `UpdateDirStats` before `InsertEntries`.
13+
- **writer.rs** -- Single writer thread, owns the write connection, processes `WriteMessage` channel (unbounded mpsc). Priority: `UpdateDirStats` before `InsertEntries`. `Flush` variant + async `flush()` method let callers wait for all prior writes to commit.
1414
- **scanner.rs** -- jwalk-based parallel directory walker. `scan_volume()` for full scan, `scan_subtree()` for micro-scans. Exclusion filter for macOS system paths. Physical sizes (`st_blocks * 512`).
1515
- **micro_scan.rs** -- `MicroScanManager`: bounded task pool (default 3 concurrent), priority queue (`UserSelected` > `CurrentDir`), deduplication, cancellation. Skips after full scan completes.
1616
- **aggregator.rs** -- Dir stats computation. Bottom-up after full scan (O(N) single pass), per-subtree after micro-scan, incremental delta propagation up ancestor chain for watcher events.
1717
- **watcher.rs** -- Drive-level FSEvents watcher via `cmdr-fsevent-stream`. File-level events with event IDs. Supports `sinceWhen` for cold-start replay.
18-
- **reconciler.rs** -- Buffers FSEvents during scan, replays after scan completes using event IDs to skip stale events. Processes live events for file creates/removes/modifies.
18+
- **reconciler.rs** -- Buffers FSEvents during scan, replays after scan completes using event IDs to skip stale events. Processes live events for file creates/removes/modifies. Key functions (`process_fs_event`, `emit_dir_updated`) are `pub(super)` so `mod.rs` can call them directly during cold-start replay.
1919
- **firmlinks.rs** -- Parses `/usr/share/firmlinks`, builds prefix map, normalizes paths. Converts `/System/Volumes/Data/Users/foo` to `/Users/foo`.
2020
- **verifier.rs** -- Placeholder for per-navigation background readdir diff (future milestone).
2121

@@ -39,7 +39,8 @@ Full scan:
3939
|-- On complete: replay buffered events (reconciler), compute all aggregates, switch to live mode
4040
|
4141
Live mode:
42-
|-- FSEvents -> reconciler -> UpsertEntry/DeleteEntry/PropagateDelta -> writer -> SQLite
42+
|-- FSEvents -> reconciler -> UpsertEntry/DeleteEntry/DeleteSubtree -> writer (auto-propagates deltas) -> SQLite
43+
|-- Affected paths batched in HashSet, flushed to frontend every 300 ms via index-dir-updated event
4344
|
4445
Enrichment (every get_file_range call):
4546
|-- enrich_entries_with_index() -> batch SELECT from dir_stats -> populate FileEntry fields
@@ -90,6 +91,14 @@ Key test files are alongside each module (test functions within `#[cfg(test)]` b
9091

9192
## Gotchas
9293

94+
**Cold-start replay uses two-phase flush**: The `run_replay_event_loop` doesn't emit `index-dir-updated` during Phase 1 (replay). It collects affected paths, flushes the writer (ensuring all writes are committed), then emits a single batched notification. This prevents the frontend from reading stale data.
95+
96+
**Live events are batched with a 300 ms window**: Both `run_live_event_loop` and the Phase 3 live loop in `run_replay_event_loop` use `tokio::select!` with a 300 ms `tokio::time::interval` to collect affected paths in a `HashSet` and emit a single `index-dir-updated` per flush. This prevents UI flicker from rapid per-event notifications (FSEvents can fire hundreds of events per second during bulk operations). `process_live_event` collects paths into the caller's `HashSet` instead of emitting directly.
97+
98+
**Writer-side delete-with-propagation**: `DeleteEntry` and `DeleteSubtree` handlers in the writer automatically read old data before deleting and propagate accurate negative deltas. This means every deletion -- replay, live, verification -- gets correct dir_stats updates without callers needing to send separate `PropagateDelta` messages. `delete_subtree` and `propagate_delta` have no internal transactions, so they're safe inside the replay's `BEGIN IMMEDIATE` transaction.
99+
100+
**Post-replay verification is bidirectional**: `verify_affected_dirs` checks both directions: (1) stale entries in DB but not on disk (sends `DeleteEntry`/`DeleteSubtree`), and (2) missing entries on disk but not in DB (sends `UpsertEntry` + `PropagateDelta` for files, collects directory paths for `scan_subtree`). New directories are scanned and their subtree totals propagated up the ancestor chain. The `GLOBAL_INDEX_STORE` mutex guard is scoped to avoid holding it across `.await` points (the guard is not `Send`).
101+
93102
**Schema version mismatch drops the DB**: If `schema_version` in meta doesn't match what the code expects, the entire DB is deleted and rebuilt. No migration path (it's a cache, not user data).
94103

95104
**`verifier.rs` is a placeholder**: Per-navigation readdir diff is a future milestone. Currently just a TODO comment.

apps/desktop/src-tauri/src/indexing/aggregator.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,20 +47,21 @@ pub fn compute_subtree_aggregates(conn: &Connection, root: &str) -> Result<u64,
4747
/// Propagate a size/count delta up the ancestor chain.
4848
///
4949
/// Called when a file is added, removed, or modified. Walks from the parent of the
50-
/// given path up to the root, updating each ancestor's `dir_stats` in a single transaction.
50+
/// given path up to the root, updating each ancestor's `dir_stats`.
51+
///
52+
/// No internal transaction: safe to call inside an outer `BEGIN IMMEDIATE`
53+
/// (replay) or as standalone statements (live mode auto-commits).
5154
pub fn propagate_delta(
5255
conn: &Connection,
5356
path: &str,
5457
size_delta: i64,
5558
file_count_delta: i32,
5659
dir_count_delta: i32,
5760
) -> Result<(), IndexStoreError> {
58-
let tx = conn.unchecked_transaction()?;
59-
6061
let mut current = parent_path(path);
6162
while let Some(ancestor) = current {
6263
// Try to read existing stats
63-
let mut stmt = tx.prepare_cached(
64+
let mut stmt = conn.prepare_cached(
6465
"SELECT recursive_size, recursive_file_count, recursive_dir_count
6566
FROM dir_stats WHERE path = ?1",
6667
)?;
@@ -83,7 +84,7 @@ pub fn propagate_delta(
8384
),
8485
};
8586

86-
tx.execute(
87+
conn.execute(
8788
"INSERT OR REPLACE INTO dir_stats
8889
(path, recursive_size, recursive_file_count, recursive_dir_count)
8990
VALUES (?1, ?2, ?3, ?4)",
@@ -93,7 +94,6 @@ pub fn propagate_delta(
9394
current = parent_path(&ancestor);
9495
}
9596

96-
tx.commit()?;
9797
Ok(())
9898
}
9999

@@ -174,8 +174,7 @@ fn compute_aggregates_for_dirs(conn: &Connection, dirs: &[String]) -> Result<u64
174174

175175
for (i, dir_path) in sorted.iter().enumerate() {
176176
// Look up pre-computed direct children stats
177-
let (file_size_sum, file_count, child_dir_count) =
178-
direct_stats.get(*dir_path).copied().unwrap_or((0, 0, 0));
177+
let (file_size_sum, file_count, child_dir_count) = direct_stats.get(*dir_path).copied().unwrap_or((0, 0, 0));
179178

180179
let mut recursive_size = file_size_sum;
181180
let mut recursive_file_count = file_count;

apps/desktop/src-tauri/src/indexing/micro_scan.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ struct MicroScanManagerInner {
5050
#[derive(Clone)]
5151
pub struct MicroScanManager {
5252
inner: Arc<tokio::sync::Mutex<MicroScanManagerInner>>,
53+
/// When true, `request_scan` returns immediately without spawning any scan.
54+
/// Set during cold-start replay to prevent micro-scans from sending writes
55+
/// into the writer's active `BEGIN IMMEDIATE` transaction.
56+
replay_active: Arc<AtomicBool>,
5357
}
5458

5559
impl MicroScanManager {
@@ -67,9 +71,15 @@ impl MicroScanManager {
6771
queue: VecDeque::new(),
6872
writer,
6973
})),
74+
replay_active: Arc::new(AtomicBool::new(false)),
7075
}
7176
}
7277

78+
/// Suppress all micro-scans. Called before cold-start replay begins.
79+
pub fn set_replay_active(&self, active: bool) {
80+
self.replay_active.store(active, Ordering::SeqCst);
81+
}
82+
7383
/// Request a micro-scan for `path` at the given priority.
7484
///
7585
/// Deduplication rules:
@@ -78,6 +88,12 @@ impl MicroScanManager {
7888
/// - If active at a lower priority, the existing scan is cancelled and re-queued at
7989
/// the higher priority.
8090
pub async fn request_scan(&self, path: PathBuf, priority: ScanPriority) {
91+
// Suppress during cold-start replay to avoid sending writes into the
92+
// writer's active BEGIN IMMEDIATE transaction.
93+
if self.replay_active.load(Ordering::SeqCst) {
94+
return;
95+
}
96+
8197
let mut inner = self.inner.lock().await;
8298

8399
if inner.full_scan_complete || inner.completed.contains(&path) {

0 commit comments

Comments
 (0)