enhancement(sinks): Adaptive Request Concurrency#3094
Conversation
Signed-off-by: Bruce Guenter <bruce@timber.io>
Signed-off-by: Bruce Guenter <bruce@timber.io>
Signed-off-by: Bruce Guenter <bruce@timber.io>
Signed-off-by: Bruce Guenter <bruce@timber.io>
Signed-off-by: Bruce Guenter <bruce@timber.io>
The implementation is complicated by the fact that we can only decrease concurrency by acquiring permits and forgetting them. If there aren't enough free permits, this requires waiting for them when polling for readiness. Signed-off-by: Bruce Guenter <bruce@timber.io>
Signed-off-by: Bruce Guenter <bruce@timber.io>
Signed-off-by: Bruce Guenter <bruce@timber.io>
Signed-off-by: Bruce Guenter <bruce@timber.io>
Signed-off-by: Bruce Guenter <bruce@timber.io>
Has issues that need correcting: * has recursive locking via `fn contract` and `fn expand` * adjusts on every measurement, not once per RTT * adjusts for any response, does not differentiate Signed-off-by: Bruce Guenter <bruce@timber.io>
Signed-off-by: Bruce Guenter <bruce@timber.io>
Signed-off-by: Bruce Guenter <bruce@timber.io>
Signed-off-by: Bruce Guenter <bruce@timber.io>
Signed-off-by: Bruce Guenter <bruce@timber.io>
Signed-off-by: Bruce Guenter <bruce@timber.io>
Signed-off-by: Bruce Guenter <bruce@timber.io>
I think the problem is that the Timeout layer needs to be inside the AutoConcurrencyLimit layer, but the Error type in the Timeout layer is a Box<Error> which differs from the RetryLogic Error type. Signed-off-by: Bruce Guenter <bruce@timber.io>
Signed-off-by: Bruce Guenter <bruce@timber.io>
Signed-off-by: Bruce Guenter <bruce@timber.io>
Signed-off-by: Bruce Guenter <bruce@timber.io>
Signed-off-by: Bruce Guenter <bruce@timber.io>
Signed-off-by: Bruce Guenter <bruce@timber.io>
Signed-off-by: Bruce Guenter <bruce@timber.io>
Signed-off-by: Bruce Guenter <bruce@timber.io>
Signed-off-by: Bruce Guenter <bruce@timber.io>
Signed-off-by: Bruce Guenter <bruce@timber.io>
Signed-off-by: Bruce Guenter <bruce@timber.io>
Signed-off-by: Bruce Guenter <bruce@timber.io>
| let mut inner = self.inner.lock().expect("Controller mutex is poisoned"); | ||
|
|
||
| #[cfg(test)] | ||
| let mut stats = self.stats.lock().expect("Stats mutex is poisoned"); |
There was a problem hiding this comment.
Might be useful to switch to try_lock here and in the rest of the places if possible - to avoid the effects of additional locking affecting test results.
There was a problem hiding this comment.
So, if we use lock, then the stats may not reflect how the sink will operate in practice because there is additional lock contention. If we use try_lock, then the stats may not reflect how the sink actually did operate because it sometimes didn't take the lock and so couldn't report stats, resulting in incomplete data. Either way, stats can't always reflect actual operation, which I agree is unfortunate. Using try_lock also requires additional conditionals when trying to report stats. I think I'll stick with lock unless there is another consideration.
Now, I can reorganize the code a bit to make the time the mutex is held even shorter, which will reduce the effect. Do you have any thoughts on whether it is better to take the mutex once and hold it for a bit longer in adjust_to_back_pressure, or to take it once for each place it is needed and drop it immediately?
There was a problem hiding this comment.
Yeah, I see the issue with the try_lock access to stats.
I had some thoughts on using lock-free ring buffers (channels?) to collect the stats for test purposes, and then to aggregate them separately. But it's very early - I'm still reading through the code and wrapping my head around what's going on.
A bit preliminary, but to me, implementing channel based mocks to tightly control the execution in a tight lock-step looks promising here. This would allow us to assert the states in all the right places, and eliminate the need for manual locking and associated race conditions.
The idea is we use a channel to send notifications that we reached a certain state to the test controlling logic along the way, and we then wait on another rx channel to continue. The test logic waits for a particular notification, does the assertion, and sends a message that lets the tested code continue the execution. We might even be able to use the exact values in the tests and make the tests deterministic.
Again, this is very preliminary - I'll have to take a deeper look if this has any actual benefits over what we have already - but I'm very curious to hear what are your thoughts on this.
There was a problem hiding this comment.
I had considered using channels for reporting test metrics, but couldn't see a path that was as straightforward as using locking. It is worth considering, though.
I see there being roughly 3 levels of testing: low level (does it take the steps that the code says it should), behavior (do the steps give it the expected high-level behavior), and performance (does it interoperate optimally with live servers). It sounds like you are describing some fairly low-level tests, covering some of the steps tested in auto_concurrency/service.rs using tokio's timers to manually advance steps. It might be worth covering other paths though.
There was a problem hiding this comment.
Yes, you're right, the approach I'm thinking about is just briefly above - and very similar to - tracing the code along it's execution in lock-step, and asserting that exactly what's expected happens.
I'm thinking of tokio timers as just one of the ways to advance.
Consider having state at the test end rather than as part of the struct, and only passing it into the Controller code via a channel. That channel will pass ownership and access to state, and it'll have to naturally be passed between the logical threads to mix the updates and assertions.
|
I ran tests locally and got some failures. The command I used was Output: log.txt UPD: if I do I encourage others to run the tests locally to gather more stats (literally). |
Signed-off-by: Bruce Guenter <bruce@timber.io>
|
Hmm, I can definitely confirm the stats issues you are having, as I am seeing them now too. When I added the tests, though, I ran them many times to ensure the boundaries were reasonable. While it would be good to track down what changed, and to adjust the boundaries to ensure tests now pass, this makes me wonder if these tests are reliable enough to keep around. |
Signed-off-by: Bruce Guenter <bruce@timber.io>
Signed-off-by: Bruce Guenter <bruce@timber.io>
This restores the previous defaults for the fixed concurrency limits, and enables the automatic limiter if `request.in_flight_limit = "auto"`. This fixes the self tests that were accidentally broken by commit 8605d64 changing the behavior of specifying an in_flight_limit number in the settings. Signed-off-by: Bruce Guenter <bruce@timber.io>
|
The issue is that commit 8605d64 switched the behavior of specifying a number for |
MOZGIII
left a comment
There was a problem hiding this comment.
Looks good overall! We might want to revisit tests though.
The previous cheap fix to make in_flight_limit an opt-in parameter did not properly keep the concurrency limit fixed. In the case of increasing RTT, it will still decrease the concurrency at some point, and so return to the variable concurrency behavior. This change completely turns off the varying mechanism when in_flight_limit is not auto. Signed-off-by: Bruce Guenter <bruce@timber.io>
Signed-off-by: Bruce Guenter <bruce@timber.io>
Signed-off-by: Bruce Guenter <bruce@timber.io>
Signed-off-by: Bruce Guenter <bruce@timber.io>
Signed-off-by: Bruce Guenter <bruce@timber.io>
Signed-off-by: Bruce Guenter <bruce@timber.io>
lukesteensen
left a comment
There was a problem hiding this comment.
Looks good! Excited to get this in and collect some real-world feedback.
Ideally these should run with exact timing, but I haven't worked that in yet. Signed-off-by: Bruce Guenter <bruce@timber.io>
* Simplify retry logic with `matches!` * Import base concurrency limiter from tower * Use new stub AutoConcurrencyLimit layer in service2-based sinks * Drop unused Load impl on AutoConcurrencyLimit * Adjust pub markers and drop more unused methods * Introduce Controller wrapper for the semaphore The implementation is complicated by the fact that we can only decrease concurrency by acquiring permits and forgetting them. If there aren't enough free permits, this requires waiting for them when polling for readiness. * Link the semaphore controller and start time into the response future * Add function to controller to calculate average RTT * Initial implementation of RTT-based concurrency adjustment * Limit adjustments to once per RTT * Introduce new EWMA abstraction * Wrap the shrinkable semaphore in its own module * Aggregate responses for each RTT interval * Handle back pressure through applying RetryLogic Signed-off-by: Bruce Guenter <bruce@timber.io> * Fixup ;) Signed-off-by: Ana Hobden <operator@hoverbear.org> * Add statistics-based behavior tests * Move creation of Metric from a Measurement into `src/event/metric.rs` * Move `get_controller` and `capture_metrics` into src/metrics.rs * Introduce an `assert_within` macro for range matches * Add and test internal metrics for the controller * Add get_ref and get_mut methods to access internals of our service layers * Make test_util into a module * Update Cargo for new get_ref methods in tokio03 * Enhance the test_util stats * Make the automatic limit opt-in by default This restores the previous defaults for the fixed concurrency limits, and enables the automatic limiter if `request.in_flight_limit = "auto"`. * Only run the tests on unix systems, due to timing variability problems Ideally these should run with exact timing, but I haven't worked that in yet. Signed-off-by: Bruce Guenter <bruce@timber.io> Co-authored-by: Ana Hobden <operator@hoverbear.org>
* Simplify retry logic with `matches!` * Import base concurrency limiter from tower * Use new stub AutoConcurrencyLimit layer in service2-based sinks * Drop unused Load impl on AutoConcurrencyLimit * Adjust pub markers and drop more unused methods * Introduce Controller wrapper for the semaphore The implementation is complicated by the fact that we can only decrease concurrency by acquiring permits and forgetting them. If there aren't enough free permits, this requires waiting for them when polling for readiness. * Link the semaphore controller and start time into the response future * Add function to controller to calculate average RTT * Initial implementation of RTT-based concurrency adjustment * Limit adjustments to once per RTT * Introduce new EWMA abstraction * Wrap the shrinkable semaphore in its own module * Aggregate responses for each RTT interval * Handle back pressure through applying RetryLogic Signed-off-by: Bruce Guenter <bruce@timber.io> * Fixup ;) Signed-off-by: Ana Hobden <operator@hoverbear.org> * Add statistics-based behavior tests * Move creation of Metric from a Measurement into `src/event/metric.rs` * Move `get_controller` and `capture_metrics` into src/metrics.rs * Introduce an `assert_within` macro for range matches * Add and test internal metrics for the controller * Add get_ref and get_mut methods to access internals of our service layers * Make test_util into a module * Update Cargo for new get_ref methods in tokio03 * Enhance the test_util stats * Make the automatic limit opt-in by default This restores the previous defaults for the fixed concurrency limits, and enables the automatic limiter if `request.in_flight_limit = "auto"`. * Only run the tests on unix systems, due to timing variability problems Ideally these should run with exact timing, but I haven't worked that in yet. Signed-off-by: Bruce Guenter <bruce@timber.io> Co-authored-by: Ana Hobden <operator@hoverbear.org> Signed-off-by: Kirill Fomichev <fanatid@ya.ru>
* Simplify retry logic with `matches!` * Import base concurrency limiter from tower * Use new stub AutoConcurrencyLimit layer in service2-based sinks * Drop unused Load impl on AutoConcurrencyLimit * Adjust pub markers and drop more unused methods * Introduce Controller wrapper for the semaphore The implementation is complicated by the fact that we can only decrease concurrency by acquiring permits and forgetting them. If there aren't enough free permits, this requires waiting for them when polling for readiness. * Link the semaphore controller and start time into the response future * Add function to controller to calculate average RTT * Initial implementation of RTT-based concurrency adjustment * Limit adjustments to once per RTT * Introduce new EWMA abstraction * Wrap the shrinkable semaphore in its own module * Aggregate responses for each RTT interval * Handle back pressure through applying RetryLogic Signed-off-by: Bruce Guenter <bruce@timber.io> * Fixup ;) Signed-off-by: Ana Hobden <operator@hoverbear.org> * Add statistics-based behavior tests * Move creation of Metric from a Measurement into `src/event/metric.rs` * Move `get_controller` and `capture_metrics` into src/metrics.rs * Introduce an `assert_within` macro for range matches * Add and test internal metrics for the controller * Add get_ref and get_mut methods to access internals of our service layers * Make test_util into a module * Update Cargo for new get_ref methods in tokio03 * Enhance the test_util stats * Make the automatic limit opt-in by default This restores the previous defaults for the fixed concurrency limits, and enables the automatic limiter if `request.in_flight_limit = "auto"`. * Only run the tests on unix systems, due to timing variability problems Ideally these should run with exact timing, but I haven't worked that in yet. Signed-off-by: Bruce Guenter <bruce@timber.io> Co-authored-by: Ana Hobden <operator@hoverbear.org> Signed-off-by: Brian Menges <brian.menges@anaplan.com>
Here is my implementation of the automatic concurrency management, which is in use in all "service2" based sinks (which appears to be everything except prometheus and aws_cloudwatch_logs).
I will continue to work on the tests started in
auto_concurrency/service.rs, but otherwise I believe this is feature complete.Closes #2529