diff --git a/src/rust/engine/fs/brfs/src/main.rs b/src/rust/engine/fs/brfs/src/main.rs index b34bea014d3..c1140ad0cd5 100644 --- a/src/rust/engine/fs/brfs/src/main.rs +++ b/src/rust/engine/fs/brfs/src/main.rs @@ -26,6 +26,7 @@ #![allow(clippy::new_without_default, clippy::new_ret_no_self)] // Arc can be more clear than needing to grok Orderings: #![allow(clippy::mutex_atomic)] +#![type_length_limit = "1303884"] use std::collections::hash_map::Entry::{Occupied, Vacant}; use std::collections::HashMap; diff --git a/src/rust/engine/fs/store/src/remote.rs b/src/rust/engine/fs/store/src/remote.rs index e333c46bdbc..c15145cec11 100644 --- a/src/rust/engine/fs/store/src/remote.rs +++ b/src/rust/engine/fs/store/src/remote.rs @@ -13,7 +13,7 @@ use futures01::{future, Future, Sink, Stream}; use hashing::Digest; use log::Level; use serverset::{retry, Serverset}; -use workunit_store::new_span_id; +use workunit_store::with_workunit; use super::{BackoffConfig, EntryType}; @@ -119,107 +119,98 @@ impl ByteStore { digest.1, ); let workunit_name = format!("store_bytes({})", resource_name.clone()); - let span_id = new_span_id(); - if let Some(workunit_state) = workunit_store::get_workunit_state() { - let parent_id = workunit_state.parent_id; - let metadata = workunit_store::WorkunitMetadata::with_level(Level::Debug); - workunit_state - .store - .start_workunit(span_id.clone(), workunit_name, parent_id, metadata); - } + let metadata = workunit_store::WorkunitMetadata::with_level(Level::Debug); let store = self.clone(); + let result_future = self.with_byte_stream_client(move |client| { + let resource_name = resource_name.clone(); + let store = store.clone(); + async move { + let (sender, receiver) = client + .write_opt(call_option(&store.headers, None)?.timeout(store.upload_timeout)) + .map_err(|err| { + format!( + "Error attempting to connect to upload digest {:?}: {:?}", + digest, err + ) + })?; - let result = self - .with_byte_stream_client(move |client| { + let chunk_size_bytes = store.chunk_size_bytes; let resource_name = resource_name.clone(); - let store = store.clone(); - async move { - let (sender, receiver) = client - .write_opt(call_option(&store.headers, None)?.timeout(store.upload_timeout)) - .map_err(|err| { - format!( - "Error attempting to connect to upload digest {:?}: {:?}", - digest, err - ) - })?; - - let chunk_size_bytes = store.chunk_size_bytes; - let resource_name = resource_name.clone(); - let stream = futures01::stream::unfold::<_, _, future::FutureResult<_, grpcio::Error>, _>( - (0, false), - move |(offset, has_sent_any)| { - if offset >= bytes.len() && has_sent_any { - None - } else { - let mut req = bazel_protos::bytestream::WriteRequest::new(); - req.set_resource_name(resource_name.clone()); - req.set_write_offset(offset as i64); - let next_offset = min(offset + chunk_size_bytes, bytes.len()); - req.set_finish_write(next_offset == bytes.len()); - req.set_data(Bytes::from(&bytes[offset..next_offset])); - Some(future::ok(( - (req, grpcio::WriteFlags::default()), - (next_offset, true), - ))) - } - }, - ); - - sender - .send_all(stream) - .map(|_| ()) - .or_else(move |e| { - match e { - // Some implementations of the remote execution API early-return if the blob has - // been concurrently uploaded by another client. In this case, they return a - // WriteResponse with a committed_size equal to the digest's entire size before - // closing the stream. - // Because the server then closes the stream, the client gets an RpcFinished - // error in this case. We ignore this, and will later on verify that the - // committed_size we received from the server is equal to the expected one. If - // these are not equal, the upload will be considered a failure at that point. - // Whether this type of response will become part of the official API is up for - // discussion: see - // https://groups.google.com/d/topic/remote-execution-apis/NXUe3ItCw68/discussion. - grpcio::Error::RpcFinished(None) => Ok(()), - e => Err(format!( - "Error attempting to upload digest {:?}: {:?}", - digest, e - )), - } - }) - .compat() - .await?; + let stream = futures01::stream::unfold::<_, _, future::FutureResult<_, grpcio::Error>, _>( + (0, false), + move |(offset, has_sent_any)| { + if offset >= bytes.len() && has_sent_any { + None + } else { + let mut req = bazel_protos::bytestream::WriteRequest::new(); + req.set_resource_name(resource_name.clone()); + req.set_write_offset(offset as i64); + let next_offset = min(offset + chunk_size_bytes, bytes.len()); + req.set_finish_write(next_offset == bytes.len()); + req.set_data(Bytes::from(&bytes[offset..next_offset])); + Some(future::ok(( + (req, grpcio::WriteFlags::default()), + (next_offset, true), + ))) + } + }, + ); - let received = receiver - .map_err(move |e| { - format!( - "Error from server when uploading digest {:?}: {:?}", + sender + .send_all(stream) + .map(|_| ()) + .or_else(move |e| { + match e { + // Some implementations of the remote execution API early-return if the blob has + // been concurrently uploaded by another client. In this case, they return a + // WriteResponse with a committed_size equal to the digest's entire size before + // closing the stream. + // Because the server then closes the stream, the client gets an RpcFinished + // error in this case. We ignore this, and will later on verify that the + // committed_size we received from the server is equal to the expected one. If + // these are not equal, the upload will be considered a failure at that point. + // Whether this type of response will become part of the official API is up for + // discussion: see + // https://groups.google.com/d/topic/remote-execution-apis/NXUe3ItCw68/discussion. + grpcio::Error::RpcFinished(None) => Ok(()), + e => Err(format!( + "Error attempting to upload digest {:?}: {:?}", digest, e - ) - }) - .compat() - .await?; + )), + } + }) + .compat() + .await?; - if received.get_committed_size() == len as i64 { - Ok(digest) - } else { - Err(format!( - "Uploading file with digest {:?}: want committed size {} but got {}", - digest, - len, - received.get_committed_size() - )) - } + let received = receiver + .map_err(move |e| { + format!( + "Error from server when uploading digest {:?}: {:?}", + digest, e + ) + }) + .compat() + .await?; + + if received.get_committed_size() == len as i64 { + Ok(digest) + } else { + Err(format!( + "Uploading file with digest {:?}: want committed size {} but got {}", + digest, + len, + received.get_committed_size() + )) } - }) - .await; + } + }); if let Some(workunit_state) = workunit_store::get_workunit_state() { - workunit_state.store.complete_workunit(span_id) + let store = workunit_state.store; + with_workunit(store, workunit_name, metadata, result_future, |_, md| md).await + } else { + result_future.await } - - result } pub async fn load_bytes_with< @@ -239,71 +230,62 @@ impl ByteStore { digest.1 ); let workunit_name = format!("load_bytes_with({})", resource_name.clone()); - let span_id = new_span_id(); - if let Some(workunit_state) = workunit_store::get_workunit_state() { - let parent_id = workunit_state.parent_id; - let metadata = workunit_store::WorkunitMetadata::with_level(Level::Debug); - workunit_state - .store - .start_workunit(span_id.clone(), workunit_name, parent_id, metadata); - } - - let result = self - .with_byte_stream_client(move |client| { - let resource_name = resource_name.clone(); - let store = store.clone(); - let f = f.clone(); - async move { - let stream = client - .read_opt( - &{ - let mut req = bazel_protos::bytestream::ReadRequest::new(); - req.set_resource_name(resource_name.clone()); - req.set_read_offset(0); - // 0 means no limit. - req.set_read_limit(0); - req - }, - call_option(&store.headers, None)?, - ) - .map_err(|err| format!("Error making CAS read request for {:?}: {:?}", digest, err))?; + let metadata = workunit_store::WorkunitMetadata::with_level(Level::Debug); + let result_future = self.with_byte_stream_client(move |client| { + let resource_name = resource_name.clone(); + let store = store.clone(); + let f = f.clone(); + async move { + let stream = client + .read_opt( + &{ + let mut req = bazel_protos::bytestream::ReadRequest::new(); + req.set_resource_name(resource_name.clone()); + req.set_read_offset(0); + // 0 means no limit. + req.set_read_limit(0); + req + }, + call_option(&store.headers, None)?, + ) + .map_err(|err| format!("Error making CAS read request for {:?}: {:?}", digest, err))?; - let bytes_res = stream - .fold(BytesMut::with_capacity(digest.1), move |mut bytes, r| { - bytes.extend_from_slice(&r.data); - future::ok::<_, grpcio::Error>(bytes) - }) - .compat() - .await; + let bytes_res = stream + .fold(BytesMut::with_capacity(digest.1), move |mut bytes, r| { + bytes.extend_from_slice(&r.data); + future::ok::<_, grpcio::Error>(bytes) + }) + .compat() + .await; - // We ensure that we hold onto the client until after we've consumed the stream as a - // workaround for https://github.com/pingcap/grpc-rs/issues/123 - std::mem::drop(client); + // We ensure that we hold onto the client until after we've consumed the stream as a + // workaround for https://github.com/pingcap/grpc-rs/issues/123 + std::mem::drop(client); - let maybe_bytes = match bytes_res { - Ok(bytes) => Some(bytes.freeze()), - Err(grpcio::Error::RpcFailure(grpcio::RpcStatus { - status: grpcio::RpcStatusCode::NOT_FOUND, - .. - })) => None, - Err(e) => { - return Err(format!( - "Error from server in response to CAS read request: {:?}", - e - )) - } - }; + let maybe_bytes = match bytes_res { + Ok(bytes) => Some(bytes.freeze()), + Err(grpcio::Error::RpcFailure(grpcio::RpcStatus { + status: grpcio::RpcStatusCode::NOT_FOUND, + .. + })) => None, + Err(e) => { + return Err(format!( + "Error from server in response to CAS read request: {:?}", + e + )) + } + }; - Ok(maybe_bytes.map(f)) - } - }) - .await; + Ok(maybe_bytes.map(f)) + } + }); if let Some(workunit_state) = workunit_store::get_workunit_state() { - workunit_state.store.complete_workunit(span_id); + let store = workunit_state.store; + with_workunit(store, workunit_name, metadata, result_future, |_, md| md).await + } else { + result_future.await } - - result } /// @@ -319,18 +301,10 @@ impl ByteStore { "list_missing_digests({})", store.instance_name.clone().unwrap_or_default() ); - let span_id = new_span_id(); - if let Some(workunit_state) = workunit_store::get_workunit_state() { - let parent_id = workunit_state.parent_id; - let metadata = workunit_store::WorkunitMetadata::with_level(Level::Debug); - workunit_state - .store - .start_workunit(span_id.clone(), workunit_name, parent_id, metadata); - } - - async move { + let metadata = workunit_store::WorkunitMetadata::with_level(Level::Debug); + let result_future = async move { let store2 = store.clone(); - let res = store2 + store2 .with_cas_client(move |client| { let request = request.clone(); let store = store.clone(); @@ -350,11 +324,15 @@ impl ByteStore { .collect::, _>>() } }) - .await; + .await + }; + async { if let Some(workunit_state) = workunit_store::get_workunit_state() { - workunit_state.store.complete_workunit(span_id); + let store = workunit_state.store; + with_workunit(store, workunit_name, metadata, result_future, |_, md| md).await + } else { + result_future.await } - res } .boxed() .compat() diff --git a/src/rust/engine/process_executor/src/main.rs b/src/rust/engine/process_executor/src/main.rs index c0992e78a61..ede4b9f0cff 100644 --- a/src/rust/engine/process_executor/src/main.rs +++ b/src/rust/engine/process_executor/src/main.rs @@ -26,6 +26,7 @@ #![allow(clippy::new_without_default, clippy::new_ret_no_self)] // Arc can be more clear than needing to grok Orderings: #![allow(clippy::mutex_atomic)] +#![type_length_limit = "1076739"] use clap::{value_t, App, AppSettings, Arg}; use futures::compat::Future01CompatExt; diff --git a/src/rust/engine/src/nodes.rs b/src/rust/engine/src/nodes.rs index f5e65516700..260406eb59b 100644 --- a/src/rust/engine/src/nodes.rs +++ b/src/rust/engine/src/nodes.rs @@ -35,7 +35,7 @@ use process_execution::{ use graph::{Entry, Node, NodeError, NodeVisualizer}; use store::{self, StoreFileByDigest}; -use workunit_store::{new_span_id, scope_task_workunit_state, Level, WorkunitMetadata}; +use workunit_store::{with_workunit, Level, WorkunitMetadata}; pub type NodeResult = Result; @@ -1086,34 +1086,22 @@ impl Node for NodeKey { type Error = Failure; async fn run(self, context: Context) -> Result { - let mut workunit_state = workunit_store::expect_workunit_state(); - - let (started_workunit_id, user_facing_name, metadata) = { - let user_facing_name = self.user_facing_name(); - let name = self.workunit_name(); - let span_id = new_span_id(); - - // We're starting a new workunit: record our parent, and set the current parent to our span. - let parent_id = std::mem::replace(&mut workunit_state.parent_id, Some(span_id.clone())); - let metadata = WorkunitMetadata { - desc: user_facing_name.clone(), - message: None, - level: self.workunit_level(), - blocked: false, - stdout: None, - stderr: None, - }; - - let started_workunit_id = - context - .session - .workunit_store() - .start_workunit(span_id, name, parent_id, metadata.clone()); - (started_workunit_id, user_facing_name, metadata) + let workunit_state = workunit_store::expect_workunit_state(); + + let user_facing_name = self.user_facing_name(); + let workunit_name = self.workunit_name(); + let metadata = WorkunitMetadata { + desc: user_facing_name.clone(), + message: None, + level: self.workunit_level(), + blocked: false, + stdout: None, + stderr: None, }; + let metadata2 = metadata.clone(); - scope_task_workunit_state(Some(workunit_state), async move { - let context2 = context.clone(); + let result_future = async move { + let metadata = metadata2; let maybe_watch = if let Some(path) = self.fs_subject() { let abs_path = context.core.build_root.join(path); context @@ -1181,13 +1169,18 @@ impl Node for NodeKey { message, ..metadata }; - context2 - .session - .workunit_store() - .complete_workunit_with_new_metadata(started_workunit_id, final_metadata); - result - }) + (result, final_metadata) + }; + + with_workunit( + workunit_state.store, + workunit_name, + metadata, + result_future, + |result, _| result.1.clone(), + ) .await + .0 } fn cacheable(&self) -> bool { diff --git a/src/rust/engine/workunit_store/src/lib.rs b/src/rust/engine/workunit_store/src/lib.rs index dae390d3a94..bb7045e2f4b 100644 --- a/src/rust/engine/workunit_store/src/lib.rs +++ b/src/rust/engine/workunit_store/src/lib.rs @@ -434,7 +434,7 @@ impl WorkunitStore { self.heavy_hitters_data.heavy_hitters(k) } - pub fn start_workunit( + fn start_workunit( &self, span_id: SpanId, name: String,