Skip to content

Commit 9d4982a

Browse files
committed
Listing: Remove last 5 block_on calls from file_system module
Zero `block_on` remaining in the entire `file_system/` production code. - `streaming.rs`: replaced `std::thread::spawn` + `block_on` with `tokio::spawn` + async polling loop - `operations.rs`: `list_directory_start_with_volume` now async, direct `.await` - `caching.rs`: `notify_full_refresh` now async, spawned via `tokio::spawn` - `watcher.rs`: `handle_directory_change` now async, spawned from FSEvents callbacks - `smb_watcher.rs`: `stat_via_volume` and `process_event_batch` now async, direct `.await` - Tests updated: removed `spawn_blocking` wrappers, direct `.await`
1 parent 0a6ae61 commit 9d4982a

10 files changed

Lines changed: 177 additions & 273 deletions

File tree

apps/desktop/src-tauri/src/commands/file_system/listing.rs

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,7 @@ use crate::file_system::{
1515
use std::path::{Path, PathBuf};
1616
use tokio::time::Duration;
1717

18-
use crate::commands::util::{
19-
IpcError, TimedOut, blocking_result_with_timeout, blocking_with_timeout, blocking_with_timeout_flag,
20-
};
18+
use crate::commands::util::{IpcError, TimedOut, blocking_with_timeout};
2119
use crate::file_system::validation::{MAX_NAME_BYTES, MAX_PATH_BYTES};
2220

2321
use super::expand_tilde;
@@ -75,11 +73,19 @@ pub async fn list_directory_start(
7573
let expanded_path = expand_tilde(&path);
7674
let path_buf = PathBuf::from(&expanded_path);
7775
let dir_sort_mode = directory_sort_mode.unwrap_or_default();
78-
blocking_result_with_timeout(Duration::from_secs(2), move || {
79-
ops_list_directory_start_with_volume("root", &path_buf, include_hidden, sort_by, sort_order, dir_sort_mode)
80-
.map_err(|e| format!("Failed to start directory listing '{}': {}", path, e))
81-
})
76+
match tokio::time::timeout(
77+
Duration::from_secs(2),
78+
ops_list_directory_start_with_volume("root", &path_buf, include_hidden, sort_by, sort_order, dir_sort_mode),
79+
)
8280
.await
81+
{
82+
Ok(Ok(result)) => Ok(result),
83+
Ok(Err(e)) => Err(IpcError::from_err(format!(
84+
"Failed to start directory listing '{}': {}",
85+
path, e
86+
))),
87+
Err(_) => Err(IpcError::timeout()),
88+
}
8389
}
8490

8591
/// Returns immediately; reads in background.
@@ -220,10 +226,12 @@ pub fn list_directory_end(listing_id: String) {
220226
/// Used after write operations (move) when the file watcher may not fire promptly.
221227
#[tauri::command]
222228
pub async fn refresh_listing(listing_id: String) -> TimedOut<()> {
223-
blocking_with_timeout_flag(Duration::from_secs(2), (), move || {
224-
crate::file_system::watcher::handle_directory_change(&listing_id);
229+
let timed_out = tokio::time::timeout(Duration::from_secs(2), async {
230+
crate::file_system::watcher::handle_directory_change(&listing_id).await;
225231
})
226232
.await
233+
.is_err();
234+
TimedOut { data: (), timed_out }
227235
}
228236

229237
/// Returns total file/dir counts and sizes, plus selection stats if `selected_indices` is given.

apps/desktop/src-tauri/src/file_system/listing/CLAUDE.md

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ Frontend Backend
6161
## Key decisions
6262

6363
**Decision**: Streaming with background task, not chunked IPC
64-
**Why**: Chunked approach required multiple IPC calls, complex state tracking. Streaming spawns `tokio::task::spawn_blocking()`, emits events. Frontend stays responsive—Tab works, ESC cancels.
64+
**Why**: Chunked approach required multiple IPC calls, complex state tracking. Streaming spawns a `tokio::spawn` async task, emits events. Frontend stays responsive—Tab works, ESC cancels via `tokio::select!`-style polling.
6565

6666
**Decision**: Cancellation via `AtomicBool` checked per-entry
6767
**Why**: Network folders iterate slowly (seconds per entry). Checking on each iteration ensures responsive cancellation. ESC → cancel within ~100ms.
@@ -89,7 +89,7 @@ Frontend Backend
8989
**Why**: SMB and MTP volumes don't use FSEvents (`supports_watching() == false`), so they never get a `WatchedDirectory` entry. With the sequence on the watcher, `increment_sequence` returned `None` and `directory-diff` events were never emitted for those volumes. Moving the `AtomicU64` to `CachedListing` makes it work for all volume types. The FSEvents watcher path also uses this same counter now.
9090

9191
**Decision**: `ListingEventSink` trait decouples streaming from Tauri (same pattern as `OperationEventSink` in write_operations)
92-
**Why**: `read_directory_with_progress` needs to emit events, but `tauri::AppHandle` can't be created in tests. The trait allows `CollectorListingEventSink` to capture events for assertions. `Arc<dyn ListingEventSink>` is used (not `&dyn`) because the sink is cloned into `std::thread::spawn` for progress callbacks.
92+
**Why**: `read_directory_with_progress` needs to emit events, but `tauri::AppHandle` can't be created in tests. The trait allows `CollectorListingEventSink` to capture events for assertions. `Arc<dyn ListingEventSink>` is used (not `&dyn`) because the sink is cloned into `tokio::spawn` for progress callbacks.
9393

9494
**Decision**: File watcher starts AFTER listing complete
9595
**Why**: Watcher diffs rely on cached entries. Starting before cache is populated would miss initial state.
@@ -108,12 +108,11 @@ Frontend Backend
108108

109109
## Gotchas
110110

111-
**Gotcha**: Listing code uses `Handle::current().block_on(volume.method())` for async Volume calls
112-
**Why**: The listing pipeline (`streaming.rs`, `caching.rs`, `operations.rs`) and the FSEvents watcher (`watcher.rs`)
113-
call Volume methods from sync contexts (`spawn_blocking` closures, watcher callbacks). These `block_on` bridges are
114-
intentional and correct — the threads are OS threads with no entered runtime. They're NOT the same anti-pattern as the
115-
old Volume-internal `block_on` bridges (which were eliminated by the async refactor). These could be migrated to fully
116-
async in a future listing pipeline refactor, but they work correctly as-is.
111+
**Gotcha**: Watcher callbacks (FSEvents) run on OS threads, not the tokio runtime
112+
**Why**: The FSEvents debouncer callback is called from an OS thread. Functions like `handle_directory_change` and
113+
`notify_directory_changed(FullRefresh)` are async, so callers in watcher callbacks use `tokio::spawn` to dispatch
114+
the async work onto the runtime. The incremental watcher path (`handle_directory_change_incremental`) remains sync
115+
since it only does cache lookups and `stat` calls via `get_single_entry`.
117116

118117
### Cache helpers (caching.rs)
119118

apps/desktop/src-tauri/src/file_system/listing/caching.rs

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -272,16 +272,20 @@ pub fn notify_directory_changed(volume_id: &str, parent_path: &Path, change: Dir
272272
// path is the share root, but the user may be browsing a subdirectory.
273273
// Refresh all listings on this volume instead.
274274
let volume_listings = find_listings_on_volume(volume_id);
275-
for (lid, path, sort_by, sort_order, dir_sort_mode) in &volume_listings {
276-
notify_full_refresh(
277-
&app,
275+
for (lid, path, sort_by, sort_order, dir_sort_mode) in volume_listings {
276+
let app = app.clone();
277+
let volume_id = volume_id.to_string();
278+
tokio::spawn(notify_full_refresh(
279+
app,
278280
volume_id,
279281
path,
280-
&[(lid.clone(), *sort_by, *sort_order, *dir_sort_mode)],
281-
);
282+
vec![(lid, sort_by, sort_order, dir_sort_mode)],
283+
));
282284
}
283285
} else {
284-
notify_full_refresh(&app, volume_id, parent_path, &listings);
286+
let volume_id = volume_id.to_string();
287+
let parent_path = parent_path.to_path_buf();
288+
tokio::spawn(notify_full_refresh(app, volume_id, parent_path, listings));
285289
}
286290
}
287291
}
@@ -390,25 +394,25 @@ fn notify_modified(app: &tauri::AppHandle, listing_id: &str, entry: FileEntry) {
390394
}
391395

392396
/// Re-reads a directory via the Volume trait, computes a diff, and emits it.
393-
fn notify_full_refresh(
394-
app: &tauri::AppHandle,
395-
volume_id: &str,
396-
parent_path: &Path,
397-
listings: &[(String, SortColumn, SortOrder, DirectorySortMode)],
397+
async fn notify_full_refresh(
398+
app: tauri::AppHandle,
399+
volume_id: String,
400+
parent_path: PathBuf,
401+
listings: Vec<(String, SortColumn, SortOrder, DirectorySortMode)>,
398402
) {
399403
use crate::file_system::listing::sorting::sort_entries;
400404
use crate::file_system::watcher::{DirectoryDiff, compute_diff};
401405
use tauri::Emitter;
402406

403-
let vol = match crate::file_system::get_volume_manager().get(volume_id) {
407+
let vol = match crate::file_system::get_volume_manager().get(&volume_id) {
404408
Some(v) => v,
405409
None => {
406410
log::warn!("notify_directory_changed: volume `{}` not found", volume_id);
407411
return;
408412
}
409413
};
410414

411-
let mut new_entries = match tokio::runtime::Handle::current().block_on(vol.list_directory(parent_path)) {
415+
let mut new_entries = match vol.list_directory(&parent_path).await {
412416
Ok(entries) => entries,
413417
Err(e) => {
414418
log::warn!(
@@ -422,7 +426,7 @@ fn notify_full_refresh(
422426

423427
crate::indexing::enrich_entries_with_index(&mut new_entries);
424428

425-
for (listing_id, sort_by, sort_order, dir_sort_mode) in listings {
429+
for (listing_id, sort_by, sort_order, dir_sort_mode) in &listings {
426430
// Re-sort to match this listing's sort params
427431
let mut sorted = new_entries.clone();
428432
sort_entries(&mut sorted, *sort_by, *sort_order, *dir_sort_mode);

apps/desktop/src-tauri/src/file_system/listing/operations.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ pub struct ListingStartResult {
4747
///
4848
/// Reads the directory once, caches it, and returns listing ID + total count.
4949
/// Frontend then fetches visible ranges on demand via `get_file_range`.
50-
pub fn list_directory_start(path: &Path, include_hidden: bool) -> Result<ListingStartResult, std::io::Error> {
50+
pub async fn list_directory_start(path: &Path, include_hidden: bool) -> Result<ListingStartResult, std::io::Error> {
5151
list_directory_start_with_volume(
5252
"root",
5353
path,
@@ -56,12 +56,13 @@ pub fn list_directory_start(path: &Path, include_hidden: bool) -> Result<Listing
5656
SortOrder::Ascending,
5757
DirectorySortMode::LikeFiles,
5858
)
59+
.await
5960
}
6061

6162
/// Starts a new directory listing using a specific volume.
6263
///
6364
/// This is the internal implementation that supports multi-volume access.
64-
pub fn list_directory_start_with_volume(
65+
pub async fn list_directory_start_with_volume(
6566
volume_id: &str,
6667
path: &Path,
6768
include_hidden: bool,
@@ -82,8 +83,9 @@ pub fn list_directory_start_with_volume(
8283
})?;
8384

8485
// Use the Volume trait to list the directory
85-
let all_entries = tokio::runtime::Handle::current()
86-
.block_on(volume.list_directory(path))
86+
let all_entries = volume
87+
.list_directory(path)
88+
.await
8789
.map_err(|e| std::io::Error::other(e.to_string()))?;
8890
benchmark::log_event_value("volume.list_directory COMPLETE, entries", all_entries.len());
8991

apps/desktop/src-tauri/src/file_system/listing/operations_test.rs

Lines changed: 18 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -329,22 +329,15 @@ async fn test_list_directory_start_with_volume_caches_entries() {
329329

330330
get_volume_manager().register(&volume_id, volume);
331331

332-
// Run on a blocking thread because the function internally uses
333-
// `Handle::current().block_on()` which panics if called from an async context.
334-
let vid = volume_id.clone();
335-
let dp = dir_path.clone();
336-
let result = tokio::task::spawn_blocking(move || {
337-
super::list_directory_start_with_volume(
338-
&vid,
339-
&dp,
340-
true,
341-
SortColumn::Name,
342-
SortOrder::Ascending,
343-
DirectorySortMode::LikeFiles,
344-
)
345-
})
346-
.await
347-
.unwrap();
332+
let result = super::list_directory_start_with_volume(
333+
&volume_id,
334+
&dir_path,
335+
true,
336+
SortColumn::Name,
337+
SortOrder::Ascending,
338+
DirectorySortMode::LikeFiles,
339+
)
340+
.await;
348341

349342
assert!(
350343
result.is_ok(),
@@ -382,18 +375,15 @@ async fn test_list_directory_start_with_volume_unknown_volume() {
382375
use crate::file_system::listing::sorting::{DirectorySortMode, SortColumn, SortOrder};
383376
use std::path::PathBuf;
384377

385-
let result = tokio::task::spawn_blocking(move || {
386-
super::list_directory_start_with_volume(
387-
"nonexistent-volume-id",
388-
&PathBuf::from("/some/path"),
389-
true,
390-
SortColumn::Name,
391-
SortOrder::Ascending,
392-
DirectorySortMode::LikeFiles,
393-
)
394-
})
395-
.await
396-
.unwrap();
378+
let result = super::list_directory_start_with_volume(
379+
"nonexistent-volume-id",
380+
&PathBuf::from("/some/path"),
381+
true,
382+
SortColumn::Name,
383+
SortOrder::Ascending,
384+
DirectorySortMode::LikeFiles,
385+
)
386+
.await;
397387

398388
assert!(result.is_err());
399389
assert!(result.unwrap_err().to_string().contains("not found"));

apps/desktop/src-tauri/src/file_system/listing/streaming.rs

Lines changed: 30 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use serde::{Deserialize, Serialize};
77
use std::collections::HashMap;
88
use std::path::Path;
99
use std::sync::atomic::{AtomicBool, Ordering};
10-
use std::sync::mpsc;
1110
use std::sync::{Arc, LazyLock, RwLock};
1211
use std::time::Duration;
1312

@@ -322,24 +321,21 @@ pub async fn list_directory_start_streaming(
322321

323322
// Spawn background task
324323
tokio::spawn(async move {
325-
// Clone again for use after spawn_blocking
324+
// Clone again for use after the listing call
326325
let listing_id_for_cleanup = listing_id_for_spawn.clone();
327326
let events_for_error = Arc::clone(&events);
328327

329-
// Run blocking I/O on dedicated thread pool
330-
let result = tokio::task::spawn_blocking(move || {
331-
read_directory_with_progress(
332-
&events,
333-
&listing_id_for_spawn,
334-
&state,
335-
&volume_id_owned,
336-
&path_owned,
337-
include_hidden,
338-
sort_by,
339-
sort_order,
340-
dir_sort_mode,
341-
)
342-
})
328+
let result = read_directory_with_progress(
329+
&events,
330+
&listing_id_for_spawn,
331+
&state,
332+
&volume_id_owned,
333+
&path_owned,
334+
include_hidden,
335+
sort_by,
336+
sort_order,
337+
dir_sort_mode,
338+
)
343339
.await;
344340

345341
// Clean up streaming state
@@ -350,21 +346,12 @@ pub async fn list_directory_start_streaming(
350346
// Handle task result
351347
match result {
352348
Err(e) => {
353-
// Task panicked or was cancelled
354-
events_for_error.emit_error(
355-
&listing_id_for_cleanup,
356-
"Something went wrong while reading this folder".to_string(),
357-
None,
358-
);
359-
log::error!("Listing task panicked: {}", e);
360-
}
361-
Ok(Err(e)) => {
362349
// Function returned an error (volume not found, permission denied, I/O, etc.)
363350
let mut friendly = friendly_error_from_volume_error(&e, &path_for_error);
364351
enrich_with_provider(&mut friendly, &path_for_error);
365352
events_for_error.emit_error(&listing_id_for_cleanup, e.to_string(), Some(friendly));
366353
}
367-
Ok(Ok(())) => {
354+
Ok(()) => {
368355
// Success - read_directory_with_progress already emitted listing-complete
369356
}
370357
}
@@ -379,13 +366,14 @@ pub async fn list_directory_start_streaming(
379366

380367
/// Reads a directory with progress reporting.
381368
///
382-
/// Runs on a blocking thread pool and emits progress events.
369+
/// Async implementation that spawns the Volume I/O in a background task
370+
/// and uses `tokio::select!` with a cancellation polling loop for responsive ESC handling.
383371
/// Uses the Volume abstraction to support both local filesystem and MTP devices.
384372
#[allow(
385373
clippy::too_many_arguments,
386374
reason = "Streaming operation requires many state parameters"
387375
)]
388-
pub(crate) fn read_directory_with_progress(
376+
pub(crate) async fn read_directory_with_progress(
389377
events: &Arc<dyn ListingEventSink>,
390378
listing_id: &str,
391379
state: &Arc<StreamingListingState>,
@@ -420,47 +408,36 @@ pub(crate) fn read_directory_with_progress(
420408
.get(volume_id)
421409
.ok_or_else(|| VolumeError::NotFound(format!("Volume not found: {}", volume_id)))?;
422410

423-
// Read directory entries via Volume abstraction
424-
// Use polling-based cancellation to remain responsive even when filesystem I/O blocks
425-
// (for example, on slow/stuck network drives like SMB mounts)
411+
// Read directory entries via Volume abstraction.
412+
// Spawn the listing as a tokio task and use select! with a cancellation poll loop
413+
// to remain responsive even when filesystem I/O blocks (slow/stuck network drives).
426414
let read_start = std::time::Instant::now();
427-
let path_for_thread = path.to_path_buf();
428-
let (tx, rx) = mpsc::channel();
415+
let path_for_task = path.to_path_buf();
429416
let events_for_progress = Arc::clone(events);
430417
let listing_id_for_progress = listing_id.to_string();
431418

432-
// Capture the Tokio runtime handle so the spawned thread can access it.
433-
// This is needed for MTP volumes, which use `Handle::block_on` internally.
434-
let runtime_handle = tokio::runtime::Handle::current();
435-
436-
std::thread::spawn(move || {
437-
// Enter the Tokio runtime context so `Handle::current()` works inside volumes
438-
let _guard = runtime_handle.enter();
439-
419+
let listing_task = tokio::spawn(async move {
440420
let on_progress = |loaded_count: usize| {
441421
events_for_progress.emit_progress(&listing_id_for_progress, loaded_count);
442422
};
443-
let result = runtime_handle.block_on(volume.list_directory_with_progress(&path_for_thread, &on_progress));
444-
let _ = tx.send(result);
423+
volume.list_directory_with_progress(&path_for_task, &on_progress).await
445424
});
446425

447-
// Poll for results, checking cancellation between polls
426+
// Poll for cancellation while waiting for the listing task
448427
let entries_result = loop {
449428
if state.cancelled.load(Ordering::Relaxed) {
450429
benchmark::log_event("read_directory_with_progress CANCELLED (during read_dir polling)");
430+
listing_task.abort();
451431
events.emit_cancelled(listing_id);
452432
return Ok(());
453433
}
454434

455-
match rx.recv_timeout(CANCELLATION_POLL_INTERVAL) {
456-
Ok(result) => break result,
457-
Err(mpsc::RecvTimeoutError::Timeout) => continue,
458-
Err(mpsc::RecvTimeoutError::Disconnected) => {
459-
return Err(VolumeError::IoError {
460-
message: "Directory listing thread terminated unexpectedly".into(),
461-
raw_os_error: None,
462-
});
463-
}
435+
tokio::time::sleep(CANCELLATION_POLL_INTERVAL).await;
436+
if listing_task.is_finished() {
437+
break listing_task.await.map_err(|e| VolumeError::IoError {
438+
message: format!("Directory listing task failed: {}", e),
439+
raw_os_error: None,
440+
})?;
464441
}
465442
};
466443

0 commit comments

Comments
 (0)