Skip to content

Commit f1501ec

Browse files
committed
Bugfix: Prevent OOM crash from unbounded indexing buffers
When I first ran it, the app consumed 500+ GB of RAM and took down my entire system. Oops :D The trigger: toggling Full Disk Access caused FSEvents to replay millions of events through a pipeline with zero backpressure Then we ralized every buffer was unbounded. Oops. Solution: - Bound the watcher→event loop channel (100K), writer channel (100K sync_channel with backpressure), reconciler buffer (500K), affected_paths (50K), and pending_rescans (1K). Overflow triggers full rescan — safe because the index is a disposable cache. - Abort journal replay after 1M events and fall back to full scan, catching the FDA-toggle scenario early. - Add a memory watchdog (`mach_task_info`, macOS only) that warns at 8 GB and stops all indexing at 16 GB, emitting a frontend event. No-op stub on Linux. - Worst-case peak memory is now ~350 MB during a full scan with heavy concurrent filesystem activity, down from effectively unlimited.
1 parent 5197120 commit f1501ec

8 files changed

Lines changed: 309 additions & 35 deletions

File tree

apps/desktop/src-tauri/src/file_system/volume/local_posix.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ impl VolumeWatcher for LocalPosixWatcher {
287287
&self,
288288
root: &Path,
289289
since_when: u64,
290-
event_sender: mpsc::UnboundedSender<FsChangeEvent>,
290+
event_sender: mpsc::Sender<FsChangeEvent>,
291291
) -> Result<DriveWatcher, WatcherError> {
292292
DriveWatcher::start(root, since_when, event_sender)
293293
}

apps/desktop/src-tauri/src/file_system/volume/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ pub trait VolumeWatcher: Send + Sync {
142142
&self,
143143
root: &Path,
144144
since_when: u64,
145-
event_sender: mpsc::UnboundedSender<FsChangeEvent>,
145+
event_sender: mpsc::Sender<FsChangeEvent>,
146146
) -> Result<DriveWatcher, WatcherError>;
147147
}
148148

apps/desktop/src-tauri/src/indexing/CLAUDE.md

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,13 @@ Full design: `docs/specs/drive-indexing/plan.md`
1111
- **mod.rs** -- Public API: `init()`, `start_indexing()`, `stop_indexing()`, `clear_index()`, `enrich_entries_with_index()`. `IndexManager` coordinates all subsystems, owns a `PathResolver` (LRU-cached path→ID mapping) for IPC commands. Global read-only store for enrichment. Enrichment uses an integer-keyed fast path: resolve parent dir once → batch-fetch child dir stats by ID → match by name. Falls back to individual path resolution for edge cases.
1212
- **store.rs** -- SQLite schema v2 (integer-keyed entries, dir_stats by entry_id, meta), platform_case collation, read queries, DB open/migrate. Schema version check: mismatch triggers drop+rebuild. Both path-keyed (backward compat) and integer-keyed APIs.
1313
- **path_resolver.rs** -- `PathResolver`: resolves filesystem paths to integer entry IDs via component-by-component walk with full-path LRU cache (50K entries). Case-aware `CacheKey` on macOS (NFD + case fold). Prefix-based invalidation for deletes/renames.
14-
- **writer.rs** -- Single writer thread, owns the write connection, processes `WriteMessage` channel (unbounded mpsc). Priority: `UpdateDirStats` before `InsertEntries`. `Flush` variant + async `flush()` method let callers wait for all prior writes to commit. Has both integer-keyed variants (`InsertEntriesV2`, `UpsertEntryV2`, `DeleteEntryById`, `DeleteSubtreeById`, `PropagateDeltaById`) and path-keyed backward-compat variants. The integer-keyed delete/subtree-delete handlers auto-propagate negative deltas via the `parent_id` chain (same pattern as the path-keyed variants). `propagate_delta_by_id` walks the parent chain using `get_parent_id` lookups.
14+
- **memory_watchdog.rs** -- Background task monitoring resident memory via `mach_task_info` (macOS). Warns at 8 GB, stops indexing at 16 GB, emits `index-memory-warning` event to frontend. No-op stub on non-macOS. Started from `start_indexing()`.
15+
- **writer.rs** -- Single writer thread, owns the write connection, processes `WriteMessage` channel (bounded `sync_channel`, 100K capacity, backpressure via blocking). Priority: `UpdateDirStats` before `InsertEntries`. `Flush` variant + async `flush()` method let callers wait for all prior writes to commit. Has both integer-keyed variants (`InsertEntriesV2`, `UpsertEntryV2`, `DeleteEntryById`, `DeleteSubtreeById`, `PropagateDeltaById`) and path-keyed backward-compat variants. The integer-keyed delete/subtree-delete handlers auto-propagate negative deltas via the `parent_id` chain (same pattern as the path-keyed variants). `propagate_delta_by_id` walks the parent chain using `get_parent_id` lookups.
1516
- **scanner.rs** -- jwalk-based parallel directory walker. `scan_volume()` for full scan, `scan_subtree()` for micro-scans. Uses `ScanContext` (from store.rs) to assign integer IDs and parent IDs during the walk: maintains a `HashMap<PathBuf, i64>` mapping directory paths to assigned IDs. The scan root is mapped to `ROOT_ID` (1). Sends `InsertEntriesV2(Vec<EntryRow>)` batches to the writer. Platform-specific exclusion filters (macOS system paths, Linux virtual filesystems). Physical sizes (`st_blocks * 512`).
1617
- **micro_scan.rs** -- `MicroScanManager`: bounded task pool (default 3 concurrent), priority queue (`UserSelected` > `CurrentDir`), deduplication, cancellation. Skips after full scan completes.
1718
- **aggregator.rs** -- Dir stats computation. Bottom-up after full scan (O(N) single pass), per-subtree after micro-scan, incremental delta propagation up ancestor chain for watcher events.
1819
- **watcher.rs** -- Drive-level filesystem watcher. macOS: FSEvents via `cmdr-fsevent-stream` with event IDs and `sinceWhen` replay. Linux: `notify` crate (inotify backend) with recursive watching and synthetic event counter. Other platforms: stub. `supports_event_replay()` lets callers branch on whether journal replay is available.
19-
- **reconciler.rs** -- Buffers FSEvents during scan, replays after scan completes using event IDs to skip stale events. Processes live events for file creates/removes/modifies using integer-keyed write messages (`UpsertEntryV2`, `DeleteEntryById`, `DeleteSubtreeById`, `PropagateDeltaById`). Resolves filesystem paths to entry IDs via `store::resolve_path()` using a read connection passed by callers. Key functions (`process_fs_event`, `emit_dir_updated`) are `pub(super)` so `mod.rs` can call them directly during cold-start replay.
20+
- **reconciler.rs** -- Buffers FSEvents during scan (capped at 500K events; overflow sets `buffer_overflow` flag forcing full rescan), replays after scan completes using event IDs to skip stale events. Processes live events for file creates/removes/modifies using integer-keyed write messages (`UpsertEntryV2`, `DeleteEntryById`, `DeleteSubtreeById`, `PropagateDeltaById`). Resolves filesystem paths to entry IDs via `store::resolve_path()` using a read connection passed by callers. Key functions (`process_fs_event`, `emit_dir_updated`) are `pub(super)` so `mod.rs` can call them directly during cold-start replay.
2021
- **firmlinks.rs** -- Parses `/usr/share/firmlinks`, builds prefix map, normalizes paths. Converts `/System/Volumes/Data/Users/foo` to `/Users/foo`.
2122
- **verifier.rs** -- Placeholder for per-navigation background readdir diff (future milestone).
2223

@@ -57,7 +58,7 @@ Enrichment (every get_file_range call):
5758

5859
### Single-writer architecture
5960

60-
All writes go through a dedicated `std::thread` via an unbounded mpsc channel. The writer thread owns the write connection and processes messages in order, prioritizing `UpdateDirStats` over `InsertEntries` for responsive micro-scan results.
61+
All writes go through a dedicated `std::thread` via a bounded `sync_channel` (100K capacity). When the channel is full, senders block (backpressure). The writer thread owns the write connection and processes messages in order, prioritizing `UpdateDirStats` over `InsertEntries` for responsive micro-scan results.
6162

6263
Reads happen on separate WAL connections (any thread). The global read-only store (`GLOBAL_INDEX_STORE`) provides enrichment without passing `AppHandle` through the listing pipeline.
6364

@@ -107,6 +108,8 @@ Key test files are alongside each module (test functions within `#[cfg(test)]` b
107108

108109
**Subtree aggregation uses scoped queries**: `scoped_get_children_stats_by_id` and `scoped_get_child_dir_ids` in `aggregator.rs` use recursive CTEs scoped to the target subtree, not full-table scans. This keeps subtree aggregation O(subtree_size) regardless of total DB size.
109110

111+
**Bounded buffers prevent OOM**: All buffers have capacity limits. Reconciler buffer: 500K events (overflow triggers full rescan). Writer channel: 100K messages (bounded `sync_channel`, backpressure). Replay `affected_paths`: 50K entries (overflow emits full refresh). Replay `pending_rescans`: 1K entries (overflow triggers full rescan). Replay event count: 1M events max (overflow falls back to full scan). Memory watchdog: warns at 8 GB, stops indexing at 16 GB. The index is a disposable cache, so dropping events and rescanning is always safe.
112+
110113
**Disposable cache pattern**: The index DB is a cache, not a source of truth. Any corruption or error triggers delete+rebuild. No user-facing errors for DB issues.
111114

112115
**cmdr-fsevent-stream fork (macOS only)**: Our fork of `fsevent-stream` (v0.3.0) provides direct access to FSEvents event IDs, `sinceWhen` replay, and `MustScanSubDirs` flags. Only used on macOS. On Linux, the `notify` crate (inotify backend) provides recursive directory watching with `RecursiveMode::Recursive`.
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
//! Memory watchdog: monitors the app's resident memory and takes action
2+
//! at safety thresholds to prevent unbounded memory growth.
3+
//!
4+
//! - 8 GB: logs a warning.
5+
//! - 16 GB: stops all indexing and emits a user-visible event.
6+
//!
7+
//! On non-macOS platforms this is a no-op stub (platform memory queries
8+
//! differ and can be added later).
9+
10+
/// 8 GB in bytes.
11+
#[cfg(target_os = "macos")]
12+
const WARN_THRESHOLD: u64 = 8 * 1024 * 1024 * 1024;
13+
14+
/// 16 GB in bytes.
15+
#[cfg(target_os = "macos")]
16+
const STOP_THRESHOLD: u64 = 16 * 1024 * 1024 * 1024;
17+
18+
/// How often the watchdog checks memory (seconds).
19+
#[cfg(target_os = "macos")]
20+
const CHECK_INTERVAL_SECS: u64 = 5;
21+
22+
/// Start the memory watchdog as a fire-and-forget background task.
23+
///
24+
/// On macOS, spawns a task that checks resident memory every 5 seconds
25+
/// using `mach_task_info`. Runs until the app is stopped or indexing is
26+
/// halted due to excessive memory usage. On other platforms, this is a no-op.
27+
#[cfg(target_os = "macos")]
28+
pub fn start(app: tauri::AppHandle) {
29+
tauri::async_runtime::spawn(async move {
30+
run_watchdog(app).await;
31+
});
32+
}
33+
34+
#[cfg(not(target_os = "macos"))]
35+
pub fn start(_app: tauri::AppHandle) {
36+
// No-op on non-macOS platforms
37+
}
38+
39+
#[cfg(target_os = "macos")]
40+
async fn run_watchdog(app: tauri::AppHandle) {
41+
use std::time::Duration;
42+
43+
let mut interval = tokio::time::interval(Duration::from_secs(CHECK_INTERVAL_SECS));
44+
let mut warned = false;
45+
46+
loop {
47+
interval.tick().await;
48+
49+
let resident_bytes = match get_resident_memory() {
50+
Some(b) => b,
51+
None => continue,
52+
};
53+
54+
if resident_bytes >= STOP_THRESHOLD {
55+
log::error!(
56+
"Memory watchdog: resident memory is {} GB, exceeding {} GB safety limit. \
57+
Stopping all indexing to prevent a system crash.",
58+
resident_bytes / (1024 * 1024 * 1024),
59+
STOP_THRESHOLD / (1024 * 1024 * 1024),
60+
);
61+
62+
// Emit user-visible event
63+
use tauri::Emitter;
64+
let _ = app.emit(
65+
"index-memory-warning",
66+
serde_json::json!({
67+
"resident_gb": resident_bytes / (1024 * 1024 * 1024),
68+
"action": "stopped_indexing",
69+
}),
70+
);
71+
72+
// Stop indexing
73+
if let Err(e) = super::stop_indexing(&app) {
74+
log::error!("Memory watchdog: stop_indexing failed: {e}");
75+
}
76+
return;
77+
}
78+
79+
if resident_bytes >= WARN_THRESHOLD && !warned {
80+
warned = true;
81+
log::warn!(
82+
"Memory watchdog: resident memory is {} MB ({} GB threshold approaching). \
83+
Indexing continues but the system may be under pressure.",
84+
resident_bytes / (1024 * 1024),
85+
STOP_THRESHOLD / (1024 * 1024 * 1024),
86+
);
87+
}
88+
89+
// Reset warning flag if memory drops back below the threshold
90+
if resident_bytes < WARN_THRESHOLD && warned {
91+
warned = false;
92+
}
93+
}
94+
}
95+
96+
/// Query the current task's resident memory using `mach_task_basic_info`.
97+
///
98+
/// Uses raw FFI because the `libc` crate doesn't expose `MACH_TASK_BASIC_INFO`.
99+
#[cfg(target_os = "macos")]
100+
fn get_resident_memory() -> Option<u64> {
101+
// Mach task info constants (from <mach/task_info.h>)
102+
const MACH_TASK_BASIC_INFO: u32 = 20;
103+
104+
#[repr(C)]
105+
struct MachTaskBasicInfo {
106+
virtual_size: u64,
107+
resident_size: u64,
108+
resident_size_max: u64,
109+
user_time_seconds: i32,
110+
user_time_microseconds: i32,
111+
system_time_seconds: i32,
112+
system_time_microseconds: i32,
113+
policy: i32,
114+
suspend_count: i32,
115+
}
116+
117+
let info_count = (size_of::<MachTaskBasicInfo>() / size_of::<libc::c_int>()) as u32;
118+
119+
#[allow(deprecated, reason = "mach_task_self is deprecated in libc but works fine")]
120+
unsafe {
121+
let mut info: MachTaskBasicInfo = std::mem::zeroed();
122+
let mut count = info_count;
123+
let result = libc::task_info(
124+
libc::mach_task_self(),
125+
MACH_TASK_BASIC_INFO,
126+
&mut info as *mut MachTaskBasicInfo as *mut i32,
127+
&mut count,
128+
);
129+
if result == 0 {
130+
Some(info.resident_size)
131+
} else {
132+
log::debug!("Memory watchdog: task_info failed with code {result}");
133+
None
134+
}
135+
}
136+
}
137+
138+
// ── Tests ────────────────────────────────────────────────────────────
139+
140+
#[cfg(test)]
141+
mod tests {
142+
#[cfg(target_os = "macos")]
143+
use super::*;
144+
145+
#[cfg(target_os = "macos")]
146+
#[test]
147+
fn get_resident_memory_returns_positive_value() {
148+
let mem = get_resident_memory();
149+
assert!(mem.is_some(), "should be able to query resident memory");
150+
assert!(mem.unwrap() > 0, "resident memory should be positive");
151+
}
152+
153+
#[cfg(target_os = "macos")]
154+
#[test]
155+
fn thresholds_are_ordered() {
156+
const {
157+
assert!(
158+
WARN_THRESHOLD < STOP_THRESHOLD,
159+
"warn threshold must be below stop threshold"
160+
)
161+
};
162+
}
163+
}

0 commit comments

Comments
 (0)