diff --git a/src/cache_dir.rs b/src/cache_dir.rs index 99c7c21..9f2d9ef 100644 --- a/src/cache_dir.rs +++ b/src/cache_dir.rs @@ -1,7 +1,9 @@ use std::borrow::Cow; use std::fs::File; +use std::io::ErrorKind; use std::io::Result; use std::path::Path; +use std::path::PathBuf; use std::time::Duration; use crate::raw_cache; @@ -15,16 +17,36 @@ const MAX_TEMP_FILE_AGE: Duration = Duration::from_secs(3600); #[cfg(test)] const MAX_TEMP_FILE_AGE: Duration = Duration::from_secs(2); +/// Attempts to make sure `path` is a directory that exists. Unlike +/// `std::fs::create_dir_all`, this function is optimised for the case +/// where `path` is already a directory. +fn ensure_directory(path: &Path) -> Result<()> { + if let Ok(meta) = std::fs::metadata(path) { + if meta.file_type().is_dir() { + return Ok(()); + } + } + + std::fs::create_dir_all(path) +} + /// Deletes any file with mtime older than `MAX_TEMP_FILE_AGE` in /// `temp_dir`. +/// +/// It is not an error if `temp_dir` does not exist. fn cleanup_temporary_directory(temp_dir: Cow) -> Result<()> { let threshold = match std::time::SystemTime::now().checked_sub(MAX_TEMP_FILE_AGE) { Some(time) => time, None => return Ok(()), }; + let iter = match std::fs::read_dir(&temp_dir) { + Err(e) if e.kind() == ErrorKind::NotFound => return Ok(()), + x => x?, + }; + let mut temp = temp_dir.into_owned(); - for dirent in std::fs::read_dir(&temp)?.flatten() { + for dirent in iter.flatten() { let mut handle = || -> Result<()> { let metadata = dirent.metadata()?; let mtime = metadata.modified()?; @@ -59,6 +81,14 @@ pub(crate) trait CacheDir { /// Returns the cache's directory capacity (in object count). fn capacity(&self) -> usize; + /// Return the path for the cache directory's temporary + /// subdirectory, after making sure the directory exists. + fn ensure_temp_dir(&self) -> Result> { + let ret = self.temp_dir(); + ensure_directory(&ret)?; + Ok(ret) + } + /// Returns a read-only file for `name` in the cache directory if /// it exists, or None if there is no such file. /// @@ -80,52 +110,82 @@ pub(crate) trait CacheDir { cleanup_temporary_directory(self.temp_dir()) } + /// Updates the second chance cache state and deletes temporary + /// files in the `base_dir` cache directory. + fn definitely_cleanup(&self, base_dir: PathBuf) -> Result { + let ret = match raw_cache::prune(base_dir, self.capacity()) { + Ok((estimate, _deleted)) => estimate, + Err(e) if e.kind() == ErrorKind::NotFound => return Ok(0), + Err(e) => return Err(e), + }; + + // Delete old temporary files while we're here. + self.cleanup_temp_directory()?; + Ok(ret) + } + /// If a periodic cleanup is called for, updates the second chance /// cache state and deletes temporary files in that cache directory. /// - /// Returns true whenever cleanup was initiated. - fn maybe_cleanup(&self, base_dir: &Path) -> Result { + /// Returns the estimated number of files remaining after cleanup + /// whenever cleanup was initiated. + fn maybe_cleanup(&self, base_dir: &Path) -> Result> { if self.trigger().event() { - raw_cache::prune(base_dir.to_owned(), self.capacity())?; - // Delete old temporary files while we're here. - self.cleanup_temp_directory()?; - Ok(true) + Ok(Some(self.definitely_cleanup(base_dir.to_owned())?)) } else { - Ok(false) + Ok(None) } } + /// Updates the second chance cache state and deletes temporary + /// files in the `base_dir` cache directory. + /// + /// Returns the estimated number of files remaining after cleanup. + fn maintain(&self) -> Result { + self.definitely_cleanup(self.base_dir().into_owned()) + } + /// Inserts or overwrites the file at `value` as `name` in the /// cache directory. /// + /// Returns the estimated number of files remaining after cleanup + /// whenever cleanup was initiated. + /// /// Always consumes the file at `value` on success; may consume it /// on error. - fn set(&self, name: &str, value: &Path) -> Result<()> { + fn set(&self, name: &str, value: &Path) -> Result> { let mut dst = self.base_dir().into_owned(); - self.maybe_cleanup(&dst)?; + let ret = self.maybe_cleanup(&dst)?; + ensure_directory(&dst)?; dst.push(name); - raw_cache::insert_or_update(value, &dst) + raw_cache::insert_or_update(value, &dst)?; + Ok(ret) } /// Inserts the file at `value` as `name` in the cache directory /// if there is no such cached entry already, or touches the /// cached file if it already exists. /// + /// Returns the estimated number of files remaining after cleanup + /// whenever cleanup was initiated. + /// /// Always consumes the file at `value` on success; may consume it /// on error. - fn put(&self, name: &str, value: &Path) -> Result<()> { + fn put(&self, name: &str, value: &Path) -> Result> { let mut dst = self.base_dir().into_owned(); - self.maybe_cleanup(&dst)?; + let ret = self.maybe_cleanup(&dst)?; + ensure_directory(&dst)?; dst.push(name); - raw_cache::insert_or_touch(value, &dst) + raw_cache::insert_or_touch(value, &dst)?; + Ok(ret) } /// Marks the cached file `name` as newly used, if it exists. /// - /// Succeeds if `name` does not exist anymore. - fn touch(&self, name: &str) -> Result<()> { + /// Returns whether the file `name` exists. + fn touch(&self, name: &str) -> Result { let mut target = self.base_dir().into_owned(); target.push(name); diff --git a/src/lib.rs b/src/lib.rs index d57c654..61af548 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,37 @@ mod cache_dir; mod plain; pub mod raw_cache; +mod readonly; pub mod second_chance; +mod sharded; +mod stack; mod trigger; pub use plain::PlainCache; +pub use readonly::ReadOnlyCache; +pub use readonly::ReadOnlyCacheBuilder; +pub use sharded::ShardedCache; +pub use stack::Cache; +pub use stack::CacheBuilder; + +/// Sharded cache keys consist of a filename and two hash values. The +/// two hashes should be computed by distinct functions of the key's +/// name, and each hash function must be identical for all processes +/// that access the same sharded cache directory. +#[derive(Clone, Copy, Debug)] +pub struct Key<'a> { + pub name: &'a str, + pub hash: u64, + pub secondary_hash: u64, +} + +impl<'a> Key<'a> { + /// Returns a new `Key` for this `name`, `hash`, and `secondary_hash`. + pub fn new(name: &str, hash: u64, secondary_hash: u64) -> Key { + Key { + name, + hash, + secondary_hash, + } + } +} diff --git a/src/plain.rs b/src/plain.rs index 24477fe..005c37c 100644 --- a/src/plain.rs +++ b/src/plain.rs @@ -61,22 +61,15 @@ impl CacheDir for PlainCache { impl PlainCache { /// Returns a new cache for approximately `capacity` files in /// `base_dir`. - /// - /// # Errors - /// - /// Returns `Err` if `$base_dir/.temp` does not exist and we fail - /// to create it. - pub fn new(base_dir: PathBuf, capacity: usize) -> Result { + pub fn new(base_dir: PathBuf, capacity: usize) -> PlainCache { let mut temp_dir = base_dir; temp_dir.push(TEMP_SUBDIR); - std::fs::create_dir_all(&temp_dir)?; - - Ok(PlainCache { + PlainCache { temp_dir, trigger: PeriodicTrigger::new((capacity / MAINTENANCE_SCALE) as u64), capacity, - }) + } } /// Returns a read-only file for `name` in the cache directory if @@ -89,8 +82,8 @@ impl PlainCache { /// Returns a temporary directory suitable for temporary files /// that will be published to the cache directory. - pub fn temp_dir(&self) -> Cow { - CacheDir::temp_dir(self) + pub fn temp_dir(&self) -> Result> { + CacheDir::ensure_temp_dir(self) } /// Inserts or overwrites the file at `value` as `name` in the @@ -99,7 +92,8 @@ impl PlainCache { /// Always consumes the file at `value` on success; may consume it /// on error. pub fn set(&self, name: &str, value: &Path) -> Result<()> { - CacheDir::set(self, name, value) + CacheDir::set(self, name, value)?; + Ok(()) } /// Inserts the file at `value` as `name` in the cache directory @@ -109,13 +103,14 @@ impl PlainCache { /// Always consumes the file at `value` on success; may consume it /// on error. pub fn put(&self, name: &str, value: &Path) -> Result<()> { - CacheDir::put(self, name, value) + CacheDir::put(self, name, value)?; + Ok(()) } /// Marks the cached file `name` as newly used, if it exists. /// - /// Succeeds even if `name` does not exist. - pub fn touch(&self, name: &str) -> Result<()> { + /// Returns whether `name` exists. + pub fn touch(&self, name: &str) -> Result { CacheDir::touch(self, name) } } @@ -140,12 +135,13 @@ fn smoke_test() { // Make sure the garbage file is old enough to be deleted. std::thread::sleep(std::time::Duration::from_secs_f64(2.5)); - let cache = PlainCache::new(temp.path("."), 10).expect("::new must succeed"); + let cache = PlainCache::new(temp.path("."), 10); for i in 0..20 { let name = format!("{}", i); - let tmp = NamedTempFile::new_in(cache.temp_dir()).expect("new temp file must succeed"); + let tmp = NamedTempFile::new_in(cache.temp_dir().expect("temp_dir must succeed")) + .expect("new temp file must succeed"); std::fs::write(tmp.path(), format!("{}", PAYLOAD_MULTIPLIER * i)) .expect("write must succeed"); cache.put(&name, tmp.path()).expect("put must succeed"); @@ -185,10 +181,11 @@ fn test_set() { use test_dir::{DirBuilder, TestDir}; let temp = TestDir::temp(); - let cache = PlainCache::new(temp.path("."), 1).expect("::new must succeed"); + let cache = PlainCache::new(temp.path("."), 1); { - let tmp = NamedTempFile::new_in(cache.temp_dir()).expect("new temp file must succeed"); + let tmp = NamedTempFile::new_in(cache.temp_dir().expect("temp_dir must succeed")) + .expect("new temp file must succeed"); tmp.as_file().write_all(b"v1").expect("write must succeed"); cache @@ -208,7 +205,8 @@ fn test_set() { // Now overwrite; it should take. { - let tmp = NamedTempFile::new_in(cache.temp_dir()).expect("new temp file must succeed"); + let tmp = NamedTempFile::new_in(cache.temp_dir().expect("temp_dir must succeed")) + .expect("new temp file must succeed"); tmp.as_file().write_all(b"v2").expect("write must succeed"); cache @@ -236,10 +234,11 @@ fn test_put() { use test_dir::{DirBuilder, TestDir}; let temp = TestDir::temp(); - let cache = PlainCache::new(temp.path("."), 1).expect("::new must succeed"); + let cache = PlainCache::new(temp.path("."), 1); { - let tmp = NamedTempFile::new_in(cache.temp_dir()).expect("new temp file must succeed"); + let tmp = NamedTempFile::new_in(cache.temp_dir().expect("temp_dir must succeed")) + .expect("new temp file must succeed"); tmp.as_file().write_all(b"v1").expect("write must succeed"); cache @@ -259,7 +258,8 @@ fn test_put() { // Now put again; it shouldn't overwrite. { - let tmp = NamedTempFile::new_in(cache.temp_dir()).expect("new temp file must succeed"); + let tmp = NamedTempFile::new_in(cache.temp_dir().expect("temp_dir must succeed")) + .expect("new temp file must succeed"); tmp.as_file().write_all(b"v2").expect("write must succeed"); cache @@ -286,18 +286,22 @@ fn test_touch() { use test_dir::{DirBuilder, TestDir}; let temp = TestDir::temp(); - let cache = PlainCache::new(temp.path("."), 5).expect("::new must succeed"); + let cache = PlainCache::new(temp.path("."), 5); for i in 0..15 { let name = format!("{}", i); - cache.touch("0").expect("touch must not fail"); + // After the first write, touch should find our file. + assert_eq!(cache.touch("0").expect("touch must not fail"), i > 0); - let tmp = NamedTempFile::new_in(cache.temp_dir()).expect("new temp file must succeed"); + let tmp = NamedTempFile::new_in(cache.temp_dir().expect("temp_dir must succeed")) + .expect("new temp file must succeed"); cache.put(&name, tmp.path()).expect("put must succeed"); - // Make sure enough time elapses for the next file to get - // a different timestamp. - std::thread::sleep(std::time::Duration::from_secs_f64(1.5)); + // Make sure enough time elapses for the first file to get + // an older timestamp than the rest. + if i == 0 { + std::thread::sleep(std::time::Duration::from_secs_f64(1.5)); + } } // We should still find "0": it's the oldest, but we also keep @@ -320,10 +324,11 @@ fn test_recent_temp_file() { // The garbage file must exist. assert!(std::fs::metadata(temp.path(&format!("{}/garbage", TEMP_SUBDIR))).is_ok()); - let cache = PlainCache::new(temp.path("."), 1).expect("::new must succeed"); + let cache = PlainCache::new(temp.path("."), 1); for i in 0..2 { - let tmp = NamedTempFile::new_in(cache.temp_dir()).expect("new temp file must succeed"); + let tmp = NamedTempFile::new_in(cache.temp_dir().expect("temp_dir must succeed")) + .expect("new temp file must succeed"); cache .put(&format!("{}", i), tmp.path()) .expect("put must succeed"); diff --git a/src/raw_cache.rs b/src/raw_cache.rs index 971a42d..2466eb9 100644 --- a/src/raw_cache.rs +++ b/src/raw_cache.rs @@ -50,19 +50,24 @@ fn set_read_only(path: &Path) -> Result<()> { } /// Sets the access bit to true for the file at `path`: the next time -/// that file is up for eviction, it will get a second chance. +/// that file is up for eviction, it will get a second chance. Returns +/// true if the file was found, false otherwise. /// /// In most cases, there is no need to explicitly call this function: /// the operating system will automatically perform the required /// update while opening the file at `path`. -pub fn touch>(path: P) -> Result<()> { - match filetime::set_file_atime(path.as_ref(), FileTime::now()) { - Ok(()) => Ok(()), - // It's OK if the file we're trying to touch was removed: - // things do disappear from caches. - Err(e) if e.kind() == ErrorKind::NotFound => Ok(()), - err => err, +pub fn touch(path: impl AsRef) -> Result { + fn run(path: &Path) -> Result { + match filetime::set_file_atime(path, FileTime::now()) { + Ok(()) => Ok(true), + // It's OK if the file we're trying to touch was removed: + // things do disappear from caches. + Err(e) if e.kind() == ErrorKind::NotFound => Ok(false), + Err(e) => Err(e), + } } + + run(path.as_ref()) } /// Consumes the file `from` and publishes it to the raw cache file @@ -78,14 +83,17 @@ pub fn touch>(path: P) -> Result<()> { /// `sync_data`ing the contents of `from`. This function does not /// fsync the cache directory itself: it's a cache, so stale contents /// are assumed safe. -pub fn insert_or_update, Q: AsRef>(from: P, to: Q) -> Result<()> { - let from_path = from.as_ref(); - // Move to the back of the list before publishing: if a reader - // comes in right away, we want it to set the access bit. - move_to_back_of_list(from_path)?; - set_read_only(from_path)?; - std::fs::rename(from_path, to)?; - ensure_file_removed(from_path) +pub fn insert_or_update(from: impl AsRef, to: impl AsRef) -> Result<()> { + fn run(from: &Path, to: &Path) -> Result<()> { + // Move to the back of the list before publishing: if a reader + // comes in right away, we want it to set the access bit. + move_to_back_of_list(from)?; + set_read_only(from)?; + std::fs::rename(from, to)?; + ensure_file_removed(from) + } + + run(from.as_ref(), to.as_ref()) } /// Consumes the file `from` and publishes it to the raw cache file @@ -105,33 +113,38 @@ pub fn insert_or_update, Q: AsRef>(from: P, to: Q) -> Resul /// `sync_data`ing the contents of `from`. This function does not /// fsync the cache directory itself: it's a cache, so stale contents /// are assumed safe. -pub fn insert_or_touch, Q: AsRef>(from: P, to: Q) -> Result<()> { - let from_path = from.as_ref(); - let to_path = to.as_ref(); - - // Optimise for the successful publication case: we expect callers - // to only `insert_or_touch` after a failed lookup, so the `link` - // call will only fail with EEXIST if another writer raced with us. - move_to_back_of_list(from_path)?; - set_read_only(from_path)?; - match std::fs::hard_link(from_path, to_path) { - Ok(()) => {} - // The destination file already exists; we just have to mark - // it as accessed. - Err(e) if e.kind() == ErrorKind::AlreadyExists => touch(to_path)?, - err => err?, +pub fn insert_or_touch(from: impl AsRef, to: impl AsRef) -> Result<()> { + fn run(from: &Path, to: &Path) -> Result<()> { + // Optimise for the successful publication case: we expect callers + // to only `insert_or_touch` after a failed lookup, so the `link` + // call will only fail with EEXIST if another writer raced with us. + move_to_back_of_list(from)?; + set_read_only(from)?; + match std::fs::hard_link(from, to) { + Ok(()) => {} + // The destination file already exists; we just have to mark + // it as accessed. + Err(e) if e.kind() == ErrorKind::AlreadyExists => { + touch(to)?; + } + err => err?, + } + + ensure_file_removed(from) } - ensure_file_removed(from_path) + run(from.as_ref(), to.as_ref()) } impl second_chance::Entry for CachedFile { type Rank = FileTime; + #[inline] fn rank(&self) -> FileTime { self.mtime } + #[inline] fn accessed(&self) -> bool { self.accessed } @@ -357,7 +370,8 @@ fn test_touch() { assert!(!old_entry.accessed()); advance_time(); - touch(&path).expect("call should succeed"); + // Should return true: the file exists. + assert!(touch(&path).expect("call should succeed")); let new_entry = get_entry(); assert_eq!(new_entry.rank(), old_entry.rank()); @@ -370,7 +384,8 @@ fn test_touch_missing() { use test_dir::{DirBuilder, TestDir}; let temp = TestDir::temp(); - touch(&temp.path("absent")).expect("should succeed on missing files"); + // Should return file: the file does not exist. + assert!(!touch(&temp.path("absent")).expect("should succeed on missing files")); } /// Reading a file should set the accessed bit, but not change the rank. diff --git a/src/readonly.rs b/src/readonly.rs new file mode 100644 index 0000000..3d4f58a --- /dev/null +++ b/src/readonly.rs @@ -0,0 +1,366 @@ +//! A `ReadOnlyCache` wraps an arbitrary number of caches, and +//! attempts to satisfy `get` and `touch` requests by hitting each +//! cache in order. For read-only usage, this should be a simple +//! and easy-to-use interface that erases the difference between plain +//! and sharded caches. +use std::fs::File; +use std::io::Result; +use std::path::Path; +use std::sync::Arc; + +use crate::Key; +use crate::PlainCache; +use crate::ShardedCache; + +/// The `ReadSide` trait offers `get` and `touch`, as implemented by +/// both plain and sharded caches. +trait ReadSide: + std::fmt::Debug + Sync + Send + std::panic::RefUnwindSafe + std::panic::UnwindSafe +{ + /// Returns a read-only file for `key` in the cache directory if + /// it exists, or None if there is no such file. + /// + /// Implicitly "touches" the cached file if it exists. + fn get(&self, key: Key) -> Result>; + + /// Marks the cached file `key` as newly used, if it exists. + /// + /// Returns whether a file for `key` exists in the cache. + fn touch(&self, key: Key) -> Result; +} + +impl ReadSide for PlainCache { + fn get(&self, key: Key) -> Result> { + PlainCache::get(self, key.name) + } + + fn touch(&self, key: Key) -> Result { + PlainCache::touch(self, key.name) + } +} + +impl ReadSide for ShardedCache { + fn get(&self, key: Key) -> Result> { + ShardedCache::get(self, key) + } + + fn touch(&self, key: Key) -> Result { + ShardedCache::touch(self, key) + } +} + +/// Construct a `ReadOnlyCache` with this builder. The resulting +/// cache will access each constituent cache directory in the order +/// they were added. +/// +/// The default builder is a fresh builder with no constituent cache. +#[derive(Debug, Default)] +pub struct ReadOnlyCacheBuilder { + stack: Vec>, +} + +/// A `ReadOnlyCache` wraps an arbitrary number of caches, and +/// attempts to satisfy `get` and `touch` requests by hitting each +/// constituent cache in order. This interface hides the difference +/// between plain and sharded cache directories, and should be the +/// first resort for read-only uses. +/// +/// The default cache wraps an empty set of constituent caches. +#[derive(Clone, Debug)] +pub struct ReadOnlyCache { + stack: Arc<[Box]>, +} + +impl ReadOnlyCacheBuilder { + /// Returns a fresh empty builder. + pub fn new() -> Self { + Self::default() + } + + /// Adds a new cache directory at `path` to the end of the cache + /// builder's search list. + /// + /// Adds a plain cache directory if `num_shards <= 1`, and an + /// actual sharded directory otherwise. + pub fn cache(self, path: impl AsRef, num_shards: usize) -> Self { + if num_shards <= 1 { + self.plain(path) + } else { + self.sharded(path, num_shards) + } + } + + /// Adds a new plain cache directory at `path` to the end of the + /// cache builder's search list. A plain cache directory is + /// merely a directory of files where the files' names match their + /// key's name. + pub fn plain(mut self, path: impl AsRef) -> Self { + self.stack.push(Box::new(PlainCache::new( + path.as_ref().to_owned(), + usize::MAX, + ))); + + self + } + + /// Adds a new sharded cache directory at `path` to the end of the + /// cache builder's search list. + pub fn sharded(mut self, path: impl AsRef, num_shards: usize) -> Self { + self.stack.push(Box::new(ShardedCache::new( + path.as_ref().to_owned(), + num_shards, + usize::MAX, + ))); + self + } + + /// Returns a fresh `ReadOnlyCache` for the builder's search list + /// of constituent cache directories. + pub fn build(self) -> ReadOnlyCache { + ReadOnlyCache::new(self.stack) + } +} + +impl Default for ReadOnlyCache { + fn default() -> ReadOnlyCache { + ReadOnlyCache::new(Default::default()) + } +} + +impl ReadOnlyCache { + fn new(stack: Vec>) -> ReadOnlyCache { + ReadOnlyCache { + stack: stack.into_boxed_slice().into(), + } + } + + /// Attempts to open a read-only file for `key`. The + /// `ReadOnlyCache` will query each constituent cache in order of + /// registration, and return a read-only file for the first hit. + /// + /// Returns `None` if no file for `key` can be found in any of the + /// constituent caches, and bubbles up the first I/O error + /// encountered, if any. + pub fn get<'a>(&self, key: impl Into>) -> Result> { + fn doit(stack: &[Box], key: Key) -> Result> { + for cache in stack.iter() { + if let Some(ret) = cache.get(key)? { + return Ok(Some(ret)); + } + } + + Ok(None) + } + + if self.stack.is_empty() { + return Ok(None); + } + + doit(&*self.stack, key.into()) + } + + /// Marks a cache entry for `key` as accessed (read). The + /// `ReadOnlyCache` will touch the same file that would be returned + /// by `get`. + /// + /// Returns whether a file for `key` could be found, and bubbles + /// up the first I/O error encountered, if any. + pub fn touch<'a>(&self, key: impl Into>) -> Result { + fn doit(stack: &[Box], key: Key) -> Result { + for cache in stack.iter() { + if cache.touch(key)? { + return Ok(true); + } + } + + Ok(false) + } + + if self.stack.is_empty() { + return Ok(false); + } + + doit(&*self.stack, key.into()) + } +} + +#[cfg(test)] +mod test { + use crate::Key; + use crate::PlainCache; + use crate::ReadOnlyCache; + use crate::ReadOnlyCacheBuilder; + use crate::ShardedCache; + + struct TestKey { + key: String, + } + + impl TestKey { + fn new(key: &str) -> TestKey { + TestKey { + key: key.to_string(), + } + } + } + + impl<'a> From<&'a TestKey> for Key<'a> { + fn from(x: &'a TestKey) -> Key<'a> { + Key::new(&x.key, 0, 1) + } + } + + /// A stack of 0 caches should always succeed with a trivial result. + #[test] + fn empty() { + let ro: ReadOnlyCache = Default::default(); + + assert!(matches!(ro.get(Key::new("foo", 1, 2)), Ok(None))); + assert!(matches!(ro.touch(Key::new("foo", 1, 2)), Ok(false))); + } + + /// Populate a plain and a sharded cache. We should be able to access + /// both. + #[test] + fn smoke_test() { + use std::io::{Read, Write}; + use tempfile::NamedTempFile; + use test_dir::{DirBuilder, FileType, TestDir}; + + let temp = TestDir::temp() + .create("sharded", FileType::Dir) + .create("plain", FileType::Dir); + + { + let cache = ShardedCache::new(temp.path("sharded"), 10, 20); + + let tmp = NamedTempFile::new_in(cache.temp_dir(None).expect("temp_dir must succeed")) + .expect("new temp file must succeed"); + tmp.as_file() + .write_all(b"sharded") + .expect("write must succeed"); + + cache + .put(Key::new("a", 0, 1), tmp.path()) + .expect("put must succeed"); + + let tmp2 = NamedTempFile::new_in(cache.temp_dir(None).expect("temp_dir must succeed")) + .expect("new temp file must succeed"); + tmp2.as_file() + .write_all(b"sharded2") + .expect("write must succeed"); + + cache + .put(Key::new("b", 0, 1), tmp2.path()) + .expect("put must succeed"); + } + + { + let cache = PlainCache::new(temp.path("plain"), 10); + + let tmp = NamedTempFile::new_in(cache.temp_dir().expect("temp_dir must succeed")) + .expect("new temp file must succeed"); + tmp.as_file() + .write_all(b"plain") + .expect("write must succeed"); + + cache.put("b", tmp.path()).expect("put must succeed"); + + let tmp2 = NamedTempFile::new_in(cache.temp_dir().expect("temp_dir must succeed")) + .expect("new temp file must succeed"); + tmp2.as_file() + .write_all(b"plain2") + .expect("write must succeed"); + + cache.put("c", tmp2.path()).expect("put must succeed"); + } + + // sharded.a => "sharded" + // sharded.b => "sharded2" + // plain.b => "plain" + // plain.c => "plain2" + + // Read from sharded, then plain. + { + let ro = ReadOnlyCacheBuilder::new() + .sharded(temp.path("sharded"), 10) + .plain(temp.path("plain")) + .build(); + + assert!(matches!(ro.get(&TestKey::new("Missing")), Ok(None))); + assert!(matches!(ro.touch(&TestKey::new("Missing")), Ok(false))); + + // We should be able to touch `a`. + assert!(matches!(ro.touch(&TestKey::new("a")), Ok(true))); + + // And now check that we get the correct file contents. + { + let mut a_file = ro + .get(&TestKey::new("a")) + .expect("must succeed") + .expect("must exist"); + let mut dst = Vec::new(); + a_file.read_to_end(&mut dst).expect("read must succeed"); + assert_eq!(&dst, b"sharded"); + } + + { + let mut b_file = ro + .get(&TestKey::new("b")) + .expect("must succeed") + .expect("must exist"); + let mut dst = Vec::new(); + b_file.read_to_end(&mut dst).expect("read must succeed"); + assert_eq!(&dst, b"sharded2"); + } + + { + let mut c_file = ro + .get(&TestKey::new("c")) + .expect("must succeed") + .expect("must exist"); + let mut dst = Vec::new(); + c_file.read_to_end(&mut dst).expect("read must succeed"); + assert_eq!(&dst, b"plain2"); + } + } + + // Read from plain then sharded. + { + let ro = ReadOnlyCacheBuilder::new() + .cache(temp.path("plain"), 1) + .cache(temp.path("sharded"), 10) + .build(); + + { + let mut a_file = ro + .get(&TestKey::new("a")) + .expect("must succeed") + .expect("must exist"); + let mut dst = Vec::new(); + a_file.read_to_end(&mut dst).expect("read must succeed"); + assert_eq!(&dst, b"sharded"); + } + + { + let mut b_file = ro + .get(&TestKey::new("b")) + .expect("must succeed") + .expect("must exist"); + let mut dst = Vec::new(); + b_file.read_to_end(&mut dst).expect("read must succeed"); + assert_eq!(&dst, b"plain"); + } + + { + let mut c_file = ro + .get(&TestKey::new("c")) + .expect("must succeed") + .expect("must exist"); + let mut dst = Vec::new(); + c_file.read_to_end(&mut dst).expect("read must succeed"); + assert_eq!(&dst, b"plain2"); + } + } + } +} diff --git a/src/sharded.rs b/src/sharded.rs new file mode 100644 index 0000000..86cdb93 --- /dev/null +++ b/src/sharded.rs @@ -0,0 +1,565 @@ +//! A `ShardedCache` uses the same basic file-based second chance +//! strategy as a `PlainCache`. However, while the simple plain cache +//! is well suited to small caches (down to 2-3 files, and up maybe +//! one hundred), this sharded version can scale nearly arbitrarily +//! high: each shard should have fewer than one hundred or so files, +//! but there may be arbitrarily many shards (up to filesystem limits, +//! since each shard is a subdirectory). +use std::borrow::Cow; +use std::fs::File; +use std::io::Result; +use std::path::Path; +use std::path::PathBuf; +use std::sync::atomic::AtomicU8; +use std::sync::atomic::Ordering::Relaxed; +use std::sync::Arc; + +use crate::cache_dir::CacheDir; +use crate::trigger::PeriodicTrigger; +use crate::Key; + +/// We will aim to trigger maintenance at least `MAINTENANCE_SCALE` +/// times per total capacity inserts or updates, and at least once per +/// shard capacity inserts or updates. +const MAINTENANCE_SCALE: usize = 2; + +/// Put temporary file in this subdirectory of the cache directory. +const TEMP_SUBDIR: &str = ".temp"; + +const RANDOM_MULTIPLIER: u64 = 0xf2efdf1111adba6f; + +const SECONDARY_RANDOM_MULTIPLIER: u64 = 0xa55e1e02718a6a47; + +/// A sharded cache is a hash-sharded directory of cache +/// subdirectories. Each subdirectory is managed as an +/// independent second chance cache directory. +#[derive(Clone, Debug)] +pub struct ShardedCache { + // The current load (number of files) estimate for each shard. + load_estimates: Arc<[AtomicU8]>, + // The parent directory for each shard (cache subdirectory). + base_dir: PathBuf, + // Triggers periodic second chance maintenance. It is set to the + // least (most frequent) period between ~1/2 the total capacity, + // and each shard's capacity. Whenever the `trigger` fires, we + // will maintain two different shards: the one we just updated, + // and another randomly chosen shard. + trigger: PeriodicTrigger, + // Number of shards in the cache, at least 2. + num_shards: usize, + // Capacity for each shard (rounded up to an integer), at least 1. + shard_capacity: usize, +} + +#[inline] +fn format_id(shard: usize) -> String { + format!("{:04x}", shard) +} + +/// We create short-lived Shard objects whenever we want to work with +/// a given shard of the sharded cache dir. +struct Shard { + id: usize, + shard_dir: PathBuf, + trigger: PeriodicTrigger, + capacity: usize, +} + +impl Shard { + /// Returns a shard object for a new shard `id`. + fn replace_shard(self, id: usize) -> Shard { + let mut shard_dir = self.shard_dir; + shard_dir.pop(); + shard_dir.push(&format_id(id)); + Shard { + id, + shard_dir, + trigger: self.trigger, + capacity: self.capacity, + } + } + + /// Returns whether the file `name` exists in this shard. + fn file_exists(&mut self, name: &str) -> bool { + self.shard_dir.push(name); + let result = std::fs::metadata(&self.shard_dir); + self.shard_dir.pop(); + + result.is_ok() + } +} + +impl CacheDir for Shard { + #[inline] + fn temp_dir(&self) -> Cow { + let mut dir = self.shard_dir.clone(); + dir.push(TEMP_SUBDIR); + Cow::from(dir) + } + + #[inline] + fn base_dir(&self) -> Cow { + Cow::from(&self.shard_dir) + } + + #[inline] + fn trigger(&self) -> &PeriodicTrigger { + &self.trigger + } + + #[inline] + fn capacity(&self) -> usize { + self.capacity + } +} + +impl ShardedCache { + /// Returns a new cache for approximately `total_capacity` files, + /// stores in `num_shards` subdirectories of `base_dir`. + pub fn new( + base_dir: PathBuf, + mut num_shards: usize, + mut total_capacity: usize, + ) -> ShardedCache { + // We assume at least two shards. + if num_shards < 2 { + num_shards = 2; + } + + if total_capacity < num_shards { + total_capacity = num_shards; + } + + let mut load_estimates = Vec::with_capacity(num_shards); + load_estimates.resize_with(num_shards, || AtomicU8::new(0)); + let shard_capacity = + (total_capacity / num_shards) + ((total_capacity % num_shards) != 0) as usize; + let trigger = + PeriodicTrigger::new(shard_capacity.min(total_capacity / MAINTENANCE_SCALE) as u64); + + ShardedCache { + load_estimates: load_estimates.into_boxed_slice().into(), + base_dir, + trigger, + num_shards, + shard_capacity, + } + } + + /// Returns a random shard id. + fn random_shard_id(&self) -> usize { + use rand::Rng; + + rand::thread_rng().gen_range(0..self.num_shards) + } + + /// Given shard ids `base` and `other`, returns a new shard id for + /// `other` such that `base` and `other` do not collide. + fn other_shard_id(&self, base: usize, mut other: usize) -> usize { + if base != other { + return other; + } + + other += 1; + if other < self.num_shards { + other + } else { + 0 + } + } + + /// Returns the two shard ids for `key`. + fn shard_ids(&self, key: Key) -> (usize, usize) { + // We can't assume the hash is well distributed, so mix it + // around a bit with a multiplicative hash. + let remap = |x: u64, mul: u64| { + let hash = x.wrapping_mul(mul) as u128; + // Map the hashed hash to a shard id with a fixed point + // multiplication. + ((self.num_shards as u128 * hash) >> 64) as usize + }; + + // We do not apply a 2-left strategy because our load + // estimates can saturate. When that happens, we want to + // revert to sharding based on `key.hash`. + let h1 = remap(key.hash, RANDOM_MULTIPLIER); + let h2 = remap(key.secondary_hash, SECONDARY_RANDOM_MULTIPLIER); + (h1, self.other_shard_id(h1, h2)) + } + + /// Reorders two shard ids to return the least loaded first. + fn sort_by_load(&self, (h1, h2): (usize, usize)) -> (usize, usize) { + let load1 = self.load_estimates[h1].load(Relaxed) as usize; + let load2 = self.load_estimates[h2].load(Relaxed) as usize; + + // Clamp loads at the shard capacity: when both shards are + // over the capacity, they're equally overloaded. This also + // lets us revert to only using `key.hash` when at capacity. + let capacity = self.shard_capacity; + if load1.clamp(0, capacity) <= load2.clamp(0, capacity) { + (h1, h2) + } else { + (h2, h1) + } + } + + /// Returns a shard object for the `shard_id`. + fn shard(&self, shard_id: usize) -> Shard { + let mut dir = self.base_dir.clone(); + dir.push(&format_id(shard_id)); + Shard { + id: shard_id, + shard_dir: dir, + trigger: self.trigger, + capacity: self.shard_capacity, + } + } + + /// Returns a read-only file for `key` in the shard cache + /// directory if it exists, or None if there is no such file. + /// + /// Implicitly "touches" the cached file if it exists. + pub fn get(&self, key: Key) -> Result> { + let (h1, h2) = self.shard_ids(key); + let shard = self.shard(h1); + + if let Some(file) = shard.get(key.name)? { + Ok(Some(file)) + } else { + shard.replace_shard(h2).get(key.name) + } + } + + /// Returns a temporary directory suitable for temporary files + /// that will be published to the shard cache directory. + /// + /// When this temporary file will be published at a known `Key`, + /// populate `key` for improved behaviour. + pub fn temp_dir(&self, key: Option) -> Result> { + let shard_id = match key { + Some(key) => self.sort_by_load(self.shard_ids(key)).0, + None => self.random_shard_id(), + }; + let shard = self.shard(shard_id); + if self.trigger.event() { + shard.cleanup_temp_directory()?; + } + + Ok(Cow::from(shard.ensure_temp_dir()?.into_owned())) + } + + /// Updates the load estimate for `shard_id` with the value + /// returned by `CacheDir::{set,put}`. + fn update_estimate(&self, shard_id: usize, update: Option) { + let target = &self.load_estimates[shard_id]; + match update { + // If we have an updated estimate, overwrite what we have, + // and take the newly added file into account. + Some(remaining) => { + let update = remaining.clamp(0, u8::MAX as u64 - 1) as u8; + target.store(update + 1, Relaxed); + } + // Otherwise, increment by one with saturation. + None => { + let _ = target.fetch_update(Relaxed, Relaxed, |i| { + if i < u8::MAX { + Some(i + 1) + } else { + None + } + }); + } + }; + } + + /// Performs a second chance maintenance on `shard`. + fn force_maintain_shard(&self, shard: Shard) -> Result<()> { + let update = shard.maintain()?.clamp(0, u8::MAX as u64) as u8; + self.load_estimates[shard.id].store(update, Relaxed); + Ok(()) + } + + /// Performs a second chance maintenance on a randomly chosen shard + /// that is not `base`. + fn maintain_random_other_shard(&self, base: Shard) -> Result<()> { + let shard_id = self.other_shard_id(base.id, self.random_shard_id()); + self.force_maintain_shard(base.replace_shard(shard_id)) + } + + /// Inserts or overwrites the file at `value` as `key` in the + /// sharded cache directory. There may be two entries for the + /// same key with concurrent `set` or `put` calls. + /// + /// Always consumes the file at `value` on success; may consume it + /// on error. + pub fn set(&self, key: Key, value: &Path) -> Result<()> { + let (h1, h2) = self.sort_by_load(self.shard_ids(key)); + let mut shard = self.shard(h2); + + // If the file does not already exist in the secondary shard, + // use the primary. + if !shard.file_exists(key.name) { + shard = shard.replace_shard(h1); + } + + let update = shard.set(key.name, value)?; + self.update_estimate(h1, update); + + // If we performed maintenance on this shard, also maintain + // a second random shard: writes might be concentrated on a + // few shard, but we can still spread the love, if only to + // clean up temporary files. + if update.is_some() { + self.maintain_random_other_shard(shard)?; + } else if self.load_estimates[h1].load(Relaxed) as usize / 2 > self.shard_capacity { + // Otherwise, we can also force a maintenance for this + // shard if we're pretty sure it has grown much too big. + self.force_maintain_shard(shard)?; + } + + Ok(()) + } + + /// Inserts the file at `value` as `key` in the cache directory if + /// there is no such cached entry already, or touches the cached + /// file if it already exists. There may be two entries for the + /// same key with concurrent `set` or `put` calls. + /// + /// Always consumes the file at `value` on success; may consume it + /// on error. + pub fn put(&self, key: Key, value: &Path) -> Result<()> { + let (h1, h2) = self.sort_by_load(self.shard_ids(key)); + let mut shard = self.shard(h2); + + // If the file does not already exist in the secondary shard, + // use the primary. + if !shard.file_exists(key.name) { + shard = shard.replace_shard(h1); + } + + let update = shard.put(key.name, value)?; + self.update_estimate(h1, update); + + // If we performed maintenance on this shard, also maintain + // a second random shard. + if update.is_some() { + self.maintain_random_other_shard(shard)?; + } else if self.load_estimates[h1].load(Relaxed) as usize / 2 > self.shard_capacity { + self.force_maintain_shard(shard)?; + } + + Ok(()) + } + + /// Marks the cached file `key` as newly used, if it exists. + /// + /// Returns whether a file for `key` exists in the cache. + pub fn touch(&self, key: Key) -> Result { + let (h1, h2) = self.shard_ids(key); + let shard = self.shard(h1); + + if shard.touch(key.name)? { + return Ok(true); + } + + shard.replace_shard(h2).touch(key.name) + } +} + +/// Put 200 files in a 3x3-file cache. We should find at least 9, but +/// at most 18 (2x the capacity), and their contents should match. +#[test] +fn smoke_test() { + use tempfile::NamedTempFile; + use test_dir::{DirBuilder, TestDir}; + + // The payload for file `i` is `PAYLOAD_MULTIPLIER * i`. + const PAYLOAD_MULTIPLIER: usize = 113; + + let temp = TestDir::temp(); + let cache = ShardedCache::new(temp.path("."), 3, 9); + + for i in 0..200 { + let name = format!("{}", i); + + let temp_dir = cache.temp_dir(None).expect("temp_dir must succeed"); + let tmp = NamedTempFile::new_in(temp_dir).expect("new temp file must succeed"); + std::fs::write(tmp.path(), format!("{}", PAYLOAD_MULTIPLIER * i)) + .expect("write must succeed"); + // It shouldn't matter if we PUT or SET. + if (i % 2) != 0 { + cache + .put(Key::new(&name, i as u64, i as u64 + 42), tmp.path()) + .expect("put must succeed"); + } else { + cache + .set(Key::new(&name, i as u64, i as u64 + 42), tmp.path()) + .expect("set must succeed"); + } + } + + let present: usize = (0..200) + .map(|i| { + let name = format!("{}", i); + match cache + .get(Key::new(&name, i as u64, i as u64 + 42)) + .expect("get must succeed") + { + Some(mut file) => { + use std::io::Read; + let mut buf = Vec::new(); + file.read_to_end(&mut buf).expect("read must succeed"); + assert_eq!(buf, format!("{}", PAYLOAD_MULTIPLIER * i).into_bytes()); + 1 + } + None => 0, + } + }) + .sum(); + + assert!(present >= 9); + assert!(present <= 18); +} + +/// Publish a file, make sure we can read it, then overwrite, and +/// confirm that the new contents are visible. +#[test] +fn test_set() { + use std::io::{Read, Write}; + use tempfile::NamedTempFile; + use test_dir::{DirBuilder, TestDir}; + + let temp = TestDir::temp(); + let cache = ShardedCache::new(temp.path("."), 0, 0); + + { + let tmp = NamedTempFile::new_in(cache.temp_dir(None).expect("temp_dir must succeed")) + .expect("new temp file must succeed"); + tmp.as_file().write_all(b"v1").expect("write must succeed"); + + cache + .set(Key::new("entry", 1, 2), tmp.path()) + .expect("initial set must succeed"); + } + + { + let mut cached = cache + .get(Key::new("entry", 1, 2)) + .expect("must succeed") + .expect("must be found"); + let mut dst = Vec::new(); + cached.read_to_end(&mut dst).expect("read must succeed"); + assert_eq!(&dst, b"v1"); + } + + // Now overwrite; it should take. + { + let tmp = NamedTempFile::new_in(cache.temp_dir(None).expect("temp_dir must succeed")) + .expect("new temp file must succeed"); + tmp.as_file().write_all(b"v2").expect("write must succeed"); + + cache + .set(Key::new("entry", 1, 2), tmp.path()) + .expect("overwrite must succeed"); + } + + { + let mut cached = cache + .get(Key::new("entry", 1, 2)) + .expect("must succeed") + .expect("must be found"); + let mut dst = Vec::new(); + cached.read_to_end(&mut dst).expect("read must succeed"); + assert_eq!(&dst, b"v2"); + } +} + +/// Publish a file, fail to put a new one with different data, and +/// confirm that the old contents are visible. +#[test] +fn test_put() { + use std::io::{Read, Write}; + use tempfile::NamedTempFile; + use test_dir::{DirBuilder, TestDir}; + + let temp = TestDir::temp(); + let cache = ShardedCache::new(temp.path("."), 0, 0); + + { + let tmp = NamedTempFile::new_in(cache.temp_dir(None).expect("temp_dir must succeed")) + .expect("new temp file must succeed"); + tmp.as_file().write_all(b"v1").expect("write must succeed"); + + cache + .set(Key::new("entry", 1, 2), tmp.path()) + .expect("initial set must succeed"); + } + + // Now put; it should not take. + { + let tmp = NamedTempFile::new_in(cache.temp_dir(None).expect("temp_dir must succeed")) + .expect("new temp file must succeed"); + tmp.as_file().write_all(b"v2").expect("write must succeed"); + + cache + .put(Key::new("entry", 1, 2), tmp.path()) + .expect("put must succeed"); + } + + { + let mut cached = cache + .get(Key::new("entry", 1, 2)) + .expect("must succeed") + .expect("must be found"); + let mut dst = Vec::new(); + cached.read_to_end(&mut dst).expect("read must succeed"); + assert_eq!(&dst, b"v1"); + } +} + +/// Put 2000 files in a 2x300-file cache, and keep touching the first. +/// We should always find the first file, even after all that cleanup. +#[test] +fn test_touch() { + use std::io::Read; + use tempfile::NamedTempFile; + use test_dir::{DirBuilder, TestDir}; + + // The payload for file `i` is `PAYLOAD_MULTIPLIER * i`. + const PAYLOAD_MULTIPLIER: usize = 113; + + let temp = TestDir::temp(); + let cache = ShardedCache::new(temp.path("."), 2, 600); + + for i in 0..2000 { + // After the first write, we should find our file. + assert_eq!( + cache + .touch(Key::new("0", 0, 42)) + .expect("touch must succeed"), + i > 0 + ); + + let name = format!("{}", i); + + let temp_dir = cache.temp_dir(None).expect("temp_dir must succeed"); + let tmp = NamedTempFile::new_in(temp_dir).expect("new temp file must succeed"); + std::fs::write(tmp.path(), format!("{}", PAYLOAD_MULTIPLIER * i)) + .expect("write must succeed"); + cache + .put(Key::new(&name, i as u64, i as u64 + 42), tmp.path()) + .expect("put must succeed"); + if i == 0 { + // Make sure file "0" is measurably older than the others. + std::thread::sleep(std::time::Duration::from_secs(2)); + } + } + + let mut file = cache + .get(Key::new("0", 0, 42)) + .expect("get must succeed") + .expect("file must be found"); + let mut buf = Vec::new(); + file.read_to_end(&mut buf).expect("read must succeed"); + assert_eq!(buf, b"0"); +} diff --git a/src/stack.rs b/src/stack.rs new file mode 100644 index 0000000..337b63d --- /dev/null +++ b/src/stack.rs @@ -0,0 +1,603 @@ +//! We expect most callers to interact with Kismet via the `Cache` +//! struct defined here. A `Cache` hides the difference in behaviour +//! between plain and sharded caches via late binding, and lets +//! callers transparently handle misses by looking in a series of +//! secondary cache directories. +use std::fs::File; +use std::io::Error; +use std::io::ErrorKind; +use std::io::Result; +use std::path::Path; +use std::sync::Arc; + +use crate::Key; +use crate::PlainCache; +use crate::ReadOnlyCache; +use crate::ReadOnlyCacheBuilder; +use crate::ShardedCache; + +/// The `FullCache` trait exposes both read and write operations as +/// implemented by sharded and plain caches. +trait FullCache: + std::fmt::Debug + Sync + Send + std::panic::RefUnwindSafe + std::panic::UnwindSafe +{ + /// Returns a read-only file for `key` in the cache directory if + /// it exists, or None if there is no such file. + /// + /// Implicitly "touches" the cached file if it exists. + fn get(&self, key: Key) -> Result>; + + /// Inserts or overwrites the file at `value` as `key` in the + /// sharded cache directory. + /// + /// Always consumes the file at `value` on success; may consume it + /// on error. + fn set(&self, key: Key, value: &Path) -> Result<()>; + + /// Inserts the file at `value` as `key` in the cache directory if + /// there is no such cached entry already, or touches the cached + /// file if it already exists. + /// + /// Always consumes the file at `value` on success; may consume it + /// on error. + fn put(&self, key: Key, value: &Path) -> Result<()>; + + /// Marks the cached file `key` as newly used, if it exists. + /// + /// Returns whether a file for `key` exists in the cache. + fn touch(&self, key: Key) -> Result; +} + +impl FullCache for PlainCache { + fn get(&self, key: Key) -> Result> { + PlainCache::get(self, key.name) + } + + fn set(&self, key: Key, value: &Path) -> Result<()> { + PlainCache::set(self, key.name, value) + } + + fn put(&self, key: Key, value: &Path) -> Result<()> { + PlainCache::put(self, key.name, value) + } + + fn touch(&self, key: Key) -> Result { + PlainCache::touch(self, key.name) + } +} + +impl FullCache for ShardedCache { + fn get(&self, key: Key) -> Result> { + ShardedCache::get(self, key) + } + + fn set(&self, key: Key, value: &Path) -> Result<()> { + ShardedCache::set(self, key, value) + } + + fn put(&self, key: Key, value: &Path) -> Result<()> { + ShardedCache::put(self, key, value) + } + + fn touch(&self, key: Key) -> Result { + ShardedCache::touch(self, key) + } +} + +/// Construct a `Cache` with this builder. The resulting cache will +/// always first access its write-side cache (if defined), and, on +/// misses, will attempt to service `get` and `touch` calls by +/// iterating over the read-only caches. +#[derive(Debug, Default)] +pub struct CacheBuilder { + write_side: Option>, + read_side: ReadOnlyCacheBuilder, +} + +/// A `Cache` wraps either up to one plain or sharded read-write cache +/// in a convenient interface, and may optionally fulfill read +/// operations by deferring to a list of read-only cache when the +/// read-write cache misses. +#[derive(Clone, Debug, Default)] +pub struct Cache { + write_side: Option>, + read_side: ReadOnlyCache, +} + +impl CacheBuilder { + /// Returns a fresh empty builder. + pub fn new() -> Self { + Self::default() + } + + /// Sets the read-write cache directory to `path`. + /// + /// The read-write cache will be a plain cache directory if + /// `num_shards <= 1`, and a sharded directory otherwise. + pub fn writer(self, path: impl AsRef, num_shards: usize, total_capacity: usize) -> Self { + if num_shards <= 1 { + self.plain_writer(path, total_capacity) + } else { + self.sharded_writer(path, num_shards, total_capacity) + } + } + + /// Sets the read-write cache directory to a plain directory at + /// `path`, with a target file count of up to `capacity`. + pub fn plain_writer(mut self, path: impl AsRef, capacity: usize) -> Self { + self.write_side.insert(Arc::new(PlainCache::new( + path.as_ref().to_owned(), + capacity, + ))); + self + } + + /// Sets the read-write cache directory to a sharded directory at + /// `path`, with `num_shards` subdirectories and a target file + /// count of up to `capacity` for the entire cache. + pub fn sharded_writer( + mut self, + path: impl AsRef, + num_shards: usize, + total_capacity: usize, + ) -> Self { + self.write_side.insert(Arc::new(ShardedCache::new( + path.as_ref().to_owned(), + num_shards, + total_capacity, + ))); + self + } + + /// Adds a new read-only cache directory at `path` to the end of the + /// cache builder's search list. + /// + /// Adds a plain cache directory if `num_shards <= 1`, and a sharded + /// directory otherwise. + pub fn reader(mut self, path: impl AsRef, num_shards: usize) -> Self { + self.read_side = self.read_side.cache(path, num_shards); + self + } + + /// Adds a new plain (unsharded) read-only cache directory at + /// `path` to the end of the cache builder's search list. + pub fn plain_reader(mut self, path: impl AsRef) -> Self { + self.read_side = self.read_side.plain(path); + self + } + + /// Adds a new sharded read-only cache directory at `path` to the + /// end of the cache builder's search list. + pub fn sharded_reader(mut self, path: impl AsRef, num_shards: usize) -> Self { + self.read_side = self.read_side.sharded(path, num_shards); + self + } + + /// Returns a fresh `Cache` for the builder's write cache and + /// additional search list of read-only cache directories. + pub fn build(self) -> Cache { + Cache { + write_side: self.write_side, + read_side: self.read_side.build(), + } + } +} + +impl Cache { + /// Attempts to open a read-only file for `key`. The `Cache` will + /// query each its write cache (if any), followed by the list of + /// additional read-only cache, in definition order, and return a + /// read-only file for the first hit. + /// + /// Returns `None` if no file for `key` can be found in any of the + /// constituent caches, and bubbles up the first I/O error + /// encountered, if any. + pub fn get<'a>(&self, key: impl Into>) -> Result> { + fn doit( + write_side: Option<&dyn FullCache>, + read_side: &ReadOnlyCache, + key: Key, + ) -> Result> { + if let Some(write) = write_side { + if let Some(ret) = write.get(key)? { + return Ok(Some(ret)); + } + } + + read_side.get(key) + } + + doit( + self.write_side.as_ref().map(AsRef::as_ref), + &self.read_side, + key.into(), + ) + } + + /// Inserts or overwrites the file at `value` as `key` in the + /// write cache directory. This will always fail with + /// `Unsupported` if no write cache was defined. + /// + /// Always consumes the file at `value` on success; may consume it + /// on error. + pub fn set<'a>(&self, key: impl Into>, value: impl AsRef) -> Result<()> { + match self.write_side.as_ref() { + Some(write) => write.set(key.into(), value.as_ref()), + None => Err(Error::new( + ErrorKind::Unsupported, + "no kismet write cache defined", + )), + } + } + + /// Inserts the file at `value` as `key` in the cache directory if + /// there is no such cached entry already, or touches the cached + /// file if it already exists. + /// + /// Always consumes the file at `value` on success; may consume it + /// on error. + pub fn put<'a>(&self, key: impl Into>, value: impl AsRef) -> Result<()> { + match self.write_side.as_ref() { + Some(write) => write.put(key.into(), value.as_ref()), + None => Err(Error::new( + ErrorKind::Unsupported, + "no kismet write cache defined", + )), + } + } + + /// Marks a cache entry for `key` as accessed (read). The `Cache` + /// will touch the same file that would be returned by `get`. + /// + /// Returns whether a file for `key` could be found, and bubbles + /// up the first I/O error encountered, if any. + pub fn touch<'a>(&self, key: impl Into>) -> Result { + fn doit( + write_side: Option<&dyn FullCache>, + read_side: &ReadOnlyCache, + key: Key, + ) -> Result { + if let Some(write) = write_side { + if write.touch(key)? { + return Ok(true); + } + } + + read_side.touch(key) + } + + doit( + self.write_side.as_ref().map(AsRef::as_ref), + &self.read_side, + key.into(), + ) + } +} + +#[cfg(test)] +mod test { + use crate::Cache; + use crate::CacheBuilder; + use crate::Key; + use crate::PlainCache; + use crate::ShardedCache; + use std::io::ErrorKind; + + struct TestKey { + key: String, + } + + impl TestKey { + fn new(key: &str) -> TestKey { + TestKey { + key: key.to_string(), + } + } + } + + impl<'a> From<&'a TestKey> for Key<'a> { + fn from(x: &'a TestKey) -> Key<'a> { + Key::new(&x.key, 0, 1) + } + } + + // No cache defined -> read calls should successfully do nothing, + // write calls should fail. + #[test] + fn empty() { + let cache: Cache = Default::default(); + + assert!(matches!(cache.get(&TestKey::new("foo")), Ok(None))); + assert!(matches!(cache.set(&TestKey::new("foo"), "/tmp/foo"), + Err(e) if e.kind() == ErrorKind::Unsupported)); + assert!(matches!(cache.put(&TestKey::new("foo"), "/tmp/foo"), + Err(e) if e.kind() == ErrorKind::Unsupported)); + assert!(matches!(cache.touch(&TestKey::new("foo")), Ok(false))); + } + + // Smoke test a wrapped plain cache. + #[test] + fn smoke_test_plain() { + use std::io::{Read, Write}; + use tempfile::NamedTempFile; + use test_dir::{DirBuilder, FileType, TestDir}; + + let temp = TestDir::temp() + .create("cache", FileType::Dir) + .create("extra", FileType::Dir); + + // Populate the plain cache in `extra` with two files, "b" and "c". + { + let cache = PlainCache::new(temp.path("extra"), 10); + + let tmp = NamedTempFile::new_in(cache.temp_dir().expect("temp_dir must succeed")) + .expect("new temp file must succeed"); + tmp.as_file() + .write_all(b"extra") + .expect("write must succeed"); + + cache.put("b", tmp.path()).expect("put must succeed"); + + let tmp2 = NamedTempFile::new_in(cache.temp_dir().expect("temp_dir must succeed")) + .expect("new temp file must succeed"); + tmp2.as_file() + .write_all(b"extra2") + .expect("write must succeed"); + + cache.put("c", tmp2.path()).expect("put must succeed"); + } + + let cache = CacheBuilder::new() + .writer(temp.path("cache"), 1, 10) + .reader(temp.path("extra"), 1) + .build(); + + // There shouldn't be anything for "a" + assert!(matches!(cache.get(&TestKey::new("a")), Ok(None))); + assert!(matches!(cache.touch(&TestKey::new("a")), Ok(false))); + + // We should be able to touch "b" + assert!(matches!(cache.touch(&TestKey::new("b")), Ok(true))); + + // And its contents should match that of the "extra" cache dir. + { + let mut b_file = cache + .get(&TestKey::new("b")) + .expect("must succeed") + .expect("must exist"); + let mut dst = Vec::new(); + b_file.read_to_end(&mut dst).expect("read must succeed"); + assert_eq!(&dst, b"extra"); + } + + // Now populate "a" and "b" in the cache. + { + let tmp = NamedTempFile::new_in(temp.path(".")).expect("new temp file must succeed"); + + tmp.as_file() + .write_all(b"write") + .expect("write must succeed"); + cache + .put(&TestKey::new("a"), tmp.path()) + .expect("put must succeed"); + } + + { + let tmp = NamedTempFile::new_in(temp.path(".")).expect("new temp file must succeed"); + + tmp.as_file() + .write_all(b"write2") + .expect("write must succeed"); + cache + .put(&TestKey::new("b"), tmp.path()) + .expect("put must succeed"); + } + + // And overwrite "a" + { + let tmp = NamedTempFile::new_in(temp.path(".")).expect("new temp file must succeed"); + + tmp.as_file() + .write_all(b"write3") + .expect("write must succeed"); + cache + .set(&TestKey::new("a"), tmp.path()) + .expect("set must succeed"); + } + + // We should find: + // a => write3 + // b => write2 + // c => extra2 + + // So we should be able to touch everything. + assert!(matches!(cache.touch(&TestKey::new("a")), Ok(true))); + assert!(matches!(cache.touch(&TestKey::new("b")), Ok(true))); + assert!(matches!(cache.touch(&TestKey::new("c")), Ok(true))); + + // And read the expected contents. + { + let mut a_file = cache + .get(&TestKey::new("a")) + .expect("must succeed") + .expect("must exist"); + let mut dst = Vec::new(); + a_file.read_to_end(&mut dst).expect("read must succeed"); + assert_eq!(&dst, b"write3"); + } + + { + let mut b_file = cache + .get(&TestKey::new("b")) + .expect("must succeed") + .expect("must exist"); + let mut dst = Vec::new(); + b_file.read_to_end(&mut dst).expect("read must succeed"); + assert_eq!(&dst, b"write2"); + } + + { + let mut c_file = cache + .get(&TestKey::new("c")) + .expect("must succeed") + .expect("must exist"); + let mut dst = Vec::new(); + c_file.read_to_end(&mut dst).expect("read must succeed"); + assert_eq!(&dst, b"extra2"); + } + } + + // Smoke test a wrapped sharded cache. + #[test] + fn smoke_test_sharded() { + use std::io::{Read, Write}; + use tempfile::NamedTempFile; + use test_dir::{DirBuilder, FileType, TestDir}; + + let temp = TestDir::temp() + .create("cache", FileType::Dir) + .create("extra_plain", FileType::Dir) + .create("extra_sharded", FileType::Dir); + + // Populate the plain cache in `extra_plain` with one file, "b". + { + let cache = PlainCache::new(temp.path("extra_plain"), 10); + + let tmp = NamedTempFile::new_in(cache.temp_dir().expect("temp_dir must succeed")) + .expect("new temp file must succeed"); + tmp.as_file() + .write_all(b"extra_plain") + .expect("write must succeed"); + + cache.put("b", tmp.path()).expect("put must succeed"); + } + + // And now add "c" in the sharded `extra_sharded` cache. + { + let cache = ShardedCache::new(temp.path("extra_sharded"), 10, 10); + + let tmp = NamedTempFile::new_in(cache.temp_dir(None).expect("temp_dir must succeed")) + .expect("new temp file must succeed"); + tmp.as_file() + .write_all(b"extra_sharded") + .expect("write must succeed"); + + cache + .put((&TestKey::new("c")).into(), tmp.path()) + .expect("put must succeed"); + } + + let cache = CacheBuilder::new() + .plain_writer(temp.path("cache"), 10) + // Override the writer with a sharded cache + .writer(temp.path("cache"), 10, 10) + .plain_reader(temp.path("extra_plain")) + .sharded_reader(temp.path("extra_sharded"), 10) + .build(); + + // There shouldn't be anything for "a" + assert!(matches!(cache.get(&TestKey::new("a")), Ok(None))); + assert!(matches!(cache.touch(&TestKey::new("a")), Ok(false))); + + // We should be able to touch "b" + assert!(matches!(cache.touch(&TestKey::new("b")), Ok(true))); + + // And its contents should match that of the "extra" cache dir. + { + let mut b_file = cache + .get(&TestKey::new("b")) + .expect("must succeed") + .expect("must exist"); + let mut dst = Vec::new(); + b_file.read_to_end(&mut dst).expect("read must succeed"); + assert_eq!(&dst, b"extra_plain"); + } + + // Similarly for "c" + { + let mut c_file = cache + .get(&TestKey::new("c")) + .expect("must succeed") + .expect("must exist"); + let mut dst = Vec::new(); + c_file.read_to_end(&mut dst).expect("read must succeed"); + assert_eq!(&dst, b"extra_sharded"); + } + + // Now populate "a" and "b" in the cache. + { + let tmp = NamedTempFile::new_in(temp.path(".")).expect("new temp file must succeed"); + + tmp.as_file() + .write_all(b"write") + .expect("write must succeed"); + cache + .set(&TestKey::new("a"), tmp.path()) + .expect("set must succeed"); + } + + { + let tmp = NamedTempFile::new_in(temp.path(".")).expect("new temp file must succeed"); + + tmp.as_file() + .write_all(b"write2") + .expect("write must succeed"); + cache + .set(&TestKey::new("b"), tmp.path()) + .expect("set must succeed"); + } + + // And fail to update "a" with a put. + { + let tmp = NamedTempFile::new_in(temp.path(".")).expect("new temp file must succeed"); + + tmp.as_file() + .write_all(b"write3") + .expect("write must succeed"); + cache + .put(&TestKey::new("a"), tmp.path()) + .expect("put must succeed"); + } + + // We should find: + // a => write + // b => write2 + // c => extra_sharded + + // So we should be able to touch everything. + assert!(matches!(cache.touch(&TestKey::new("a")), Ok(true))); + assert!(matches!(cache.touch(&TestKey::new("b")), Ok(true))); + assert!(matches!(cache.touch(&TestKey::new("c")), Ok(true))); + + // And read the expected contents. + { + let mut a_file = cache + .get(&TestKey::new("a")) + .expect("must succeed") + .expect("must exist"); + let mut dst = Vec::new(); + a_file.read_to_end(&mut dst).expect("read must succeed"); + assert_eq!(&dst, b"write"); + } + + { + let mut b_file = cache + .get(&TestKey::new("b")) + .expect("must succeed") + .expect("must exist"); + let mut dst = Vec::new(); + b_file.read_to_end(&mut dst).expect("read must succeed"); + assert_eq!(&dst, b"write2"); + } + + { + let mut c_file = cache + .get(&TestKey::new("c")) + .expect("must succeed") + .expect("must exist"); + let mut dst = Vec::new(); + c_file.read_to_end(&mut dst).expect("read must succeed"); + assert_eq!(&dst, b"extra_sharded"); + } + } +}