diff --git a/src/rust/engine/process_execution/Cargo.toml b/src/rust/engine/process_execution/Cargo.toml index e846e76d764..75b2c6bd0f7 100644 --- a/src/rust/engine/process_execution/Cargo.toml +++ b/src/rust/engine/process_execution/Cargo.toml @@ -28,10 +28,10 @@ tokio-codec = "0.1" tokio-process = "0.2.1" tokio-timer = "0.2" workunit_store = { path = "../workunit_store" } +tokio = "0.1" [dev-dependencies] maplit = "1.0.1" mock = { path = "../testutil/mock" } tempfile = "3" testutil = { path = "../testutil" } -tokio = "0.1" diff --git a/src/rust/engine/process_execution/src/lib.rs b/src/rust/engine/process_execution/src/lib.rs index fe5a34aa240..56f7d0c9e30 100644 --- a/src/rust/engine/process_execution/src/lib.rs +++ b/src/rust/engine/process_execution/src/lib.rs @@ -44,6 +44,7 @@ use async_semaphore::AsyncSemaphore; pub mod cache; pub mod local; pub mod remote; +pub mod speculate; /// /// A process to be executed. diff --git a/src/rust/engine/process_execution/src/remote.rs b/src/rust/engine/process_execution/src/remote.rs index 75b6f27e59e..f7fe36bf47e 100644 --- a/src/rust/engine/process_execution/src/remote.rs +++ b/src/rust/engine/process_execution/src/remote.rs @@ -108,6 +108,9 @@ impl CommandRunner { } } +// TODO(pantsbuild/pants#8039) Need to impl Drop on command runner so that when the BoxFuture goes out of scope +// we cancel a potential RPC. So we need to distinguish local vs. remote +// requests and save enough state to BoxFuture or another abstraction around our execution results impl super::CommandRunner for CommandRunner { /// /// Runs a command via a gRPC service implementing the Bazel Remote Execution API @@ -970,7 +973,7 @@ fn timespec_from(timestamp: &protobuf::well_known_types::Timestamp) -> time::Tim } #[cfg(test)] -mod tests { +pub mod tests { use bazel_protos; use bazel_protos::operations::Operation; use bazel_protos::remote_execution::ExecutedActionMetadata; @@ -2636,7 +2639,7 @@ mod tests { assert_eq!(got_workunits, want_workunits); } - fn echo_foo_request() -> ExecuteProcessRequest { + pub fn echo_foo_request() -> ExecuteProcessRequest { ExecuteProcessRequest { argv: owned_string_vec(&["/bin/echo", "-n", "foo"]), env: BTreeMap::new(), diff --git a/src/rust/engine/process_execution/src/speculate.rs b/src/rust/engine/process_execution/src/speculate.rs new file mode 100644 index 00000000000..02e8ddf0e66 --- /dev/null +++ b/src/rust/engine/process_execution/src/speculate.rs @@ -0,0 +1,218 @@ +use super::{CommandRunner, ExecuteProcessRequest, FallibleExecuteProcessResult}; +use boxfuture::{BoxFuture, Boxable}; +use futures::future::{err, ok, Future}; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio_timer::Delay; +use workunit_store::WorkUnitStore; + +#[derive(Clone)] +pub struct SpeculatingCommandRunner { + primary: Arc, + secondary: Arc, + speculation_timeout: Duration, +} + +impl SpeculatingCommandRunner { + pub fn new( + primary: Box, + secondary: Box, + speculation_timeout: Duration, + ) -> SpeculatingCommandRunner { + SpeculatingCommandRunner { + primary: primary.into(), + secondary: secondary.into(), + speculation_timeout: speculation_timeout, + } + } +} + +impl CommandRunner for SpeculatingCommandRunner { + fn run( + &self, + req: ExecuteProcessRequest, + workunit_store: WorkUnitStore, + ) -> BoxFuture { + let command_runner = self.clone(); + let workunit_store_clone = workunit_store.clone(); + let req_2 = req.clone(); + let delay = Delay::new(Instant::now() + self.speculation_timeout); + self + .primary + .run(req, workunit_store) + .select(delay.then(move |_| command_runner.secondary.run(req_2, workunit_store_clone))) + .then(|raced_result| match raced_result { + Ok((successful_res, _outstanding_req)) => { + ok::(successful_res).to_boxed() + } + Err((failed_res, _outstanding_req)) => { + err::(failed_res).to_boxed() + } + }) + .to_boxed() + } +} + +#[cfg(test)] +mod tests { + use crate::remote::tests::echo_foo_request; + use boxfuture::{BoxFuture, Boxable}; + use bytes::Bytes; + use futures::future::Future; + use hashing::EMPTY_DIGEST; + use std::sync::{Arc, Mutex}; + use std::time::{Duration, Instant}; + use tokio; + use tokio_timer::Delay; + use workunit_store::WorkUnitStore; + + use super::{ + CommandRunner, ExecuteProcessRequest, FallibleExecuteProcessResult, SpeculatingCommandRunner, + }; + + #[test] + fn test_no_speculation() { + let (result, call_counter) = run_speculation_test(0, 0, 100, false, false); + assert_eq![1, *call_counter.lock().unwrap()]; + assert_eq![result.unwrap().stdout, Bytes::from("m1")]; + } + + #[test] + fn test_speculate() { + let (result, call_counter) = run_speculation_test(100, 0, 10, false, false); + assert_eq![1, *call_counter.lock().unwrap()]; + assert_eq![result.unwrap().stdout, Bytes::from("m2")] + } + + #[test] + fn first_req_slow_success() { + let (result, call_counter) = run_speculation_test(15, 10, 10, false, false); + assert_eq![1, *call_counter.lock().unwrap()]; + assert_eq![result.unwrap().stdout, Bytes::from("m1")] + } + + #[test] + fn first_req_slow_fail() { + let (result, call_counter) = run_speculation_test(100, 0, 10, true, false); + assert_eq![1, *call_counter.lock().unwrap()]; + assert_eq![result.unwrap().stdout, Bytes::from("m2")] + } + + #[test] + fn first_req_fast_fail() { + let (result, call_counter) = run_speculation_test(15, 10, 10, true, false); + assert_eq![1, *call_counter.lock().unwrap()]; + assert_eq![result.unwrap_err(), Bytes::from("m1")] + } + + #[test] + fn second_req_fast_fail() { + let (result, call_counter) = run_speculation_test(100, 0, 10, true, true); + assert_eq![1, *call_counter.lock().unwrap()]; + assert_eq![result.unwrap_err(), Bytes::from("m2")] + } + + fn run_speculation_test( + r1_latency_ms: u64, + r2_latency_ms: u64, + speculation_delay_ms: u64, + r1_is_err: bool, + r2_is_err: bool, + ) -> ( + Result, + Arc>, + ) { + let mut runtime = tokio::runtime::Runtime::new().unwrap(); + let execute_request = echo_foo_request(); + let msg1: String = "m1".into(); + let msg2: String = "m2".into(); + let workunit_store = WorkUnitStore::new(); + let call_counter = Arc::new(Mutex::new(0)); + let runner = SpeculatingCommandRunner::new( + Box::new(make_delayed_command_runner( + msg1.clone(), + r1_latency_ms, + r1_is_err, + call_counter.clone(), + )), + Box::new(make_delayed_command_runner( + msg2.clone(), + r2_latency_ms, + r2_is_err, + call_counter.clone(), + )), + Duration::from_millis(speculation_delay_ms), + ); + ( + runtime.block_on(runner.run(execute_request, workunit_store)), + call_counter, + ) + } + + fn make_delayed_command_runner( + msg: String, + delay: u64, + is_err: bool, + call_counter: Arc>, + ) -> DelayedCommandRunner { + let mut result; + if is_err { + result = Err(msg.into()); + } else { + result = Ok(FallibleExecuteProcessResult { + stdout: msg.into(), + stderr: "".into(), + exit_code: 0, + output_directory: EMPTY_DIGEST, + execution_attempts: vec![], + }) + } + DelayedCommandRunner::new(Duration::from_millis(delay), result, call_counter) + } + + #[derive(Clone)] + struct DelayedCommandRunner { + delay: Duration, + result: Result, + call_counter: Arc>, + } + + impl DelayedCommandRunner { + pub fn new( + delay: Duration, + result: Result, + call_counter: Arc>, + ) -> DelayedCommandRunner { + DelayedCommandRunner { + delay: delay, + result: result, + call_counter: call_counter, + } + } + fn incr_call_counter(&self) { + let mut calls = self.call_counter.lock().unwrap(); + *calls += 1; + } + } + + impl CommandRunner for DelayedCommandRunner { + fn run( + &self, + _req: ExecuteProcessRequest, + _workunit_store: WorkUnitStore, + ) -> BoxFuture { + let delay = Delay::new(Instant::now() + self.delay); + let exec_result = self.result.clone(); + let command_runner = self.clone(); + delay + .then(move |delay_res| match delay_res { + Ok(_) => { + command_runner.incr_call_counter(); + exec_result + } + Err(_) => Err(String::from("Timer failed during testing")), + }) + .to_boxed() + } + } +}