Skip to content

Commit

Permalink
Populate output_directory in ExecuteProcessResponse (#5896)
Browse files Browse the repository at this point in the history
  • Loading branch information
dotordogh authored and illicitonion committed Jun 4, 2018
1 parent c33ccd4 commit b558b01
Show file tree
Hide file tree
Showing 2 changed files with 216 additions and 6 deletions.
13 changes: 13 additions & 0 deletions src/rust/engine/fs/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,19 @@ impl Snapshot {
.to_boxed()
}

pub fn digest_from_path_stats<
S: StoreFileByDigest<Error> + Sized + Clone,
Error: fmt::Debug + 'static + Send,
>(
store: Store,
file_digester: S,
path_stats: Vec<PathStat>,
) -> BoxFuture<Digest, String> {
let mut sorted_path_stats = path_stats.clone();
sorted_path_stats.sort_by(|a, b| a.path().cmp(b.path()));
Snapshot::ingest_directory_from_sorted_path_stats(store, file_digester, sorted_path_stats)
}

fn ingest_directory_from_sorted_path_stats<
S: StoreFileByDigest<Error> + Sized + Clone,
Error: fmt::Debug + 'static + Send,
Expand Down
209 changes: 203 additions & 6 deletions src/rust/engine/process_execution/src/remote.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use std::collections::HashMap;
use std::error::Error;
use std::sync::Arc;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};

use bazel_protos;
use boxfuture::{BoxFuture, Boxable};
use bytes::Bytes;
use digest::{Digest as DigestTrait, FixedOutput};
use fs::{self, Store};
use fs::{self, File, PathStat, Store};
use futures::{future, Future};
use futures_timer::Delay;
use hashing::{Digest, Fingerprint};
Expand Down Expand Up @@ -261,14 +263,14 @@ impl CommandRunner {
self
.extract_stdout(&execute_response)
.join(self.extract_stderr(&execute_response))
.and_then(move |(stdout, stderr)| {
.join(self.extract_output_files(&execute_response))
.and_then(move |((stdout, stderr), output_directory)| {
match grpcio::RpcStatusCode::from(execute_response.get_status().get_code()) {
grpcio::RpcStatusCode::Ok => future::ok(ExecuteProcessResult {
stdout: stdout,
stderr: stderr,
exit_code: execute_response.get_result().get_exit_code(),
// TODO: Populate output directory: https://github.com/pantsbuild/pants/issues/5709
output_directory: fs::EMPTY_DIGEST,
output_directory: output_directory,
}).to_boxed(),
grpcio::RpcStatusCode::FailedPrecondition => {
if execute_response.get_status().get_details().len() != 1 {
Expand Down Expand Up @@ -433,6 +435,101 @@ impl CommandRunner {
};
return stderr;
}

fn extract_output_files(
&self,
execute_response: &bazel_protos::remote_execution::ExecuteResponse,
) -> BoxFuture<Digest, ExecutionError> {
let mut futures = vec![];
let path_map = Arc::new(Mutex::new(HashMap::new()));
let path_map_2 = path_map.clone();
let path_stats: Vec<PathStat> = execute_response
.get_result()
.get_output_files()
.into_iter()
.map(|output_file| {
let output_file_path_buf = PathBuf::from(output_file.get_path());
if output_file.has_digest() {
let mut underlying_path_map = path_map.lock().unwrap();
underlying_path_map.insert(
output_file_path_buf.clone(),
output_file.get_digest().into(),
);
} else {
let raw_content = output_file.content.clone();
let path_map_3 = path_map.clone();
let output_file_path_buf_2 = output_file_path_buf.clone();
let output_file_path_buf_3 = output_file_path_buf_2.clone();
futures.push(
self
.store
.store_file_bytes(raw_content, false)
.map_err(move |error| {
ExecutionError::Fatal(format!(
"Error storing raw content for output file {:?}: {:?}",
output_file_path_buf_3, error
))
})
.map(move |digest| {
let mut underlying_path_map = path_map_3.lock().unwrap();
underlying_path_map.insert(output_file_path_buf_2, digest);
}),
);
}
PathStat::file(
output_file_path_buf.clone(),
File {
path: output_file_path_buf.clone(),
is_executable: output_file.get_is_executable(),
},
)
})
.collect();

#[derive(Clone)]
struct StoreOneOffRemoteDigest {
map_of_paths_to_digests: HashMap<PathBuf, Digest>,
}

impl StoreOneOffRemoteDigest {
pub fn new(map: HashMap<PathBuf, Digest>) -> StoreOneOffRemoteDigest {
StoreOneOffRemoteDigest {
map_of_paths_to_digests: map,
}
}
}

impl fs::StoreFileByDigest<String> for StoreOneOffRemoteDigest {
fn store_by_digest(&self, file: File) -> BoxFuture<Digest, String> {
match self.map_of_paths_to_digests.get(&file.path) {
Some(digest) => future::ok(digest.clone()),
None => future::err(format!(
"Didn't know digest for path in remote execution response: {:?}",
file.path
)),
}.to_boxed()
}
}

let store = self.store.clone();
future::join_all(futures)
.and_then(|_| {
// The unwrap() below is safe because we have joined any futures that had references to the Arc
let path_wrap_mutex = Arc::try_unwrap(path_map_2).unwrap();
let underlying_path_map = path_wrap_mutex.into_inner().unwrap();
fs::Snapshot::digest_from_path_stats(
store,
StoreOneOffRemoteDigest::new(underlying_path_map),
path_stats,
).map_err(move |error| {
ExecutionError::Fatal(format!(
"Error when storing the output file directory info in the remote CAS: {:?}",
error
))
})
})
.to_boxed()
}
}

fn make_execute_request(
Expand Down Expand Up @@ -1062,9 +1159,16 @@ mod tests {
stdout: as_bytes("roland"),
stderr: Bytes::from("simba"),
exit_code: 17,
output_directory: fs::EMPTY_DIGEST,
output_directory: TestDirectory::nested().digest(),
};

let mut output_file = bazel_protos::remote_execution::OutputFile::new();
output_file.set_path("cats/roland".into());
output_file.set_digest((&TestData::roland().digest()).into());
output_file.set_is_executable(false);
let mut output_files = protobuf::RepeatedField::new();
output_files.push(output_file);

let mut operation = bazel_protos::operations::Operation::new();
operation.set_name("cat".to_owned());
operation.set_done(true);
Expand All @@ -1075,6 +1179,7 @@ mod tests {
result.set_exit_code(want_result.exit_code);
result.set_stdout_raw(Bytes::from(want_result.stdout.clone()));
result.set_stderr_raw(Bytes::from(want_result.stderr.clone()));
result.set_output_files(output_files);
result
});
response
Expand Down Expand Up @@ -1238,6 +1343,7 @@ mod tests {
assert!(start_time.elapsed().unwrap() >= Duration::from_millis(500));
}
}

#[test]
fn wait_between_request_3_retry() {
// wait at least 500 + 1000 + 1500 = 3000 milli for 3 retries.
Expand Down Expand Up @@ -1267,6 +1373,87 @@ mod tests {
}
}

#[test]
fn extract_output_files_from_response_one_file() {
let mut output_file = bazel_protos::remote_execution::OutputFile::new();
output_file.set_path("roland".into());
output_file.set_digest((&TestData::roland().digest()).into());
output_file.set_is_executable(false);
let mut output_files = protobuf::RepeatedField::new();
output_files.push(output_file);

let mut execute_response = bazel_protos::remote_execution::ExecuteResponse::new();
execute_response.set_result({
let mut result = bazel_protos::remote_execution::ActionResult::new();
result.set_exit_code(0);
result.set_output_files(output_files);
result
});

assert_eq!(
extract_output_files_from_response(&execute_response),
Ok(TestDirectory::containing_roland().digest())
)
}

#[test]
fn extract_output_files_from_response_two_files_not_nested() {
let mut output_file_1 = bazel_protos::remote_execution::OutputFile::new();
output_file_1.set_path("roland".into());
output_file_1.set_digest((&TestData::roland().digest()).into());
output_file_1.set_is_executable(false);

let mut output_file_2 = bazel_protos::remote_execution::OutputFile::new();
output_file_2.set_path("treats".into());
output_file_2.set_digest((&TestData::catnip().digest()).into());
output_file_2.set_is_executable(false);
let mut output_files = protobuf::RepeatedField::new();
output_files.push(output_file_1);
output_files.push(output_file_2);

let mut execute_response = bazel_protos::remote_execution::ExecuteResponse::new();
execute_response.set_result({
let mut result = bazel_protos::remote_execution::ActionResult::new();
result.set_exit_code(0);
result.set_output_files(output_files);
result
});

assert_eq!(
extract_output_files_from_response(&execute_response),
Ok(TestDirectory::containing_roland_and_treats().digest())
)
}

#[test]
fn extract_output_files_from_response_two_files_nested() {
let mut output_file_1 = bazel_protos::remote_execution::OutputFile::new();
output_file_1.set_path("cats/roland".into());
output_file_1.set_digest((&TestData::roland().digest()).into());
output_file_1.set_is_executable(false);

let mut output_file_2 = bazel_protos::remote_execution::OutputFile::new();
output_file_2.set_path("treats".into());
output_file_2.set_digest((&TestData::catnip().digest()).into());
output_file_2.set_is_executable(false);
let mut output_files = protobuf::RepeatedField::new();
output_files.push(output_file_1);
output_files.push(output_file_2);

let mut execute_response = bazel_protos::remote_execution::ExecuteResponse::new();
execute_response.set_result({
let mut result = bazel_protos::remote_execution::ActionResult::new();
result.set_exit_code(0);
result.set_output_files(output_files);
result
});

assert_eq!(
extract_output_files_from_response(&execute_response),
Ok(TestDirectory::recursive().digest())
)
}

fn echo_foo_request() -> ExecuteProcessRequest {
ExecuteProcessRequest {
argv: owned_string_vec(&["/bin/echo", "-n", "foo"]),
Expand Down Expand Up @@ -1403,6 +1590,16 @@ mod tests {
command_runner.extract_execute_response(operation).wait()
}

fn extract_output_files_from_response(
execute_response: &bazel_protos::remote_execution::ExecuteResponse,
) -> Result<Digest, ExecutionError> {
let cas = mock::StubCAS::with_roland_and_directory(1024);
let command_runner = create_command_runner("".to_owned(), &cas);
command_runner
.extract_output_files(&execute_response)
.wait()
}

fn make_any_proto(message: &protobuf::Message) -> protobuf::well_known_types::Any {
let mut any = protobuf::well_known_types::Any::new();
any.set_type_url(format!(
Expand Down

0 comments on commit b558b01

Please sign in to comment.