Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/engine/src/local_copy/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use super::{
LocalCopyArgumentError, LocalCopyError, LocalCopyErrorKind, LocalCopyExecution,
LocalCopyMetadata, LocalCopyOptions, LocalCopyProgress, LocalCopyRecord,
LocalCopyRecordHandler, LocalCopyReport, LocalCopySummary, ReferenceDirectory,
compute_backup_path, copy_entry_to_backup, delete_extraneous_entries,
SparseWriteState, compute_backup_path, copy_entry_to_backup, delete_extraneous_entries,
filter_program_local_error, follow_symlink_metadata, load_dir_merge_rules_recursive,
map_metadata_error, remove_source_entry_if_requested, resolve_dir_merge_path, should_skip_copy,
write_sparse_chunk,
Expand Down
11 changes: 9 additions & 2 deletions crates/engine/src/local_copy/context_impl/delta_transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ impl<'a> CopyContext<'a> {
let mut compressed_progress = 0u64;
let mut total_bytes = 0u64;
let mut literal_bytes = 0u64;
let mut sparse_state = SparseWriteState::default();
let mut window: VecDeque<u8> = VecDeque::with_capacity(index.block_length());
let mut pending_literals = Vec::with_capacity(index.block_length());
let mut scratch = Vec::with_capacity(index.block_length());
Expand Down Expand Up @@ -72,6 +73,7 @@ impl<'a> CopyContext<'a> {
writer,
pending_literals.as_slice(),
sparse,
&mut sparse_state,
compressor.as_mut(),
&mut compressed_progress,
source,
Expand All @@ -94,6 +96,7 @@ impl<'a> CopyContext<'a> {
destination,
matched,
sparse,
&mut sparse_state,
)?;
total_bytes = total_bytes.saturating_add(block_len as u64);
let progressed = initial_bytes.saturating_add(total_bytes);
Expand All @@ -120,6 +123,7 @@ impl<'a> CopyContext<'a> {
writer,
pending_literals.as_slice(),
sparse,
&mut sparse_state,
compressor.as_mut(),
&mut compressed_progress,
source,
Expand All @@ -132,6 +136,7 @@ impl<'a> CopyContext<'a> {
}

if sparse {
sparse_state.finish(writer, destination)?;
let final_len = initial_bytes.saturating_add(total_bytes);
writer.set_len(final_len).map_err(|error| {
LocalCopyError::io(
Expand Down Expand Up @@ -163,6 +168,7 @@ impl<'a> CopyContext<'a> {
writer: &mut fs::File,
chunk: &[u8],
sparse: bool,
state: &mut SparseWriteState,
compressor: Option<&mut ActiveCompressor>,
compressed_progress: &mut u64,
source: &Path,
Expand All @@ -173,7 +179,7 @@ impl<'a> CopyContext<'a> {
}
self.enforce_timeout()?;
let written = if sparse {
write_sparse_chunk(writer, chunk, destination)?
write_sparse_chunk(writer, state, chunk, destination)?
} else {
writer.write_all(chunk).map_err(|error| {
LocalCopyError::io("copy file", destination.to_path_buf(), error)
Expand Down Expand Up @@ -204,6 +210,7 @@ impl<'a> CopyContext<'a> {
destination: &Path,
matched: MatchedBlock<'_>,
sparse: bool,
state: &mut SparseWriteState,
) -> Result<(), LocalCopyError> {
let offset = matched.offset();
existing.seek(SeekFrom::Start(offset)).map_err(|error| {
Expand Down Expand Up @@ -238,7 +245,7 @@ impl<'a> CopyContext<'a> {
}

if sparse {
let _ = write_sparse_chunk(writer, &buffer[..read], destination)?;
let _ = write_sparse_chunk(writer, state, &buffer[..read], destination)?;
} else {
writer.write_all(&buffer[..read]).map_err(|error| {
LocalCopyError::io("copy file", destination.to_path_buf(), error)
Expand Down
4 changes: 3 additions & 1 deletion crates/engine/src/local_copy/context_impl/transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ impl<'a> CopyContext<'a> {

let mut total_bytes: u64 = 0;
let mut literal_bytes: u64 = 0;
let mut sparse_state = SparseWriteState::default();
let mut compressor = self.start_compressor(compress, source)?;
let mut compressed_progress: u64 = 0;
let expected_remaining = total_size.saturating_sub(initial_bytes);
Expand All @@ -290,7 +291,7 @@ impl<'a> CopyContext<'a> {
}

let written = if sparse {
write_sparse_chunk(writer, &buffer[..read], destination)?
write_sparse_chunk(writer, &mut sparse_state, &buffer[..read], destination)?
} else {
writer.write_all(&buffer[..read]).map_err(|error| {
LocalCopyError::io("copy file", destination.to_path_buf(), error)
Expand Down Expand Up @@ -324,6 +325,7 @@ impl<'a> CopyContext<'a> {
}

if sparse {
sparse_state.finish(writer, destination)?;
let final_len = initial_bytes.saturating_add(total_bytes);
writer.set_len(final_len).map_err(|error| {
LocalCopyError::io(
Expand Down
2 changes: 1 addition & 1 deletion crates/engine/src/local_copy/executor/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ pub(crate) use guard::{DestinationWriteGuard, remove_existing_destination};
pub(crate) use paths::partial_destination_path;
#[cfg(test)]
pub(crate) use preallocate::maybe_preallocate_destination;
pub(crate) use sparse::write_sparse_chunk;
pub(crate) use sparse::{SparseWriteState, write_sparse_chunk};
129 changes: 111 additions & 18 deletions crates/engine/src/local_copy/executor/file/sparse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,48 @@ use std::path::Path;
use crate::local_copy::LocalCopyError;
use memchr::memchr;

#[derive(Default)]
pub(crate) struct SparseWriteState {
pending_zero_run: u64,
}

impl SparseWriteState {
fn accumulate(&mut self, additional: usize) {
self.pending_zero_run = self.pending_zero_run.saturating_add(additional as u64);
}

fn flush(&mut self, writer: &mut fs::File, destination: &Path) -> Result<(), LocalCopyError> {
if self.pending_zero_run == 0 {
return Ok(());
}

let mut remaining = self.pending_zero_run;
while remaining > 0 {
let step = remaining.min(i64::MAX as u64);
writer
.seek(SeekFrom::Current(step as i64))
.map_err(|error| {
LocalCopyError::io("seek in destination file", destination.to_path_buf(), error)
})?;
remaining -= step;
}

self.pending_zero_run = 0;
Ok(())
}

pub(crate) fn finish(
&mut self,
writer: &mut fs::File,
destination: &Path,
) -> Result<(), LocalCopyError> {
self.flush(writer, destination)
}
}

pub(crate) fn write_sparse_chunk(
writer: &mut fs::File,
state: &mut SparseWriteState,
chunk: &[u8],
destination: &Path,
) -> Result<usize, LocalCopyError> {
Expand All @@ -21,7 +61,8 @@ pub(crate) fn write_sparse_chunk(
Some(rel_zero) => {
let zero_index = offset + rel_zero;

if rel_zero > 0 {
if zero_index > offset {
state.flush(writer, destination)?;
writer
.write_all(&chunk[offset..zero_index])
.map_err(|error| {
Expand All @@ -30,24 +71,11 @@ pub(crate) fn write_sparse_chunk(
}

let zero_run = zero_run_length(&chunk[zero_index..]);
let zero_end = zero_index + zero_run;

let span = zero_end - zero_index;
if span > 0 {
writer
.seek(SeekFrom::Current(span as i64))
.map_err(|error| {
LocalCopyError::io(
"seek in destination file",
destination.to_path_buf(),
error,
)
})?;
}

offset = zero_end;
state.accumulate(zero_run);
offset = zero_index + zero_run;
}
None => {
state.flush(writer, destination)?;
writer.write_all(&chunk[offset..]).map_err(|error| {
LocalCopyError::io("copy file", destination.to_path_buf(), error)
})?;
Expand Down Expand Up @@ -86,7 +114,9 @@ fn zero_run_length_scalar(bytes: &[u8]) -> usize {

#[cfg(test)]
mod tests {
use super::{zero_run_length, zero_run_length_scalar};
use super::{SparseWriteState, write_sparse_chunk, zero_run_length, zero_run_length_scalar};
use std::io::{Read, Seek, SeekFrom};
use tempfile::NamedTempFile;

#[test]
fn zero_run_length_matches_scalar_reference() {
Expand Down Expand Up @@ -117,4 +147,67 @@ mod tests {
assert_eq!(zero_run_length(&long[511..]), 0);
assert_eq!(zero_run_length(&long[512..]), 1);
}

#[test]
fn sparse_writer_accumulates_zero_runs_across_chunks() {
let mut file = NamedTempFile::new().expect("temp file");
let path = file.path().to_path_buf();
let mut state = SparseWriteState::default();

let first = [b'A', b'B', 0, 0, 0];
write_sparse_chunk(file.as_file_mut(), &mut state, &first, path.as_path())
.expect("write first chunk");

let second = [0, 0, b'C', b'D'];
write_sparse_chunk(file.as_file_mut(), &mut state, &second, path.as_path())
.expect("write second chunk");

state
.finish(file.as_file_mut(), path.as_path())
.expect("finalise sparse writer");

let total = (first.len() + second.len()) as u64;
file.as_file_mut()
.set_len(total)
.expect("truncate file to final length");
file.as_file_mut()
.seek(SeekFrom::Start(0))
.expect("rewind for verification");

let mut buffer = vec![0u8; total as usize];
file.as_file_mut()
.read_exact(&mut buffer)
.expect("read back contents");

assert_eq!(&buffer[0..2], b"AB");
assert!(buffer[2..7].iter().all(|&byte| byte == 0));
assert_eq!(&buffer[7..9], b"CD");
}

#[test]
fn sparse_writer_flushes_trailing_zero_run() {
let mut file = NamedTempFile::new().expect("temp file");
let path = file.path().to_path_buf();
let mut state = SparseWriteState::default();

let chunk = [b'Z', 0, 0, 0, 0];
write_sparse_chunk(file.as_file_mut(), &mut state, &chunk, path.as_path())
.expect("write chunk");
state
.finish(file.as_file_mut(), path.as_path())
.expect("flush trailing zeros");

file.as_file_mut()
.set_len(chunk.len() as u64)
.expect("truncate file");
file.as_file_mut().seek(SeekFrom::Start(0)).expect("rewind");

let mut buffer = vec![0u8; chunk.len()];
file.as_file_mut()
.read_exact(&mut buffer)
.expect("read back data");

assert_eq!(buffer[0], b'Z');
assert!(buffer[1..].iter().all(|&byte| byte == 0));
}
}
5 changes: 3 additions & 2 deletions crates/engine/src/local_copy/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ pub(crate) use directory::{copy_directory_recursive, is_device, is_fifo};
#[allow(unused_imports)]
pub(crate) use file::take_fsync_call_count;
pub(crate) use file::{
CopyComparison, DestinationWriteGuard, compute_backup_path, copy_entry_to_backup, copy_file,
remove_existing_destination, should_skip_copy, write_sparse_chunk,
CopyComparison, DestinationWriteGuard, SparseWriteState, compute_backup_path,
copy_entry_to_backup, copy_file, remove_existing_destination, should_skip_copy,
write_sparse_chunk,
};
#[cfg(test)]
pub(crate) use file::{
Expand Down