Skip to content

Commit

Permalink
Add FileCombiner
Browse files Browse the repository at this point in the history
  • Loading branch information
sourcefrog committed Oct 18, 2020
1 parent aea784a commit 6a8367e
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 11 deletions.
113 changes: 102 additions & 11 deletions src/backup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
//! Make a backup by walking a source directory and copying the contents
//! into an archive.

use std::convert::TryInto;
use std::io::prelude::*;

use globset::GlobSet;
Expand Down Expand Up @@ -241,17 +242,7 @@ fn store_file_content(
break;
}
let block_data = &buffer.0[..read_len];
let hash = block_dir.hash_bytes(block_data)?;
if block_dir.contains(&hash)? {
// TODO: Separate counter for size of the already-present blocks?
stats.deduplicated_blocks += 1;
stats.deduplicated_bytes += read_len as u64;
} else {
let comp_len = block_dir.compress_and_store(block_data, &hash)?;
stats.written_blocks += 1;
stats.uncompressed_bytes += read_len as u64;
stats.compressed_bytes += comp_len;
}
let hash = store_or_deduplicate(block_data, block_dir, &mut stats)?;
addresses.push(Address {
hash,
start: 0,
Expand All @@ -266,6 +257,106 @@ fn store_file_content(
Ok((addresses, stats))
}

fn store_or_deduplicate(
block_data: &[u8],
block_dir: &mut BlockDir,
stats: &mut CopyStats,
) -> Result<BlockHash> {
let hash = block_dir.hash_bytes(block_data)?;
if block_dir.contains(&hash)? {
stats.deduplicated_blocks += 1;
stats.deduplicated_bytes += block_data.len() as u64;
} else {
let comp_len = block_dir.compress_and_store(block_data, &hash)?;
stats.written_blocks += 1;
stats.uncompressed_bytes += block_data.len() as u64;
stats.compressed_bytes += comp_len;
}
Ok(hash)
}

/// Combines multiple small files into a single block.
///
/// When the block is finished, and only then, this returns the index entries with the addresses
/// completed.
struct FileCombiner {
/// Buffer of concatenated data from small files.
buf: Vec<u8>,
queue: Vec<QueuedFile>,
}

/// A file in the process of being written into a combined block.
///
/// While this exists, the data has been stored into the combine buffer, and we know
/// the offset and length. But since the combine buffer hasn't been finished and hashed,
/// we do not yet know a full address.
struct QueuedFile {
/// Offset of the start of the data from this file within `buf`.
start: usize,
/// Length of data in this file.
len: usize,
/// IndexEntry without addresses.
entry: IndexEntry,
}

impl FileCombiner {
fn new() -> FileCombiner {
FileCombiner {
buf: Vec::new(),
queue: Vec::new(),
}
}

/// Write all the content from the combined block to a blockdir.
///
/// Returns the fully populated entries for all files in this combined block.
///
/// After this call the FileCombiner is empty and can be reused for more files into a new
/// block.
fn flush(&mut self, block_dir: &mut BlockDir) -> Result<(CopyStats, Vec<IndexEntry>)> {
let mut stats = CopyStats::default();
let hash: BlockHash = store_or_deduplicate(&self.buf, block_dir, &mut stats)?;
self.buf.clear();
let finished_entries = self
.queue
.drain(..)
.map(|qf| IndexEntry {
addrs: vec![Address {
hash: hash.clone(),
start: qf.start.try_into().unwrap(),
len: qf.len.try_into().unwrap(),
}],
..qf.entry
})
.collect();
Ok((stats, finished_entries))
}

/// Add the contents of a small file into this combiner.
///
/// `entry` should be an IndexEntry that's complete apart from the block addresses.
fn push_file(&mut self, entry: IndexEntry, from_file: &mut dyn Read) -> Result<()> {
let start = self.buf.len();
let expected_len: usize = entry
.size()
.expect("small file has no length")
.try_into()
.unwrap();
assert!(expected_len > 0, "small file is empty");
self.buf.resize(start + expected_len, 0);
let len = from_file
.read(&mut self.buf[start..])
.map_err(|source| Error::StoreFile {
apath: entry.apath.to_owned(),
source,
})?;
// TODO: Maybe check this is actually the end of the file.
self.buf.truncate(start + len);
self.queue.push(QueuedFile { start, len, entry });
Ok(())
}
}

#[cfg(test)]
mod tests {
use std::fs;
Expand Down
6 changes: 6 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ pub const SYMLINKS_SUPPORTED: bool = cfg!(target_family = "unix");
/// Break blocks at this many uncompressed bytes.
pub(crate) const MAX_BLOCK_SIZE: usize = 1 << 20;

/// Maximum file size that will be combined with others rather than being stored alone.
const SMALL_FILE_CAP: u64 = 100_000;

/// Target maximum uncompressed size for combined blocks.
const TARGET_COMBINED_BLOCK_SIZE: usize = MAX_BLOCK_SIZE;

/// ISO timestamp, for https://docs.rs/chrono/0.4.11/chrono/format/strftime/.
const TIMESTAMP_FORMAT: &str = "%F %T";

Expand Down

0 comments on commit 6a8367e

Please sign in to comment.