New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use speculation for remote cache reads #11429
Use speculation for remote cache reads #11429
Conversation
# Building wheels and fs_util will be skipped. Delete if not intended. [ci skip-build-wheels]
# Building wheels and fs_util will be skipped. Delete if not intended. [ci skip-build-wheels]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Looks good. Just needs a test... one would be fine.
The tests were originally closer to integration tests, whereas we want high-quality, focused unit tests for both cache reads and cache writes. This will allow us to do further mocking for the sake of testing speculation in #11429.
This will allow us to test speculation, along with more generally testing tarpitting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great, thanks!
I don't think that testing the metric is a blocker from my perspective.
async fn cache_read_speculation() { | ||
WorkunitStore::setup_for_tests(); | ||
|
||
async fn run_process(local_delay_ms: u64, remote_delay_ms: u64, cache_hit: bool) -> (i32, usize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than using _ms
suffixed values integers, prefer to use Duration
, which you can create like Duration::from_secs
, Duration::from_millis
, etc: https://doc.rust-lang.org/std/time/struct.Duration.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried doing this but I think it made our test code more verbose and worse:
diff --git a/src/rust/engine/process_execution/src/remote_cache_tests.rs b/src/rust/engine/process_execution/src/remote_cache_tests.rs
index 28f8b055e..c1ee88110 100644
--- a/src/rust/engine/process_execution/src/remote_cache_tests.rs
+++ b/src/rust/engine/process_execution/src/remote_cache_tests.rs
@@ -36,7 +36,7 @@ impl MockLocalCommandRunner {
pub fn new(
exit_code: i32,
call_counter: Arc<AtomicUsize>,
- delay_ms: u64,
+ delay: Duration,
) -> MockLocalCommandRunner {
MockLocalCommandRunner {
result: Ok(FallibleProcessResultWithPlatform {
@@ -48,7 +48,7 @@ impl MockLocalCommandRunner {
platform: Platform::current().unwrap(),
}),
call_counter,
- delay: Duration::from_millis(delay_ms),
+ delay,
}
}
}
@@ -113,7 +113,7 @@ fn create_local_runner(
let local_runner = Box::new(MockLocalCommandRunner::new(
exit_code,
call_counter.clone(),
- delay_ms,
+ Duration::from_millis(delay_ms),
));
(local_runner, call_counter)
}
@@ -124,7 +124,7 @@ fn create_cached_runner(
read_delay_ms: u64,
eager_fetch: bool,
) -> (Box<dyn CommandRunnerTrait>, StubActionCache) {
- let action_cache = StubActionCache::new(read_delay_ms).unwrap();
+ let action_cache = StubActionCache::new(Duration::from_millis(read_delay_ms)).unwrap();
let runner = Box::new(
crate::remote_cache::CommandRunner::new(
local.into(),
@@ -503,7 +503,7 @@ async fn make_action_result_basic() {
.expect("Error saving directory");
let mock_command_runner = Arc::new(MockCommandRunner);
- let action_cache = StubActionCache::new(0).unwrap();
+ let action_cache = StubActionCache::new(Duration::from_millis(0)).unwrap();
let runner = crate::remote_cache::CommandRunner::new(
mock_command_runner.clone(),
ProcessMetadata::default(),
diff --git a/src/rust/engine/process_execution/src/remote_tests.rs b/src/rust/engine/process_execution/src/remote_tests.rs
index 2f5840042..6989874b7 100644
--- a/src/rust/engine/process_execution/src/remote_tests.rs
+++ b/src/rust/engine/process_execution/src/remote_tests.rs
@@ -1745,7 +1745,7 @@ async fn remote_workunits_are_stored() {
.file(&TestData::roland())
.directory(&TestDirectory::containing_roland())
.build();
- let action_cache = mock::StubActionCache::new(0).unwrap();
+ let action_cache = mock::StubActionCache::new(std::time::Duration::from_millis(0)).unwrap();
let (command_runner, _store) =
create_command_runner(action_cache.address(), &cas, Platform::Linux);
@@ -2301,7 +2301,8 @@ async fn extract_execute_response(
operation: Operation,
remote_platform: Platform,
) -> Result<RemoteTestResult, ExecutionError> {
- let action_cache = mock::StubActionCache::new(0).expect("failed to create action cache");
+ let action_cache =
+ mock::StubActionCache::new(Duration::from_millis(0)).expect("failed to create action cache");
let cas = mock::StubCAS::builder()
.file(&TestData::roland())
diff --git a/src/rust/engine/testutil/mock/src/action_cache.rs b/src/rust/engine/testutil/mock/src/action_cache.rs
index 234ce6327..1395acb61 100644
--- a/src/rust/engine/testutil/mock/src/action_cache.rs
+++ b/src/rust/engine/testutil/mock/src/action_cache.rs
@@ -135,13 +135,13 @@ impl ActionCache for ActionCacheResponder {
}
impl StubActionCache {
- pub fn new(read_delay_ms: u64) -> Result<Self, String> {
+ pub fn new(read_delay: Duration) -> Result<Self, String> {
let action_map = Arc::new(Mutex::new(HashMap::new()));
let always_errors = Arc::new(AtomicBool::new(false));
let responder = ActionCacheResponder {
action_map: action_map.clone(),
always_errors: always_errors.clone(),
- read_delay: Duration::from_millis(read_delay_ms),
+ read_delay,
};
let addr = "127.0.0.1:0"
Were this production code, I would agree to keep it more flexible.
Tom and I tried to get workunit testing added. I came up with this diff: diff --git a/src/rust/engine/process_execution/src/remote_cache_tests.rs b/src/rust/engine/process_execution/src/remote_cache_tests.rs
index 28f8b055e..252ecda6f 100644
--- a/src/rust/engine/process_execution/src/remote_cache_tests.rs
+++ b/src/rust/engine/process_execution/src/remote_cache_tests.rs
@@ -16,13 +16,14 @@ use store::{BackoffConfig, Store};
use tempfile::TempDir;
use testutil::data::{TestData, TestDirectory, TestTree};
use tokio::time::delay_for;
-use workunit_store::WorkunitStore;
+use workunit_store::{with_workunit, Metric, WorkunitMetadata, WorkunitStore};
use crate::remote::{ensure_action_stored_locally, make_execute_request};
use crate::{
CommandRunner as CommandRunnerTrait, Context, FallibleProcessResultWithPlatform,
MultiPlatformProcess, Platform, Process, ProcessMetadata,
};
+use itertools::Itertools;
/// A mock of the local runner used for better hermeticity of the tests.
#[derive(Clone)]
@@ -261,9 +262,12 @@ async fn cache_read_eager_fetch() {
#[tokio::test]
async fn cache_read_speculation() {
- WorkunitStore::setup_for_tests();
-
- async fn run_process(local_delay_ms: u64, remote_delay_ms: u64, cache_hit: bool) -> (i32, usize) {
+ async fn run_process(
+ local_delay_ms: u64,
+ remote_delay_ms: u64,
+ cache_hit: bool,
+ ) -> (i32, usize, u64, u64) {
+ let mut workunit_store = WorkunitStore::setup_for_tests();
let store_setup = StoreSetup::new();
let (local_runner, local_runner_call_counter) = create_local_runner(1, local_delay_ms);
let (cache_runner, action_cache) = create_cached_runner(
@@ -279,29 +283,70 @@ async fn cache_read_speculation() {
}
assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0);
- let remote_result = cache_runner
- .run(process.clone().into(), Context::default())
- .await
+ // NB: We use with_workunit so that we can test that we correctly incremented counters.
+ let remote_result = with_workunit(
+ workunit_store.clone(),
+ "speculation_test".to_string(),
+ WorkunitMetadata::with_level(log::Level::Info),
+ cache_runner.run(process.clone().into(), Context::default()),
+ |_, md| md,
+ )
+ .await
+ .unwrap();
+
+ let counters = workunit_store.with_latest_workunits(log::Level::Info, |_, completed| {
+ println!("{:?}", completed);
+ completed
+ .iter()
+ .filter_map(|workunit| {
+ if workunit.name == "speculation_test" {
+ Some(workunit.counters.clone())
+ } else {
+ None
+ }
+ })
+ .collect_vec()[0]
+ .clone()
+ });
+ let local_won_counter = *counters
+ .get(&Metric::RemoteCacheSpeculationLocalCompletedFirst)
+ .unwrap();
+ let remote_won_counter = *counters
+ .get(&Metric::RemoteCacheSpeculationRemoteCompletedFirst)
.unwrap();
let final_local_count = local_runner_call_counter.load(Ordering::SeqCst);
- (remote_result.exit_code, final_local_count)
+ (
+ remote_result.exit_code,
+ final_local_count,
+ local_won_counter,
+ remote_won_counter,
+ )
}
// Case 1: remote is faster than local.
- let (exit_code, local_call_count) = run_process(20, 0, true).await;
+ let (exit_code, local_call_count, local_won_counter, remote_won_counter) =
+ run_process(20, 0, true).await;
assert_eq!(exit_code, 0);
assert_eq!(local_call_count, 0);
+ assert_eq!(local_won_counter, 0);
+ assert_eq!(remote_won_counter, 1);
// Case 2: local is faster than remote.
- let (exit_code, local_call_count) = run_process(0, 20, true).await;
+ let (exit_code, local_call_count, local_won_counter, remote_won_counter) =
+ run_process(0, 20, true).await;
assert_eq!(exit_code, 1);
assert_eq!(local_call_count, 1);
+ assert_eq!(local_won_counter, 1);
+ assert_eq!(remote_won_counter, 0);
// Case 3: the remote lookup wins, but there is no cache entry so we fallback to local execution.
- let (exit_code, local_call_count) = run_process(20, 0, false).await;
+ let (exit_code, local_call_count, local_won_counter, remote_won_counter) =
+ run_process(20, 0, false).await;
assert_eq!(exit_code, 1);
assert_eq!(local_call_count, 1);
+ assert_eq!(local_won_counter, 0);
+ assert_eq!(remote_won_counter, 0);
}
#[tokio::test] But it doesn't work properly, the counters are empty for the
I time-boxed myself, so I'm going to move forward on this PR without getting the workunits tested. But lmk if something pops out. |
@@ -53,6 +60,7 @@ impl CommandRunnerTrait for MockLocalCommandRunner { | |||
_req: MultiPlatformProcess, | |||
_context: Context, | |||
) -> Result<FallibleProcessResultWithPlatform, String> { | |||
delay_for(self.delay).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using a delay to control execution of this function will be flaky. Please use a tokio::sync::Semaphore
to control the function's execution
Specifically, the MockLocalCommandRunner
should take an Arc<Semaphore>
and then just call .acquire
to obtain a "permit" from the semaphore. The test code should call add_permits
either before or after the call to run_process
to setup whether the .acquire
call blocks or succeeds immediately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm I tried doing this just now, but I have no idea how to do this via semaphore, given that cache_runner.run
possibly uses both the local runner and the StubActionCache. We want to ensure one of those happens before the other.
I was thinking of using the same Semaphore between MockLocalCommandRunner
and StubActionCache
, with only one permit, so that we can ensure only one happens at a time. But how do we influence which gains the lock first? I don't think we want to gain/drop the lock in the test code itself, i.e. before or after:
let remote_result = cache_runner
.run(process.clone().into(), Context::default())
.await
.unwrap();
All the interesting things happen inside that cache_runner.run()
.
Closes #11390.
Some processes are extremely quick to run—such as parsing a Python file's imports—and running locally can often be faster than reading from the remote cache.
We add counters so that we can get some insight on how much of a value-add remote caching is.
Unlike remote execution's speculation, we do not use any delay. It's very cheap to check the remote cache and it does not tie up workers, unlike remote execution.