Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
tgolsson committed May 13, 2024
1 parent cad2ea4 commit 885784c
Showing 1 changed file with 82 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ use workunit_store::{Metric, ObservationMetric};

use remote_provider_traits::{ByteStoreProvider, LoadDestination, RemoteStoreOptions};

const RPC_DIGEST_SIZE: usize = 70;

pub struct Provider {
instance_name: Option<String>,
chunk_size_bytes: usize,
Expand Down Expand Up @@ -393,24 +395,49 @@ impl ByteStoreProvider for Provider {
&self,
digests: &mut (dyn Iterator<Item = Digest> + Send),
) -> Result<HashSet<Digest>, String> {
let request = remexec::FindMissingBlobsRequest {
instance_name: self.instance_name.as_ref().cloned().unwrap_or_default(),
blob_digests: digests.into_iter().map(|d| d.into()).collect::<Vec<_>>(),
};
let blob_digests = digests.into_iter().map(|d| d.into()).collect::<Vec<_>>();

let max_digests_per_request: usize = (4 * 1024 * 1024
- self
.instance_name
.as_ref()
.cloned()
.unwrap_or_default()
.len())
/ RPC_DIGEST_SIZE;

let client = self.cas_client.as_ref().clone();
let requests = blob_digests.chunks(max_digests_per_request).map(|digests| {
let msg = remexec::FindMissingBlobsRequest {
instance_name: self.instance_name.as_ref().cloned().unwrap_or_default(),
blob_digests: digests.to_vec(),
};

msg
});

workunit_store::increment_counter_if_in_workunit(Metric::RemoteStoreExistsAttempts, 1);
let result = retry_call(
client,
move |mut client, _| {
let request = request.clone();
async move { client.find_missing_blobs(request).await }
},
status_is_retryable,
)
.await
.map_err(status_to_str);

let client = self.cas_client.as_ref();
let futures = requests
.map(|request| {
let client = client.clone();
let result = retry_call(
client,
move |mut client, _| {
let request = request.clone();
async move { client.find_missing_blobs(request).await }
},
status_is_retryable,
);
result
})
.collect::<Vec<_>>();

let result = futures::future::join_all(futures)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()
.map_err(status_to_str);

let metric = match result {
Ok(_) => Metric::RemoteStoreExistsSuccesses,
Expand All @@ -421,10 +448,46 @@ impl ByteStoreProvider for Provider {
let response = result?;

response
.into_inner()
.missing_blob_digests
.iter()
.map(|digest| digest.try_into())
.into_iter()
.flat_map(|response| {
response
.into_inner()
.missing_blob_digests
.into_iter()
.map(|digest| digest.try_into())
})
.collect::<Result<HashSet<_>, _>>()
}
}

#[cfg(test)]
mod tests {

use super::RPC_DIGEST_SIZE;
use crate::remexec::FindMissingBlobsRequest;
use prost::Message;
use testutil::data::TestData;

#[test]
fn test_size_of_find_missing_blobs_request() {
let mut blobs = Vec::new();
let instance_name = "";
// NOTE[TSolberg]:
// This test is a bit of a hack, but it's the best way I could think of to ensure that the size of the
// FindMissingBlobsRequest is roughly what we expect. The only delta would be the encoding of the
for it in (0..10).into_iter().chain(1000..1010).chain(10000..10010) {
while blobs.len() < it {
blobs.push(TestData::roland().digest().into());
}

let request = FindMissingBlobsRequest {
instance_name: instance_name.to_string(),
blob_digests: blobs.clone(),
};

let size = request.encoded_len();

assert_eq!(size, RPC_DIGEST_SIZE * it);
}
}
}

0 comments on commit 885784c

Please sign in to comment.