diff --git a/src/python/pants/backend/python/rules/run_setup_py_test.py b/src/python/pants/backend/python/rules/run_setup_py_test.py index 7a8b79a1a19..001faa56b7b 100644 --- a/src/python/pants/backend/python/rules/run_setup_py_test.py +++ b/src/python/pants/backend/python/rules/run_setup_py_test.py @@ -166,7 +166,7 @@ def test_generate_chroot(self) -> None: "name": "foo", "version": "1.2.3", "package_dir": {"": "src"}, - "packages": ["foo", "foo.qux", "foo.resources.js"], + "packages": ["foo", "foo.qux"], "namespace_packages": ["foo"], "package_data": {"foo": ["resources/js/code.js"]}, "install_requires": ["baz==1.1.1"], @@ -293,7 +293,7 @@ def test_get_sources(self) -> None: "foo/__init__.py", "foo/resources/js/code.js", ], - expected_packages=["foo", "foo.bar", "foo.bar.baz", "foo.qux", "foo.resources.js"], + expected_packages=["foo", "foo.bar", "foo.bar.baz", "foo.qux"], expected_namespace_packages=["foo.bar"], expected_package_data={"foo": ("resources/js/code.js",)}, addrs=["src/python/foo/bar/baz:baz1", "src/python/foo/qux", "src/python/foo/resources"], @@ -309,7 +309,7 @@ def test_get_sources(self) -> None: "foo/__init__.py", "foo/resources/js/code.js", ], - expected_packages=["foo", "foo.bar", "foo.bar.baz", "foo.qux", "foo.resources.js"], + expected_packages=["foo", "foo.bar", "foo.bar.baz", "foo.qux"], expected_namespace_packages=["foo.bar"], expected_package_data={"foo": ("resources/js/code.js",)}, addrs=[ diff --git a/src/rust/engine/Cargo.lock b/src/rust/engine/Cargo.lock index 5dee86b57fc..7db54246c62 100644 --- a/src/rust/engine/Cargo.lock +++ b/src/rust/engine/Cargo.lock @@ -2758,7 +2758,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" name = "store" version = "0.1.0" dependencies = [ - "async-trait 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)", "bazel_protos 0.0.1", "boxfuture 0.0.1", "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2769,7 +2768,6 @@ dependencies = [ "fs 0.0.1", "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", - "glob 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", "grpcio 0.5.1 (git+https://github.com/pantsbuild/grpc-rs.git?rev=ed3afa3c24ddf1fdd86826e836f57c00757dfc00)", "hashing 0.0.1", "indexmap 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/src/rust/engine/fs/src/glob_matching.rs b/src/rust/engine/fs/src/glob_matching.rs index 65f68178908..d30ae3af7fd 100644 --- a/src/rust/engine/fs/src/glob_matching.rs +++ b/src/rust/engine/fs/src/glob_matching.rs @@ -4,7 +4,6 @@ use std::collections::HashSet; use std::ffi::OsStr; use std::fmt::Display; -use std::iter::Iterator; use std::path::{Component, Path, PathBuf}; use std::sync::Arc; @@ -22,9 +21,9 @@ use crate::{ lazy_static! { static ref PARENT_DIR: &'static str = ".."; - pub static ref SINGLE_STAR_GLOB: Pattern = Pattern::new("*").unwrap(); + static ref SINGLE_STAR_GLOB: Pattern = Pattern::new("*").unwrap(); static ref DOUBLE_STAR: &'static str = "**"; - pub static ref DOUBLE_STAR_GLOB: Pattern = Pattern::new(*DOUBLE_STAR).unwrap(); + static ref DOUBLE_STAR_GLOB: Pattern = Pattern::new(*DOUBLE_STAR).unwrap(); static ref MISSING_GLOB_SOURCE: GlobParsedSource = GlobParsedSource(String::from("")); static ref PATTERN_MATCH_OPTIONS: MatchOptions = MatchOptions { require_literal_separator: true, @@ -33,7 +32,7 @@ lazy_static! { } #[derive(Clone, Debug, Eq, Hash, PartialEq)] -pub enum PathGlob { +pub(crate) enum PathGlob { Wildcard { canonical_dir: Dir, symbolic_path: PathBuf, @@ -239,19 +238,7 @@ impl PathGlob { } } -/// -/// This struct extracts out just the include and exclude globs from the `PreparedPathGlobs` -/// struct. It is a temporary measure to try to share some code between the glob matching -/// implementation in this file and in snapshot_ops.rs. -/// -/// TODO(#9967): Remove this struct! -/// -pub struct ExpandablePathGlobs { - pub include: Vec, - pub exclude: Arc, -} - -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct PreparedPathGlobs { pub(crate) include: Vec, pub(crate) exclude: Arc, @@ -261,13 +248,6 @@ pub struct PreparedPathGlobs { } impl PreparedPathGlobs { - pub fn as_expandable_globs(&self) -> ExpandablePathGlobs { - ExpandablePathGlobs { - include: Iterator::flatten(self.include.iter().map(|pgie| pgie.globs.clone())).collect(), - exclude: self.exclude.clone(), - } - } - fn parse_patterns_from_include( include: &[PathGlobIncludeEntry], ) -> Result, String> { diff --git a/src/rust/engine/fs/src/lib.rs b/src/rust/engine/fs/src/lib.rs index 129442a6ead..f6519baea0a 100644 --- a/src/rust/engine/fs/src/lib.rs +++ b/src/rust/engine/fs/src/lib.rs @@ -33,10 +33,7 @@ mod glob_matching_tests; #[cfg(test)] mod posixfs_tests; -pub use crate::glob_matching::{ - ExpandablePathGlobs, GlobMatching, PathGlob, PreparedPathGlobs, DOUBLE_STAR_GLOB, - SINGLE_STAR_GLOB, -}; +pub use crate::glob_matching::{GlobMatching, PreparedPathGlobs}; use std::cmp::min; use std::io::{self, Read}; @@ -201,7 +198,7 @@ impl GitignoreStyleExcludes { self.is_ignored_path(stat.path(), is_dir) } - pub fn is_ignored_path(&self, path: &Path, is_dir: bool) -> bool { + fn is_ignored_path(&self, path: &Path, is_dir: bool) -> bool { match self.gitignore.matched(path, is_dir) { ::ignore::Match::None | ::ignore::Match::Whitelist(_) => false, ::ignore::Match::Ignore(_) => true, @@ -216,7 +213,7 @@ impl GitignoreStyleExcludes { } } -#[derive(Debug, Clone)] +#[derive(Debug)] pub enum StrictGlobMatching { // NB: the Error and Warn variants store a description of the origin of the PathGlob // request so that we can make the error message more helpful to users when globs fail to match. @@ -260,7 +257,7 @@ impl StrictGlobMatching { } } -#[derive(Debug, Clone)] +#[derive(Debug)] pub enum GlobExpansionConjunction { AllMatch, AnyMatch, diff --git a/src/rust/engine/fs/store/Cargo.toml b/src/rust/engine/fs/store/Cargo.toml index 374b42a46ab..875edd7eff5 100644 --- a/src/rust/engine/fs/store/Cargo.toml +++ b/src/rust/engine/fs/store/Cargo.toml @@ -5,7 +5,6 @@ authors = ["Daniel Wagner-Hall "] edition = "2018" [dependencies] -async-trait = "0.1" bazel_protos = { path = "../../process_execution/bazel_protos" } boxfuture = { path = "../../boxfuture" } bytes = "0.4.5" @@ -15,7 +14,6 @@ dirs = "1" fs = { path = ".." } futures01 = { package = "futures", version = "0.1" } futures = { version = "0.3", features = ["compat"] } -glob = "0.2.11" # TODO: This is 0.5.1 + https://github.com/tikv/grpc-rs/pull/457 + a workaround for https://github.com/rust-lang/cargo/issues/8258 grpcio = { git = "https://github.com/pantsbuild/grpc-rs.git", rev = "ed3afa3c24ddf1fdd86826e836f57c00757dfc00", default_features = false, features = ["protobuf-codec", "secure"] } hashing = { path = "../../hashing" } diff --git a/src/rust/engine/fs/store/benches/store.rs b/src/rust/engine/fs/store/benches/store.rs index a57dac4a757..ef534b209d4 100644 --- a/src/rust/engine/fs/store/benches/store.rs +++ b/src/rust/engine/fs/store/benches/store.rs @@ -29,27 +29,23 @@ use criterion::{criterion_group, criterion_main, Criterion}; -use std::collections::HashSet; use std::fs::File; use std::io::{BufRead, BufReader}; use std::os::unix::ffi::OsStrExt; use std::path::PathBuf; use std::time::Duration; -use bazel_protos::remote_execution as remexec; use bytes::Bytes; -use fs::{GlobExpansionConjunction, PreparedPathGlobs, StrictGlobMatching}; use futures::compat::Future01CompatExt; use futures::future; -use hashing::{Digest, EMPTY_DIGEST}; -use protobuf; +use hashing::Digest; use task_executor::Executor; use tempfile::TempDir; use tokio::runtime::Runtime; -use store::{SnapshotOps, Store, SubsetParams}; +use store::{Snapshot, Store}; -pub fn criterion_benchmark_materialize(c: &mut Criterion) { +pub fn criterion_benchmark(c: &mut Criterion) { // Create an executor, store containing the stuff to materialize, and a digest for the stuff. // To avoid benchmarking the deleting of things, we create a parent temporary directory (which // will be deleted at the end of the benchmark) and then skip deletion of the per-run directories. @@ -66,10 +62,8 @@ pub fn criterion_benchmark_materialize(c: &mut Criterion) { .measurement_time(Duration::from_secs(60)) .bench_function("materialize_directory", |b| { b.iter(|| { - // NB: We forget this child tempdir to avoid deleting things during the run. - let new_temp = TempDir::new_in(parent_dest_path).unwrap(); - let dest = new_temp.path().to_path_buf(); - std::mem::forget(new_temp); + // NB: We take ownership of this child tempdir to avoid deleting things during the run. + let dest = TempDir::new_in(parent_dest_path).unwrap().into_path(); let _ = executor .block_on(store.materialize_directory(dest, digest).compat()) .unwrap(); @@ -77,114 +71,7 @@ pub fn criterion_benchmark_materialize(c: &mut Criterion) { }); } -pub fn criterion_benchmark_subset_wildcard(c: &mut Criterion) { - let rt = Runtime::new().unwrap(); - let executor = Executor::new(rt.handle().clone()); - // NB: We use a much larger snapshot size compared to the materialize benchmark! - let (store, _tempdir, digest) = large_snapshot(&executor, 1000); - - let mut cgroup = c.benchmark_group("snapshot_subset"); - - cgroup - .sample_size(10) - .measurement_time(Duration::from_secs(80)) - .bench_function("wildcard", |b| { - b.iter(|| { - let get_subset = store.subset( - digest, - SubsetParams { - globs: PreparedPathGlobs::create( - vec!["**/*".to_string()], - StrictGlobMatching::Ignore, - GlobExpansionConjunction::AllMatch, - ) - .unwrap(), - }, - ); - let _ = executor.block_on(get_subset).unwrap(); - }) - }); -} - -pub fn criterion_benchmark_merge(c: &mut Criterion) { - let rt = Runtime::new().unwrap(); - let executor = Executor::new(rt.handle().clone()); - let num_files: usize = 4000; - let (store, _tempdir, digest) = large_snapshot(&executor, num_files); - - let (directory, _metadata) = executor - .block_on(store.load_directory(digest)) - .unwrap() - .unwrap(); - // Modify half of the files in the top-level directory by setting them to have the empty - // fingerprint (zero content). - let mut all_file_nodes = directory.get_files().to_vec(); - let mut file_nodes_to_modify = all_file_nodes.split_off(all_file_nodes.len() / 2); - for file_node in file_nodes_to_modify.iter_mut() { - let mut empty_bazel_digest = remexec::Digest::new(); - empty_bazel_digest.set_hash(EMPTY_DIGEST.0.to_hex()); - empty_bazel_digest.set_size_bytes(0); - file_node.set_digest(empty_bazel_digest); - } - let modified_file_names: HashSet = file_nodes_to_modify - .iter() - .map(|file_node| file_node.get_name().to_string()) - .collect(); - - let mut bazel_modified_files_directory = remexec::Directory::new(); - bazel_modified_files_directory.set_files(protobuf::RepeatedField::from_vec( - all_file_nodes - .iter() - .cloned() - .chain(file_nodes_to_modify.into_iter()) - .collect(), - )); - bazel_modified_files_directory.set_directories(directory.directories.clone()); - - let modified_digest = executor - .block_on(store.record_directory(&bazel_modified_files_directory, true)) - .unwrap(); - - let mut bazel_removed_files_directory = remexec::Directory::new(); - bazel_removed_files_directory.set_files(protobuf::RepeatedField::from_vec( - all_file_nodes - .into_iter() - .filter(|file_node| !modified_file_names.contains(file_node.get_name())) - .collect(), - )); - bazel_removed_files_directory.set_directories(directory.directories.clone()); - let removed_digest = executor - .block_on(store.record_directory(&bazel_removed_files_directory, true)) - .unwrap(); - - let mut cgroup = c.benchmark_group("snapshot_merge"); - - cgroup - .sample_size(10) - .measurement_time(Duration::from_secs(80)) - .bench_function("snapshot_merge", |b| { - b.iter(|| { - // Merge the old and the new snapshot together, allowing any file to be duplicated. - let old_first: Digest = executor - .block_on(store.merge(vec![removed_digest, modified_digest])) - .unwrap(); - - // Test the performance of either ordering of snapshots. - let new_first: Digest = executor - .block_on(store.merge(vec![modified_digest, removed_digest])) - .unwrap(); - - assert_eq!(old_first, new_first); - }) - }); -} - -criterion_group!( - benches, - criterion_benchmark_materialize, - criterion_benchmark_subset_wildcard, - criterion_benchmark_merge -); +criterion_group!(benches, criterion_benchmark); criterion_main!(benches); /// @@ -220,10 +107,8 @@ pub fn large_snapshot(executor: &Executor, max_files: usize) -> (Store, TempDir, }) .collect::(); - // NB: Split the line by whitespace, then accumulate a PathBuf using each word as a path - // component! - let path_buf = clean_line.split_whitespace().collect::(); // Drop empty or too-long candidates. + let path_buf = clean_line.split_whitespace().collect::(); let components_too_long = path_buf.components().any(|c| c.as_os_str().len() > 255); if components_too_long || path_buf.as_os_str().is_empty() || path_buf.as_os_str().len() > 512 { @@ -237,10 +122,9 @@ pub fn large_snapshot(executor: &Executor, max_files: usize) -> (Store, TempDir, let storedir = TempDir::new().unwrap(); let store = Store::local_only(executor.clone(), storedir.path()).unwrap(); - let store2 = store.clone(); let digests = henries_paths .map(|mut path| { - let store = store2.clone(); + let store = store.clone(); async move { // We use the path as the content as well: would be interesting to make this tunable. let content = Bytes::from(path.as_os_str().as_bytes()); @@ -257,9 +141,10 @@ pub fn large_snapshot(executor: &Executor, max_files: usize) -> (Store, TempDir, let digest = executor .block_on({ + let store = store.clone(); async move { let digests = future::try_join_all(digests).await?; - store2.merge(digests).await + Snapshot::merge_directories(store, digests).await } }) .unwrap(); diff --git a/src/rust/engine/fs/store/src/lib.rs b/src/rust/engine/fs/store/src/lib.rs index 0dbf4b93596..b33b3024c3e 100644 --- a/src/rust/engine/fs/store/src/lib.rs +++ b/src/rust/engine/fs/store/src/lib.rs @@ -28,17 +28,14 @@ #![allow(clippy::mutex_atomic)] #![type_length_limit = "8576838"] +#[macro_use] +extern crate log; + mod snapshot; pub use crate::snapshot::{OneOffStoreFileByDigest, Snapshot, StoreFileByDigest}; -mod snapshot_ops; -#[cfg(test)] -mod snapshot_ops_tests; #[cfg(test)] mod snapshot_tests; -pub use crate::snapshot_ops::{SnapshotOps, StoreWrapper, SubsetParams}; -use async_trait::async_trait; -use bazel_protos::remote_execution as remexec; use boxfuture::{try_future, BoxFuture, Boxable}; use bytes::Bytes; use concrete_time::TimeSpan; @@ -182,7 +179,7 @@ enum RootOrParentMetadataBuilder { /// It can also write back to a remote gRPC server, but will only do so when explicitly instructed /// to do so. /// -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct Store { local: local::ByteStore, remote: Option, @@ -1038,37 +1035,6 @@ impl Store { } } -#[async_trait] -impl StoreWrapper for Store { - async fn load_file_bytes_with T + Send + Sync + 'static>( - &self, - digest: Digest, - f: F, - ) -> Result, String> { - Ok( - Store::load_file_bytes_with(self, digest, f) - .await? - .map(|(value, _)| value), - ) - } - - async fn load_directory(&self, digest: Digest) -> Result, String> { - Ok( - Store::load_directory(self, digest) - .await? - .map(|(dir, _)| dir), - ) - } - - async fn load_directory_or_err(&self, digest: Digest) -> Result { - Snapshot::get_directory_or_err(self.clone(), digest).await - } - - async fn record_directory(&self, directory: &remexec::Directory) -> Result { - Store::record_directory(self, directory, true).await - } -} - // Only public for testing. #[derive(Copy, Clone, Debug, Eq, Hash, PartialEq, Ord, PartialOrd)] pub enum EntryType { diff --git a/src/rust/engine/fs/store/src/local.rs b/src/rust/engine/fs/store/src/local.rs index b845c2d132c..9a885015be7 100644 --- a/src/rust/engine/fs/store/src/local.rs +++ b/src/rust/engine/fs/store/src/local.rs @@ -14,12 +14,11 @@ use lmdb::{self, Cursor, Transaction}; use sha2::Sha256; use sharded_lmdb::{ShardedLmdb, VersionedFingerprint, DEFAULT_LEASE_TIME}; -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct ByteStore { inner: Arc, } -#[derive(Debug)] struct InnerStore { // Store directories separately from files because: // 1. They may have different lifetimes. diff --git a/src/rust/engine/fs/store/src/remote.rs b/src/rust/engine/fs/store/src/remote.rs index 8ee51cc3e80..1a6ed79d596 100644 --- a/src/rust/engine/fs/store/src/remote.rs +++ b/src/rust/engine/fs/store/src/remote.rs @@ -1,7 +1,6 @@ use std::cmp::min; use std::collections::{BTreeMap, HashSet}; use std::convert::TryInto; -use std::fmt; use std::sync::Arc; use std::time::Duration; @@ -29,12 +28,6 @@ pub struct ByteStore { headers: BTreeMap, } -impl fmt::Debug for ByteStore { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "ByteStore(name={:?})", self.instance_name) - } -} - impl ByteStore { pub fn new( cas_addresses: Vec, diff --git a/src/rust/engine/fs/store/src/snapshot.rs b/src/rust/engine/fs/store/src/snapshot.rs index 0eb2e6f4e2b..89ed4630035 100644 --- a/src/rust/engine/fs/store/src/snapshot.rs +++ b/src/rust/engine/fs/store/src/snapshot.rs @@ -1,7 +1,8 @@ // Copyright 2017 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; +use std::convert::TryInto; use std::ffi::OsString; use std::fmt; use std::iter::Iterator; @@ -10,6 +11,7 @@ use std::sync::Arc; use crate::Store; use boxfuture::{BoxFuture, Boxable}; +use bytes::Bytes; use fs::{ Dir, File, GitignoreStyleExcludes, GlobMatching, PathStat, PosixFS, PreparedPathGlobs, SymlinkBehavior, @@ -18,6 +20,7 @@ use futures::compat::Future01CompatExt; use futures::future::{self as future03, FutureExt, TryFutureExt}; use futures01::future; use hashing::{Digest, EMPTY_DIGEST}; +use indexmap::{self, IndexMap}; use itertools::Itertools; #[derive(Eq, Hash, PartialEq)] @@ -201,7 +204,358 @@ impl Snapshot { .boxed() } - pub fn directories_and_files(directories: &[String], files: &[String]) -> String { + /// + /// Given N Snapshots, returns a new Snapshot that merges them. + /// + /// Any files that exist in multiple Snapshots will cause this method to fail: the assumption + /// behind this behaviour is that almost any colliding file would represent a Rule implementation + /// error, and in cases where overwriting a file is desirable, explicitly removing a duplicated + /// copy should be straightforward. + /// + pub async fn merge(store: Store, snapshots: &[Snapshot]) -> Result { + // We dedupe PathStats by their symbolic names, as those will be their names within the + // `Directory` structure. Only `Dir+Dir` collisions are legal. + let path_stats = { + let mut uniq_paths: IndexMap = IndexMap::new(); + for path_stat in Iterator::flatten(snapshots.iter().map(|s| s.path_stats.iter().cloned())) { + match uniq_paths.entry(path_stat.path().to_owned()) { + indexmap::map::Entry::Occupied(e) => match (&path_stat, e.get()) { + (&PathStat::Dir { .. }, &PathStat::Dir { .. }) => (), + (x, y) => { + return Err(format!( + "Snapshots contained duplicate path: {:?} vs {:?}", + x, y + )); + } + }, + indexmap::map::Entry::Vacant(v) => { + v.insert(path_stat); + } + } + } + uniq_paths.into_iter().map(|(_, v)| v).collect() + }; + // Recursively merge the Digests in the Snapshots. + let root_digest = + Self::merge_directories(store, snapshots.iter().map(|s| s.digest).collect()).await?; + Ok(Snapshot { + digest: root_digest, + path_stats, + }) + } + + /// + /// Given Digest(s) representing Directory instances, merge them recursively into a single + /// output Directory Digest. + /// + /// If a file is present with the same name and contents multiple times, it will appear once. + /// If a file is present with the same name, but different contents, an error will be returned. + /// + pub async fn merge_directories(store: Store, dir_digests: Vec) -> Result { + Self::merge_directories_recursive(store, PathBuf::new(), dir_digests).await + } + + // NB: This function is recursive, and so cannot be directly marked async: + // https://rust-lang.github.io/async-book/07_workarounds/05_recursion.html + fn merge_directories_recursive( + store: Store, + parent_path: PathBuf, + dir_digests: Vec, + ) -> future03::BoxFuture<'static, Result> { + async move { + if dir_digests.is_empty() { + return Ok(EMPTY_DIGEST); + } else if dir_digests.len() == 1 { + let mut dir_digests = dir_digests; + return Ok(dir_digests.pop().unwrap()); + } + + let mut directories = future03::try_join_all( + dir_digests + .into_iter() + .map(|digest| { + store + .load_directory(digest) + .and_then(move |maybe_directory| { + future03::ready( + maybe_directory + .map(|(dir, _metadata)| dir) + .ok_or_else(|| format!("Digest {:?} did not exist in the Store.", digest)), + ) + }) + }) + .collect::>(), + ) + .await?; + + let mut out_dir = bazel_protos::remote_execution::Directory::new(); + + // Merge FileNodes. + let file_nodes = Iterator::flatten( + directories + .iter_mut() + .map(|directory| directory.take_files().into_iter()), + ) + .sorted_by(|a, b| a.name.cmp(&b.name)); + + out_dir.set_files(protobuf::RepeatedField::from_vec( + file_nodes.into_iter().dedup().collect(), + )); + + // Group and recurse for DirectoryNodes. + let child_directory_futures = { + let store = store.clone(); + let parent_path = parent_path.clone(); + let mut directories_to_merge = Iterator::flatten( + directories + .iter_mut() + .map(|directory| directory.take_directories().into_iter()), + ) + .collect::>(); + directories_to_merge.sort_by(|a, b| a.name.cmp(&b.name)); + directories_to_merge + .into_iter() + .group_by(|d| d.name.clone()) + .into_iter() + .map(move |(child_name, group)| { + let store = store.clone(); + let digests_result = group + .map(|d| d.get_digest().try_into()) + .collect::, String>>(); + let child_path = parent_path.join(&child_name); + async move { + let digests = digests_result?; + let merged_digest = + Self::merge_directories_recursive(store, child_path, digests).await?; + let mut child_dir = bazel_protos::remote_execution::DirectoryNode::new(); + child_dir.set_name(child_name); + child_dir.set_digest((&merged_digest).into()); + let res: Result<_, String> = Ok(child_dir); + res + } + }) + .collect::>() + }; + + let child_directories = future03::try_join_all(child_directory_futures).await?; + + out_dir.set_directories(protobuf::RepeatedField::from_vec(child_directories)); + + Self::error_for_collisions(&store, &parent_path, &out_dir).await?; + store.record_directory(&out_dir, true).await + } + .boxed() + } + + /// + /// Ensure merge is unique and fail with debugging info if not. + /// + async fn error_for_collisions( + store: &Store, + parent_path: &Path, + dir: &bazel_protos::remote_execution::Directory, + ) -> Result<(), String> { + // Attempt to cheaply check for collisions to bail out early if there aren't any. + let unique_count = dir + .get_files() + .iter() + .map(bazel_protos::remote_execution::FileNode::get_name) + .chain( + dir + .get_directories() + .iter() + .map(bazel_protos::remote_execution::DirectoryNode::get_name), + ) + .collect::>() + .len(); + if unique_count == (dir.get_files().len() + dir.get_directories().len()) { + return Ok(()); + } + + let file_details_by_name = dir + .get_files() + .iter() + .map(|file_node| async move { + let digest_proto = file_node.get_digest(); + let header = format!( + "file digest={} size={}:\n\n", + digest_proto.hash, digest_proto.size_bytes + ); + + let digest_res: Result = digest_proto.try_into(); + let contents = store + .load_file_bytes_with(digest_res?, |bytes| { + const MAX_LENGTH: usize = 1024; + let content_length = bytes.len(); + let mut bytes = Bytes::from(&bytes[0..std::cmp::min(content_length, MAX_LENGTH)]); + if content_length > MAX_LENGTH && !log_enabled!(log::Level::Debug) { + bytes.extend_from_slice( + format!( + "\n... TRUNCATED contents from {}B to {}B \ + (Pass -ldebug to see full contents).", + content_length, MAX_LENGTH + ) + .as_bytes(), + ); + } + String::from_utf8_lossy(bytes.to_vec().as_slice()).to_string() + }) + .await? + .map(|(content, _metadata)| content) + .unwrap_or_else(|| "".to_string()); + let detail = format!("{}{}", header, contents); + let res: Result<_, String> = Ok((file_node.get_name(), detail)); + res + }) + .map(|f| f.boxed()); + let dir_details_by_name = dir + .get_directories() + .iter() + .map(|dir_node| async move { + let digest_proto = dir_node.get_digest(); + let detail = format!( + "dir digest={} size={}:\n\n", + digest_proto.hash, digest_proto.size_bytes + ); + let res: Result<_, String> = Ok((dir_node.get_name(), detail)); + res + }) + .map(|f| f.boxed()); + + let duplicate_details = async move { + let details_by_name = future03::try_join_all( + file_details_by_name + .chain(dir_details_by_name) + .collect::>(), + ) + .await? + .into_iter() + .into_group_map(); + + let enumerated_details = + std::iter::Iterator::flatten(details_by_name.iter().filter_map(|(name, details)| { + if details.len() > 1 { + Some( + details + .iter() + .enumerate() + .map(move |(index, detail)| format!("`{}`: {}.) {}", name, index + 1, detail)), + ) + } else { + None + } + })) + .collect(); + + let res: Result, String> = Ok(enumerated_details); + res + } + .await + .unwrap_or_else(|err| vec![format!("Failed to load contents for comparison: {}", err)]); + + Err(format!( + "Can only merge Directories with no duplicates, but found {} duplicate entries in {}:\ + \n\n{}", + duplicate_details.len(), + parent_path.display(), + duplicate_details.join("\n\n") + )) + } + + pub async fn add_prefix(store: Store, digest: Digest, prefix: PathBuf) -> Result { + let mut dir_node = bazel_protos::remote_execution::DirectoryNode::new(); + dir_node.set_name(osstring_as_utf8(prefix.into_os_string())?); + dir_node.set_digest((&digest).into()); + + let mut out_dir = bazel_protos::remote_execution::Directory::new(); + out_dir.set_directories(protobuf::RepeatedField::from_vec(vec![dir_node])); + + store.record_directory(&out_dir, true).await + } + + pub async fn strip_prefix( + store: Store, + root_digest: Digest, + prefix: PathBuf, + ) -> Result { + let store2 = store.clone(); + let mut dir = Self::get_directory_or_err(store.clone(), root_digest).await?; + let mut already_stripped = PathBuf::new(); + let mut prefix = prefix; + loop { + let has_already_stripped_any = already_stripped.components().next().is_some(); + + let mut components = prefix.components(); + let component_to_strip = components.next(); + if let Some(component_to_strip) = component_to_strip { + let remaining_prefix = components.collect(); + let component_to_strip_str = component_to_strip.as_os_str().to_string_lossy(); + + let mut saw_matching_dir = false; + let extra_directories: Vec<_> = dir + .get_directories() + .iter() + .filter_map(|subdir| { + if subdir.get_name() == component_to_strip_str { + saw_matching_dir = true; + None + } else { + Some(subdir.get_name().to_owned()) + } + }) + .collect(); + let files: Vec<_> = dir + .get_files() + .iter() + .map(|file| file.get_name().to_owned()) + .collect(); + + match (saw_matching_dir, extra_directories.is_empty() && files.is_empty()) { + (false, true) => { + dir = bazel_protos::remote_execution::Directory::new(); + break; + }, + (false, false) => { + // Prefer "No subdirectory found" error to "had extra files" error. + return Err(format!( + "Cannot strip prefix {} from root directory {:?} - {}directory{} didn't contain a directory named {}{}", + already_stripped.join(&prefix).display(), + root_digest, + if has_already_stripped_any { "sub" } else { "root " }, + if has_already_stripped_any { format!(" {}", already_stripped.display()) } else { String::new() }, + component_to_strip_str, + if !extra_directories.is_empty() || !files.is_empty() { format!(" but did contain {}", Self::directories_and_files(&extra_directories, &files)) } else { String::new() }, + )) + }, + (true, false) => { + return Err(format!( + "Cannot strip prefix {} from root directory {:?} - {}directory{} contained non-matching {}", + already_stripped.join(&prefix).display(), + root_digest, + if has_already_stripped_any { "sub" } else { "root " }, + if has_already_stripped_any { format!(" {}", already_stripped.display()) } else { String::new() }, + Self::directories_and_files(&extra_directories, &files), + )) + }, + (true, true) => { + // Must be 0th index, because we've checked that we saw a matching directory, and no others. + let maybe_digest: Result = dir.get_directories()[0] + .get_digest() + .try_into(); + already_stripped = already_stripped.join(component_to_strip); + dir = Self::get_directory_or_err(store.clone(), maybe_digest?).await?; + prefix = remaining_prefix; + } + } + } else { + break; + } + } + + store2.record_directory(&dir, true).await + } + + fn directories_and_files(directories: &[String], files: &[String]) -> String { format!( "{}{}{}", if directories.is_empty() { @@ -230,7 +584,7 @@ impl Snapshot { ) } - pub async fn get_directory_or_err( + async fn get_directory_or_err( store: Store, digest: Digest, ) -> Result { @@ -290,6 +644,76 @@ impl Snapshot { .await } } + + pub async fn get_snapshot_subset( + store: Store, + digest: Digest, + path_globs: PreparedPathGlobs, + ) -> Result { + use bazel_protos::remote_execution::{Directory, DirectoryNode, FileNode}; + + let traverser = move |_: &Store, + path_so_far: &PathBuf, + _: Digest, + directory: &Directory| + -> BoxFuture<(Vec, StoreManyFileDigests), String> { + let subdir_paths: Vec = directory + .get_directories() + .iter() + .map(move |node: &DirectoryNode| path_so_far.join(node.get_name())) + .filter(|path: &PathBuf| path_globs.matches(path)) + .collect(); + + let file_paths: Vec<(PathBuf, Result, bool)> = directory + .get_files() + .iter() + .map(|node: &FileNode| { + ( + path_so_far.join(node.get_name()), + node.get_digest().try_into(), + node.is_executable, + ) + }) + .filter(|(path, _, _)| path_globs.matches(path)) + .collect(); + + let mut path_stats: Vec = vec![]; + for path in subdir_paths.into_iter() { + path_stats.push(PathStat::dir(path.clone(), Dir(path))); + } + + let mut hash = HashMap::new(); + for (path, maybe_digest, is_executable) in file_paths.into_iter() { + let digest = match maybe_digest { + Ok(d) => d, + Err(err) => return future::err(err).to_boxed(), + }; + hash.insert(path.clone(), digest); + path_stats.push(PathStat::file( + path.clone(), + File { + path, + is_executable, + }, + )); + } + + future::ok((path_stats, StoreManyFileDigests { hash })).to_boxed() + }; + + let path_stats_and_stores_per_directory: Vec<(Vec, StoreManyFileDigests)> = + store.walk(digest, traverser).compat().await?; + + let mut final_store = StoreManyFileDigests::new(); + let mut path_stats: Vec = vec![]; + for (per_dir_path_stats, per_dir_store) in path_stats_and_stores_per_directory.into_iter() { + final_store.merge(per_dir_store); + path_stats.extend(per_dir_path_stats.into_iter()); + } + + path_stats.sort_by(|l, r| l.path().cmp(&r.path())); + Snapshot::from_path_stats(store, final_store, path_stats).await + } } impl fmt::Debug for Snapshot { @@ -324,7 +748,7 @@ fn paths_of_child_dir(paths: Vec) -> Vec { .collect() } -pub fn osstring_as_utf8(path: OsString) -> Result { +fn osstring_as_utf8(path: OsString) -> Result { path .into_string() .map_err(|p| format!("{:?}'s file_name is not representable in UTF8", p)) @@ -369,10 +793,22 @@ impl StoreFileByDigest for OneOffStoreFileByDigest { } #[derive(Clone)] -pub struct StoreManyFileDigests { +struct StoreManyFileDigests { pub hash: HashMap, } +impl StoreManyFileDigests { + fn new() -> StoreManyFileDigests { + StoreManyFileDigests { + hash: HashMap::new(), + } + } + + fn merge(&mut self, other: StoreManyFileDigests) { + self.hash.extend(other.hash); + } +} + impl StoreFileByDigest for StoreManyFileDigests { fn store_by_digest(&self, file: File) -> BoxFuture { future::result(self.hash.get(&file.path).copied().ok_or_else(|| { diff --git a/src/rust/engine/fs/store/src/snapshot_ops.rs b/src/rust/engine/fs/store/src/snapshot_ops.rs deleted file mode 100644 index bab1e3d94ad..00000000000 --- a/src/rust/engine/fs/store/src/snapshot_ops.rs +++ /dev/null @@ -1,823 +0,0 @@ -// Copyright 2020 Pants project contributors (see CONTRIBUTORS.md). -// Licensed under the Apache License, Version 2.0 (see LICENSE). - -use crate::{snapshot::osstring_as_utf8, Snapshot}; - -use async_trait::async_trait; -use bazel_protos::remote_execution as remexec; -use bytes::Bytes; -use fs::{ - ExpandablePathGlobs, GitignoreStyleExcludes, PathGlob, PreparedPathGlobs, DOUBLE_STAR_GLOB, - SINGLE_STAR_GLOB, -}; -use futures::future::{self as future03, FutureExt, TryFutureExt}; -use glob::Pattern; -use hashing::{Digest, Fingerprint, EMPTY_DIGEST}; -use indexmap::{self, IndexMap}; -use itertools::Itertools; -use log::log_enabled; - -use std::collections::HashSet; -use std::convert::From; -use std::iter::Iterator; -use std::path::{Path, PathBuf}; -use std::sync::Arc; - -#[derive(Clone, Eq, PartialEq, Hash, Debug)] -pub enum SnapshotOpsError { - String(String), - DigestMergeFailure(String), - GlobMatchError(String), -} - -impl From for SnapshotOpsError { - fn from(err: String) -> Self { - Self::String(err) - } -} - -/// -/// Parameters used to determine which files and directories to operate on within a parent snapshot. -/// -#[derive(Debug, Clone)] -pub struct SubsetParams { - pub globs: PreparedPathGlobs, -} - -/// -/// A trait that encapsulates some of the features of a Store, with nicer type signatures. This is -/// used to implement the `SnapshotOps` trait. -/// -#[async_trait] -pub trait StoreWrapper: Clone + Send + Sync { - async fn load_file_bytes_with T + Send + Sync + 'static>( - &self, - digest: Digest, - f: F, - ) -> Result, String>; - - async fn load_directory(&self, digest: Digest) -> Result, String>; - async fn load_directory_or_err(&self, digest: Digest) -> Result; - - async fn record_directory(&self, directory: &remexec::Directory) -> Result; -} - -/// -/// Given Digest(s) representing Directory instances, merge them recursively into a single -/// output Directory Digest. -/// -/// If a file is present with the same name and contents multiple times, it will appear once. -/// If a file is present with the same name, but different contents, an error will be returned. -/// -async fn merge_directories( - store_wrapper: T, - dir_digests: Vec, -) -> Result { - merge_directories_recursive(store_wrapper, PathBuf::new(), dir_digests).await -} - -// NB: This function is recursive, and so cannot be directly marked async: -// https://rust-lang.github.io/async-book/07_workarounds/05_recursion.html -fn merge_directories_recursive( - store_wrapper: T, - parent_path: PathBuf, - dir_digests: Vec, -) -> future03::BoxFuture<'static, Result> { - async move { - if dir_digests.is_empty() { - return Ok(EMPTY_DIGEST); - } else if dir_digests.len() == 1 { - let mut dir_digests = dir_digests; - return Ok(dir_digests.pop().unwrap()); - } - - let mut directories = future03::try_join_all( - dir_digests - .into_iter() - .map(|digest| { - store_wrapper - .load_directory(digest) - .and_then(move |maybe_directory| { - future03::ready( - maybe_directory - .ok_or_else(|| format!("Digest {:?} did not exist in the Store.", digest)), - ) - }) - }) - .collect::>(), - ) - .await?; - - let mut out_dir = remexec::Directory::new(); - - // Merge FileNodes. - let file_nodes = Iterator::flatten( - directories - .iter_mut() - .map(|directory| directory.take_files().into_iter()), - ) - .sorted_by(|a, b| a.name.cmp(&b.name)); - - out_dir.set_files(protobuf::RepeatedField::from_vec( - file_nodes.into_iter().dedup().collect(), - )); - - // Group and recurse for DirectoryNodes. - let child_directory_futures = { - let store = store_wrapper.clone(); - let parent_path = parent_path.clone(); - let mut directories_to_merge = Iterator::flatten( - directories - .iter_mut() - .map(|directory| directory.take_directories().into_iter()), - ) - .collect::>(); - directories_to_merge.sort_by(|a, b| a.name.cmp(&b.name)); - directories_to_merge - .into_iter() - .group_by(|d| d.name.clone()) - .into_iter() - .map(move |(child_name, group)| { - let store = store.clone(); - let digests: Vec = group - .map(|d| to_pants_digest(d.get_digest().clone())) - .collect(); - let child_path = parent_path.join(&child_name); - async move { - let merged_digest = merge_directories_recursive(store, child_path, digests).await?; - let mut child_dir = remexec::DirectoryNode::new(); - child_dir.set_name(child_name); - child_dir.set_digest((&merged_digest).into()); - let res: Result<_, String> = Ok(child_dir); - res - } - }) - .collect::>() - }; - - let child_directories = future03::try_join_all(child_directory_futures).await?; - - out_dir.set_directories(protobuf::RepeatedField::from_vec(child_directories)); - - error_for_collisions(&store_wrapper, &parent_path, &out_dir).await?; - store_wrapper.record_directory(&out_dir).await - } - .boxed() -} - -/// -/// Ensure merge is unique and fail with debugging info if not. -/// -async fn error_for_collisions( - store_wrapper: &T, - parent_path: &Path, - dir: &remexec::Directory, -) -> Result<(), String> { - // Attempt to cheaply check for collisions to bail out early if there aren't any. - let unique_count = dir - .get_files() - .iter() - .map(remexec::FileNode::get_name) - .chain( - dir - .get_directories() - .iter() - .map(remexec::DirectoryNode::get_name), - ) - .collect::>() - .len(); - if unique_count == (dir.get_files().len() + dir.get_directories().len()) { - return Ok(()); - } - - let file_details_by_name = dir - .get_files() - .iter() - .map(|file_node| async move { - let digest_proto = file_node.get_digest(); - let header = format!( - "file digest={} size={}:\n\n", - digest_proto.hash, digest_proto.size_bytes - ); - - let digest = to_pants_digest(digest_proto.clone()); - let contents = store_wrapper - .load_file_bytes_with(digest, |bytes| { - const MAX_LENGTH: usize = 1024; - let content_length = bytes.len(); - let mut bytes = Bytes::from(&bytes[0..std::cmp::min(content_length, MAX_LENGTH)]); - if content_length > MAX_LENGTH && !log_enabled!(log::Level::Debug) { - bytes.extend_from_slice( - format!( - "\n... TRUNCATED contents from {}B to {}B \ - (Pass -ldebug to see full contents).", - content_length, MAX_LENGTH - ) - .as_bytes(), - ); - } - String::from_utf8_lossy(bytes.to_vec().as_slice()).to_string() - }) - .await? - .unwrap_or_else(|| "".to_string()); - let detail = format!("{}{}", header, contents); - let res: Result<_, String> = Ok((file_node.get_name(), detail)); - res - }) - .map(|f| f.boxed()); - let dir_details_by_name = dir - .get_directories() - .iter() - .map(|dir_node| async move { - let digest_proto = dir_node.get_digest(); - let detail = format!( - "dir digest={} size={}:\n\n", - digest_proto.hash, digest_proto.size_bytes - ); - let res: Result<_, String> = Ok((dir_node.get_name(), detail)); - res - }) - .map(|f| f.boxed()); - - let duplicate_details = async move { - let details_by_name = future03::try_join_all( - file_details_by_name - .chain(dir_details_by_name) - .collect::>(), - ) - .await? - .into_iter() - .into_group_map(); - - let enumerated_details = - std::iter::Iterator::flatten(details_by_name.iter().filter_map(|(name, details)| { - if details.len() > 1 { - Some( - details - .iter() - .enumerate() - .map(move |(index, detail)| format!("`{}`: {}.) {}", name, index + 1, detail)), - ) - } else { - None - } - })) - .collect(); - - let res: Result, String> = Ok(enumerated_details); - res - } - .await - .unwrap_or_else(|err| vec![format!("Failed to load contents for comparison: {}", err)]); - - Err(format!( - "Can only merge Directories with no duplicates, but found {} duplicate entries in {}:\ - \n\n{}", - duplicate_details.len(), - parent_path.display(), - duplicate_details.join("\n\n") - )) -} - -/// -/// When we evaluate a recursive glob during the subset() operation, we perform some relatively -/// complex logic to coalesce globs in subdirectories, and to short-circuit retrieving -/// subdirectories from the store at all if we don't need to inspect their contents -/// (e.g. if a glob ends in "**"). This struct isolates that complexity, allowing the code in -/// subset() to just operate on the higher-level GlobbedFilesAndDirectories struct. -/// -struct IntermediateGlobbedFilesAndDirectories { - globbed_files: IndexMap, - globbed_directories: IndexMap, - cur_dir_files: IndexMap, - cur_dir_directories: IndexMap, - todo_directories: IndexMap>, - prefix: PathBuf, - multiple_globs: MultipleGlobs, -} - -struct GlobbedFilesAndDirectories { - // All of the subdirectories of the source Directory that is currently being subsetted, - // *regardless of whether the glob is matched yet*. - cur_dir_directories: IndexMap, - // All of the files of the source Directory matching the current glob. - globbed_files: IndexMap, - // All of the matching subdirectories of the source Directory *after* being subsetted to match the - // current glob. - globbed_directories: IndexMap, - // All of the matching subdirectories of the source Directory, *before* being subsetted to match - // the current glob. - todo_directories: IndexMap>, - exclude: Arc, -} - -impl IntermediateGlobbedFilesAndDirectories { - fn from_cur_dir_and_globs( - cur_dir: remexec::Directory, - multiple_globs: MultipleGlobs, - prefix: PathBuf, - ) -> Self { - let cur_dir_files: IndexMap = cur_dir - .get_files() - .to_vec() - .into_iter() - .map(|file_node| (PathBuf::from(file_node.get_name()), file_node)) - .collect(); - let cur_dir_directories: IndexMap = cur_dir - .get_directories() - .to_vec() - .into_iter() - .map(|directory_node| (PathBuf::from(directory_node.get_name()), directory_node)) - .collect(); - - let globbed_files: IndexMap = IndexMap::new(); - let globbed_directories: IndexMap = IndexMap::new(); - let todo_directories: IndexMap> = IndexMap::new(); - - IntermediateGlobbedFilesAndDirectories { - globbed_files, - globbed_directories, - cur_dir_files, - cur_dir_directories, - todo_directories, - prefix, - multiple_globs, - } - } - - async fn populate_globbed_files_and_directories( - self, - ) -> Result { - let IntermediateGlobbedFilesAndDirectories { - mut globbed_files, - mut globbed_directories, - // NB: When iterating over files, we can remove them from `cur_dir_files` after they are - // successfully matched once, hence the `mut` declaration. This is a small - // optimization. However, when iterating over directories, different DirWildcard instances - // within a single Vec can result in having multiple different DirWildcard instances - // created after matching against a single directory node! So we do *not* mark - // `cur_dir_directories` as `mut`. - mut cur_dir_files, - cur_dir_directories, - mut todo_directories, - prefix, - multiple_globs: MultipleGlobs { include, exclude }, - } = self; - - // Populate globbed_{files,directories} by iterating over all the globs. - for path_glob in include.into_iter() { - let wildcard = match &path_glob { - RestrictedPathGlob::Wildcard { wildcard } => wildcard, - RestrictedPathGlob::DirWildcard { wildcard, .. } => wildcard, - }; - - let matching_files: Vec = cur_dir_files - .keys() - .filter(|path| { - // NB: match just the current path component against the wildcard, but use the prefix - // when checking against the `exclude` patterns! - wildcard.matches_path(path) && !exclude.is_ignored_path(&prefix.join(path), false) - }) - .cloned() - .collect(); - for file_path in matching_files.into_iter() { - // NB: remove any matched files from `cur_dir_files`, so they are successfully matched - // against at most once. - let file_node = cur_dir_files.remove(&file_path).unwrap(); - globbed_files.insert(file_path, file_node); - } - - let matching_directories: Vec = cur_dir_directories - .keys() - .filter(|path| { - // NB: match just the current path component against the wildcard, but use the prefix - // when checking against the `exclude` patterns! - wildcard.matches_path(path) && !exclude.is_ignored_path(&prefix.join(path), true) - }) - .cloned() - .collect(); - for directory_path in matching_directories.into_iter() { - // NB: do *not* remove the directory for `cur_dir_directories` after it is matched - // successfully once! - let directory_node = cur_dir_directories.get(&directory_path).unwrap(); - - // TODO(#9967): Figure out how to consume the existing glob matching logic that works on - // `VFS` instances! - match &path_glob { - RestrictedPathGlob::Wildcard { wildcard } => { - assert_ne!(*wildcard, *DOUBLE_STAR_GLOB); - // This directory matched completely, without having to subset its contents - // whatsoever. We can avoid traversing its contents and return the node. - // NB: This line should be idempotent if the same directory node is added twice. - globbed_directories.insert(directory_path, directory_node.clone()); - } - RestrictedPathGlob::DirWildcard { - wildcard, - remainder, - } => { - let mut subdir_globs: Vec = vec![]; - if (*wildcard == *DOUBLE_STAR_GLOB) || (*wildcard == *SINGLE_STAR_GLOB) { - // Here we short-circuit all cases which would swallow up a directory without - // subsetting it or needing to perform any further recursive work. - let short_circuit: bool = match &remainder[..] { - [] => true, - // NB: Very often, /**/* is seen ending zsh-style globs, which means the same as - // ending in /**. Because we want to *avoid* recursing and just use the subdirectory - // as-is for /**/* and /**, we `continue` here in both cases. - [single_glob] if *single_glob == *SINGLE_STAR_GLOB => true, - [double_glob] if *double_glob == *DOUBLE_STAR_GLOB => true, - [double_glob, single_glob] - if *double_glob == *DOUBLE_STAR_GLOB && *single_glob == *SINGLE_STAR_GLOB => - { - true - } - _ => false, - }; - if short_circuit { - globbed_directories.insert(directory_path, directory_node.clone()); - continue; - } - // In this case, there is a remainder glob which will be used to subset the contents - // of any subdirectories. We ensure ** is recursive by cloning the ** and all the - // remainder globs and pushing it to `subdir_globs` whenever we need to recurse into - // subdirectories. - let with_double_star = RestrictedPathGlob::DirWildcard { - wildcard: wildcard.clone(), - remainder: remainder.clone(), - }; - subdir_globs.push(with_double_star); - } else { - assert_ne!(0, remainder.len()); - } - match remainder.len() { - 0 => (), - 1 => { - let next_glob = RestrictedPathGlob::Wildcard { - wildcard: remainder.get(0).unwrap().clone(), - }; - subdir_globs.push(next_glob); - } - _ => { - let next_glob = RestrictedPathGlob::DirWildcard { - wildcard: remainder.get(0).unwrap().clone(), - remainder: remainder[1..].to_vec(), - }; - subdir_globs.push(next_glob); - } - } - // Append to the existing globs, and collate at the end of this iteration. - let entry = todo_directories - .entry(directory_path) - .or_insert_with(Vec::new); - entry.extend(subdir_globs); - } - } - } - } - - Ok(GlobbedFilesAndDirectories { - cur_dir_directories, - globbed_files, - globbed_directories, - todo_directories, - exclude, - }) - } -} - -async fn snapshot_glob_match( - store_wrapper: T, - digest: Digest, - path_globs: PreparedPathGlobs, -) -> Result { - // Split the globs into PathGlobs that can be incrementally matched against individual directory - // components. - let initial_match_context = UnexpandedSubdirectoryContext { - digest, - multiple_globs: path_globs.as_expandable_globs().into(), - }; - let mut unexpanded_stack: IndexMap = - [(PathBuf::new(), initial_match_context)] - .iter() - .map(|(a, b)| (a.clone(), b.clone())) - .collect(); - let mut partially_expanded_stack: IndexMap = - IndexMap::new(); - - // 1. Determine all the digests we need to recurse through and modify in order to respect a glob - // which may match over multiple contiguous directory components. - while let Some(( - prefix, - UnexpandedSubdirectoryContext { - digest, - multiple_globs, - }, - )) = unexpanded_stack.pop() - { - // 1a. Extract a single level of directory structure from the digest. - let cur_dir = store_wrapper - .load_directory(digest) - .await? - .ok_or_else(|| format!("directory digest {:?} was not found!", digest))?; - - // 1b. Filter files and directories by globs. - let intermediate_globbed = IntermediateGlobbedFilesAndDirectories::from_cur_dir_and_globs( - cur_dir, - multiple_globs, - prefix.clone(), - ); - let GlobbedFilesAndDirectories { - cur_dir_directories, - globbed_files, - globbed_directories, - todo_directories, - exclude, - } = intermediate_globbed - .populate_globbed_files_and_directories() - .await?; - - // 1c. Push context structs that specify unexpanded directories onto `unexpanded_stack`. - let dependencies: Vec = todo_directories - .keys() - .filter_map(|subdir_path| { - // If we ever encounter a directory *with* a remainder, we have to ensure that that - // directory is *not* in `globbed_directories`, which are *not* subsetted at all. - // NB: Because our goal is to *union* all of the includes, if a directory is in - // `globbed_directories`, we can skip processing it for subsetting here! - if globbed_directories.contains_key(subdir_path) { - None - } else { - Some(prefix.join(subdir_path)) - } - }) - .collect(); - - for (subdir_name, all_path_globs) in todo_directories.into_iter() { - let full_name = prefix.join(&subdir_name); - let bazel_digest = cur_dir_directories - .get(&subdir_name) - .unwrap() - .get_digest() - .clone(); - let digest = to_pants_digest(bazel_digest); - let multiple_globs = MultipleGlobs { - include: all_path_globs, - exclude: exclude.clone(), - }; - unexpanded_stack.insert( - full_name, - UnexpandedSubdirectoryContext { - digest, - multiple_globs, - }, - ); - } - - // 1d. Push a context struct onto `partially_expanded_stack` which has "dependencies" on all - // subdirectories to be globbed in `directory_promises`. IndexMap is backed by a vector and can - // act as a stack, so we can be sure that when we finally retrieve this context struct from the - // stack, we will have already globbed all of its subdirectories. - let partially_expanded_context = PartiallyExpandedDirectoryContext { - files: globbed_files.into_iter().map(|(_, node)| node).collect(), - known_directories: globbed_directories - .into_iter() - .map(|(_, node)| node) - .collect(), - directory_promises: dependencies, - }; - partially_expanded_stack.insert(prefix, partially_expanded_context); - } - - // 2. Zip back up the recursively subsetted directory protos. - let mut completed_digests: IndexMap = IndexMap::new(); - while let Some(( - prefix, - PartiallyExpandedDirectoryContext { - files, - known_directories, - directory_promises, - }, - )) = partially_expanded_stack.pop() - { - let completed_nodes: Vec = directory_promises - .into_iter() - .map(|dependency| { - // NB: Note that all "dependencies" here are subdirectories that need to be globbed before - // their parent directory can be entered into the store. - let digest = completed_digests.get(&dependency).ok_or_else(|| { - format!( - "expected subdirectory to glob {:?} to be available from completed_digests {:?} -- internal error", - &dependency, &completed_digests - ) - })?; - let mut fixed_directory_node = remexec::DirectoryNode::new(); - // NB: Get the name *relative* to the current directory. - let name = dependency.strip_prefix(prefix.clone()).map_err(|e| format!("{:?}", e))?; - fixed_directory_node.set_name(format!("{}", name.display())); - fixed_directory_node.set_digest(to_bazel_digest(*digest)); - Ok(fixed_directory_node) - }) - .collect::, String>>()?; - - // Create the new protobuf with the merged nodes. - let mut final_directory = remexec::Directory::new(); - final_directory.set_files(protobuf::RepeatedField::from_vec(files)); - let all_directories: Vec = known_directories - .into_iter() - .chain(completed_nodes.into_iter()) - .collect(); - final_directory.set_directories(protobuf::RepeatedField::from_vec(all_directories)); - let digest = store_wrapper.record_directory(&final_directory).await?; - completed_digests.insert(prefix, digest); - } - - let final_digest = completed_digests.get(&PathBuf::new()).unwrap(); - Ok(*final_digest) -} - -/// -/// High-level operations to manipulate and merge `Digest`s. -/// -/// These methods take care to avoid redundant work when traversing Directory protos. Prefer to use -/// these primitives to compose any higher-level snapshot operations elsewhere in the codebase. -/// -#[async_trait] -pub trait SnapshotOps: StoreWrapper + 'static { - /// - /// Given N Snapshots, returns a new Snapshot that merges them. - /// - async fn merge(&self, digests: Vec) -> Result { - merge_directories(self.clone(), digests) - .await - .map_err(|e| e.into()) - } - - async fn add_prefix(&self, digest: Digest, prefix: PathBuf) -> Result { - let mut dir_node = remexec::DirectoryNode::new(); - dir_node.set_name(osstring_as_utf8(prefix.into_os_string())?); - dir_node.set_digest((&digest).into()); - - let mut out_dir = remexec::Directory::new(); - out_dir.set_directories(protobuf::RepeatedField::from_vec(vec![dir_node])); - - Ok(self.record_directory(&out_dir).await?) - } - - async fn strip_prefix( - &self, - root_digest: Digest, - prefix: PathBuf, - ) -> Result { - let mut dir = self.load_directory_or_err(root_digest).await?; - let mut already_stripped = PathBuf::new(); - let mut prefix = prefix; - loop { - let has_already_stripped_any = already_stripped.components().next().is_some(); - - let mut components = prefix.components(); - let component_to_strip = components.next(); - if let Some(component_to_strip) = component_to_strip { - let remaining_prefix = components.collect(); - let component_to_strip_str = component_to_strip.as_os_str().to_string_lossy(); - - let mut saw_matching_dir = false; - let extra_directories: Vec<_> = dir - .get_directories() - .iter() - .filter_map(|subdir| { - if subdir.get_name() == component_to_strip_str { - saw_matching_dir = true; - None - } else { - Some(subdir.get_name().to_owned()) - } - }) - .collect(); - let files: Vec<_> = dir - .get_files() - .iter() - .map(|file| file.get_name().to_owned()) - .collect(); - - match (saw_matching_dir, extra_directories.is_empty() && files.is_empty()) { - (false, true) => { - dir = remexec::Directory::new(); - break; - }, - (false, false) => { - // Prefer "No subdirectory found" error to "had extra files" error. - return Err(format!( - "Cannot strip prefix {} from root directory {:?} - {}directory{} didn't contain a directory named {}{}", - already_stripped.join(&prefix).display(), - root_digest, - if has_already_stripped_any { "sub" } else { "root " }, - if has_already_stripped_any { format!(" {}", already_stripped.display()) } else { String::new() }, - component_to_strip_str, - if !extra_directories.is_empty() || !files.is_empty() { format!(" but did contain {}", Snapshot::directories_and_files(&extra_directories, &files)) } else { String::new() }, - ).into()) - }, - (true, false) => { - return Err(format!( - "Cannot strip prefix {} from root directory {:?} - {}directory{} contained non-matching {}", - already_stripped.join(&prefix).display(), - root_digest, - if has_already_stripped_any { "sub" } else { "root " }, - if has_already_stripped_any { format!(" {}", already_stripped.display()) } else { String::new() }, - Snapshot::directories_and_files(&extra_directories, &files), - ).into()) - }, - (true, true) => { - // Must be 0th index, because we've checked that we saw a matching directory, and no - // others. - let digest = to_pants_digest( - dir.get_directories()[0] - .get_digest() - .clone()); - already_stripped = already_stripped.join(component_to_strip); - dir = self.load_directory_or_err(digest).await?; - prefix = remaining_prefix; - } - } - } else { - break; - } - } - - Ok(self.record_directory(&dir).await?) - } - - async fn subset(&self, digest: Digest, params: SubsetParams) -> Result { - let SubsetParams { globs } = params; - snapshot_glob_match(self.clone(), digest, globs).await - } -} -impl SnapshotOps for T {} - -struct PartiallyExpandedDirectoryContext { - pub files: Vec, - pub known_directories: Vec, - pub directory_promises: Vec, -} - -#[derive(Clone, Debug)] -enum RestrictedPathGlob { - Wildcard { - wildcard: Pattern, - }, - DirWildcard { - wildcard: Pattern, - remainder: Vec, - }, -} - -impl From for RestrictedPathGlob { - fn from(glob: PathGlob) -> Self { - match glob { - PathGlob::Wildcard { wildcard, .. } => RestrictedPathGlob::Wildcard { wildcard }, - PathGlob::DirWildcard { - wildcard, - remainder, - .. - } => RestrictedPathGlob::DirWildcard { - wildcard, - remainder, - }, - } - } -} - -fn to_bazel_digest(digest: Digest) -> remexec::Digest { - let mut bazel_digest = remexec::Digest::new(); - bazel_digest.set_hash(digest.0.to_hex()); - bazel_digest.set_size_bytes(digest.1 as i64); - bazel_digest -} - -fn to_pants_digest(bazel_digest: remexec::Digest) -> Digest { - let fp = Fingerprint::from_hex_string(bazel_digest.get_hash()) - .expect("failed to coerce bazel to pants digest"); - let size_bytes = bazel_digest.get_size_bytes() as usize; - Digest(fp, size_bytes) -} - -#[derive(Clone)] -struct MultipleGlobs { - pub include: Vec, - pub exclude: Arc, -} - -impl From for MultipleGlobs { - fn from(globs: ExpandablePathGlobs) -> Self { - let ExpandablePathGlobs { include, exclude } = globs; - MultipleGlobs { - include: include.into_iter().map(|x| x.into()).collect(), - exclude, - } - } -} - -#[derive(Clone)] -struct UnexpandedSubdirectoryContext { - pub digest: Digest, - pub multiple_globs: MultipleGlobs, -} diff --git a/src/rust/engine/fs/store/src/snapshot_ops_tests.rs b/src/rust/engine/fs/store/src/snapshot_ops_tests.rs deleted file mode 100644 index aeed3e8765b..00000000000 --- a/src/rust/engine/fs/store/src/snapshot_ops_tests.rs +++ /dev/null @@ -1,221 +0,0 @@ -use async_trait::async_trait; -use parking_lot::Mutex; -use testutil::make_file; - -use crate::{ - snapshot_ops::StoreWrapper, - snapshot_tests::{expand_all_sorted, setup, STR, STR2}, - OneOffStoreFileByDigest, Snapshot, SnapshotOps, Store, SubsetParams, -}; -use bazel_protos::remote_execution as remexec; -use fs::{GlobExpansionConjunction, PosixFS, PreparedPathGlobs, StrictGlobMatching}; -use hashing::Digest; - -use std::collections::HashMap; -use std::fs::create_dir_all; -use std::path::{Path, PathBuf}; -use std::sync::Arc; - -async fn get_duplicate_rolands( - store: Store, - store_wrapper: T, - base_path: &Path, - posix_fs: Arc, - digester: OneOffStoreFileByDigest, -) -> (Digest, Snapshot, Snapshot) { - create_dir_all(base_path.join("subdir")).unwrap(); - - make_file(&base_path.join("subdir/roland1"), STR.as_bytes(), 0o600); - let path_stats1 = expand_all_sorted(posix_fs).await; - let snapshot1 = Snapshot::from_path_stats(store.clone(), digester.clone(), path_stats1) - .await - .unwrap(); - - let (_store2, tempdir2, posix_fs2, digester2) = setup(); - create_dir_all(tempdir2.path().join("subdir")).unwrap(); - make_file( - &tempdir2.path().join("subdir/roland2"), - STR2.as_bytes(), - 0o600, - ); - let path_stats2 = expand_all_sorted(posix_fs2).await; - let snapshot2 = Snapshot::from_path_stats(store.clone(), digester2, path_stats2) - .await - .unwrap(); - - let merged_digest = store_wrapper - .merge(vec![snapshot1.digest, snapshot2.digest]) - .await - .unwrap(); - - (merged_digest, snapshot1, snapshot2) -} - -fn make_subset_params(globs: &[&str]) -> SubsetParams { - let globs = PreparedPathGlobs::create( - globs.iter().map(|s| s.to_string()).collect(), - StrictGlobMatching::Ignore, - GlobExpansionConjunction::AllMatch, - ) - .unwrap(); - SubsetParams { globs } -} - -#[tokio::test] -async fn subset_single_files() { - let (store, tempdir, posix_fs, digester) = setup(); - - let (merged_digest, snapshot1, snapshot2) = get_duplicate_rolands( - store.clone(), - store.clone(), - tempdir.path(), - posix_fs.clone(), - digester, - ) - .await; - - let subset_params1 = make_subset_params(&["subdir/roland1"]); - let subset_roland1 = store - .clone() - .subset(merged_digest, subset_params1) - .await - .unwrap(); - assert_eq!(subset_roland1, snapshot1.digest); - - let subset_params2 = make_subset_params(&["subdir/roland2"]); - let subset_roland2 = store - .clone() - .subset(merged_digest, subset_params2) - .await - .unwrap(); - assert_eq!(subset_roland2, snapshot2.digest); -} - -#[tokio::test] -async fn subset_recursive_wildcard() { - let (store, tempdir, posix_fs, digester) = setup(); - - let (merged_digest, _, _) = get_duplicate_rolands( - store.clone(), - store.clone(), - tempdir.path(), - posix_fs.clone(), - digester, - ) - .await; - - let subset_params1 = make_subset_params(&["subdir/**"]); - let subset_roland1 = store - .clone() - .subset(merged_digest, subset_params1) - .await - .unwrap(); - assert_eq!(merged_digest, subset_roland1); - - // **/* is a commonly-used alias for **. - let subset_params2 = make_subset_params(&["subdir/**/*"]); - let subset_roland2 = store - .clone() - .subset(merged_digest, subset_params2) - .await - .unwrap(); - assert_eq!(merged_digest, subset_roland2); -} - -#[derive(Clone)] -struct LoadTrackingStore { - store: Store, - load_counts: Arc>>, -} - -#[async_trait] -impl StoreWrapper for LoadTrackingStore { - async fn load_file_bytes_with T + Send + Sync + 'static>( - &self, - digest: Digest, - f: F, - ) -> Result, String> { - Ok( - Store::load_file_bytes_with(&self.store, digest, f) - .await? - .map(|(value, _)| value), - ) - } - - async fn load_directory(&self, digest: Digest) -> Result, String> { - { - let mut counts = self.load_counts.lock(); - let entry = counts.entry(digest).or_insert(0); - *entry += 1; - } - Ok( - Store::load_directory(&self.store, digest) - .await? - .map(|(dir, _)| dir), - ) - } - - async fn load_directory_or_err(&self, digest: Digest) -> Result { - { - let mut counts = self.load_counts.lock(); - let entry = counts.entry(digest).or_insert(0); - *entry += 1; - } - Snapshot::get_directory_or_err(self.store.clone(), digest).await - } - - async fn record_directory(&self, directory: &remexec::Directory) -> Result { - Store::record_directory(&self.store, directory, true).await - } -} - -#[tokio::test] -async fn subset_tracking_load_counts() { - let (store, tempdir, posix_fs, digester) = setup(); - - let load_tracking_store = LoadTrackingStore { - store: store.clone(), - load_counts: Arc::new(Mutex::new(HashMap::new())), - }; - - let (merged_digest, _, _) = get_duplicate_rolands( - store.clone(), - load_tracking_store.clone(), - tempdir.path(), - posix_fs.clone(), - digester, - ) - .await; - - let subdir_digest = load_tracking_store - .strip_prefix(merged_digest, PathBuf::from("subdir")) - .await - .unwrap(); - - let num_subdir_loads = { - let num_loads: HashMap = load_tracking_store.load_counts.lock().clone(); - *num_loads.get(&subdir_digest).unwrap() - }; - assert_eq!(1, num_subdir_loads); - - let subset_everything = make_subset_params(&["**/*"]); - let subset_result = load_tracking_store - .subset(merged_digest, subset_everything) - .await - .unwrap(); - assert_eq!(merged_digest, subset_result); - // Verify that no extra digest loads for the subdirectory "subdir" are performed when a ** glob is - // used, which should just take the digest unmodified, and not attempt to examine its contents. - let num_loads: HashMap = load_tracking_store.load_counts.lock().clone(); - assert_eq!(num_subdir_loads, *num_loads.get(&subdir_digest).unwrap()); - - // Check that the same result occurs when the trailing glob is just /**. - let subset_everything = make_subset_params(&["**"]); - let subset_result = load_tracking_store - .subset(merged_digest, subset_everything) - .await - .unwrap(); - assert_eq!(merged_digest, subset_result); - let num_loads: HashMap = load_tracking_store.load_counts.lock().clone(); - assert_eq!(num_subdir_loads, *num_loads.get(&subdir_digest).unwrap()); -} diff --git a/src/rust/engine/fs/store/src/snapshot_tests.rs b/src/rust/engine/fs/store/src/snapshot_tests.rs index 4591a228ae4..97cee53c693 100644 --- a/src/rust/engine/fs/store/src/snapshot_tests.rs +++ b/src/rust/engine/fs/store/src/snapshot_tests.rs @@ -3,22 +3,20 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use hashing::{Digest, Fingerprint}; -use task_executor; use tempfile; use testutil::data::TestDirectory; use testutil::make_file; use tokio::runtime::Handle; -use crate::{OneOffStoreFileByDigest, Snapshot, SnapshotOps, Store}; +use crate::{OneOffStoreFileByDigest, Snapshot, Store}; use fs::{ Dir, File, GitignoreStyleExcludes, GlobExpansionConjunction, GlobMatching, PathGlobs, PathStat, PosixFS, StrictGlobMatching, }; -pub const STR: &str = "European Burmese"; -pub const STR2: &str = "asdf"; +const STR: &str = "European Burmese"; -pub fn setup() -> ( +fn setup() -> ( Store, tempfile::TempDir, Arc, @@ -165,9 +163,11 @@ async fn merge_directories_two_files() { .await .expect("Storing treats directory"); - let result = store - .merge(vec![containing_treats.digest(), containing_roland.digest()]) - .await; + let result = Snapshot::merge_directories( + store, + vec![containing_treats.digest(), containing_roland.digest()], + ) + .await; assert_eq!( result, @@ -191,17 +191,16 @@ async fn merge_directories_clashing_files() { .await .expect("Storing wrong roland directory"); - let err = store - .merge(vec![ - containing_roland.digest(), - containing_wrong_roland.digest(), - ]) - .await - .expect_err("Want error merging"); + let err = Snapshot::merge_directories( + store, + vec![containing_roland.digest(), containing_wrong_roland.digest()], + ) + .await + .expect_err("Want error merging"); assert!( - format!("{:?}", err).contains("roland"), - "Want error message to contain roland but was: {:?}", + err.contains("roland"), + "Want error message to contain roland but was: {}", err ); } @@ -222,12 +221,14 @@ async fn merge_directories_same_files() { .await .expect("Storing treats directory"); - let result = store - .merge(vec![ + let result = Snapshot::merge_directories( + store, + vec![ containing_roland.digest(), containing_roland_and_treats.digest(), - ]) - .await; + ], + ) + .await; assert_eq!( result, @@ -269,12 +270,17 @@ async fn snapshot_merge_two_files() { .await .unwrap(); - let merged = store - .merge(vec![snapshot1.digest, snapshot2.digest]) + let merged = Snapshot::merge(store.clone(), &[snapshot1, snapshot2]) .await .unwrap(); - let merged_root_directory = store.load_directory(merged).await.unwrap().unwrap().0; + let merged_root_directory = store + .load_directory(merged.digest) + .await + .unwrap() + .unwrap() + .0; + assert_eq!(merged.path_stats, vec![dir, file1, file2]); assert_eq!(merged_root_directory.files.len(), 0); assert_eq!(merged_root_directory.directories.len(), 1); @@ -300,7 +306,7 @@ async fn snapshot_merge_two_files() { } #[tokio::test] -async fn snapshot_merge_same_file() { +async fn snapshot_merge_colliding() { let (store, tempdir, _, digester) = setup(); let file = make_file_stat( @@ -310,48 +316,18 @@ async fn snapshot_merge_same_file() { false, ); - // When the file is the exact same, merging should succeed. let snapshot1 = Snapshot::from_path_stats(store.clone(), digester.clone(), vec![file.clone()]) .await .unwrap(); - let snapshot1_cloned = Snapshot::from_path_stats(store.clone(), digester.clone(), vec![file]) - .await - .unwrap(); - let merged_res = store - .merge(vec![snapshot1.digest, snapshot1_cloned.digest]) - .await; - - assert_eq!(merged_res, Ok(snapshot1.digest)); -} - -#[tokio::test] -async fn snapshot_merge_colliding() { - let (store, tempdir, posix_fs, digester) = setup(); - - make_file(&tempdir.path().join("roland"), STR.as_bytes(), 0o600); - let path_stats1 = expand_all_sorted(posix_fs).await; - let snapshot1 = Snapshot::from_path_stats(store.clone(), digester.clone(), path_stats1) + let snapshot2 = Snapshot::from_path_stats(store.clone(), digester, vec![file]) .await .unwrap(); - // When the file is *not* the same, error out. - let (_store2, tempdir2, posix_fs2, digester2) = setup(); - make_file(&tempdir2.path().join("roland"), STR2.as_bytes(), 0o600); - let path_stats2 = expand_all_sorted(posix_fs2).await; - let snapshot2 = Snapshot::from_path_stats(store.clone(), digester2, path_stats2) - .await - .unwrap(); - - let merged_res = store.merge(vec![snapshot1.digest, snapshot2.digest]).await; + let merged_res = Snapshot::merge(store.clone(), &[snapshot1, snapshot2]).await; match merged_res { - Err(ref msg) - if format!("{:?}", msg).contains("found 2 duplicate entries") - && format!("{:?}", msg).contains("roland") => - { - () - } + Err(ref msg) if msg.contains("contained duplicate path") && msg.contains("roland") => (), x => panic!( "Snapshot::merge should have failed with a useful message; got: {:?}", x @@ -369,7 +345,7 @@ async fn strip_empty_prefix() { .await .expect("Error storing directory"); - let result = store.strip_prefix(dir.digest(), PathBuf::from("")).await; + let result = super::Snapshot::strip_prefix(store, dir.digest(), PathBuf::from("")).await; assert_eq!(result, Ok(dir.digest())); } @@ -387,9 +363,7 @@ async fn strip_non_empty_prefix() { .await .expect("Error storing directory"); - let result = store - .strip_prefix(dir.digest(), PathBuf::from("cats")) - .await; + let result = super::Snapshot::strip_prefix(store, dir.digest(), PathBuf::from("cats")).await; assert_eq!(result, Ok(TestDirectory::containing_roland().digest())); } @@ -403,9 +377,8 @@ async fn strip_prefix_empty_subdir() { .await .expect("Error storing directory"); - let result = store - .strip_prefix(dir.digest(), PathBuf::from("falcons/peregrine")) - .await; + let result = + super::Snapshot::strip_prefix(store, dir.digest(), PathBuf::from("falcons/peregrine")).await; assert_eq!(result, Ok(TestDirectory::empty().digest())); } @@ -413,8 +386,8 @@ async fn strip_prefix_empty_subdir() { async fn strip_dir_not_in_store() { let (store, _, _, _) = setup(); let digest = TestDirectory::nested().digest(); - let result = store.strip_prefix(digest, PathBuf::from("cats")).await; - assert_eq!(result, Err(format!("{:?} was not known", digest).into())); + let result = super::Snapshot::strip_prefix(store, digest, PathBuf::from("cats")).await; + assert_eq!(result, Err(format!("{:?} was not known", digest))); } #[tokio::test] @@ -425,18 +398,13 @@ async fn strip_subdir_not_in_store() { .record_directory(&dir.directory(), false) .await .expect("Error storing directory"); - let result = store - .strip_prefix(dir.digest(), PathBuf::from("cats")) - .await; + let result = super::Snapshot::strip_prefix(store, dir.digest(), PathBuf::from("cats")).await; assert_eq!( result, - Err( - format!( - "{:?} was not known", - TestDirectory::containing_roland().digest() - ) - .into() - ) + Err(format!( + "{:?} was not known", + TestDirectory::containing_roland().digest() + )) ); } @@ -453,11 +421,9 @@ async fn strip_prefix_non_matching_file() { .record_directory(&child_dir.directory(), false) .await .expect("Error storing directory"); - let result = store - .strip_prefix(dir.digest(), PathBuf::from("cats")) - .await; + let result = super::Snapshot::strip_prefix(store, dir.digest(), PathBuf::from("cats")).await; - assert_eq!(result, Err(format!("Cannot strip prefix cats from root directory {:?} - root directory contained non-matching file named: treats", dir.digest()).into())); + assert_eq!(result, Err(format!("Cannot strip prefix cats from root directory {:?} - root directory contained non-matching file named: treats", dir.digest()))); } #[tokio::test] @@ -473,11 +439,10 @@ async fn strip_prefix_non_matching_dir() { .record_directory(&child_dir.directory(), false) .await .expect("Error storing directory"); - let result = store - .strip_prefix(dir.digest(), PathBuf::from("animals/cats")) - .await; + let result = + super::Snapshot::strip_prefix(store, dir.digest(), PathBuf::from("animals/cats")).await; - assert_eq!(result, Err(format!("Cannot strip prefix animals/cats from root directory {:?} - subdirectory animals contained non-matching directory named: birds", dir.digest()).into())); + assert_eq!(result, Err(format!("Cannot strip prefix animals/cats from root directory {:?} - subdirectory animals contained non-matching directory named: birds", dir.digest()))); } #[tokio::test] @@ -492,10 +457,8 @@ async fn strip_subdir_not_in_dir() { .record_directory(&TestDirectory::containing_roland().directory(), false) .await .expect("Error storing directory"); - let result = store - .strip_prefix(dir.digest(), PathBuf::from("cats/ugly")) - .await; - assert_eq!(result, Err(format!("Cannot strip prefix cats/ugly from root directory {:?} - subdirectory cats didn't contain a directory named ugly but did contain file named: roland", dir.digest()).into())); + let result = super::Snapshot::strip_prefix(store, dir.digest(), PathBuf::from("cats/ugly")).await; + assert_eq!(result, Err(format!("Cannot strip prefix cats/ugly from root directory {:?} - subdirectory cats didn't contain a directory named ugly but did contain file named: roland", dir.digest()))); } fn make_dir_stat(root: &Path, relpath: &Path) -> PathStat { @@ -518,7 +481,7 @@ fn make_file_stat(root: &Path, relpath: &Path, contents: &[u8], is_executable: b ) } -pub async fn expand_all_sorted(posix_fs: Arc) -> Vec { +async fn expand_all_sorted(posix_fs: Arc) -> Vec { let mut v = posix_fs .expand( // Don't error or warn if there are no paths matched -- that is a valid state. diff --git a/src/rust/engine/process_execution/src/remote.rs b/src/rust/engine/process_execution/src/remote.rs index 91041283ee0..4fcd53acbe8 100644 --- a/src/rust/engine/process_execution/src/remote.rs +++ b/src/rust/engine/process_execution/src/remote.rs @@ -20,7 +20,7 @@ use hashing::{Digest, Fingerprint}; use log::{debug, trace, warn}; use protobuf::{self, Message, ProtobufEnum}; use sha2::Sha256; -use store::{Snapshot, SnapshotOps, Store, StoreFileByDigest}; +use store::{Snapshot, Store, StoreFileByDigest}; use tokio::time::delay_for; use crate::{ @@ -1137,10 +1137,8 @@ pub fn extract_output_files( future03::try_join(files_digest, future03::try_join_all(directory_digests)).await?; directory_digests.push(files_digest); - - store - .merge(directory_digests) - .map_err(|err| format!("Error when merging output files and directories: {:?}", err)) + Snapshot::merge_directories(store, directory_digests) + .map_err(|err| format!("Error when merging output files and directories: {}", err)) .await }) .compat() diff --git a/src/rust/engine/sharded_lmdb/src/lib.rs b/src/rust/engine/sharded_lmdb/src/lib.rs index 2dc37089d9f..bf6a617e225 100644 --- a/src/rust/engine/sharded_lmdb/src/lib.rs +++ b/src/rust/engine/sharded_lmdb/src/lib.rs @@ -103,7 +103,7 @@ impl AsRef<[u8]> for VersionedFingerprint { // Each LMDB directory can have at most one concurrent writer. // We use this type to shard storage into 16 LMDB directories, based on the first 4 bits of the // fingerprint being stored, so that we can write to them in parallel. -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct ShardedLmdb { // First Database is content, second is leases. lmdbs: HashMap, Database, Database)>, diff --git a/src/rust/engine/src/externs/interface.rs b/src/rust/engine/src/externs/interface.rs index 86df0a09217..29a1f49db03 100644 --- a/src/rust/engine/src/externs/interface.rs +++ b/src/rust/engine/src/externs/interface.rs @@ -60,7 +60,6 @@ use log::{self, error, warn, Log}; use logging::logger::LOGGER; use logging::{Destination, Logger, PythonLogLevel}; use rule_graph::{self, RuleGraph}; -use store::SnapshotOps; use task_executor::Executor; use tempfile::TempDir; use workunit_store::{Workunit, WorkunitState}; @@ -1355,9 +1354,11 @@ fn merge_directories( scheduler .core .executor - .block_on(scheduler.core.store().merge(digests)) + .block_on(store::Snapshot::merge_directories( + scheduler.core.store(), + digests, + )) .map(|dir| nodes::Snapshot::store_directory(&scheduler.core, &dir).into()) - .map_err(|e| format!("{:?}", e)) }) .map_err(|e| PyErr::new::(py, (e,))) }) diff --git a/src/rust/engine/src/intrinsics.rs b/src/rust/engine/src/intrinsics.rs index 9e7835ce3cc..8933a89cad6 100644 --- a/src/rust/engine/src/intrinsics.rs +++ b/src/rust/engine/src/intrinsics.rs @@ -9,7 +9,6 @@ use crate::types::Types; use futures::compat::Future01CompatExt; use futures::future::{self, BoxFuture, FutureExt, TryFutureExt}; use indexmap::IndexMap; -use store::{SnapshotOps, SubsetParams}; use std::path::PathBuf; @@ -203,15 +202,12 @@ fn remove_prefix_request_to_digest( args: Vec, ) -> BoxFuture<'static, NodeResult> { let core = context.core; - let store = core.store(); async move { let input_digest = lift_digest(&externs::project_ignoring_type(&args[0], "digest"))?; let prefix = externs::project_str(&args[0], "prefix"); - let digest = store - .strip_prefix(input_digest, PathBuf::from(prefix)) - .await - .map_err(|e| format!("{:?}", e))?; + let digest = + store::Snapshot::strip_prefix(core.store(), input_digest, PathBuf::from(prefix)).await?; let res: Result<_, String> = Ok(Snapshot::store_directory(&core, &digest)); res } @@ -224,14 +220,11 @@ fn add_prefix_request_to_digest( args: Vec, ) -> BoxFuture<'static, NodeResult> { let core = context.core; - let store = core.store(); async move { let input_digest = lift_digest(&externs::project_ignoring_type(&args[0], "digest"))?; let prefix = externs::project_str(&args[0], "prefix"); - let digest = store - .add_prefix(input_digest, PathBuf::from(prefix)) - .await - .map_err(|e| format!("{:?}", e))?; + let digest = + store::Snapshot::add_prefix(core.store(), input_digest, PathBuf::from(prefix)).await?; let res: Result<_, String> = Ok(Snapshot::store_directory(&core, &digest)); res } @@ -256,16 +249,12 @@ fn merge_digests_request_to_digest( args: Vec, ) -> BoxFuture<'static, NodeResult> { let core = context.core; - let store = core.store(); let digests: Result, String> = externs::project_multi(&args[0], "digests") .into_iter() .map(|val| lift_digest(&val)) .collect(); async move { - let digest = store - .merge(digests?) - .await - .map_err(|e| format!("{:?}", e))?; + let digest = store::Snapshot::merge_directories(core.store(), digests?).await?; let res: Result<_, String> = Ok(Snapshot::store_directory(&core, &digest)); res } @@ -327,11 +316,10 @@ fn input_files_content_to_digest( } }) .collect(); - let store = context.core.store(); async move { let digests = future::try_join_all(digests).await?; - let digest = store.merge(digests).await.map_err(|e| format!("{:?}", e))?; + let digest = store::Snapshot::merge_directories(context.core.store(), digests).await?; let res: Result<_, String> = Ok(Snapshot::store_directory(&context.core, &digest)); res } @@ -349,15 +337,8 @@ fn snapshot_subset_to_snapshot( async move { let path_globs = Snapshot::lift_path_globs(&globs)?; let original_digest = lift_digest(&externs::project_ignoring_type(&args[0], "digest"))?; - let subset_params = SubsetParams { globs: path_globs }; - let digest = store - .subset(original_digest, subset_params) - .await - .map_err(|e| format!("{:?}", e))?; - let snapshot = store::Snapshot::from_digest(store, digest) - .await - .map_err(|e| format!("{:?}", e))?; + let snapshot = store::Snapshot::get_snapshot_subset(store, original_digest, path_globs).await?; Ok(Snapshot::store_snapshot(&context.core, &snapshot)?) } diff --git a/src/rust/engine/task_executor/src/lib.rs b/src/rust/engine/task_executor/src/lib.rs index 07943ed2b78..c37250b2009 100644 --- a/src/rust/engine/task_executor/src/lib.rs +++ b/src/rust/engine/task_executor/src/lib.rs @@ -33,7 +33,7 @@ use std::sync::Arc; use futures::future::FutureExt; use tokio::runtime::{Builder, Handle, Runtime}; -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct Executor { runtime: Option>, handle: Handle,