Skip to content

Commit 1696355

Browse files
committed
MTP: propagate cancel via mtp-rs cancel tokens (M2)
- `WriteOperationState.backend_cancel` (`Arc<AtomicBool>`) is flipped together with `intent` by `cancel_write_operation` and `cancel_all_write_operations`, so any cancel path stops the wire activity (not just the loop above). - `Volume` trait grows `list_directory_with_cancel` / `delete_with_cancel` defaults that drop the flag — only MTP overrides today; local, SMB, in-memory inherit and ignore. - `MtpVolume` wraps the flag as a fresh `mtp_rs::CancelToken` via `CancelToken::from_arc(Arc::clone(...))` — shared atomic, no second polling task — and threads it through to `MtpConnectionManager::list_directory_with_cancel`, `list_directory_with_progress_and_cancel`, and `delete_object_with_cancel`. The recursive child-delete loop also checks the token between iterations. - `MtpConnectionManager` calls `storage.list_objects_with_cancel` / `storage.list_objects_stream_with_cancel` / `storage.delete_with_cancel` in `mtp-rs`, so the per-handle `GetObjectInfo` USB loop bails at the next roundtrip boundary instead of running all 950 entries on a `/DCIM/Camera`-style folder. - `map_mtp_error` now maps `mtp_rs::Error::Cancelled` to the typed `MtpConnectionError::Cancelled` variant (was wrapped in `Other` with a string match upstream — typed all the way through now). - `delete.rs` (scan walker + per-leaf delete + dir cleanup) calls the cancel-aware Volume variants with `Some(&state.backend_cancel)`. - Tests: 5 new state-machine tests pin that the intent-flip adapter sets `backend_cancel`. 2 new integration tests against a `CancellingVolume` mimicking MTP's per-handle loop pin promptness (bail before all entries processed, total wall-clock under 3 s). - Docs: `mtp/CLAUDE.md` documents the per-roundtrip cancel point, the wiring, the rationale vs PTP `CancelTransaction (0x4001)`, and the hardware caveat. `volume/CLAUDE.md` documents the new trait variants and which backends opt in. - `mtp-rs` dep switched to local path while 0.15.0 is unpublished.
1 parent 0de4c6b commit 1696355

14 files changed

Lines changed: 937 additions & 72 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apps/desktop/src-tauri/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,8 @@ mime_guess = "2"
161161

162162
[target.'cfg(any(target_os = "macos", target_os = "linux"))'.dependencies]
163163
# MTP (Android device) support via pure Rust implementation
164-
mtp-rs = "0.14.0"
164+
# Path dep until mtp-rs 0.15.0 is published. Switch back to a crates.io version once released.
165+
mtp-rs = { path = "../../../../../../../mtp-rs" }
165166
# USB hotplug detection for MTP device watcher
166167
nusb = "0.2.3"
167168
bytes = "1"

apps/desktop/src-tauri/src/file_system/volume/CLAUDE.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,33 @@ Optional methods default to `Err(VolumeError::NotSupported)` or `false`, so new
5555
3. The `refresh_listing` Tauri command (`commands/file_system/listing.rs`) — short-circuits the post-transfer redundant `list_directory` re-read entirely when the volume is keeping the cache fresh via `notify_mutation`. Without this, a 1k-entry MTP folder paid ~17 s + USB session collision after every transfer outcome, wedging the next user op.
5656
Default `false` so a new backend without a real watcher won't accidentally claim freshness. **Freshness contract**: a `true` result does NOT mean the cache is byte-perfect with the device right now. Every backend has a debounce or settling window between a real change and the cache reflecting it: local FS ≈ 10 ms (FSEvents coalesce), SMB 200 ms (watcher debounce; > 50 events/dir triggers a `FullRefresh`), MTP 500 ms (event debouncer plus per-device polling; many cameras emit no events at all, so on those `true` means only "the device is reachable"). Callers must treat the result as "fresh as our most recent observation" — the same guarantee a `list_directory` call gives. The MTP and SMB checks are volume-level, not path-level: when the gate flips true, every path on that volume becomes oracle-eligible.
5757

58+
## Cancel-aware variants
59+
60+
`list_directory_with_cancel(path, on_progress, cancel)` and
61+
`delete_with_cancel(path, cancel)` accept an
62+
`Option<&Arc<AtomicBool>>` that backends interpret as a cooperative cancel
63+
flag. Default impls delegate to the non-cancel `list_directory` / `delete`,
64+
dropping the flag — so adding a new backend doesn't have to implement them
65+
unless its operations are interruptible at a meaningful boundary.
66+
67+
- `MtpVolume` overrides both. The flag wraps a fresh `mtp_rs::CancelToken` via
68+
`CancelToken::from_arc(Arc::clone(...))` (shared atomic, no polling task) and
69+
threads through to mtp-rs's `list_objects_with_cancel` /
70+
`delete_with_cancel`. That bails the per-handle `GetObjectInfo` loop within
71+
one USB roundtrip's latency.
72+
- `LocalPosixVolume`, `SmbVolume`, and `InMemoryVolume` inherit the default
73+
(ignore the flag). Local listings are effectively atomic; SMB cancel
74+
propagation is a follow-up.
75+
76+
The write-op layer hands `Some(&state.backend_cancel)` (a clone of the same
77+
`Arc<AtomicBool>` that `cancel_write_operation` flips when intent leaves
78+
`Running`). Volumes that ignore the flag are unaffected; volumes that consume
79+
it stop their wire activity, not just the loop above.
80+
81+
See `apps/desktop/src-tauri/src/mtp/CLAUDE.md` § "Cancel propagation" for the
82+
MTP-specific wiring and the rationale for "between-roundtrip" cancel vs PTP
83+
`CancelTransaction`.
84+
5885
## Building a new volume
5986

6087
Adding a new backend (say, FTP, WebDAV, S3, or a new device protocol) is a matter of implementing the `Volume` trait and opting into the capability flags that make sense for your backend. The checklist below walks the path in the order you'd hit each concern.

apps/desktop/src-tauri/src/file_system/volume/mod.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,29 @@ pub trait Volume: Send + Sync {
363363
on_progress: Option<&'a (dyn Fn(usize) + Sync)>,
364364
) -> Pin<Box<dyn Future<Output = Result<Vec<FileEntry>, VolumeError>> + Send + 'a>>;
365365

366+
/// Cancel-aware version of [`list_directory`](Self::list_directory).
367+
///
368+
/// `cancel`, when `Some`, is consulted by backends that issue many small
369+
/// USB or network roundtrips inside one listing (currently MTP — a 950-entry
370+
/// folder is 950 `GetObjectInfo` calls). When the flag flips to `true`,
371+
/// the backend bails between roundtrips with `VolumeError::Cancelled`
372+
/// instead of running to completion.
373+
///
374+
/// Local and in-memory backends ignore the flag (their listings are
375+
/// effectively atomic from the caller's perspective). SMB ignores it
376+
/// today — adding SMB cancel propagation is a follow-up.
377+
///
378+
/// Default impl delegates to `list_directory`, dropping the flag.
379+
fn list_directory_with_cancel<'a>(
380+
&'a self,
381+
path: &'a Path,
382+
on_progress: Option<&'a (dyn Fn(usize) + Sync)>,
383+
cancel: Option<&'a std::sync::Arc<AtomicBool>>,
384+
) -> Pin<Box<dyn Future<Output = Result<Vec<FileEntry>, VolumeError>> + Send + 'a>> {
385+
let _ = cancel;
386+
self.list_directory(path, on_progress)
387+
}
388+
366389
/// Gets metadata for a single path (relative to volume root).
367390
fn get_metadata<'a>(
368391
&'a self,
@@ -434,6 +457,23 @@ pub trait Volume: Send + Sync {
434457
Box::pin(async { Err(VolumeError::NotSupported) })
435458
}
436459

460+
/// Cancel-aware version of [`delete`](Self::delete).
461+
///
462+
/// MTP overrides this to thread the cancel flag through to mtp-rs's
463+
/// `delete_with_cancel`, which bails before issuing the `DeleteObject` PTP
464+
/// request when the flag is set. For non-empty directories the MTP
465+
/// implementation also checks the flag between recursive child deletes.
466+
///
467+
/// Default impl delegates to `delete`, dropping the flag.
468+
fn delete_with_cancel<'a>(
469+
&'a self,
470+
path: &'a Path,
471+
cancel: Option<&'a std::sync::Arc<AtomicBool>>,
472+
) -> Pin<Box<dyn Future<Output = Result<(), VolumeError>> + Send + 'a>> {
473+
let _ = cancel;
474+
self.delete(path)
475+
}
476+
437477
/// Renames/moves a file or directory within this volume.
438478
///
439479
/// Both source and destination paths are relative to the volume root.

apps/desktop/src-tauri/src/file_system/volume/mtp.rs

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,29 +106,50 @@ impl Volume for MtpVolume {
106106
&'a self,
107107
path: &'a Path,
108108
on_progress: Option<&'a (dyn Fn(usize) + Sync)>,
109+
) -> Pin<Box<dyn Future<Output = Result<Vec<FileEntry>, VolumeError>> + Send + 'a>> {
110+
self.list_directory_with_cancel(path, on_progress, None)
111+
}
112+
113+
fn list_directory_with_cancel<'a>(
114+
&'a self,
115+
path: &'a Path,
116+
on_progress: Option<&'a (dyn Fn(usize) + Sync)>,
117+
cancel: Option<&'a std::sync::Arc<std::sync::atomic::AtomicBool>>,
109118
) -> Pin<Box<dyn Future<Output = Result<Vec<FileEntry>, VolumeError>> + Send + 'a>> {
110119
Box::pin(async move {
111120
#[cfg(test)]
112121
test_hooks::bump_list_directory_call_count();
113122

114123
let mtp_path = self.to_mtp_path(path);
115124

125+
// Build a mtp_rs CancelToken that shares the caller's Arc<AtomicBool>.
126+
// No second polling task: flipping the original atomic flips the token.
127+
let cancel_token = cancel.map(|c| mtp_rs::CancelToken::from_arc(std::sync::Arc::clone(c)));
128+
let cancel_ref = cancel_token.as_ref();
129+
116130
debug!(
117-
"MtpVolume::list_directory: device={}, storage={}, input_path={}, mtp_path={}",
131+
"MtpVolume::list_directory: device={}, storage={}, input_path={}, mtp_path={}, cancel={}",
118132
self.device_id,
119133
self.storage_id,
120134
path.display(),
121-
mtp_path
135+
mtp_path,
136+
cancel_ref.is_some()
122137
);
123138

124139
let start = std::time::Instant::now();
125140
let result = if let Some(on_progress) = on_progress {
126141
connection_manager()
127-
.list_directory_with_progress(&self.device_id, self.storage_id, &mtp_path, on_progress)
142+
.list_directory_with_progress_and_cancel(
143+
&self.device_id,
144+
self.storage_id,
145+
&mtp_path,
146+
on_progress,
147+
cancel_ref,
148+
)
128149
.await
129150
} else {
130151
connection_manager()
131-
.list_directory(&self.device_id, self.storage_id, &mtp_path)
152+
.list_directory_with_cancel(&self.device_id, self.storage_id, &mtp_path, cancel_ref)
132153
.await
133154
};
134155

@@ -322,11 +343,22 @@ impl Volume for MtpVolume {
322343
}
323344

324345
fn delete<'a>(&'a self, path: &'a Path) -> Pin<Box<dyn Future<Output = Result<(), VolumeError>> + Send + 'a>> {
346+
self.delete_with_cancel(path, None)
347+
}
348+
349+
fn delete_with_cancel<'a>(
350+
&'a self,
351+
path: &'a Path,
352+
cancel: Option<&'a std::sync::Arc<std::sync::atomic::AtomicBool>>,
353+
) -> Pin<Box<dyn Future<Output = Result<(), VolumeError>> + Send + 'a>> {
325354
Box::pin(async move {
326355
let mtp_path = self.to_mtp_path(path);
327356

357+
let cancel_token = cancel.map(|c| mtp_rs::CancelToken::from_arc(std::sync::Arc::clone(c)));
358+
let cancel_ref = cancel_token.as_ref();
359+
328360
connection_manager()
329-
.delete_object(&self.device_id, self.storage_id, &mtp_path)
361+
.delete_object_with_cancel(&self.device_id, self.storage_id, &mtp_path, cancel_ref)
330362
.await
331363
.map_err(map_mtp_error)?;
332364

apps/desktop/src-tauri/src/file_system/write_operations/delete.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ async fn scan_volume_recursive(
318318
cached
319319
}
320320
None => volume
321-
.list_directory(path, Some(&on_progress))
321+
.list_directory_with_cancel(path, Some(&on_progress), Some(&state.backend_cancel))
322322
.await
323323
.map_err(|e| map_volume_error(&path.display().to_string(), e))?,
324324
};
@@ -682,7 +682,7 @@ pub(super) async fn delete_volume_files_with_progress_inner(
682682
}
683683

684684
volume
685-
.delete(&entry.path)
685+
.delete_with_cancel(&entry.path, Some(&state.backend_cancel))
686686
.await
687687
.map_err(|e| map_volume_error(&entry.path.display().to_string(), e))?;
688688

@@ -732,7 +732,9 @@ pub(super) async fn delete_volume_files_with_progress_inner(
732732
}
733733

734734
// Best-effort directory removal (may fail if not empty due to partial delete)
735-
let _ = volume.delete(&entry.path).await;
735+
let _ = volume
736+
.delete_with_cancel(&entry.path, Some(&state.backend_cancel))
737+
.await;
736738
}
737739

738740
// Emit completion

apps/desktop/src-tauri/src/file_system/write_operations/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,3 +384,5 @@ mod tests;
384384
mod transaction_integration_test;
385385
#[cfg(test)]
386386
mod validation_integration_test;
387+
#[cfg(test)]
388+
mod volume_cancel_tests;

apps/desktop/src-tauri/src/file_system/write_operations/state.rs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,20 @@ pub struct WriteOperationState {
7575
/// at every `write-progress` emit site, so every emitter (local copy/delete,
7676
/// volume copy/move, MTP, SMB) reports rates and ETA uniformly.
7777
pub estimator: std::sync::Mutex<EtaEstimator>,
78+
/// Cooperative cancel flag for in-flight backend I/O. Flipped whenever the
79+
/// op transitions out of `Running` (via `cancel_write_operation`,
80+
/// `cancel_all_write_operations`, or `cancel_all_write_operations_with_rollback`).
81+
/// MTP volume ops thread this into `list_objects_with_cancel` /
82+
/// `delete_with_cancel` so an in-flight `GetObjectInfo` loop bails at the
83+
/// next per-handle USB boundary (≈one roundtrip) instead of running the
84+
/// full 950-photo `/DCIM/Camera` listing to completion. Non-MTP backends
85+
/// ignore it for now.
86+
///
87+
/// Kept as a raw `Arc<AtomicBool>` (not the mtp-rs `CancelToken` type) so
88+
/// this module doesn't pull mtp-rs onto non-MTP platforms; the MTP wiring
89+
/// layer (`mtp::connection`) builds a fresh `mtp_rs::CancelToken` from
90+
/// this flag at the entry point of each MTP-aware call.
91+
pub backend_cancel: Arc<AtomicBool>,
7892
}
7993

8094
impl WriteOperationState {
@@ -87,6 +101,7 @@ impl WriteOperationState {
87101
progress_interval,
88102
conflict_resolution_tx: std::sync::Mutex::new(None),
89103
estimator: std::sync::Mutex::new(EtaEstimator::new()),
104+
backend_cancel: Arc::new(AtomicBool::new(false)),
90105
}
91106
}
92107

@@ -312,6 +327,9 @@ pub fn cancel_write_operation(operation_id: &str, rollback: bool) {
312327
}
313328

314329
state.intent.store(target as u8, Ordering::Relaxed);
330+
// Any transition out of `Running` should also stop in-flight backend
331+
// I/O (per-handle MTP loops, etc.) — not just the loop above it.
332+
state.backend_cancel.store(true, Ordering::Release);
315333
// Drop the conflict resolution sender to unblock any waiting receiver
316334
let _ = state.conflict_resolution_tx.lock().unwrap().take();
317335
}
@@ -329,6 +347,7 @@ pub fn cancel_all_write_operations() {
329347
if current != OperationIntent::Stopped {
330348
log::info!("cancel_all_write_operations: stopping op={id}");
331349
state.intent.store(OperationIntent::Stopped as u8, Ordering::Relaxed);
350+
state.backend_cancel.store(true, Ordering::Release);
332351
// Drop the conflict resolution sender to unblock any waiting receiver
333352
let _ = state.conflict_resolution_tx.lock().unwrap().take();
334353
}
@@ -694,6 +713,67 @@ mod tests {
694713
uninstall_state(&id);
695714
}
696715

716+
// ---- backend_cancel flag flipping (M2 wedge fix) ------------------------
717+
718+
#[test]
719+
fn backend_cancel_starts_unset_on_fresh_state() {
720+
let state = WriteOperationState::new(Duration::from_millis(50));
721+
assert!(!state.backend_cancel.load(Ordering::Acquire));
722+
}
723+
724+
#[test]
725+
fn cancel_write_operation_flips_backend_cancel_to_stopped() {
726+
let id = unique_id("cancel-flips-backend-stopped");
727+
let state = install_state(&id, OperationIntent::Running);
728+
assert!(!state.backend_cancel.load(Ordering::Acquire));
729+
cancel_write_operation(&id, false);
730+
assert!(
731+
state.backend_cancel.load(Ordering::Acquire),
732+
"cancel → Stopped must also flip backend_cancel so in-flight USB ops bail"
733+
);
734+
uninstall_state(&id);
735+
}
736+
737+
#[test]
738+
fn cancel_write_operation_flips_backend_cancel_to_rolling_back() {
739+
let id = unique_id("cancel-flips-backend-rb");
740+
let state = install_state(&id, OperationIntent::Running);
741+
cancel_write_operation(&id, true);
742+
assert!(
743+
state.backend_cancel.load(Ordering::Acquire),
744+
"cancel → RollingBack must also flip backend_cancel — the user wants the wire activity stopped, even though we're going to delete created files"
745+
);
746+
uninstall_state(&id);
747+
}
748+
749+
#[test]
750+
fn cancel_all_write_operations_flips_backend_cancel() {
751+
let id = unique_id("cancel-all-flips-backend");
752+
let state = install_state(&id, OperationIntent::Running);
753+
cancel_all_write_operations();
754+
assert!(
755+
state.backend_cancel.load(Ordering::Acquire),
756+
"cancel_all must flip backend_cancel so teardown also stops the wire activity"
757+
);
758+
uninstall_state(&id);
759+
}
760+
761+
#[test]
762+
fn cancel_stopped_is_noop_for_backend_cancel_too() {
763+
// Stopped → anything is terminal, so backend_cancel state must not
764+
// change either. This guards against a subtle regression where the
765+
// flag flip happens before the validity check.
766+
let id = unique_id("cancel-stopped-noop");
767+
let state = install_state(&id, OperationIntent::Stopped);
768+
state.backend_cancel.store(false, Ordering::Release);
769+
cancel_write_operation(&id, true);
770+
assert!(
771+
!state.backend_cancel.load(Ordering::Acquire),
772+
"Stopped is terminal: invalid transition must not flip backend_cancel"
773+
);
774+
uninstall_state(&id);
775+
}
776+
697777
#[test]
698778
fn cancel_unknown_operation_is_a_silent_noop() {
699779
// No installed state; must not panic, must not affect anything.

0 commit comments

Comments
 (0)