Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use with_workunit everywhere #10300

Merged
merged 2 commits into from Jul 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions src/rust/engine/fs/brfs/src/main.rs
Expand Up @@ -26,6 +26,7 @@
#![allow(clippy::new_without_default, clippy::new_ret_no_self)]
// Arc<Mutex> can be more clear than needing to grok Orderings:
#![allow(clippy::mutex_atomic)]
#![type_length_limit = "1303884"]

use std::collections::hash_map::Entry::{Occupied, Vacant};
use std::collections::HashMap;
Expand Down
306 changes: 142 additions & 164 deletions src/rust/engine/fs/store/src/remote.rs
Expand Up @@ -13,7 +13,7 @@ use futures01::{future, Future, Sink, Stream};
use hashing::Digest;
use log::Level;
use serverset::{retry, Serverset};
use workunit_store::new_span_id;
use workunit_store::with_workunit;

use super::{BackoffConfig, EntryType};

Expand Down Expand Up @@ -119,107 +119,98 @@ impl ByteStore {
digest.1,
);
let workunit_name = format!("store_bytes({})", resource_name.clone());
let span_id = new_span_id();
if let Some(workunit_state) = workunit_store::get_workunit_state() {
let parent_id = workunit_state.parent_id;
let metadata = workunit_store::WorkunitMetadata::with_level(Level::Debug);
workunit_state
.store
.start_workunit(span_id.clone(), workunit_name, parent_id, metadata);
}
let metadata = workunit_store::WorkunitMetadata::with_level(Level::Debug);
let store = self.clone();
let result_future = self.with_byte_stream_client(move |client| {
let resource_name = resource_name.clone();
let store = store.clone();
async move {
let (sender, receiver) = client
.write_opt(call_option(&store.headers, None)?.timeout(store.upload_timeout))
.map_err(|err| {
format!(
"Error attempting to connect to upload digest {:?}: {:?}",
digest, err
)
})?;

let result = self
.with_byte_stream_client(move |client| {
let chunk_size_bytes = store.chunk_size_bytes;
let resource_name = resource_name.clone();
let store = store.clone();
async move {
let (sender, receiver) = client
.write_opt(call_option(&store.headers, None)?.timeout(store.upload_timeout))
.map_err(|err| {
format!(
"Error attempting to connect to upload digest {:?}: {:?}",
digest, err
)
})?;

let chunk_size_bytes = store.chunk_size_bytes;
let resource_name = resource_name.clone();
let stream = futures01::stream::unfold::<_, _, future::FutureResult<_, grpcio::Error>, _>(
(0, false),
move |(offset, has_sent_any)| {
if offset >= bytes.len() && has_sent_any {
None
} else {
let mut req = bazel_protos::bytestream::WriteRequest::new();
req.set_resource_name(resource_name.clone());
req.set_write_offset(offset as i64);
let next_offset = min(offset + chunk_size_bytes, bytes.len());
req.set_finish_write(next_offset == bytes.len());
req.set_data(Bytes::from(&bytes[offset..next_offset]));
Some(future::ok((
(req, grpcio::WriteFlags::default()),
(next_offset, true),
)))
}
},
);

sender
.send_all(stream)
.map(|_| ())
.or_else(move |e| {
match e {
// Some implementations of the remote execution API early-return if the blob has
// been concurrently uploaded by another client. In this case, they return a
// WriteResponse with a committed_size equal to the digest's entire size before
// closing the stream.
// Because the server then closes the stream, the client gets an RpcFinished
// error in this case. We ignore this, and will later on verify that the
// committed_size we received from the server is equal to the expected one. If
// these are not equal, the upload will be considered a failure at that point.
// Whether this type of response will become part of the official API is up for
// discussion: see
// https://groups.google.com/d/topic/remote-execution-apis/NXUe3ItCw68/discussion.
grpcio::Error::RpcFinished(None) => Ok(()),
e => Err(format!(
"Error attempting to upload digest {:?}: {:?}",
digest, e
)),
}
})
.compat()
.await?;
let stream = futures01::stream::unfold::<_, _, future::FutureResult<_, grpcio::Error>, _>(
(0, false),
move |(offset, has_sent_any)| {
if offset >= bytes.len() && has_sent_any {
None
} else {
let mut req = bazel_protos::bytestream::WriteRequest::new();
req.set_resource_name(resource_name.clone());
req.set_write_offset(offset as i64);
let next_offset = min(offset + chunk_size_bytes, bytes.len());
req.set_finish_write(next_offset == bytes.len());
req.set_data(Bytes::from(&bytes[offset..next_offset]));
Some(future::ok((
(req, grpcio::WriteFlags::default()),
(next_offset, true),
)))
}
},
);

let received = receiver
.map_err(move |e| {
format!(
"Error from server when uploading digest {:?}: {:?}",
sender
.send_all(stream)
.map(|_| ())
.or_else(move |e| {
match e {
// Some implementations of the remote execution API early-return if the blob has
// been concurrently uploaded by another client. In this case, they return a
// WriteResponse with a committed_size equal to the digest's entire size before
// closing the stream.
// Because the server then closes the stream, the client gets an RpcFinished
// error in this case. We ignore this, and will later on verify that the
// committed_size we received from the server is equal to the expected one. If
// these are not equal, the upload will be considered a failure at that point.
// Whether this type of response will become part of the official API is up for
// discussion: see
// https://groups.google.com/d/topic/remote-execution-apis/NXUe3ItCw68/discussion.
grpcio::Error::RpcFinished(None) => Ok(()),
e => Err(format!(
"Error attempting to upload digest {:?}: {:?}",
digest, e
)
})
.compat()
.await?;
)),
}
})
.compat()
.await?;

if received.get_committed_size() == len as i64 {
Ok(digest)
} else {
Err(format!(
"Uploading file with digest {:?}: want committed size {} but got {}",
digest,
len,
received.get_committed_size()
))
}
let received = receiver
.map_err(move |e| {
format!(
"Error from server when uploading digest {:?}: {:?}",
digest, e
)
})
.compat()
.await?;

if received.get_committed_size() == len as i64 {
Ok(digest)
} else {
Err(format!(
"Uploading file with digest {:?}: want committed size {} but got {}",
digest,
len,
received.get_committed_size()
))
}
})
.await;
}
});

if let Some(workunit_state) = workunit_store::get_workunit_state() {
workunit_state.store.complete_workunit(span_id)
let store = workunit_state.store;
with_workunit(store, workunit_name, metadata, result_future, |_, md| md).await
} else {
result_future.await
}

result
}

pub async fn load_bytes_with<
Expand All @@ -239,71 +230,62 @@ impl ByteStore {
digest.1
);
let workunit_name = format!("load_bytes_with({})", resource_name.clone());
let span_id = new_span_id();
if let Some(workunit_state) = workunit_store::get_workunit_state() {
let parent_id = workunit_state.parent_id;
let metadata = workunit_store::WorkunitMetadata::with_level(Level::Debug);
workunit_state
.store
.start_workunit(span_id.clone(), workunit_name, parent_id, metadata);
}

let result = self
.with_byte_stream_client(move |client| {
let resource_name = resource_name.clone();
let store = store.clone();
let f = f.clone();
async move {
let stream = client
.read_opt(
&{
let mut req = bazel_protos::bytestream::ReadRequest::new();
req.set_resource_name(resource_name.clone());
req.set_read_offset(0);
// 0 means no limit.
req.set_read_limit(0);
req
},
call_option(&store.headers, None)?,
)
.map_err(|err| format!("Error making CAS read request for {:?}: {:?}", digest, err))?;
let metadata = workunit_store::WorkunitMetadata::with_level(Level::Debug);
let result_future = self.with_byte_stream_client(move |client| {
let resource_name = resource_name.clone();
let store = store.clone();
let f = f.clone();
async move {
let stream = client
.read_opt(
&{
let mut req = bazel_protos::bytestream::ReadRequest::new();
req.set_resource_name(resource_name.clone());
req.set_read_offset(0);
// 0 means no limit.
req.set_read_limit(0);
req
},
call_option(&store.headers, None)?,
)
.map_err(|err| format!("Error making CAS read request for {:?}: {:?}", digest, err))?;

let bytes_res = stream
.fold(BytesMut::with_capacity(digest.1), move |mut bytes, r| {
bytes.extend_from_slice(&r.data);
future::ok::<_, grpcio::Error>(bytes)
})
.compat()
.await;
let bytes_res = stream
.fold(BytesMut::with_capacity(digest.1), move |mut bytes, r| {
bytes.extend_from_slice(&r.data);
future::ok::<_, grpcio::Error>(bytes)
})
.compat()
.await;

// We ensure that we hold onto the client until after we've consumed the stream as a
// workaround for https://github.com/pingcap/grpc-rs/issues/123
std::mem::drop(client);
// We ensure that we hold onto the client until after we've consumed the stream as a
// workaround for https://github.com/pingcap/grpc-rs/issues/123
std::mem::drop(client);

let maybe_bytes = match bytes_res {
Ok(bytes) => Some(bytes.freeze()),
Err(grpcio::Error::RpcFailure(grpcio::RpcStatus {
status: grpcio::RpcStatusCode::NOT_FOUND,
..
})) => None,
Err(e) => {
return Err(format!(
"Error from server in response to CAS read request: {:?}",
e
))
}
};
let maybe_bytes = match bytes_res {
Ok(bytes) => Some(bytes.freeze()),
Err(grpcio::Error::RpcFailure(grpcio::RpcStatus {
status: grpcio::RpcStatusCode::NOT_FOUND,
..
})) => None,
Err(e) => {
return Err(format!(
"Error from server in response to CAS read request: {:?}",
e
))
}
};

Ok(maybe_bytes.map(f))
}
})
.await;
Ok(maybe_bytes.map(f))
}
});

if let Some(workunit_state) = workunit_store::get_workunit_state() {
workunit_state.store.complete_workunit(span_id);
let store = workunit_state.store;
with_workunit(store, workunit_name, metadata, result_future, |_, md| md).await
} else {
result_future.await
}
Comment on lines 283 to 288
Copy link
Sponsor Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a followup, it might be useful to expose with_workunit as a static method or a macro that would do the equivalent of these 6 lines in one line.


result
}

///
Expand All @@ -319,18 +301,10 @@ impl ByteStore {
"list_missing_digests({})",
store.instance_name.clone().unwrap_or_default()
);
let span_id = new_span_id();
if let Some(workunit_state) = workunit_store::get_workunit_state() {
let parent_id = workunit_state.parent_id;
let metadata = workunit_store::WorkunitMetadata::with_level(Level::Debug);
workunit_state
.store
.start_workunit(span_id.clone(), workunit_name, parent_id, metadata);
}

async move {
let metadata = workunit_store::WorkunitMetadata::with_level(Level::Debug);
let result_future = async move {
let store2 = store.clone();
let res = store2
store2
.with_cas_client(move |client| {
let request = request.clone();
let store = store.clone();
Expand All @@ -350,11 +324,15 @@ impl ByteStore {
.collect::<Result<HashSet<_>, _>>()
}
})
.await;
.await
};
async {
if let Some(workunit_state) = workunit_store::get_workunit_state() {
workunit_state.store.complete_workunit(span_id);
let store = workunit_state.store;
with_workunit(store, workunit_name, metadata, result_future, |_, md| md).await
} else {
result_future.await
}
res
}
.boxed()
.compat()
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/process_executor/src/main.rs
Expand Up @@ -26,6 +26,7 @@
#![allow(clippy::new_without_default, clippy::new_ret_no_self)]
// Arc<Mutex> can be more clear than needing to grok Orderings:
#![allow(clippy::mutex_atomic)]
#![type_length_limit = "1076739"]

use clap::{value_t, App, AppSettings, Arg};
use futures::compat::Future01CompatExt;
Expand Down