Skip to content

Commit

Permalink
[internal] test batch and stream API paths for load/store from remote…
Browse files Browse the repository at this point in the history
… store (#15996)

Add tests that guarantee to invoke both the batch and stream API variants for loads and stores from a remote CAS. The other tests rely on the wrapper methods for load and store which select the API to use based on blob size. For full test coverage, both paths need to be tested, so make them `pub(crate)` (since the tests are in a sibling module) and invoke directly.

[ci skip-build-wheels]
  • Loading branch information
Tom Dyas committed Jun 29, 2022
1 parent 7382ad4 commit 2fa8671
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 4 deletions.
8 changes: 4 additions & 4 deletions src/rust/engine/fs/store/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ impl ByteStore {
}
}

async fn store_bytes_source_batch<ByteSource>(
pub(crate) async fn store_bytes_source_batch<ByteSource>(
&self,
digest: Digest,
bytes: ByteSource,
Expand All @@ -259,7 +259,7 @@ impl ByteStore {
Ok(())
}

async fn store_bytes_source_stream<ByteSource>(
pub(crate) async fn store_bytes_source_stream<ByteSource>(
&self,
digest: Digest,
bytes: ByteSource,
Expand Down Expand Up @@ -349,7 +349,7 @@ impl ByteStore {
}
}

async fn load_bytes_with_batch<
pub(crate) async fn load_bytes_with_batch<
T: Send + 'static,
F: Fn(Bytes) -> Result<T, String> + Send + Sync + Clone + 'static,
>(
Expand Down Expand Up @@ -390,7 +390,7 @@ impl ByteStore {
}
}

async fn load_bytes_with_stream<
pub(crate) async fn load_bytes_with_stream<
T: Send + 'static,
F: Fn(Bytes) -> Result<T, String> + Send + Sync + Clone + 'static,
>(
Expand Down
57 changes: 57 additions & 0 deletions src/rust/engine/fs/store/src/remote_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,63 @@ async fn list_missing_digests_error() {
);
}

#[tokio::test]
async fn load_via_each_api() {
let _ = WorkunitStore::setup_for_tests();
let cas = StubCAS::empty();

let data = TestData::roland();
{
let mut blobs = cas.blobs.lock();
blobs.insert(data.fingerprint(), data.bytes());
}

let store = new_byte_store(&cas);
let result_batch = store
.load_bytes_with_batch(data.digest(), |b| Ok(b))
.await
.unwrap()
.unwrap();
let result_stream = store
.load_bytes_with_stream(data.digest(), |b| Ok(b))
.await
.unwrap()
.unwrap();
assert_eq!(result_batch, data.bytes());
assert_eq!(result_stream, data.bytes());
}

#[tokio::test]
async fn store_via_each_api() {
let _ = WorkunitStore::setup_for_tests();
let cas = StubCAS::empty();

let data = TestData::roland();
let store = new_byte_store(&cas);

let bytes = data.bytes();
let _ = store
.store_bytes_source_batch(data.digest(), move |r| bytes.slice(r))
.await
.unwrap();
{
let mut blobs = cas.blobs.lock();
assert_eq!(*blobs.get(&data.digest().hash).unwrap(), data.bytes());
blobs.clear();
}

let bytes = data.bytes();
let _ = store
.store_bytes_source_stream(data.digest(), move |r| bytes.slice(r))
.await
.unwrap();
{
let mut blobs = cas.blobs.lock();
assert_eq!(*blobs.get(&data.digest().hash).unwrap(), data.bytes());
blobs.clear();
}
}

fn new_byte_store(cas: &StubCAS) -> ByteStore {
ByteStore::new(
&cas.address(),
Expand Down

0 comments on commit 2fa8671

Please sign in to comment.