diff --git a/src/rust/engine/process_execution/src/lib.rs b/src/rust/engine/process_execution/src/lib.rs index 183ba2b2088..b9fbc8613b5 100644 --- a/src/rust/engine/process_execution/src/lib.rs +++ b/src/rust/engine/process_execution/src/lib.rs @@ -134,7 +134,11 @@ impl AddAssign for ExecutionStats { } pub trait CommandRunner: Send + Sync { - fn run(&self, req: ExecuteProcessRequest, workunit_store: WorkUnitStore) -> BoxFuture; + fn run( + &self, + req: ExecuteProcessRequest, + workunit_store: WorkUnitStore, + ) -> BoxFuture; } /// @@ -154,9 +158,16 @@ impl BoundedCommandRunner { } impl CommandRunner for BoundedCommandRunner { - fn run(&self, req: ExecuteProcessRequest, workunit_store: WorkUnitStore) -> BoxFuture { + fn run( + &self, + req: ExecuteProcessRequest, + workunit_store: WorkUnitStore, + ) -> BoxFuture { let inner = self.inner.clone(); - self.inner.1.with_acquired(move || inner.0.run(req, workunit_store)) + self + .inner + .1 + .with_acquired(move || inner.0.run(req, workunit_store)) } } diff --git a/src/rust/engine/process_execution/src/local.rs b/src/rust/engine/process_execution/src/local.rs index 68144d05cd8..d62288b1885 100644 --- a/src/rust/engine/process_execution/src/local.rs +++ b/src/rust/engine/process_execution/src/local.rs @@ -208,7 +208,11 @@ impl super::CommandRunner for CommandRunner { /// /// TODO: start to create workunits for local process execution /// - fn run(&self, req: ExecuteProcessRequest, _workunit_store: WorkUnitStore) -> BoxFuture { + fn run( + &self, + req: ExecuteProcessRequest, + _workunit_store: WorkUnitStore, + ) -> BoxFuture { let workdir = try_future!(tempfile::Builder::new() .prefix("process-execution") .tempdir_in(&self.work_dir) diff --git a/src/rust/engine/process_execution/src/remote.rs b/src/rust/engine/process_execution/src/remote.rs index 25cf159c8dd..c32c073e339 100644 --- a/src/rust/engine/process_execution/src/remote.rs +++ b/src/rust/engine/process_execution/src/remote.rs @@ -24,7 +24,7 @@ use super::{ExecuteProcessRequest, ExecutionStats, FallibleExecuteProcessResult} use std; use std::cmp::min; use std::collections::btree_map::BTreeMap; -use workunit_store::{WorkUnit, WorkUnitStore, generate_random_64bit_string, get_parent_id}; +use workunit_store::{generate_random_64bit_string, get_parent_id, WorkUnit, WorkUnitStore}; // Environment variable which is exclusively used for cache key invalidation. // This may be not specified in an ExecuteProcessRequest, and may be populated only by the @@ -128,7 +128,11 @@ impl super::CommandRunner for CommandRunner { /// /// TODO: Request jdk_home be created if set. /// - fn run(&self, req: ExecuteProcessRequest, workunit_store: WorkUnitStore) -> BoxFuture { + fn run( + &self, + req: ExecuteProcessRequest, + workunit_store: WorkUnitStore, + ) -> BoxFuture { let operations_client = self.operations_client.clone(); let store = self.store.clone(); @@ -189,7 +193,11 @@ impl super::CommandRunner for CommandRunner { let operations_client = operations_client.clone(); let command_runner2 = command_runner2.clone(); let command_runner3 = command_runner3.clone(); - let f = command_runner2.extract_execute_response(operation, &mut history, workunit_store.clone()); + let f = command_runner2.extract_execute_response( + operation, + &mut history, + workunit_store.clone(), + ); f.map(future::Loop::Break).or_else(move |value| { match value { ExecutionError::Fatal(err) => future::err(err).to_boxed(), @@ -415,29 +423,57 @@ impl CommandRunner { match (worker_start - enqueued).to_std() { Ok(duration) => { attempts.current_attempt.remote_queue = Some(duration); - maybe_add_workunit(result_cached, "remote execution action scheduling", &enqueued, &worker_start, parent_id.clone(), &workunit_store); - }, + maybe_add_workunit( + result_cached, + "remote execution action scheduling", + enqueued, + worker_start, + parent_id.clone(), + &workunit_store, + ); + } Err(err) => warn!("Got negative remote queue time: {}", err), } match (input_fetch_completed - input_fetch_start).to_std() { Ok(duration) => { attempts.current_attempt.remote_input_fetch = Some(duration); - maybe_add_workunit(result_cached, "remote execution worker input fetching", &input_fetch_start, &input_fetch_completed, parent_id.clone(), &workunit_store); - }, + maybe_add_workunit( + result_cached, + "remote execution worker input fetching", + input_fetch_start, + input_fetch_completed, + parent_id.clone(), + &workunit_store, + ); + } Err(err) => warn!("Got negative remote input fetch time: {}", err), } match (execution_completed - execution_start).to_std() { Ok(duration) => { attempts.current_attempt.remote_execution = Some(duration); - maybe_add_workunit(result_cached, "remote execution worker command executing", &execution_start, &execution_completed, parent_id.clone(), &workunit_store); - }, + maybe_add_workunit( + result_cached, + "remote execution worker command executing", + execution_start, + execution_completed, + parent_id.clone(), + &workunit_store, + ); + } Err(err) => warn!("Got negative remote execution time: {}", err), } match (output_upload_completed - output_upload_start).to_std() { Ok(duration) => { attempts.current_attempt.remote_output_store = Some(duration); - maybe_add_workunit(result_cached, "remote execution worker output uploading", &output_upload_start, &output_upload_completed, parent_id, &workunit_store); - }, + maybe_add_workunit( + result_cached, + "remote execution worker output uploading", + output_upload_start, + output_upload_completed, + parent_id, + &workunit_store, + ); + } Err(err) => warn!("Got negative remote output store time: {}", err), } attempts.current_attempt.was_cache_hit = execute_response.cached_result; @@ -747,14 +783,21 @@ impl CommandRunner { } } -fn maybe_add_workunit(result_cached: bool, name: &str, start_time: &Timespec, end_time: &Timespec, parent_id: Option, workunit_store: &WorkUnitStore) { -// TODO: workunits for scheduling, fetching, executing and uploading should be recorded -// only if '--reporting-zipkin-trace-v2' is set +fn maybe_add_workunit( + result_cached: bool, + name: &str, + start_time: Timespec, + end_time: Timespec, + parent_id: Option, + workunit_store: &WorkUnitStore, +) { + // TODO: workunits for scheduling, fetching, executing and uploading should be recorded + // only if '--reporting-zipkin-trace-v2' is set if !result_cached { let workunit = WorkUnit { name: String::from(name), - start_timestamp: start_time.clone(), - end_timestamp: end_time.clone(), + start_timestamp: start_time, + end_timestamp: end_time, span_id: generate_random_64bit_string(), parent_id, }; @@ -961,7 +1004,7 @@ mod tests { use std::path::PathBuf; use std::time::Duration; use time::Timespec; - use workunit_store::{WorkUnit, WorkUnitStore, got_workunits}; + use workunit_store::{workunits_with_constant_span_id, WorkUnit, WorkUnitStore}; #[derive(Debug, PartialEq)] enum StdoutType { @@ -2522,7 +2565,7 @@ mod tests { #[test] fn remote_workunits_are_stored() { - let workunit_store =WorkUnitStore::new(); + let workunit_store = WorkUnitStore::new(); let op_name = "gimme-foo".to_string(); let testdata = TestData::roland(); let testdata_empty = TestData::empty(); @@ -2533,23 +2576,27 @@ mod tests { 0, ); let cas = mock::StubCAS::builder() - .file(&TestData::roland()) - .directory(&TestDirectory::containing_roland()) - .build(); + .file(&TestData::roland()) + .directory(&TestDirectory::containing_roland()) + .build(); let command_runner = create_command_runner("".to_owned(), &cas); let mut runtime = tokio::runtime::Runtime::new().unwrap(); let workunit_store_2 = workunit_store.clone(); - runtime.block_on(futures::future::lazy(move || command_runner.extract_execute_response( - super::OperationOrStatus::Operation(operation), - &mut ExecutionHistory::default(), - workunit_store_2, - ))).unwrap(); + runtime + .block_on(futures::future::lazy(move || { + command_runner.extract_execute_response( + super::OperationOrStatus::Operation(operation), + &mut ExecutionHistory::default(), + workunit_store_2, + ) + })) + .unwrap(); - let got_workunits = got_workunits(workunit_store); + let got_workunits = workunits_with_constant_span_id(&workunit_store); - let want_workunits = hashset!{ + let want_workunits = hashset! { WorkUnit { name: String::from("remote execution action scheduling"), start_timestamp: Timespec::new(0, 0), @@ -2680,7 +2727,7 @@ mod tests { stdout, stderr, exit_code, - None + None, ); MockOperation::new(op) } diff --git a/src/rust/engine/src/lib.rs b/src/rust/engine/src/lib.rs index 6d833810e7b..913482d6aed 100644 --- a/src/rust/engine/src/lib.rs +++ b/src/rust/engine/src/lib.rs @@ -350,7 +350,8 @@ pub extern "C" fn scheduler_metrics( .flat_map(|(metric, value)| vec![externs::store_utf8(metric), externs::store_i64(value)]) .collect::>(); if session.should_record_zipkin_spans() { - let workunits = session.workunit_store() + let workunits = session + .workunit_store() .get_workunits() .lock() .iter() @@ -383,7 +384,7 @@ pub extern "C" fn scheduler_metrics( fn timespec_as_float_secs(timespec: &Timespec) -> f64 { // Reverting time from Timespec to f64 decreases precision. let whole_secs = timespec.sec as f64; - let fract_part_in_nanos = timespec.nsec as f64; + let fract_part_in_nanos = timespec.nsec as f64; whole_secs + fract_part_in_nanos / 1_000_000_000.0 } diff --git a/src/rust/engine/src/nodes.rs b/src/rust/engine/src/nodes.rs index fb6c1794c27..519a806b920 100644 --- a/src/rust/engine/src/nodes.rs +++ b/src/rust/engine/src/nodes.rs @@ -31,7 +31,7 @@ use rule_graph; use graph::{Entry, Node, NodeError, NodeTracer, NodeVisualizer}; use store::{self, StoreFileByDigest}; -use workunit_store::{WorkUnit, generate_random_64bit_string, set_parent_id}; +use workunit_store::{generate_random_64bit_string, set_parent_id, WorkUnit}; pub type NodeFuture = BoxFuture; @@ -1134,7 +1134,7 @@ impl Node for NodeKey { start_timestamp, end_timestamp, span_id, -// TODO: set parent_id with the proper value, issue #7969 + // TODO: set parent_id with the proper value, issue #7969 parent_id: None, }; context2.session.workunit_store().add_workunit(workunit) diff --git a/src/rust/engine/src/scheduler.rs b/src/rust/engine/src/scheduler.rs index 737fa0832b8..df61260ff0d 100644 --- a/src/rust/engine/src/scheduler.rs +++ b/src/rust/engine/src/scheduler.rs @@ -83,7 +83,7 @@ impl Session { } pub fn workunit_store(&self) -> WorkUnitStore { - self.0.workunit_store.clone() + self.0.workunit_store.clone() } } diff --git a/src/rust/engine/workunit_store/src/lib.rs b/src/rust/engine/workunit_store/src/lib.rs index c797316ae97..210da3f1f7f 100644 --- a/src/rust/engine/workunit_store/src/lib.rs +++ b/src/rust/engine/workunit_store/src/lib.rs @@ -34,7 +34,7 @@ use std::collections::HashSet; use std::sync::Arc; use time::Timespec; -#[derive (Clone, Debug, PartialEq, Eq, Hash)] +#[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct WorkUnit { pub name: String, pub start_timestamp: Timespec, @@ -43,7 +43,7 @@ pub struct WorkUnit { pub parent_id: Option, } -#[derive (Clone)] +#[derive(Clone)] pub struct WorkUnitStore { workunits: Arc>>, } @@ -65,18 +65,23 @@ impl WorkUnitStore { } pub fn generate_random_64bit_string() -> String { - let mut rng = thread_rng(); - let random_u64: u64 = rng.gen(); - format!("{:16.x}", random_u64) + let mut rng = thread_rng(); + let random_u64: u64 = rng.gen(); + format!("{:16.x}", random_u64) } -pub fn got_workunits(workunit_store: WorkUnitStore) -> HashSet { -// This function is for the test purpose. +pub fn workunits_with_constant_span_id(workunit_store: &WorkUnitStore) -> HashSet { + // This function is for the test purpose. - workunit_store.get_workunits().lock().iter().cloned().map(|mut workunit| { - workunit.span_id = String::from("ignore"); - workunit - }).collect() + workunit_store + .get_workunits() + .lock() + .iter() + .map(|workunit| WorkUnit { + span_id: String::from("ignore"), + ..workunit.clone() + }) + .collect() } task_local! {