From 53d4bd2856a674aafbc3ca314bdd5099095ad28c Mon Sep 17 00:00:00 2001 From: Ofer Chen Date: Thu, 13 Nov 2025 21:25:09 +0100 Subject: [PATCH] Coalesce sparse zero runs across chunks --- crates/engine/src/local_copy/context.rs | 2 +- .../local_copy/context_impl/delta_transfer.rs | 11 +- .../src/local_copy/context_impl/transfer.rs | 4 +- .../src/local_copy/executor/file/mod.rs | 2 +- .../src/local_copy/executor/file/sparse.rs | 129 +++++++++++++++--- crates/engine/src/local_copy/executor/mod.rs | 5 +- 6 files changed, 128 insertions(+), 25 deletions(-) diff --git a/crates/engine/src/local_copy/context.rs b/crates/engine/src/local_copy/context.rs index 635675658..08d6a41f5 100644 --- a/crates/engine/src/local_copy/context.rs +++ b/crates/engine/src/local_copy/context.rs @@ -25,7 +25,7 @@ use super::{ LocalCopyArgumentError, LocalCopyError, LocalCopyErrorKind, LocalCopyExecution, LocalCopyMetadata, LocalCopyOptions, LocalCopyProgress, LocalCopyRecord, LocalCopyRecordHandler, LocalCopyReport, LocalCopySummary, ReferenceDirectory, - compute_backup_path, copy_entry_to_backup, delete_extraneous_entries, + SparseWriteState, compute_backup_path, copy_entry_to_backup, delete_extraneous_entries, filter_program_local_error, follow_symlink_metadata, load_dir_merge_rules_recursive, map_metadata_error, remove_source_entry_if_requested, resolve_dir_merge_path, should_skip_copy, write_sparse_chunk, diff --git a/crates/engine/src/local_copy/context_impl/delta_transfer.rs b/crates/engine/src/local_copy/context_impl/delta_transfer.rs index 21612f362..3843e4405 100644 --- a/crates/engine/src/local_copy/context_impl/delta_transfer.rs +++ b/crates/engine/src/local_copy/context_impl/delta_transfer.rs @@ -26,6 +26,7 @@ impl<'a> CopyContext<'a> { let mut compressed_progress = 0u64; let mut total_bytes = 0u64; let mut literal_bytes = 0u64; + let mut sparse_state = SparseWriteState::default(); let mut window: VecDeque = VecDeque::with_capacity(index.block_length()); let mut pending_literals = Vec::with_capacity(index.block_length()); let mut scratch = Vec::with_capacity(index.block_length()); @@ -72,6 +73,7 @@ impl<'a> CopyContext<'a> { writer, pending_literals.as_slice(), sparse, + &mut sparse_state, compressor.as_mut(), &mut compressed_progress, source, @@ -94,6 +96,7 @@ impl<'a> CopyContext<'a> { destination, matched, sparse, + &mut sparse_state, )?; total_bytes = total_bytes.saturating_add(block_len as u64); let progressed = initial_bytes.saturating_add(total_bytes); @@ -120,6 +123,7 @@ impl<'a> CopyContext<'a> { writer, pending_literals.as_slice(), sparse, + &mut sparse_state, compressor.as_mut(), &mut compressed_progress, source, @@ -132,6 +136,7 @@ impl<'a> CopyContext<'a> { } if sparse { + sparse_state.finish(writer, destination)?; let final_len = initial_bytes.saturating_add(total_bytes); writer.set_len(final_len).map_err(|error| { LocalCopyError::io( @@ -163,6 +168,7 @@ impl<'a> CopyContext<'a> { writer: &mut fs::File, chunk: &[u8], sparse: bool, + state: &mut SparseWriteState, compressor: Option<&mut ActiveCompressor>, compressed_progress: &mut u64, source: &Path, @@ -173,7 +179,7 @@ impl<'a> CopyContext<'a> { } self.enforce_timeout()?; let written = if sparse { - write_sparse_chunk(writer, chunk, destination)? + write_sparse_chunk(writer, state, chunk, destination)? } else { writer.write_all(chunk).map_err(|error| { LocalCopyError::io("copy file", destination.to_path_buf(), error) @@ -204,6 +210,7 @@ impl<'a> CopyContext<'a> { destination: &Path, matched: MatchedBlock<'_>, sparse: bool, + state: &mut SparseWriteState, ) -> Result<(), LocalCopyError> { let offset = matched.offset(); existing.seek(SeekFrom::Start(offset)).map_err(|error| { @@ -238,7 +245,7 @@ impl<'a> CopyContext<'a> { } if sparse { - let _ = write_sparse_chunk(writer, &buffer[..read], destination)?; + let _ = write_sparse_chunk(writer, state, &buffer[..read], destination)?; } else { writer.write_all(&buffer[..read]).map_err(|error| { LocalCopyError::io("copy file", destination.to_path_buf(), error) diff --git a/crates/engine/src/local_copy/context_impl/transfer.rs b/crates/engine/src/local_copy/context_impl/transfer.rs index 5e05e6e3a..9774e1c35 100644 --- a/crates/engine/src/local_copy/context_impl/transfer.rs +++ b/crates/engine/src/local_copy/context_impl/transfer.rs @@ -267,6 +267,7 @@ impl<'a> CopyContext<'a> { let mut total_bytes: u64 = 0; let mut literal_bytes: u64 = 0; + let mut sparse_state = SparseWriteState::default(); let mut compressor = self.start_compressor(compress, source)?; let mut compressed_progress: u64 = 0; let expected_remaining = total_size.saturating_sub(initial_bytes); @@ -290,7 +291,7 @@ impl<'a> CopyContext<'a> { } let written = if sparse { - write_sparse_chunk(writer, &buffer[..read], destination)? + write_sparse_chunk(writer, &mut sparse_state, &buffer[..read], destination)? } else { writer.write_all(&buffer[..read]).map_err(|error| { LocalCopyError::io("copy file", destination.to_path_buf(), error) @@ -324,6 +325,7 @@ impl<'a> CopyContext<'a> { } if sparse { + sparse_state.finish(writer, destination)?; let final_len = initial_bytes.saturating_add(total_bytes); writer.set_len(final_len).map_err(|error| { LocalCopyError::io( diff --git a/crates/engine/src/local_copy/executor/file/mod.rs b/crates/engine/src/local_copy/executor/file/mod.rs index 7439f2025..2b7d6015f 100644 --- a/crates/engine/src/local_copy/executor/file/mod.rs +++ b/crates/engine/src/local_copy/executor/file/mod.rs @@ -21,4 +21,4 @@ pub(crate) use guard::{DestinationWriteGuard, remove_existing_destination}; pub(crate) use paths::partial_destination_path; #[cfg(test)] pub(crate) use preallocate::maybe_preallocate_destination; -pub(crate) use sparse::write_sparse_chunk; +pub(crate) use sparse::{SparseWriteState, write_sparse_chunk}; diff --git a/crates/engine/src/local_copy/executor/file/sparse.rs b/crates/engine/src/local_copy/executor/file/sparse.rs index 26ec84e2f..41020abb8 100644 --- a/crates/engine/src/local_copy/executor/file/sparse.rs +++ b/crates/engine/src/local_copy/executor/file/sparse.rs @@ -5,8 +5,48 @@ use std::path::Path; use crate::local_copy::LocalCopyError; use memchr::memchr; +#[derive(Default)] +pub(crate) struct SparseWriteState { + pending_zero_run: u64, +} + +impl SparseWriteState { + fn accumulate(&mut self, additional: usize) { + self.pending_zero_run = self.pending_zero_run.saturating_add(additional as u64); + } + + fn flush(&mut self, writer: &mut fs::File, destination: &Path) -> Result<(), LocalCopyError> { + if self.pending_zero_run == 0 { + return Ok(()); + } + + let mut remaining = self.pending_zero_run; + while remaining > 0 { + let step = remaining.min(i64::MAX as u64); + writer + .seek(SeekFrom::Current(step as i64)) + .map_err(|error| { + LocalCopyError::io("seek in destination file", destination.to_path_buf(), error) + })?; + remaining -= step; + } + + self.pending_zero_run = 0; + Ok(()) + } + + pub(crate) fn finish( + &mut self, + writer: &mut fs::File, + destination: &Path, + ) -> Result<(), LocalCopyError> { + self.flush(writer, destination) + } +} + pub(crate) fn write_sparse_chunk( writer: &mut fs::File, + state: &mut SparseWriteState, chunk: &[u8], destination: &Path, ) -> Result { @@ -21,7 +61,8 @@ pub(crate) fn write_sparse_chunk( Some(rel_zero) => { let zero_index = offset + rel_zero; - if rel_zero > 0 { + if zero_index > offset { + state.flush(writer, destination)?; writer .write_all(&chunk[offset..zero_index]) .map_err(|error| { @@ -30,24 +71,11 @@ pub(crate) fn write_sparse_chunk( } let zero_run = zero_run_length(&chunk[zero_index..]); - let zero_end = zero_index + zero_run; - - let span = zero_end - zero_index; - if span > 0 { - writer - .seek(SeekFrom::Current(span as i64)) - .map_err(|error| { - LocalCopyError::io( - "seek in destination file", - destination.to_path_buf(), - error, - ) - })?; - } - - offset = zero_end; + state.accumulate(zero_run); + offset = zero_index + zero_run; } None => { + state.flush(writer, destination)?; writer.write_all(&chunk[offset..]).map_err(|error| { LocalCopyError::io("copy file", destination.to_path_buf(), error) })?; @@ -86,7 +114,9 @@ fn zero_run_length_scalar(bytes: &[u8]) -> usize { #[cfg(test)] mod tests { - use super::{zero_run_length, zero_run_length_scalar}; + use super::{SparseWriteState, write_sparse_chunk, zero_run_length, zero_run_length_scalar}; + use std::io::{Read, Seek, SeekFrom}; + use tempfile::NamedTempFile; #[test] fn zero_run_length_matches_scalar_reference() { @@ -117,4 +147,67 @@ mod tests { assert_eq!(zero_run_length(&long[511..]), 0); assert_eq!(zero_run_length(&long[512..]), 1); } + + #[test] + fn sparse_writer_accumulates_zero_runs_across_chunks() { + let mut file = NamedTempFile::new().expect("temp file"); + let path = file.path().to_path_buf(); + let mut state = SparseWriteState::default(); + + let first = [b'A', b'B', 0, 0, 0]; + write_sparse_chunk(file.as_file_mut(), &mut state, &first, path.as_path()) + .expect("write first chunk"); + + let second = [0, 0, b'C', b'D']; + write_sparse_chunk(file.as_file_mut(), &mut state, &second, path.as_path()) + .expect("write second chunk"); + + state + .finish(file.as_file_mut(), path.as_path()) + .expect("finalise sparse writer"); + + let total = (first.len() + second.len()) as u64; + file.as_file_mut() + .set_len(total) + .expect("truncate file to final length"); + file.as_file_mut() + .seek(SeekFrom::Start(0)) + .expect("rewind for verification"); + + let mut buffer = vec![0u8; total as usize]; + file.as_file_mut() + .read_exact(&mut buffer) + .expect("read back contents"); + + assert_eq!(&buffer[0..2], b"AB"); + assert!(buffer[2..7].iter().all(|&byte| byte == 0)); + assert_eq!(&buffer[7..9], b"CD"); + } + + #[test] + fn sparse_writer_flushes_trailing_zero_run() { + let mut file = NamedTempFile::new().expect("temp file"); + let path = file.path().to_path_buf(); + let mut state = SparseWriteState::default(); + + let chunk = [b'Z', 0, 0, 0, 0]; + write_sparse_chunk(file.as_file_mut(), &mut state, &chunk, path.as_path()) + .expect("write chunk"); + state + .finish(file.as_file_mut(), path.as_path()) + .expect("flush trailing zeros"); + + file.as_file_mut() + .set_len(chunk.len() as u64) + .expect("truncate file"); + file.as_file_mut().seek(SeekFrom::Start(0)).expect("rewind"); + + let mut buffer = vec![0u8; chunk.len()]; + file.as_file_mut() + .read_exact(&mut buffer) + .expect("read back data"); + + assert_eq!(buffer[0], b'Z'); + assert!(buffer[1..].iter().all(|&byte| byte == 0)); + } } diff --git a/crates/engine/src/local_copy/executor/mod.rs b/crates/engine/src/local_copy/executor/mod.rs index 36ea018c0..29f7e6759 100644 --- a/crates/engine/src/local_copy/executor/mod.rs +++ b/crates/engine/src/local_copy/executor/mod.rs @@ -14,8 +14,9 @@ pub(crate) use directory::{copy_directory_recursive, is_device, is_fifo}; #[allow(unused_imports)] pub(crate) use file::take_fsync_call_count; pub(crate) use file::{ - CopyComparison, DestinationWriteGuard, compute_backup_path, copy_entry_to_backup, copy_file, - remove_existing_destination, should_skip_copy, write_sparse_chunk, + CopyComparison, DestinationWriteGuard, SparseWriteState, compute_backup_path, + copy_entry_to_backup, copy_file, remove_existing_destination, should_skip_copy, + write_sparse_chunk, }; #[cfg(test)] pub(crate) use file::{