From ab95fc230ec0b0a04fe5bf0a7ba56e58fb276c42 Mon Sep 17 00:00:00 2001 From: Tom Dyas Date: Thu, 30 Jun 2022 10:57:17 -0400 Subject: [PATCH] revert REAPI batch API PRs (#16016) Causes stack blowout due to the Future being too big. * Revert "[internal] test batch and stream API paths for load/store from remote store (#15996)" This reverts commit 2fa867199b76b51db87db389f3c018f241a04c1c. [ci skip-build-wheels] * Revert "[internal] use CAS BatchReadBlobs API for small blob reads (#15969)" This reverts commit 58748840eb671a27935d493951b496667babdde6. [ci skip-build-wheels] --- src/rust/engine/fs/store/src/remote.rs | 88 ++++--------------- src/rust/engine/fs/store/src/remote_tests.rs | 57 ------------ src/rust/engine/protos/src/conversions.rs | 8 -- .../engine/testutil/mock/src/cas_service.rs | 5 -- 4 files changed, 15 insertions(+), 143 deletions(-) diff --git a/src/rust/engine/fs/store/src/remote.rs b/src/rust/engine/fs/store/src/remote.rs index 42dacdff0fc..e4db49cc353 100644 --- a/src/rust/engine/fs/store/src/remote.rs +++ b/src/rust/engine/fs/store/src/remote.rs @@ -18,8 +18,8 @@ use protos::gen::build::bazel::remote::execution::v2 as remexec; use protos::gen::google::bytestream::byte_stream_client::ByteStreamClient; use remexec::{ capabilities_client::CapabilitiesClient, - content_addressable_storage_client::ContentAddressableStorageClient, BatchReadBlobsRequest, - BatchUpdateBlobsRequest, ServerCapabilities, + content_addressable_storage_client::ContentAddressableStorageClient, BatchUpdateBlobsRequest, + ServerCapabilities, }; use tonic::{Code, Request, Status}; use workunit_store::{in_workunit, ObservationMetric}; @@ -203,7 +203,16 @@ impl ByteStore { }) } - async fn len_is_allowed_for_batch_api(&self, len: usize) -> Result { + async fn store_bytes_source( + &self, + digest: Digest, + bytes: ByteSource, + ) -> Result<(), ByteStoreError> + where + ByteSource: Fn(Range) -> Bytes + Send + Sync + 'static, + { + let len = digest.size_bytes; + let max_batch_total_size_bytes = { let capabilities = self.get_capabilities().await?; @@ -217,25 +226,14 @@ impl ByteStore { let batch_api_allowed_by_local_config = len <= self.batch_api_size_limit; let batch_api_allowed_by_server_config = max_batch_total_size_bytes == 0 || len < max_batch_total_size_bytes; - Ok(batch_api_allowed_by_local_config && batch_api_allowed_by_server_config) - } - - async fn store_bytes_source( - &self, - digest: Digest, - bytes: ByteSource, - ) -> Result<(), ByteStoreError> - where - ByteSource: Fn(Range) -> Bytes + Send + Sync + 'static, - { - if self.len_is_allowed_for_batch_api(digest.size_bytes).await? { + if batch_api_allowed_by_local_config && batch_api_allowed_by_server_config { self.store_bytes_source_batch(digest, bytes).await } else { self.store_bytes_source_stream(digest, bytes).await } } - pub(crate) async fn store_bytes_source_batch( + async fn store_bytes_source_batch( &self, digest: Digest, bytes: ByteSource, @@ -259,7 +257,7 @@ impl ByteStore { Ok(()) } - pub(crate) async fn store_bytes_source_stream( + async fn store_bytes_source_stream( &self, digest: Digest, bytes: ByteSource, @@ -341,62 +339,6 @@ impl ByteStore { &self, digest: Digest, f: F, - ) -> Result, ByteStoreError> { - if self.len_is_allowed_for_batch_api(digest.size_bytes).await? { - self.load_bytes_with_batch(digest, f).await - } else { - self.load_bytes_with_stream(digest, f).await - } - } - - pub(crate) async fn load_bytes_with_batch< - T: Send + 'static, - F: Fn(Bytes) -> Result + Send + Sync + Clone + 'static, - >( - &self, - digest: Digest, - f: F, - ) -> Result, ByteStoreError> { - let request = BatchReadBlobsRequest { - instance_name: self.instance_name.clone().unwrap_or_default(), - digests: vec![digest.into()], - }; - let mut client = self.cas_client.as_ref().clone(); - let response = client - .batch_read_blobs(request) - .await - .map_err(ByteStoreError::Grpc)?; - - let response = response.into_inner(); - if response.responses.len() != 1 { - return Err(ByteStoreError::Other( - format!( - "Response from remote store for BatchReadBlobs API had inconsistent number of responses (got {}, expected 1)", - response.responses.len() - ) - )); - } - - let blob_response = response.responses.into_iter().next().unwrap(); - let rpc_status = blob_response.status.unwrap_or_default(); - let status = Status::from(rpc_status); - match status.code() { - Code::Ok => { - let result = f(blob_response.data); - result.map(Some).map_err(ByteStoreError::Other) - } - Code::NotFound => Ok(None), - _ => Err(ByteStoreError::Grpc(status)), - } - } - - pub(crate) async fn load_bytes_with_stream< - T: Send + 'static, - F: Fn(Bytes) -> Result + Send + Sync + Clone + 'static, - >( - &self, - digest: Digest, - f: F, ) -> Result, ByteStoreError> { let start = Instant::now(); let store = self.clone(); diff --git a/src/rust/engine/fs/store/src/remote_tests.rs b/src/rust/engine/fs/store/src/remote_tests.rs index 583c33c30ed..52363edc543 100644 --- a/src/rust/engine/fs/store/src/remote_tests.rs +++ b/src/rust/engine/fs/store/src/remote_tests.rs @@ -307,63 +307,6 @@ async fn list_missing_digests_error() { ); } -#[tokio::test] -async fn load_via_each_api() { - let _ = WorkunitStore::setup_for_tests(); - let cas = StubCAS::empty(); - - let data = TestData::roland(); - { - let mut blobs = cas.blobs.lock(); - blobs.insert(data.fingerprint(), data.bytes()); - } - - let store = new_byte_store(&cas); - let result_batch = store - .load_bytes_with_batch(data.digest(), |b| Ok(b)) - .await - .unwrap() - .unwrap(); - let result_stream = store - .load_bytes_with_stream(data.digest(), |b| Ok(b)) - .await - .unwrap() - .unwrap(); - assert_eq!(result_batch, data.bytes()); - assert_eq!(result_stream, data.bytes()); -} - -#[tokio::test] -async fn store_via_each_api() { - let _ = WorkunitStore::setup_for_tests(); - let cas = StubCAS::empty(); - - let data = TestData::roland(); - let store = new_byte_store(&cas); - - let bytes = data.bytes(); - let _ = store - .store_bytes_source_batch(data.digest(), move |r| bytes.slice(r)) - .await - .unwrap(); - { - let mut blobs = cas.blobs.lock(); - assert_eq!(*blobs.get(&data.digest().hash).unwrap(), data.bytes()); - blobs.clear(); - } - - let bytes = data.bytes(); - let _ = store - .store_bytes_source_stream(data.digest(), move |r| bytes.slice(r)) - .await - .unwrap(); - { - let mut blobs = cas.blobs.lock(); - assert_eq!(*blobs.get(&data.digest().hash).unwrap(), data.bytes()); - blobs.clear(); - } -} - fn new_byte_store(cas: &StubCAS) -> ByteStore { ByteStore::new( &cas.address(), diff --git a/src/rust/engine/protos/src/conversions.rs b/src/rust/engine/protos/src/conversions.rs index c7fb43da86e..1e385d8cf00 100644 --- a/src/rust/engine/protos/src/conversions.rs +++ b/src/rust/engine/protos/src/conversions.rs @@ -1,5 +1,3 @@ -use tonic::{Code, Status}; - impl<'a> From<&'a hashing::Digest> for crate::gen::build::bazel::remote::execution::v2::Digest { fn from(d: &'a hashing::Digest) -> Self { Self { @@ -53,9 +51,3 @@ pub fn require_digest< None => Err("Protocol violation: Digest missing from a Remote Execution API protobuf.".into()), } } - -impl From for Status { - fn from(rpc_status: crate::gen::google::rpc::Status) -> Self { - Status::new(Code::from_i32(rpc_status.code), rpc_status.message) - } -} diff --git a/src/rust/engine/testutil/mock/src/cas_service.rs b/src/rust/engine/testutil/mock/src/cas_service.rs index 8a95f41fbe2..de855a1ae36 100644 --- a/src/rust/engine/testutil/mock/src/cas_service.rs +++ b/src/rust/engine/testutil/mock/src/cas_service.rs @@ -446,11 +446,6 @@ impl ContentAddressableStorage for StubCASResponder { &self, request: Request, ) -> Result, Status> { - { - let mut request_count = self.read_request_count.lock(); - *request_count += 1; - } - check_auth!(self, request); if self.always_errors {