Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minimum viable speculating command runner #7992

Merged
merged 10 commits into from Jul 12, 2019
2 changes: 1 addition & 1 deletion src/rust/engine/process_execution/Cargo.toml
Expand Up @@ -27,10 +27,10 @@ tokio-codec = "0.1"
tokio-process = "0.2.1"
tokio-timer = "0.2"
workunit_store = { path = "../workunit_store" }
tokio = "0.1"

[dev-dependencies]
maplit = "1.0.1"
mock = { path = "../testutil/mock" }
tempfile = "3"
testutil = { path = "../testutil" }
tokio = "0.1"
4 changes: 4 additions & 0 deletions src/rust/engine/process_execution/src/lib.rs
Expand Up @@ -43,6 +43,7 @@ use async_semaphore::AsyncSemaphore;

pub mod local;
pub mod remote;
pub mod speculate;

///
/// A process to be executed.
Expand Down Expand Up @@ -133,6 +134,9 @@ impl AddAssign<UploadSummary> for ExecutionStats {
}
}

// TODO(pantsbuild/pants#8039) Need to impl Drop on command runner so that when the BoxFuture goes out of scope
// we cancel a potential RPC. So we need to distinguish local vs. remote
hrfuller marked this conversation as resolved.
Show resolved Hide resolved
// requests and save enough state to BoxFuture or another abstraction around our execution results
pub trait CommandRunner: Send + Sync {
fn run(
&self,
Expand Down
184 changes: 184 additions & 0 deletions src/rust/engine/process_execution/src/speculate.rs
@@ -0,0 +1,184 @@
use std::time::{Duration, Instant};
use std::sync::Arc;
use tokio_timer::Delay;
use futures::future::{Future, ok, err};
use boxfuture::{BoxFuture, Boxable};
use workunit_store::WorkUnitStore;
use super::{
CommandRunner,
ExecuteProcessRequest,
FallibleExecuteProcessResult
};


#[derive(Clone)]
pub struct SpeculatingCommandRunner {
primary: Arc<dyn CommandRunner>,
secondary: Arc<dyn CommandRunner>,
speculation_timeout: Duration,
}

impl SpeculatingCommandRunner {
pub fn new(primary: Box<dyn CommandRunner>, secondary: Box<dyn CommandRunner>, speculation_timeout: Duration) -> SpeculatingCommandRunner {
SpeculatingCommandRunner {
primary: primary.into(),
secondary: secondary.into(),
speculation_timeout: speculation_timeout
}
}
}

impl CommandRunner for SpeculatingCommandRunner {
fn run(
&self, req: ExecuteProcessRequest, workunit_store: WorkUnitStore
) -> BoxFuture<FallibleExecuteProcessResult, String> {
let command_runner = self.clone();
let workunit_store_clone = workunit_store.clone();
let req_2 = req.clone();
let delay = Delay::new(Instant::now() + self.speculation_timeout);
self.primary.run(req, workunit_store).select(
delay.then(move |_| command_runner.secondary.run(req_2, workunit_store_clone))
).then(
|raced_result| {
match raced_result {
Ok((successful_res, _outstanding_req)) => ok::<FallibleExecuteProcessResult, String>(successful_res).to_boxed(),
Err((failed_res, _outstanding_req)) => err::<FallibleExecuteProcessResult, String>(failed_res).to_boxed(),
}
}
).to_boxed()
}
}

#[cfg(test)]
mod tests {
use bytes::Bytes;
use futures::future::Future;
use hashing::{EMPTY_DIGEST};
use boxfuture::{BoxFuture, Boxable};
use std::time::{Duration, Instant};
use tokio_timer::Delay;
use tokio;
use testutil::owned_string_vec;
use std::collections::{BTreeMap, BTreeSet};
use workunit_store::WorkUnitStore;

use super::{
SpeculatingCommandRunner,
CommandRunner,
ExecuteProcessRequest,
FallibleExecuteProcessResult,
};


#[test]
fn test_no_speculation() {
let result = run_speculation_test(0, 0, 100, false, false,);
assert_eq![result.unwrap().stdout, Bytes::from("m1")]
}

#[test]
fn test_speculate() {
let result = run_speculation_test(100, 0, 10, false, false);
assert_eq![result.unwrap().stdout, Bytes::from("m2")]
}

#[test]
fn first_req_slow_success() {
let result = run_speculation_test(15, 10, 10, false, false);
assert_eq![result.unwrap().stdout, Bytes::from("m1")]
}

#[test]
fn first_req_slow_fail() {
let result = run_speculation_test(100, 0, 10, true, false);
assert_eq![result.unwrap().stdout, Bytes::from("m2")]
}

#[test]
fn first_req_fast_fail() {
let result = run_speculation_test(15, 10, 10, true, false);
assert_eq![result.unwrap_err(), Bytes::from("m1")]
}

#[test]
fn second_req_fast_fail() {
let result = run_speculation_test(100, 0, 10, true, true);
assert_eq![result.unwrap_err(), Bytes::from("m2")]
}

fn run_speculation_test(
r1_latency_ms: u64,
r2_latency_ms: u64,
speculation_delay_ms: u64,
r1_is_err: bool,
r2_is_err: bool,
) -> Result<FallibleExecuteProcessResult, String> {
let mut runtime = tokio::runtime::Runtime::new().unwrap();
let execute_request = echo_foo_request();
let msg1: String = "m1".into();
let msg2: String = "m2".into();
let workunit_store = WorkUnitStore::new();
let runner = SpeculatingCommandRunner::new(
Box::new(make_delayed_command_runner(msg1.clone(), r1_latency_ms, r1_is_err)),
Box::new(make_delayed_command_runner(msg2.clone(), r2_latency_ms, r2_is_err)),
Duration::from_millis(speculation_delay_ms),
);
runtime.block_on(runner.run(execute_request, workunit_store))
}

fn make_delayed_command_runner(msg: String, delay: u64, is_err: bool) -> DelayedCommandRunner {
let mut result;
if is_err {
result = Err(msg.into());
} else {
result = Ok(FallibleExecuteProcessResult {
stdout: msg.into(),
stderr: "".into(),
exit_code: 0,
output_directory: EMPTY_DIGEST,
execution_attempts: vec![],
})
}
DelayedCommandRunner::new(Duration::from_millis(delay), result)
}

fn echo_foo_request() -> ExecuteProcessRequest {
hrfuller marked this conversation as resolved.
Show resolved Hide resolved
ExecuteProcessRequest {
argv: owned_string_vec(&["/bin/echo", "-n", "foo"]),
env: BTreeMap::new(),
input_files: EMPTY_DIGEST,
output_files: BTreeSet::new(),
output_directories: BTreeSet::new(),
timeout: Duration::from_millis(5000),
description: "echo a foo".to_string(),
jdk_home: None,
}
}

struct DelayedCommandRunner {
delay: Duration,
result: Result<FallibleExecuteProcessResult, String>,
}

impl DelayedCommandRunner {
pub fn new(delay: Duration, result: Result<FallibleExecuteProcessResult, String>) -> DelayedCommandRunner {
DelayedCommandRunner {
delay: delay,
result: result,
}
}
}

impl CommandRunner for DelayedCommandRunner {
fn run(&self, _req: ExecuteProcessRequest, _workunit_store: WorkUnitStore) -> BoxFuture<FallibleExecuteProcessResult, String> {
let delay = Delay::new(Instant::now() + self.delay);
let exec_result = self.result.clone();
delay.then(move |delay_res| {
match delay_res {
Ok(_) => exec_result,
Err(_) => Err(String::from("Timer failed during testing"))
}
}).to_boxed()
}
}
}
10 changes: 6 additions & 4 deletions src/rust/engine/src/context.rs
Expand Up @@ -19,7 +19,7 @@ use boxfuture::{BoxFuture, Boxable};
use core::clone::Clone;
use fs::{safe_create_dir_all_ioerror, PosixFS};
use graph::{EntryId, Graph, NodeContext};
use process_execution::{self, BoundedCommandRunner};
use process_execution::{self, BoundedCommandRunner, ExecuteProcessRequestMetadata};
use rand::seq::SliceRandom;
use reqwest;
use rule_graph::RuleGraph;
Expand Down Expand Up @@ -127,11 +127,13 @@ impl Core {
Some(ref address) if remote_execution => BoundedCommandRunner::new(
Box::new(process_execution::remote::CommandRunner::new(
address,
remote_execution_process_cache_namespace.clone(),
remote_instance_name.clone(),
ExecuteProcessRequestMetadata {
instance_name: remote_instance_name.clone(),
cache_key_gen_version: remote_execution_process_cache_namespace.clone(),
platform_properties: remote_execution_extra_platform_properties.clone(),
},
root_ca_certs.clone(),
oauth_bearer_token.clone(),
remote_execution_extra_platform_properties.clone(),
store.clone(),
)),
process_execution_remote_parallelism,
Expand Down
2 changes: 1 addition & 1 deletion src/rust/engine/src/nodes.rs
Expand Up @@ -26,7 +26,7 @@ use fs::{
PathGlobs, PathStat, StrictGlobMatching, VFS,
};
use hashing;
use process_execution::{self, CommandRunner};
use process_execution;
use rule_graph;

use graph::{Entry, Node, NodeError, NodeTracer, NodeVisualizer};
Expand Down