Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
cattibrie committed Jul 9, 2019
1 parent 4a7ea42 commit b54225f
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 49 deletions.
17 changes: 14 additions & 3 deletions src/rust/engine/process_execution/src/lib.rs
Expand Up @@ -134,7 +134,11 @@ impl AddAssign<UploadSummary> for ExecutionStats {
}

pub trait CommandRunner: Send + Sync {
fn run(&self, req: ExecuteProcessRequest, workunit_store: WorkUnitStore) -> BoxFuture<FallibleExecuteProcessResult, String>;
fn run(
&self,
req: ExecuteProcessRequest,
workunit_store: WorkUnitStore,
) -> BoxFuture<FallibleExecuteProcessResult, String>;
}

///
Expand All @@ -154,9 +158,16 @@ impl BoundedCommandRunner {
}

impl CommandRunner for BoundedCommandRunner {
fn run(&self, req: ExecuteProcessRequest, workunit_store: WorkUnitStore) -> BoxFuture<FallibleExecuteProcessResult, String> {
fn run(
&self,
req: ExecuteProcessRequest,
workunit_store: WorkUnitStore,
) -> BoxFuture<FallibleExecuteProcessResult, String> {
let inner = self.inner.clone();
self.inner.1.with_acquired(move || inner.0.run(req, workunit_store))
self
.inner
.1
.with_acquired(move || inner.0.run(req, workunit_store))
}
}

Expand Down
6 changes: 5 additions & 1 deletion src/rust/engine/process_execution/src/local.rs
Expand Up @@ -208,7 +208,11 @@ impl super::CommandRunner for CommandRunner {
///
/// TODO: start to create workunits for local process execution
///
fn run(&self, req: ExecuteProcessRequest, _workunit_store: WorkUnitStore) -> BoxFuture<FallibleExecuteProcessResult, String> {
fn run(
&self,
req: ExecuteProcessRequest,
_workunit_store: WorkUnitStore,
) -> BoxFuture<FallibleExecuteProcessResult, String> {
let workdir = try_future!(tempfile::Builder::new()
.prefix("process-execution")
.tempdir_in(&self.work_dir)
Expand Down
105 changes: 76 additions & 29 deletions src/rust/engine/process_execution/src/remote.rs
Expand Up @@ -24,7 +24,7 @@ use super::{ExecuteProcessRequest, ExecutionStats, FallibleExecuteProcessResult}
use std;
use std::cmp::min;
use std::collections::btree_map::BTreeMap;
use workunit_store::{WorkUnit, WorkUnitStore, generate_random_64bit_string, get_parent_id};
use workunit_store::{generate_random_64bit_string, get_parent_id, WorkUnit, WorkUnitStore};

// Environment variable which is exclusively used for cache key invalidation.
// This may be not specified in an ExecuteProcessRequest, and may be populated only by the
Expand Down Expand Up @@ -128,7 +128,11 @@ impl super::CommandRunner for CommandRunner {
///
/// TODO: Request jdk_home be created if set.
///
fn run(&self, req: ExecuteProcessRequest, workunit_store: WorkUnitStore) -> BoxFuture<FallibleExecuteProcessResult, String> {
fn run(
&self,
req: ExecuteProcessRequest,
workunit_store: WorkUnitStore,
) -> BoxFuture<FallibleExecuteProcessResult, String> {
let operations_client = self.operations_client.clone();

let store = self.store.clone();
Expand Down Expand Up @@ -189,7 +193,11 @@ impl super::CommandRunner for CommandRunner {
let operations_client = operations_client.clone();
let command_runner2 = command_runner2.clone();
let command_runner3 = command_runner3.clone();
let f = command_runner2.extract_execute_response(operation, &mut history, workunit_store.clone());
let f = command_runner2.extract_execute_response(
operation,
&mut history,
workunit_store.clone(),
);
f.map(future::Loop::Break).or_else(move |value| {
match value {
ExecutionError::Fatal(err) => future::err(err).to_boxed(),
Expand Down Expand Up @@ -415,29 +423,57 @@ impl CommandRunner {
match (worker_start - enqueued).to_std() {
Ok(duration) => {
attempts.current_attempt.remote_queue = Some(duration);
maybe_add_workunit(result_cached, "remote execution action scheduling", &enqueued, &worker_start, parent_id.clone(), &workunit_store);
},
maybe_add_workunit(
result_cached,
"remote execution action scheduling",
enqueued,
worker_start,
parent_id.clone(),
&workunit_store,
);
}
Err(err) => warn!("Got negative remote queue time: {}", err),
}
match (input_fetch_completed - input_fetch_start).to_std() {
Ok(duration) => {
attempts.current_attempt.remote_input_fetch = Some(duration);
maybe_add_workunit(result_cached, "remote execution worker input fetching", &input_fetch_start, &input_fetch_completed, parent_id.clone(), &workunit_store);
},
maybe_add_workunit(
result_cached,
"remote execution worker input fetching",
input_fetch_start,
input_fetch_completed,
parent_id.clone(),
&workunit_store,
);
}
Err(err) => warn!("Got negative remote input fetch time: {}", err),
}
match (execution_completed - execution_start).to_std() {
Ok(duration) => {
attempts.current_attempt.remote_execution = Some(duration);
maybe_add_workunit(result_cached, "remote execution worker command executing", &execution_start, &execution_completed, parent_id.clone(), &workunit_store);
},
maybe_add_workunit(
result_cached,
"remote execution worker command executing",
execution_start,
execution_completed,
parent_id.clone(),
&workunit_store,
);
}
Err(err) => warn!("Got negative remote execution time: {}", err),
}
match (output_upload_completed - output_upload_start).to_std() {
Ok(duration) => {
attempts.current_attempt.remote_output_store = Some(duration);
maybe_add_workunit(result_cached, "remote execution worker output uploading", &output_upload_start, &output_upload_completed, parent_id, &workunit_store);
},
maybe_add_workunit(
result_cached,
"remote execution worker output uploading",
output_upload_start,
output_upload_completed,
parent_id,
&workunit_store,
);
}
Err(err) => warn!("Got negative remote output store time: {}", err),
}
attempts.current_attempt.was_cache_hit = execute_response.cached_result;
Expand Down Expand Up @@ -747,14 +783,21 @@ impl CommandRunner {
}
}

fn maybe_add_workunit(result_cached: bool, name: &str, start_time: &Timespec, end_time: &Timespec, parent_id: Option<String>, workunit_store: &WorkUnitStore) {
// TODO: workunits for scheduling, fetching, executing and uploading should be recorded
// only if '--reporting-zipkin-trace-v2' is set
fn maybe_add_workunit(
result_cached: bool,
name: &str,
start_time: Timespec,
end_time: Timespec,
parent_id: Option<String>,
workunit_store: &WorkUnitStore,
) {
// TODO: workunits for scheduling, fetching, executing and uploading should be recorded
// only if '--reporting-zipkin-trace-v2' is set
if !result_cached {
let workunit = WorkUnit {
name: String::from(name),
start_timestamp: start_time.clone(),
end_timestamp: end_time.clone(),
start_timestamp: start_time,
end_timestamp: end_time,
span_id: generate_random_64bit_string(),
parent_id,
};
Expand Down Expand Up @@ -961,7 +1004,7 @@ mod tests {
use std::path::PathBuf;
use std::time::Duration;
use time::Timespec;
use workunit_store::{WorkUnit, WorkUnitStore, got_workunits};
use workunit_store::{workunits_with_constant_span_id, WorkUnit, WorkUnitStore};

#[derive(Debug, PartialEq)]
enum StdoutType {
Expand Down Expand Up @@ -2522,7 +2565,7 @@ mod tests {

#[test]
fn remote_workunits_are_stored() {
let workunit_store =WorkUnitStore::new();
let workunit_store = WorkUnitStore::new();
let op_name = "gimme-foo".to_string();
let testdata = TestData::roland();
let testdata_empty = TestData::empty();
Expand All @@ -2533,23 +2576,27 @@ mod tests {
0,
);
let cas = mock::StubCAS::builder()
.file(&TestData::roland())
.directory(&TestDirectory::containing_roland())
.build();
.file(&TestData::roland())
.directory(&TestDirectory::containing_roland())
.build();
let command_runner = create_command_runner("".to_owned(), &cas);

let mut runtime = tokio::runtime::Runtime::new().unwrap();

let workunit_store_2 = workunit_store.clone();
runtime.block_on(futures::future::lazy(move || command_runner.extract_execute_response(
super::OperationOrStatus::Operation(operation),
&mut ExecutionHistory::default(),
workunit_store_2,
))).unwrap();
runtime
.block_on(futures::future::lazy(move || {
command_runner.extract_execute_response(
super::OperationOrStatus::Operation(operation),
&mut ExecutionHistory::default(),
workunit_store_2,
)
}))
.unwrap();

let got_workunits = got_workunits(workunit_store);
let got_workunits = workunits_with_constant_span_id(&workunit_store);

let want_workunits = hashset!{
let want_workunits = hashset! {
WorkUnit {
name: String::from("remote execution action scheduling"),
start_timestamp: Timespec::new(0, 0),
Expand Down Expand Up @@ -2680,7 +2727,7 @@ mod tests {
stdout,
stderr,
exit_code,
None
None,
);
MockOperation::new(op)
}
Expand Down
5 changes: 3 additions & 2 deletions src/rust/engine/src/lib.rs
Expand Up @@ -350,7 +350,8 @@ pub extern "C" fn scheduler_metrics(
.flat_map(|(metric, value)| vec![externs::store_utf8(metric), externs::store_i64(value)])
.collect::<Vec<_>>();
if session.should_record_zipkin_spans() {
let workunits = session.workunit_store()
let workunits = session
.workunit_store()
.get_workunits()
.lock()
.iter()
Expand Down Expand Up @@ -383,7 +384,7 @@ pub extern "C" fn scheduler_metrics(
fn timespec_as_float_secs(timespec: &Timespec) -> f64 {
// Reverting time from Timespec to f64 decreases precision.
let whole_secs = timespec.sec as f64;
let fract_part_in_nanos = timespec.nsec as f64;
let fract_part_in_nanos = timespec.nsec as f64;
whole_secs + fract_part_in_nanos / 1_000_000_000.0
}

Expand Down
4 changes: 2 additions & 2 deletions src/rust/engine/src/nodes.rs
Expand Up @@ -31,7 +31,7 @@ use rule_graph;

use graph::{Entry, Node, NodeError, NodeTracer, NodeVisualizer};
use store::{self, StoreFileByDigest};
use workunit_store::{WorkUnit, generate_random_64bit_string, set_parent_id};
use workunit_store::{generate_random_64bit_string, set_parent_id, WorkUnit};

pub type NodeFuture<T> = BoxFuture<T, Failure>;

Expand Down Expand Up @@ -1134,7 +1134,7 @@ impl Node for NodeKey {
start_timestamp,
end_timestamp,
span_id,
// TODO: set parent_id with the proper value, issue #7969
// TODO: set parent_id with the proper value, issue #7969
parent_id: None,
};
context2.session.workunit_store().add_workunit(workunit)
Expand Down
2 changes: 1 addition & 1 deletion src/rust/engine/src/scheduler.rs
Expand Up @@ -83,7 +83,7 @@ impl Session {
}

pub fn workunit_store(&self) -> WorkUnitStore {
self.0.workunit_store.clone()
self.0.workunit_store.clone()
}
}

Expand Down
27 changes: 16 additions & 11 deletions src/rust/engine/workunit_store/src/lib.rs
Expand Up @@ -34,7 +34,7 @@ use std::collections::HashSet;
use std::sync::Arc;
use time::Timespec;

#[derive (Clone, Debug, PartialEq, Eq, Hash)]
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct WorkUnit {
pub name: String,
pub start_timestamp: Timespec,
Expand All @@ -43,7 +43,7 @@ pub struct WorkUnit {
pub parent_id: Option<String>,
}

#[derive (Clone)]
#[derive(Clone)]
pub struct WorkUnitStore {
workunits: Arc<Mutex<HashSet<WorkUnit>>>,
}
Expand All @@ -65,18 +65,23 @@ impl WorkUnitStore {
}

pub fn generate_random_64bit_string() -> String {
let mut rng = thread_rng();
let random_u64: u64 = rng.gen();
format!("{:16.x}", random_u64)
let mut rng = thread_rng();
let random_u64: u64 = rng.gen();
format!("{:16.x}", random_u64)
}

pub fn got_workunits(workunit_store: WorkUnitStore) -> HashSet<WorkUnit> {
// This function is for the test purpose.
pub fn workunits_with_constant_span_id(workunit_store: &WorkUnitStore) -> HashSet<WorkUnit> {
// This function is for the test purpose.

workunit_store.get_workunits().lock().iter().cloned().map(|mut workunit| {
workunit.span_id = String::from("ignore");
workunit
}).collect()
workunit_store
.get_workunits()
.lock()
.iter()
.map(|workunit| WorkUnit {
span_id: String::from("ignore"),
..workunit.clone()
})
.collect()
}

task_local! {
Expand Down

0 comments on commit b54225f

Please sign in to comment.