Skip to content

Commit

Permalink
Use with_workunit everywhere (#10300)
Browse files Browse the repository at this point in the history
This commit changes all call sites that were previously using the public `start_workunit`/`complete_workunit` method pair to use the `with_workunit` method, so that the former two methods can be made private.
  • Loading branch information
gshuflin committed Jul 9, 2020
1 parent 8d4e8a0 commit 9366ac9
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 198 deletions.
1 change: 1 addition & 0 deletions src/rust/engine/fs/brfs/src/main.rs
Expand Up @@ -26,6 +26,7 @@
#![allow(clippy::new_without_default, clippy::new_ret_no_self)]
// Arc<Mutex> can be more clear than needing to grok Orderings:
#![allow(clippy::mutex_atomic)]
#![type_length_limit = "1303884"]

use std::collections::hash_map::Entry::{Occupied, Vacant};
use std::collections::HashMap;
Expand Down
306 changes: 142 additions & 164 deletions src/rust/engine/fs/store/src/remote.rs
Expand Up @@ -13,7 +13,7 @@ use futures01::{future, Future, Sink, Stream};
use hashing::Digest;
use log::Level;
use serverset::{retry, Serverset};
use workunit_store::new_span_id;
use workunit_store::with_workunit;

use super::{BackoffConfig, EntryType};

Expand Down Expand Up @@ -119,107 +119,98 @@ impl ByteStore {
digest.1,
);
let workunit_name = format!("store_bytes({})", resource_name.clone());
let span_id = new_span_id();
if let Some(workunit_state) = workunit_store::get_workunit_state() {
let parent_id = workunit_state.parent_id;
let metadata = workunit_store::WorkunitMetadata::with_level(Level::Debug);
workunit_state
.store
.start_workunit(span_id.clone(), workunit_name, parent_id, metadata);
}
let metadata = workunit_store::WorkunitMetadata::with_level(Level::Debug);
let store = self.clone();
let result_future = self.with_byte_stream_client(move |client| {
let resource_name = resource_name.clone();
let store = store.clone();
async move {
let (sender, receiver) = client
.write_opt(call_option(&store.headers, None)?.timeout(store.upload_timeout))
.map_err(|err| {
format!(
"Error attempting to connect to upload digest {:?}: {:?}",
digest, err
)
})?;

let result = self
.with_byte_stream_client(move |client| {
let chunk_size_bytes = store.chunk_size_bytes;
let resource_name = resource_name.clone();
let store = store.clone();
async move {
let (sender, receiver) = client
.write_opt(call_option(&store.headers, None)?.timeout(store.upload_timeout))
.map_err(|err| {
format!(
"Error attempting to connect to upload digest {:?}: {:?}",
digest, err
)
})?;

let chunk_size_bytes = store.chunk_size_bytes;
let resource_name = resource_name.clone();
let stream = futures01::stream::unfold::<_, _, future::FutureResult<_, grpcio::Error>, _>(
(0, false),
move |(offset, has_sent_any)| {
if offset >= bytes.len() && has_sent_any {
None
} else {
let mut req = bazel_protos::bytestream::WriteRequest::new();
req.set_resource_name(resource_name.clone());
req.set_write_offset(offset as i64);
let next_offset = min(offset + chunk_size_bytes, bytes.len());
req.set_finish_write(next_offset == bytes.len());
req.set_data(Bytes::from(&bytes[offset..next_offset]));
Some(future::ok((
(req, grpcio::WriteFlags::default()),
(next_offset, true),
)))
}
},
);

sender
.send_all(stream)
.map(|_| ())
.or_else(move |e| {
match e {
// Some implementations of the remote execution API early-return if the blob has
// been concurrently uploaded by another client. In this case, they return a
// WriteResponse with a committed_size equal to the digest's entire size before
// closing the stream.
// Because the server then closes the stream, the client gets an RpcFinished
// error in this case. We ignore this, and will later on verify that the
// committed_size we received from the server is equal to the expected one. If
// these are not equal, the upload will be considered a failure at that point.
// Whether this type of response will become part of the official API is up for
// discussion: see
// https://groups.google.com/d/topic/remote-execution-apis/NXUe3ItCw68/discussion.
grpcio::Error::RpcFinished(None) => Ok(()),
e => Err(format!(
"Error attempting to upload digest {:?}: {:?}",
digest, e
)),
}
})
.compat()
.await?;
let stream = futures01::stream::unfold::<_, _, future::FutureResult<_, grpcio::Error>, _>(
(0, false),
move |(offset, has_sent_any)| {
if offset >= bytes.len() && has_sent_any {
None
} else {
let mut req = bazel_protos::bytestream::WriteRequest::new();
req.set_resource_name(resource_name.clone());
req.set_write_offset(offset as i64);
let next_offset = min(offset + chunk_size_bytes, bytes.len());
req.set_finish_write(next_offset == bytes.len());
req.set_data(Bytes::from(&bytes[offset..next_offset]));
Some(future::ok((
(req, grpcio::WriteFlags::default()),
(next_offset, true),
)))
}
},
);

let received = receiver
.map_err(move |e| {
format!(
"Error from server when uploading digest {:?}: {:?}",
sender
.send_all(stream)
.map(|_| ())
.or_else(move |e| {
match e {
// Some implementations of the remote execution API early-return if the blob has
// been concurrently uploaded by another client. In this case, they return a
// WriteResponse with a committed_size equal to the digest's entire size before
// closing the stream.
// Because the server then closes the stream, the client gets an RpcFinished
// error in this case. We ignore this, and will later on verify that the
// committed_size we received from the server is equal to the expected one. If
// these are not equal, the upload will be considered a failure at that point.
// Whether this type of response will become part of the official API is up for
// discussion: see
// https://groups.google.com/d/topic/remote-execution-apis/NXUe3ItCw68/discussion.
grpcio::Error::RpcFinished(None) => Ok(()),
e => Err(format!(
"Error attempting to upload digest {:?}: {:?}",
digest, e
)
})
.compat()
.await?;
)),
}
})
.compat()
.await?;

if received.get_committed_size() == len as i64 {
Ok(digest)
} else {
Err(format!(
"Uploading file with digest {:?}: want committed size {} but got {}",
digest,
len,
received.get_committed_size()
))
}
let received = receiver
.map_err(move |e| {
format!(
"Error from server when uploading digest {:?}: {:?}",
digest, e
)
})
.compat()
.await?;

if received.get_committed_size() == len as i64 {
Ok(digest)
} else {
Err(format!(
"Uploading file with digest {:?}: want committed size {} but got {}",
digest,
len,
received.get_committed_size()
))
}
})
.await;
}
});

if let Some(workunit_state) = workunit_store::get_workunit_state() {
workunit_state.store.complete_workunit(span_id)
let store = workunit_state.store;
with_workunit(store, workunit_name, metadata, result_future, |_, md| md).await
} else {
result_future.await
}

result
}

pub async fn load_bytes_with<
Expand All @@ -239,71 +230,62 @@ impl ByteStore {
digest.1
);
let workunit_name = format!("load_bytes_with({})", resource_name.clone());
let span_id = new_span_id();
if let Some(workunit_state) = workunit_store::get_workunit_state() {
let parent_id = workunit_state.parent_id;
let metadata = workunit_store::WorkunitMetadata::with_level(Level::Debug);
workunit_state
.store
.start_workunit(span_id.clone(), workunit_name, parent_id, metadata);
}

let result = self
.with_byte_stream_client(move |client| {
let resource_name = resource_name.clone();
let store = store.clone();
let f = f.clone();
async move {
let stream = client
.read_opt(
&{
let mut req = bazel_protos::bytestream::ReadRequest::new();
req.set_resource_name(resource_name.clone());
req.set_read_offset(0);
// 0 means no limit.
req.set_read_limit(0);
req
},
call_option(&store.headers, None)?,
)
.map_err(|err| format!("Error making CAS read request for {:?}: {:?}", digest, err))?;
let metadata = workunit_store::WorkunitMetadata::with_level(Level::Debug);
let result_future = self.with_byte_stream_client(move |client| {
let resource_name = resource_name.clone();
let store = store.clone();
let f = f.clone();
async move {
let stream = client
.read_opt(
&{
let mut req = bazel_protos::bytestream::ReadRequest::new();
req.set_resource_name(resource_name.clone());
req.set_read_offset(0);
// 0 means no limit.
req.set_read_limit(0);
req
},
call_option(&store.headers, None)?,
)
.map_err(|err| format!("Error making CAS read request for {:?}: {:?}", digest, err))?;

let bytes_res = stream
.fold(BytesMut::with_capacity(digest.1), move |mut bytes, r| {
bytes.extend_from_slice(&r.data);
future::ok::<_, grpcio::Error>(bytes)
})
.compat()
.await;
let bytes_res = stream
.fold(BytesMut::with_capacity(digest.1), move |mut bytes, r| {
bytes.extend_from_slice(&r.data);
future::ok::<_, grpcio::Error>(bytes)
})
.compat()
.await;

// We ensure that we hold onto the client until after we've consumed the stream as a
// workaround for https://github.com/pingcap/grpc-rs/issues/123
std::mem::drop(client);
// We ensure that we hold onto the client until after we've consumed the stream as a
// workaround for https://github.com/pingcap/grpc-rs/issues/123
std::mem::drop(client);

let maybe_bytes = match bytes_res {
Ok(bytes) => Some(bytes.freeze()),
Err(grpcio::Error::RpcFailure(grpcio::RpcStatus {
status: grpcio::RpcStatusCode::NOT_FOUND,
..
})) => None,
Err(e) => {
return Err(format!(
"Error from server in response to CAS read request: {:?}",
e
))
}
};
let maybe_bytes = match bytes_res {
Ok(bytes) => Some(bytes.freeze()),
Err(grpcio::Error::RpcFailure(grpcio::RpcStatus {
status: grpcio::RpcStatusCode::NOT_FOUND,
..
})) => None,
Err(e) => {
return Err(format!(
"Error from server in response to CAS read request: {:?}",
e
))
}
};

Ok(maybe_bytes.map(f))
}
})
.await;
Ok(maybe_bytes.map(f))
}
});

if let Some(workunit_state) = workunit_store::get_workunit_state() {
workunit_state.store.complete_workunit(span_id);
let store = workunit_state.store;
with_workunit(store, workunit_name, metadata, result_future, |_, md| md).await
} else {
result_future.await
}

result
}

///
Expand All @@ -319,18 +301,10 @@ impl ByteStore {
"list_missing_digests({})",
store.instance_name.clone().unwrap_or_default()
);
let span_id = new_span_id();
if let Some(workunit_state) = workunit_store::get_workunit_state() {
let parent_id = workunit_state.parent_id;
let metadata = workunit_store::WorkunitMetadata::with_level(Level::Debug);
workunit_state
.store
.start_workunit(span_id.clone(), workunit_name, parent_id, metadata);
}

async move {
let metadata = workunit_store::WorkunitMetadata::with_level(Level::Debug);
let result_future = async move {
let store2 = store.clone();
let res = store2
store2
.with_cas_client(move |client| {
let request = request.clone();
let store = store.clone();
Expand All @@ -350,11 +324,15 @@ impl ByteStore {
.collect::<Result<HashSet<_>, _>>()
}
})
.await;
.await
};
async {
if let Some(workunit_state) = workunit_store::get_workunit_state() {
workunit_state.store.complete_workunit(span_id);
let store = workunit_state.store;
with_workunit(store, workunit_name, metadata, result_future, |_, md| md).await
} else {
result_future.await
}
res
}
.boxed()
.compat()
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/process_executor/src/main.rs
Expand Up @@ -26,6 +26,7 @@
#![allow(clippy::new_without_default, clippy::new_ret_no_self)]
// Arc<Mutex> can be more clear than needing to grok Orderings:
#![allow(clippy::mutex_atomic)]
#![type_length_limit = "1076739"]

use clap::{value_t, App, AppSettings, Arg};
use futures::compat::Future01CompatExt;
Expand Down

0 comments on commit 9366ac9

Please sign in to comment.