Skip to content

Commit

Permalink
close write stream for CAS writes via ByteStream API (#10395)
Browse files Browse the repository at this point in the history
### Problem

The gRPC library used by the Rust engine cancels streams on drop; close must be called explicitly. BuildGrid's CAS daemon was [generating errors on writes](https://gitlab.com/BuildGrid/buildgrid/-/blob/master/buildgrid/server/cas/instance.py#L392)  via the ByteStream API due to the stream being cancelled. 

### Solution

Explicitly close the stream to the ByteStream Write RPC.

### Result

With this PR, I did not see the CAS write errors any more in the log of the BuildGrid CAS server.
  • Loading branch information
Tom Dyas committed Jul 20, 2020
1 parent 669b5f3 commit 1f6267f
Showing 1 changed file with 56 additions and 46 deletions.
102 changes: 56 additions & 46 deletions src/rust/engine/fs/store/src/remote.rs
Expand Up @@ -7,9 +7,10 @@ use std::time::Duration;

use bazel_protos::{self, call_option};
use bytes::{Bytes, BytesMut};
use futures::compat::Future01CompatExt;
use futures::compat::{Future01CompatExt, Sink01CompatExt};
use futures::future::{FutureExt, TryFutureExt};
use futures01::{future, Future, Sink, Stream};
use futures::sink::SinkExt;
use futures01::{future, Future, Stream};
use hashing::Digest;
use log::Level;
use serverset::{retry, Serverset};
Expand Down Expand Up @@ -134,53 +135,62 @@ impl ByteStore {
)
})?;

let mut sender = sender.sink_compat();

let chunk_size_bytes = store.chunk_size_bytes;
let resource_name = resource_name.clone();
let stream = futures01::stream::unfold::<_, _, future::FutureResult<_, grpcio::Error>, _>(
(0, false),
move |(offset, has_sent_any)| {
if offset >= bytes.len() && has_sent_any {
None
} else {
let mut req = bazel_protos::bytestream::WriteRequest::new();
req.set_resource_name(resource_name.clone());
req.set_write_offset(offset as i64);
let next_offset = min(offset + chunk_size_bytes, bytes.len());
req.set_finish_write(next_offset == bytes.len());
req.set_data(Bytes::from(&bytes[offset..next_offset]));
Some(future::ok((
(req, grpcio::WriteFlags::default()),
(next_offset, true),
)))
}
},
);
let mut stream = futures::stream::unfold((0, false), move |(offset, has_sent_any)| {
if offset >= bytes.len() && has_sent_any {
futures::future::ready(None)
} else {
let mut req = bazel_protos::bytestream::WriteRequest::new();
req.set_resource_name(resource_name.clone());
req.set_write_offset(offset as i64);
let next_offset = min(offset + chunk_size_bytes, bytes.len());
req.set_finish_write(next_offset == bytes.len());
req.set_data(Bytes::from(&bytes[offset..next_offset]));
futures::future::ready(Some((
Ok((req, grpcio::WriteFlags::default())),
(next_offset, true),
)))
}
});

sender
.send_all(stream)
.map(|_| ())
.or_else(move |e| {
match e {
// Some implementations of the remote execution API early-return if the blob has
// been concurrently uploaded by another client. In this case, they return a
// WriteResponse with a committed_size equal to the digest's entire size before
// closing the stream.
// Because the server then closes the stream, the client gets an RpcFinished
// error in this case. We ignore this, and will later on verify that the
// committed_size we received from the server is equal to the expected one. If
// these are not equal, the upload will be considered a failure at that point.
// Whether this type of response will become part of the official API is up for
// discussion: see
// https://groups.google.com/d/topic/remote-execution-apis/NXUe3ItCw68/discussion.
grpcio::Error::RpcFinished(None) => Ok(()),
e => Err(format!(
"Error attempting to upload digest {:?}: {:?}",
digest, e
)),
}
})
.compat()
.await?;
sender.send_all(&mut stream).await.or_else(move |e| {
match e {
// Some implementations of the remote execution API early-return if the blob has
// been concurrently uploaded by another client. In this case, they return a
// WriteResponse with a committed_size equal to the digest's entire size before
// closing the stream.
// Because the server then closes the stream, the client gets an RpcFinished
// error in this case. We ignore this, and will later on verify that the
// committed_size we received from the server is equal to the expected one. If
// these are not equal, the upload will be considered a failure at that point.
// Whether this type of response will become part of the official API is up for
// discussion: see
// https://groups.google.com/d/topic/remote-execution-apis/NXUe3ItCw68/discussion.
grpcio::Error::RpcFinished(None) => Ok(()),
e => Err(format!(
"Error attempting to upload digest {:?}: {:?}",
digest, e
)),
}
})?;

// The gRPC library cancels streams on drop; closes must be explicit. Not closing
// the stream caused the BuildGrid CAS server to generate errors on writes
// when the stream was cancelled because it was not closed explicitly.
sender.close().await.or_else(|err| {
match err {
// Some servers (e.g., RBE) may have already closed the stream for the early
// return reason identified previously. Treat this condition as a successful close.
grpcio::Error::RpcFinished(None) => Ok(()),
e => Err(format!(
"Error from server when uploading digest {:?}: {:?}",
digest, e
)),
}
})?;

let received = receiver
.map_err(move |e| {
Expand Down

0 comments on commit 1f6267f

Please sign in to comment.