diff --git a/src/rust/engine/fs/brfs/src/main.rs b/src/rust/engine/fs/brfs/src/main.rs index f6673fe42962..30c2b8069f87 100644 --- a/src/rust/engine/fs/brfs/src/main.rs +++ b/src/rust/engine/fs/brfs/src/main.rs @@ -308,7 +308,11 @@ impl BuildResultFS { }); for (digest, name, filetype, is_executable) in directories.chain(files) { - let child_digest = digest.into(); + let child_digest_result: Result = digest.into(); + let child_digest = child_digest_result.map_err(|err| { + error!("Error parsing digest: {:?}", err); + libc::ENOENT + })?; let maybe_child_inode = match filetype { fuse::FileType::Directory => self.inode_for_directory(child_digest), fuse::FileType::RegularFile => self.inode_for_file(child_digest, is_executable), @@ -406,11 +410,19 @@ impl fuse::Filesystem for BuildResultFS { }) .and_then(|node| match node { Node::Directory(directory_node) => { - let digest = directory_node.get_digest().into(); + let digest_result: Result = directory_node.get_digest().into(); + let digest = digest_result.map_err(|err| { + error!("Error parsing digest: {:?}", err); + libc::ENOENT + })?; self.dir_attr_for(digest) } Node::File(file_node) => { - let digest = file_node.get_digest().into(); + let digest_result: Result = file_node.get_digest().into(); + let digest = digest_result.map_err(|err| { + error!("Error parsing digest: {:?}", err); + libc::ENOENT + })?; self .inode_for_file(digest, file_node.get_is_executable()) .map_err(|err| { diff --git a/src/rust/engine/fs/src/lib.rs b/src/rust/engine/fs/src/lib.rs index 6883276251df..e75fa70c377d 100644 --- a/src/rust/engine/fs/src/lib.rs +++ b/src/rust/engine/fs/src/lib.rs @@ -10,6 +10,7 @@ mod pool; pub use pool::ResettablePool; extern crate bazel_protos; +#[macro_use] extern crate boxfuture; extern crate byteorder; extern crate bytes; diff --git a/src/rust/engine/fs/src/snapshot.rs b/src/rust/engine/fs/src/snapshot.rs index f54c93523cd9..b745a8f3faf8 100644 --- a/src/rust/engine/fs/src/snapshot.rs +++ b/src/rust/engine/fs/src/snapshot.rs @@ -237,15 +237,18 @@ impl Snapshot { .group_by(|d| d.name.clone()) .into_iter() .map(move |(child_name, group)| { - Self::merge_helper( - store2.clone(), - group.map(|d| d.get_digest().into()).collect(), - ).map(move |merged_digest| { - let mut child_dir = bazel_protos::remote_execution::DirectoryNode::new(); - child_dir.set_name(child_name); - child_dir.set_digest((&merged_digest).into()); - child_dir - }) + let store2 = store2.clone(); + let digests_result = group + .map(|d| d.get_digest().into()) + .collect::, String>>(); + future::done(digests_result) + .and_then(move |digests| Self::merge_helper(store2.clone(), digests)) + .map(move |merged_digest| { + let mut child_dir = bazel_protos::remote_execution::DirectoryNode::new(); + child_dir.set_name(child_name); + child_dir.set_digest((&merged_digest).into()); + child_dir + }) }) .collect::>(), ).and_then(move |child_directories| { @@ -486,8 +489,10 @@ mod tests { assert_eq!(merged_root_directory.directories.len(), 1); let merged_child_dirnode = merged_root_directory.directories[0].clone(); + let merged_child_dirnode_digest: Result = + merged_child_dirnode.get_digest().into(); let merged_child_directory = store - .load_directory(merged_child_dirnode.get_digest().into()) + .load_directory(merged_child_dirnode_digest.unwrap()) .wait() .unwrap() .unwrap(); diff --git a/src/rust/engine/fs/src/store.rs b/src/rust/engine/fs/src/store.rs index c779b473114e..10a478617c43 100644 --- a/src/rust/engine/fs/src/store.rs +++ b/src/rust/engine/fs/src/store.rs @@ -383,7 +383,7 @@ impl Store { let mut accumulator = accumulator.lock().unwrap(); accumulator.insert(digest, EntryType::Directory); for file in directory.get_files().into_iter() { - accumulator.insert(file.get_digest().into(), EntryType::File); + accumulator.insert(try_future!(file.get_digest().into()), EntryType::File); } } future::join_all( @@ -391,9 +391,10 @@ impl Store { .get_directories() .into_iter() .map(move |subdir| { - store - .clone() - .expand_directory_helper(subdir.get_digest().into(), accumulator.clone()) + store.clone().expand_directory_helper( + try_future!(subdir.get_digest().into()), + accumulator.clone(), + ) }) .collect::>(), ).map(|_| ()) @@ -428,7 +429,7 @@ impl Store { .map(|file_node| { let store = store.clone(); let path = destination.join(file_node.get_name()); - let digest = file_node.get_digest().into(); + let digest = try_future!(file_node.get_digest().into()); store.materialize_file(path, digest, file_node.is_executable) }) .collect::>(); @@ -438,7 +439,7 @@ impl Store { .map(|directory_node| { let store = store.clone(); let path = destination.join(directory_node.get_name()); - let digest = directory_node.get_digest().into(); + let digest = try_future!(directory_node.get_digest().into()); store.materialize_directory(path, digest) }) .collect::>(); @@ -512,7 +513,7 @@ impl Store { let path = path_so_far_copy.join(file_node.get_name()); let contents_wrapped_copy = contents_wrapped_copy.clone(); store_copy - .load_file_bytes_with(file_node.get_digest().into(), |b| b) + .load_file_bytes_with(try_future!(file_node.get_digest().into()), |b| b) .and_then(move |maybe_bytes| { maybe_bytes .ok_or_else(|| format!("Couldn't find file contents for {:?}", path)) @@ -521,6 +522,7 @@ impl Store { contents.insert(path, bytes); }) }) + .to_boxed() }) .collect::>(), ); @@ -530,7 +532,7 @@ impl Store { .get_directories() .into_iter() .map(move |dir_node| { - let digest = dir_node.get_digest().into(); + let digest = try_future!(dir_node.get_digest().into()); let path = path_so_far.join(dir_node.get_name()); let store = store.clone(); let contents_wrapped = contents_wrapped.clone(); @@ -541,6 +543,7 @@ impl Store { .ok_or_else(|| format!("Could not find sub-directory with digest {:?}", digest)) }) .and_then(move |dir| store.contents_for_directory_helper(dir, path, contents_wrapped)) + .to_boxed() }) .collect::>(), ); @@ -1718,19 +1721,19 @@ mod remote { .cas_client .get() .find_missing_blobs(&request) - .map(|response| { - response - .get_missing_blob_digests() - .iter() - .map(|digest| digest.into()) - .collect() - }) .map_err(|err| { format!( "Error from server in response to find_missing_blobs_request: {:?}", err ) }) + .and_then(|response| { + response + .get_missing_blob_digests() + .iter() + .map(|digest| digest.into()) + .collect() + }) } } diff --git a/src/rust/engine/process_execution/bazel_protos/src/conversions.rs b/src/rust/engine/process_execution/bazel_protos/src/conversions.rs index 6f1fc92542bf..f371349593b1 100644 --- a/src/rust/engine/process_execution/bazel_protos/src/conversions.rs +++ b/src/rust/engine/process_execution/bazel_protos/src/conversions.rs @@ -9,12 +9,11 @@ impl<'a> From<&'a hashing::Digest> for super::remote_execution::Digest { } } -impl<'a> From<&'a super::remote_execution::Digest> for hashing::Digest { +impl<'a> From<&'a super::remote_execution::Digest> for Result { fn from(d: &super::remote_execution::Digest) -> Self { - hashing::Digest( - hashing::Fingerprint::from_hex_string(d.get_hash()).expect("Bad fingerprint in Digest"), - d.get_size_bytes() as usize, - ) + hashing::Fingerprint::from_hex_string(d.get_hash()) + .map_err(|err| format!("Bad fingerprint in Digest {:?}: {:?}", d.get_hash(), err)) + .map(|fingerprint| hashing::Digest(fingerprint, d.get_size_bytes() as usize)) } } @@ -43,13 +42,27 @@ mod tests { bazel_digest .set_hash("0123456789abcdeffedcba98765432100000000000000000ffffffffffffffff".to_owned()); bazel_digest.set_size_bytes(10); - let converted: hashing::Digest = (&bazel_digest).into(); + let converted: Result = (&bazel_digest).into(); let want = hashing::Digest( hashing::Fingerprint::from_hex_string( "0123456789abcdeffedcba98765432100000000000000000ffffffffffffffff", ).unwrap(), 10, ); - assert_eq!(converted, want); + assert_eq!(converted, Ok(want)); + } + + #[test] + fn from_bad_bazel_digest() { + let mut bazel_digest = super::super::remote_execution::Digest::new(); + bazel_digest.set_hash("0".to_owned()); + bazel_digest.set_size_bytes(10); + let converted: Result = (&bazel_digest).into(); + let err = converted.expect_err("Want Err converting bad digest"); + assert!( + err.starts_with("Bad fingerprint in Digest \"0\":"), + "Bad error message: {}", + err + ); } } diff --git a/src/rust/engine/process_execution/src/remote.rs b/src/rust/engine/process_execution/src/remote.rs index e788e94157db..5dbf8ce494a1 100644 --- a/src/rust/engine/process_execution/src/remote.rs +++ b/src/rust/engine/process_execution/src/remote.rs @@ -109,7 +109,8 @@ impl CommandRunner { match execute_request_result { Ok((command, execute_request)) => { let command_runner = self.clone(); - self.upload_command(&command, execute_request.get_action().get_command_digest().into()) + let command_digest = try_future!(execute_request.get_action().get_command_digest().into()); + self.upload_command(&command, command_digest) .and_then(move |_| { debug!("Executing remotely request: {:?} (command: {:?})", execute_request, command); @@ -334,7 +335,12 @@ impl CommandRunner { execute_response: &bazel_protos::remote_execution::ExecuteResponse, ) -> BoxFuture { let stdout = if execute_response.get_result().has_stdout_digest() { - let stdout_digest = execute_response.get_result().get_stdout_digest().into(); + let stdout_digest_result: Result = + execute_response.get_result().get_stdout_digest().into(); + let stdout_digest = try_future!( + stdout_digest_result + .map_err(|err| ExecutionError::Fatal(format!("Error extracting stdout: {}", err))) + ); self .store .load_file_bytes_with(stdout_digest, |v| v) @@ -365,7 +371,12 @@ impl CommandRunner { execute_response: &bazel_protos::remote_execution::ExecuteResponse, ) -> BoxFuture { let stderr = if execute_response.get_result().has_stderr_digest() { - let stderr_digest = execute_response.get_result().get_stderr_digest().into(); + let stderr_digest_result: Result = + execute_response.get_result().get_stderr_digest().into(); + let stderr_digest = try_future!( + stderr_digest_result + .map_err(|err| ExecutionError::Fatal(format!("Error extracting stderr: {}", err))) + ); self .store .load_file_bytes_with(stderr_digest, |v| v) diff --git a/src/rust/engine/testutil/mock/src/cas.rs b/src/rust/engine/testutil/mock/src/cas.rs index 75ea7146bcbc..af346e400cc4 100644 --- a/src/rust/engine/testutil/mock/src/cas.rs +++ b/src/rust/engine/testutil/mock/src/cas.rs @@ -376,7 +376,8 @@ impl bazel_protos::remote_execution_grpc::ContentAddressableStorage for StubCASR let blobs = self.blobs.lock().unwrap(); let mut response = bazel_protos::remote_execution::FindMissingBlobsResponse::new(); for digest in req.get_blob_digests() { - let hashing_digest: Digest = digest.into(); + let hashing_digest_result: Result = digest.into(); + let hashing_digest = hashing_digest_result.expect("Bad digest"); if !blobs.contains_key(&hashing_digest.0) { response.mut_missing_blob_digests().push(digest.clone()) }