New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Looping request with backoff period #5714

Merged
merged 20 commits into from Apr 25, 2018

Conversation

Projects
None yet
5 participants
@wisechengyi
Copy link
Contributor

wisechengyi commented Apr 17, 2018

Addresses #5503

// TODO: Add a timeout of some kind.
// https://github.com/pantsbuild/pants/issues/5504

.and_then(move |(execute_request, operation)| {
future::loop_fn(operation, move |operation| {
match extract_execute_response(operation) {
Ok(value) => {
// backoff_per_operation.remove(operation.clone().get_name());

This comment has been minimized.

@wisechengyi

wisechengyi Apr 17, 2018

Contributor

the goal is to remove the backoff period tracking once the operation is Oked or Erred, but I'm getting

error[E0382]: use of moved value: `operation`
   --> process_execution/src/remote.rs:103:49
    |
101 |               match extract_execute_response(operation) {
    |                                              --------- value moved here
102 |                 Ok(value) => {
103 |                    backoff_per_operation.remove(operation.clone().get_name());
    |                                                 ^^^^^^^^^ value used here after move
    |
    = note: move occurs because `operation` has type `bazel_protos::operations::Operation`, which does not implement the `Copy` trait

wonder if there's a good way to access operation.name in the context.

This comment has been minimized.

@wisechengyi

wisechengyi Apr 17, 2018

Contributor

or maybe not so important to reclaim the memory if run_command_remote's context is small enough?

This comment has been minimized.

@illicitonion

illicitonion Apr 17, 2018

Contributor

If you wanted to do this, what I'd suggest is adding a let operation_name = operation.get_name().clone(); on the line before the match, and using operation_name here; the problem is that extract_execute_response takes ownership of the Operation's memory. An alternative would be to make extract_execute_response return a Ok((name, value)) instead of just an Ok(value) which you could extract from the match.

@wisechengyi wisechengyi requested a review from illicitonion Apr 17, 2018

@illicitonion
Copy link
Contributor

illicitonion left a comment

Looks good, thanks for putting this together! I think we can avoid the HashMap entirely, which would probably be worthwhile :)

@dgassaway What kind of polling interval do you think we can get away with for GetOperation requests?
This currently implements subsequent requests having intervals of: 0.5s, 1s, 1.5s, ... 4s, 4.5s, 5s, 5s, 5s, ...
I think aiming for something like: 100ms x10, 1s x10, 2s x10 5s, 5s, 5s, ... would probably be a decent balance of responsiveness. WDYT?

@@ -76,6 +80,9 @@ impl CommandRunner {
let store = self.store.clone();
let execute_request_result = make_execute_request(&req);

// A map that stores how much time an operation has waited last time
let mut backoff_per_operation: HashMap<String, u64> = HashMap::new();

This comment has been minimized.

@illicitonion

illicitonion Apr 17, 2018

Contributor

If we want to keep this information in a HashMap, it's going to need to be an Arc<Mutex<HashMap>> which we clone as we move into closures, so that the modifications are guarded by a Mutex which makes them threadsafe. The idea here is that the Mutex means it's safe for modification, the Arc means it's shareable across threads, and the clone() means that we can have several threads each having a reference to it.

However, I think there's something neater we can do :) Right now, the information we pass around our loop (the first argument to future::loop_fn, and then argument we call future::Loop::Continue with) is an Operation. If we made it a tuple: (Operation, usize) where the usize is the iteration number, that would avoid this central syncronisation point, and just calculate the expected delay based on the iteration number. (Alternatively, we could use an (Operation, Duration) tuple to pass around the "next wait Duration", if we wanted to just be increasing the Duration value, rather than implementing "calculate duration from iteration" logic). What do you think?

This comment has been minimized.

@wisechengyi

wisechengyi Apr 17, 2018

Contributor

Yeah I was trying to find a way to pass the iter number around, so thanks for the pointer! It's also nice not having to clean up the hashmap when futures are finished.

This comment has been minimized.

@wisechengyi

wisechengyi Apr 20, 2018

Contributor

Implemented.

#[test]
fn wait_between_request() {
// wait at least 500 milli for one retry
{

This comment has been minimized.

@illicitonion

illicitonion Apr 17, 2018

Contributor

Rather than making separate scopes within the one test, let's just make these two separate tests

// TODO: Add a timeout of some kind.
// https://github.com/pantsbuild/pants/issues/5504

.and_then(move |(execute_request, operation)| {
future::loop_fn(operation, move |operation| {
match extract_execute_response(operation) {
Ok(value) => {
// backoff_per_operation.remove(operation.clone().get_name());

This comment has been minimized.

@illicitonion

illicitonion Apr 17, 2018

Contributor

If you wanted to do this, what I'd suggest is adding a let operation_name = operation.get_name().clone(); on the line before the match, and using operation_name here; the problem is that extract_execute_response takes ownership of the Operation's memory. An alternative would be to make extract_execute_response return a Ok((name, value)) instead of just an Ok(value) which you could extract from the match.

};
let start_time = SystemTime::now();
let result = run_command_remote(&mock_server.address(), execute_request).unwrap();
assert!(start_time.elapsed().unwrap() >= Duration::from_millis(3000));

This comment has been minimized.

@illicitonion

illicitonion Apr 17, 2018

Contributor

It could be nice to make the mock server record the time it gets requests, and assert on the gaps between them, rather than counting the total time. But I can also believe it's not worth the complexity :)

// Each time increment wait time by default amount, but capped by max wait time.
backoff_per_operation.insert(operation_name.clone(), min(max_wait, backoff_period + default));

thread::sleep(Duration::from_millis(backoff_period.clone()));

This comment has been minimized.

@stuhood

stuhood Apr 17, 2018

Member

Using thread::sleep in an async context is a no-no. This needs to be using a Future-aware timing library... probably this one: https://tokio.rs/blog/2018-03-timers/

This comment has been minimized.

@wisechengyi

wisechengyi Apr 20, 2018

Contributor

tokio-timer = "0.2.0" or "0.2.1" gives:

error: failed to select a version for `futures` (required by `tokio-timer`):
all possible versions conflict with previously selected versions of `futures`
  version 0.1.18 in use by futures v0.1.18
  possible versions to select: 0.1.21, 0.1.20

Does that mean we need to bump future library first?

This comment has been minimized.

@illicitonion

illicitonion Apr 20, 2018

Contributor

I suspect that if you delete the Cargo.lock file, everything will Just Work.

Maybe also try making futures specify version 0.1.21 in every Cargo.toml file recrusively in src/rust/engine

@dgassaway

This comment has been minimized.

Copy link

dgassaway commented Apr 17, 2018

@illicitonion I'd start with a a longer interval and go exponential or near to it - it's doubtful much will be done within a 100ms window, and pinging at that interval is probably too aggressive. I'd start ~1s and start backing off from there, maybe even 1s, 2s x2, 5s. This is something we'll have to feel out but I prefer to start conservatively.

@illicitonion

This comment has been minimized.

Copy link
Contributor

illicitonion commented Apr 17, 2018

@dgassaway In that case, the timings currently written in this PR sound good as a starting point :) Thanks!

@wisechengyi wisechengyi force-pushed the wisechengyi:remote branch from aef0301 to 9d044e1 Apr 20, 2018

@@ -105,8 +108,8 @@ impl CommandRunner {
// TODO: Add a timeout of some kind.
// https://github.com/pantsbuild/pants/issues/5504

.and_then(move |(execute_request, operation)| {
future::loop_fn(operation, move |operation| {
.and_then(move |(execute_request, operation, iter_num)| {

This comment has been minimized.

@illicitonion

illicitonion Apr 20, 2018

Contributor

Nit: No need to pass iter_num across the futures, can just use 0 in the tuple in the first arg to loop_fn

@@ -134,19 +137,24 @@ impl CommandRunner {
)
)
})
.map(|operation| future::Loop::Continue(operation))
.map(|operation| future::Loop::Continue((operation, 0))) // iter_num does not matter for `MissingDigests`

This comment has been minimized.

@illicitonion

illicitonion Apr 20, 2018

Contributor

I'd maybe phrase this as "Reset iter_count on MissingDigest" rather than that it does not matter


let max_wait = 5000;
let backoff_period = min(max_wait, ((1 + iter_num) * 500));
thread::sleep(Duration::from_millis(backoff_period.clone()));

This comment has been minimized.

@illicitonion

illicitonion Apr 20, 2018

Contributor

I don't think that clone should be necessary - what happens if you remove it?

@wisechengyi

This comment has been minimized.

Copy link
Contributor

wisechengyi commented Apr 23, 2018

Getting an error on

thread 'remote::tests::wait_between_request_1_retry' panicked at 'called `Result::unwrap()` on an `Err` value: "timer is shutdown"', src/libcore/result.rs:916:5

at https://github.com/pantsbuild/pants/pull/5714/files#diff-6facb150e905df7a00a027527582fadaR164

According to the doc, this means the timer has been dropped. https://github.com/tokio-rs/tokio/blob/372400ed34a04428ee4e16d53a5d77cda40ee771/tokio-timer/src/error.rs#L10-L11

How can I prevent it from being dropped?

@stuhood

This comment has been minimized.

Copy link
Member

stuhood commented Apr 23, 2018

. @wisechengyi and I talked offline: I expect that that is tokio being badly behaved and "assuming" that a tokio runtime has been started. We looked around, and it looks like there is another simple futures_timer library that doesn't have the tokio dependency. Should try that instead.

@baroquebobcat

This comment has been minimized.

Copy link
Contributor

baroquebobcat commented Apr 23, 2018

That looks like a handy library @stuhood. I might also want something like it for the timeout patch I've got.

wisechengyi added some commits Apr 24, 2018

Revert "dep bump"
This reverts commit 12b4c7a.
fmt
@wisechengyi

This comment has been minimized.

Copy link
Contributor

wisechengyi commented Apr 24, 2018

This is good for review.

I will make another PR to follow up:

It could be nice to make the mock server record the time it gets requests, and assert on the gaps between them, rather than counting the total time. But I can also believe it's not worth the complexity :)

@illicitonion
Copy link
Contributor

illicitonion left a comment

Looks great! A couple of clean-up comments and we're good to go :)

(try_future!(grpc_result), iter_num + 1)))
.to_boxed() as BoxFuture<_, _>
}
Err(e) => future::err(e.to_string()).to_boxed()

This comment has been minimized.

@illicitonion

illicitonion Apr 24, 2018

Contributor

Let's adorn the error with a little more information here; rather than e.to_string() let's do something like format!("Error from Future Delay when polling for execution result for operation {}: {}", operation_name, e)

This comment has been minimized.

@wisechengyi

wisechengyi Apr 25, 2018

Contributor

Done.

This comment has been minimized.

@wisechengyi

wisechengyi Apr 25, 2018

Contributor

Also had to simplify because fmt isn't happy with long strings.

future::Loop::Continue(
try_future!(grpc_result))).to_boxed() as BoxFuture<_, _>
Delay::new(Duration::from_millis(backoff_period))
.then(move |res| {

This comment has been minimized.

@illicitonion

illicitonion Apr 24, 2018

Contributor

Rather than using an unconditional .then, with a match in it:

.then(|res| match res {
  Ok(_) => ...,
  Err(e) => ...,
)

it may be a little cleaner to use map/map_err. Because our success case may fail (that's what the try_future! is doing - returning a failure case if grpc_result is an error), we need to use and_then rather than map:

Delay::new(...)
  .map_err(|err| err.to_string())
  .and_then(|_| future::ok(future::Loop::Continue((try_future!(grpc_result), iter_num + 1))).to_boxed() as BoxFuture<_, _>)

Then, noting that Future::and_then accepts something which returns an IntoFuture, we can actually just return a Result, and not need to worry about all of the Future-specific nonsense of try_future!, to_boxed() and so on:

Delay::new(...)
  .map_err(|err| err.to_string())
  .and_then(|_| Ok(future::Loop::Continue((grpc_result?, iter_num + 1))))

This comment has been minimized.

@wisechengyi

wisechengyi Apr 25, 2018

Contributor
  1. Used the new pattern.

  .and_then(|_| Ok(future::Loop::Continue((grpc_result?, iter_num + 1))))

doesn't seem to work, so left try_future! untouched.

@stuhood
Copy link
Member

stuhood left a comment

Thanks.

...it should have occurred to be earlier, but introducing a crate that explicitly supports retrying might have been a better step. But it can happen in a followup. https://docs.rs/tokio-retry/0.2.0/tokio_retry/

.to_boxed()
},
ExecutionError::NotFinished(operation_name) => {
let mut operation_request =
bazel_protos::operations::GetOperationRequest::new();
operation_request.set_name(operation_name);

let max_wait = 5000;

This comment has been minimized.

@stuhood

stuhood Apr 24, 2018

Member

This should be extracted into a constant.

This comment has been minimized.

@wisechengyi

wisechengyi Apr 25, 2018

Contributor

Done.

.to_boxed()
},
ExecutionError::NotFinished(operation_name) => {
let mut operation_request =
bazel_protos::operations::GetOperationRequest::new();
operation_request.set_name(operation_name);

let max_wait = 5000;
let backoff_period = min(max_wait, ((1 + iter_num) * 500));

This comment has been minimized.

@stuhood

stuhood Apr 24, 2018

Member

And the 500 here.

This comment has been minimized.

@wisechengyi

wisechengyi Apr 25, 2018

Contributor

Done.

@wisechengyi

This comment has been minimized.

Copy link
Contributor

wisechengyi commented Apr 25, 2018

looks like rustfmt isn't happy with the code that I did not touch due to recent fmt changes. Shall I put up a separate review first to do a format first then land this? @stuhood

@illicitonion
Copy link
Contributor

illicitonion left a comment

Looks great, thanks!

When I ran rustfmt locally, it was just telling me you need to remove the newline in the middle of

 impl CommandRunner {

   const BACKOFF_INCR_WAIT_MILLIS: u64 = 500;

If you're seeing something significantly different, it's possible you have an old version of rustfmt confusing things. Try deleting ${HOME}/.cache/pants/rust/cargo/{.crates.toml,bin}

@wisechengyi

This comment has been minimized.

Copy link
Contributor

wisechengyi commented Apr 25, 2018

Indeed. Thanks for the tip, @illicitonion !

@wisechengyi wisechengyi merged commit f46be3a into pantsbuild:master Apr 25, 2018

1 check passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details

@wisechengyi wisechengyi deleted the wisechengyi:remote branch Apr 25, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment