Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minimum viable speculating command runner #7992

Merged
merged 10 commits into from Jul 12, 2019
2 changes: 1 addition & 1 deletion src/rust/engine/process_execution/Cargo.toml
Expand Up @@ -28,10 +28,10 @@ tokio-codec = "0.1"
tokio-process = "0.2.1"
tokio-timer = "0.2"
workunit_store = { path = "../workunit_store" }
tokio = "0.1"

[dev-dependencies]
maplit = "1.0.1"
mock = { path = "../testutil/mock" }
tempfile = "3"
testutil = { path = "../testutil" }
tokio = "0.1"
1 change: 1 addition & 0 deletions src/rust/engine/process_execution/src/lib.rs
Expand Up @@ -44,6 +44,7 @@ use async_semaphore::AsyncSemaphore;
pub mod cache;
pub mod local;
pub mod remote;
pub mod speculate;

///
/// A process to be executed.
Expand Down
7 changes: 5 additions & 2 deletions src/rust/engine/process_execution/src/remote.rs
Expand Up @@ -108,6 +108,9 @@ impl CommandRunner {
}
}

// TODO(pantsbuild/pants#8039) Need to impl Drop on command runner so that when the BoxFuture goes out of scope
// we cancel a potential RPC. So we need to distinguish local vs. remote
// requests and save enough state to BoxFuture or another abstraction around our execution results
impl super::CommandRunner for CommandRunner {
///
/// Runs a command via a gRPC service implementing the Bazel Remote Execution API
Expand Down Expand Up @@ -970,7 +973,7 @@ fn timespec_from(timestamp: &protobuf::well_known_types::Timestamp) -> time::Tim
}

#[cfg(test)]
mod tests {
pub mod tests {
use bazel_protos;
use bazel_protos::operations::Operation;
use bazel_protos::remote_execution::ExecutedActionMetadata;
Expand Down Expand Up @@ -2636,7 +2639,7 @@ mod tests {
assert_eq!(got_workunits, want_workunits);
}

fn echo_foo_request() -> ExecuteProcessRequest {
pub fn echo_foo_request() -> ExecuteProcessRequest {
ExecuteProcessRequest {
argv: owned_string_vec(&["/bin/echo", "-n", "foo"]),
env: BTreeMap::new(),
Expand Down
218 changes: 218 additions & 0 deletions src/rust/engine/process_execution/src/speculate.rs
@@ -0,0 +1,218 @@
use super::{CommandRunner, ExecuteProcessRequest, FallibleExecuteProcessResult};
use boxfuture::{BoxFuture, Boxable};
use futures::future::{err, ok, Future};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio_timer::Delay;
use workunit_store::WorkUnitStore;

#[derive(Clone)]
pub struct SpeculatingCommandRunner {
primary: Arc<dyn CommandRunner>,
secondary: Arc<dyn CommandRunner>,
speculation_timeout: Duration,
}

impl SpeculatingCommandRunner {
pub fn new(
primary: Box<dyn CommandRunner>,
secondary: Box<dyn CommandRunner>,
speculation_timeout: Duration,
) -> SpeculatingCommandRunner {
SpeculatingCommandRunner {
primary: primary.into(),
secondary: secondary.into(),
speculation_timeout: speculation_timeout,
}
}
}

impl CommandRunner for SpeculatingCommandRunner {
fn run(
&self,
req: ExecuteProcessRequest,
workunit_store: WorkUnitStore,
) -> BoxFuture<FallibleExecuteProcessResult, String> {
let command_runner = self.clone();
let workunit_store_clone = workunit_store.clone();
let req_2 = req.clone();
let delay = Delay::new(Instant::now() + self.speculation_timeout);
self
.primary
.run(req, workunit_store)
.select(delay.then(move |_| command_runner.secondary.run(req_2, workunit_store_clone)))
.then(|raced_result| match raced_result {
Ok((successful_res, _outstanding_req)) => {
ok::<FallibleExecuteProcessResult, String>(successful_res).to_boxed()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because Result implements IntoFuture, you should actually be able to just:
Ok(successful_res)
and
Err(failed_res)
in this closure :) Not particularly important, but good to know about :)

}
Err((failed_res, _outstanding_req)) => {
err::<FallibleExecuteProcessResult, String>(failed_res).to_boxed()
}
})
.to_boxed()
}
}

#[cfg(test)]
mod tests {
use crate::remote::tests::echo_foo_request;
use boxfuture::{BoxFuture, Boxable};
use bytes::Bytes;
use futures::future::Future;
use hashing::EMPTY_DIGEST;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tokio;
use tokio_timer::Delay;
use workunit_store::WorkUnitStore;

use super::{
CommandRunner, ExecuteProcessRequest, FallibleExecuteProcessResult, SpeculatingCommandRunner,
};

#[test]
fn test_no_speculation() {
let (result, call_counter) = run_speculation_test(0, 0, 100, false, false);
assert_eq![1, *call_counter.lock().unwrap()];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the no speculation case, it may actually be nice to have two counters; a "started" counter and a "finished" counter, to show that we didn't even start the second attempt. But also understand if you don't think it's worthwhile :)

assert_eq![result.unwrap().stdout, Bytes::from("m1")];
}

#[test]
fn test_speculate() {
let (result, call_counter) = run_speculation_test(100, 0, 10, false, false);
assert_eq![1, *call_counter.lock().unwrap()];
assert_eq![result.unwrap().stdout, Bytes::from("m2")]
}

#[test]
fn first_req_slow_success() {
let (result, call_counter) = run_speculation_test(15, 10, 10, false, false);
assert_eq![1, *call_counter.lock().unwrap()];
assert_eq![result.unwrap().stdout, Bytes::from("m1")]
}

#[test]
fn first_req_slow_fail() {
let (result, call_counter) = run_speculation_test(100, 0, 10, true, false);
assert_eq![1, *call_counter.lock().unwrap()];
assert_eq![result.unwrap().stdout, Bytes::from("m2")]
}

#[test]
fn first_req_fast_fail() {
let (result, call_counter) = run_speculation_test(15, 10, 10, true, false);
assert_eq![1, *call_counter.lock().unwrap()];
assert_eq![result.unwrap_err(), Bytes::from("m1")]
}

#[test]
fn second_req_fast_fail() {
let (result, call_counter) = run_speculation_test(100, 0, 10, true, true);
assert_eq![1, *call_counter.lock().unwrap()];
assert_eq![result.unwrap_err(), Bytes::from("m2")]
}

fn run_speculation_test(
r1_latency_ms: u64,
r2_latency_ms: u64,
speculation_delay_ms: u64,
r1_is_err: bool,
r2_is_err: bool,
) -> (
Result<FallibleExecuteProcessResult, String>,
Arc<Mutex<u32>>,
) {
let mut runtime = tokio::runtime::Runtime::new().unwrap();
let execute_request = echo_foo_request();
let msg1: String = "m1".into();
let msg2: String = "m2".into();
let workunit_store = WorkUnitStore::new();
let call_counter = Arc::new(Mutex::new(0));
let runner = SpeculatingCommandRunner::new(
Box::new(make_delayed_command_runner(
msg1.clone(),
r1_latency_ms,
r1_is_err,
call_counter.clone(),
)),
Box::new(make_delayed_command_runner(
msg2.clone(),
r2_latency_ms,
r2_is_err,
call_counter.clone(),
)),
Duration::from_millis(speculation_delay_ms),
);
(
runtime.block_on(runner.run(execute_request, workunit_store)),
call_counter,
)
}

fn make_delayed_command_runner(
msg: String,
delay: u64,
is_err: bool,
call_counter: Arc<Mutex<u32>>,
) -> DelayedCommandRunner {
let mut result;
if is_err {
result = Err(msg.into());
} else {
result = Ok(FallibleExecuteProcessResult {
stdout: msg.into(),
stderr: "".into(),
exit_code: 0,
output_directory: EMPTY_DIGEST,
execution_attempts: vec![],
})
}
DelayedCommandRunner::new(Duration::from_millis(delay), result, call_counter)
}

#[derive(Clone)]
struct DelayedCommandRunner {
delay: Duration,
result: Result<FallibleExecuteProcessResult, String>,
call_counter: Arc<Mutex<u32>>,
}

impl DelayedCommandRunner {
pub fn new(
delay: Duration,
result: Result<FallibleExecuteProcessResult, String>,
call_counter: Arc<Mutex<u32>>,
) -> DelayedCommandRunner {
DelayedCommandRunner {
delay: delay,
result: result,
call_counter: call_counter,
}
}
fn incr_call_counter(&self) {
let mut calls = self.call_counter.lock().unwrap();
*calls += 1;
}
}

impl CommandRunner for DelayedCommandRunner {
fn run(
&self,
_req: ExecuteProcessRequest,
_workunit_store: WorkUnitStore,
) -> BoxFuture<FallibleExecuteProcessResult, String> {
let delay = Delay::new(Instant::now() + self.delay);
let exec_result = self.result.clone();
let command_runner = self.clone();
delay
.then(move |delay_res| match delay_res {
Ok(_) => {
command_runner.incr_call_counter();
exec_result
}
Err(_) => Err(String::from("Timer failed during testing")),
})
.to_boxed()
}
}
}