From a86cfb55c2f1a00301c5cbc6c75e2f45dda8e806 Mon Sep 17 00:00:00 2001 From: Ofer Chen Date: Fri, 7 Nov 2025 13:42:10 +0100 Subject: [PATCH] Fix fsync CLI plumbing --- .../frontend/execution/drive/workflow/mod.rs | 4 ++ .../src/local_copy/tests/execute_fsync.rs | 51 +++++++++++++++++++ 2 files changed, 55 insertions(+) create mode 100644 crates/engine/src/local_copy/tests/execute_fsync.rs diff --git a/crates/cli/src/frontend/execution/drive/workflow/mod.rs b/crates/cli/src/frontend/execution/drive/workflow/mod.rs index 3962b99c5..42ebf3535 100644 --- a/crates/cli/src/frontend/execution/drive/workflow/mod.rs +++ b/crates/cli/src/frontend/execution/drive/workflow/mod.rs @@ -142,6 +142,7 @@ where stats, partial, preallocate, + fsync, delay_updates, partial_dir, temp_dir, @@ -435,6 +436,7 @@ where stats, partial, preallocate, + fsync, delay_updates, partial_dir: partial_dir.as_ref(), temp_dir: temp_dir.as_ref(), @@ -583,6 +585,7 @@ where } = metadata; let prune_empty_dirs_flag = prune_empty_dirs.unwrap_or(false); + let fsync_flag = fsync.unwrap_or(false); let inplace_enabled = inplace.unwrap_or(false); let append_enabled = append.unwrap_or(false); let whole_file_enabled = whole_file_option.unwrap_or(true); @@ -653,6 +656,7 @@ where debug_flags_list, partial, preallocate, + fsync: fsync_flag, partial_dir: partial_dir.clone(), temp_dir: temp_dir.clone(), delay_updates, diff --git a/crates/engine/src/local_copy/tests/execute_fsync.rs b/crates/engine/src/local_copy/tests/execute_fsync.rs new file mode 100644 index 000000000..9bbc4cc24 --- /dev/null +++ b/crates/engine/src/local_copy/tests/execute_fsync.rs @@ -0,0 +1,51 @@ +use crate::local_copy::{ + test_support::take_fsync_call_count, LocalCopyExecution, LocalCopyOptions, LocalCopyPlan, +}; + +#[test] +fn execute_performs_fsync_when_requested() { + let temp = tempdir().expect("tempdir"); + let source = temp.path().join("source.txt"); + let destination = temp.path().join("dest.txt"); + fs::write(&source, b"payload").expect("write source"); + + let operands = vec![ + source.into_os_string(), + destination.clone().into_os_string(), + ]; + let plan = LocalCopyPlan::from_operands(&operands).expect("plan"); + + // Clear any previous instrumentation counts. + take_fsync_call_count(); + + let options = LocalCopyOptions::default().fsync(true); + plan + .execute_with_options(LocalCopyExecution::Apply, options) + .expect("copy succeeds"); + + assert_eq!(take_fsync_call_count(), 1); + assert!(destination.exists()); +} + +#[test] +fn execute_skips_fsync_when_not_requested() { + let temp = tempdir().expect("tempdir"); + let source = temp.path().join("source.txt"); + let destination = temp.path().join("dest.txt"); + fs::write(&source, b"payload").expect("write source"); + + let operands = vec![ + source.into_os_string(), + destination.clone().into_os_string(), + ]; + let plan = LocalCopyPlan::from_operands(&operands).expect("plan"); + + take_fsync_call_count(); + + plan + .execute_with_options(LocalCopyExecution::Apply, LocalCopyOptions::default()) + .expect("copy succeeds"); + + assert_eq!(take_fsync_call_count(), 0); + assert!(destination.exists()); +}