New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fixes in async_semaphore for concurrency ID calculation #10436
Conversation
4a3fdd2
to
620e923
Compare
620e923
to
0887573
Compare
As an illustration of the changes this patch makes, here is some log output from pants running Old: New: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
There was also an issue with always waking up the task in the front position in the queue. In the case where a large number of processes were all scheduled at once, they might all call wake_by_id on the same waiting task, meaning that any other waiting tasks after the first would wouldn't get awoken until either the first task, or a newly-scheduled task, completed and woke up tasks in the queue beyond the first.
Good fix. But IMO, we should really, really switch to tokio semaphore. I'm fine with not doing it in this PR as long as you're confident in the tests.
@@ -158,7 +166,8 @@ impl Future for PermitFuture { | |||
// queue, so we don't have to waste time searching for it in the Drop | |||
// handler. | |||
self.waiter_id = None; | |||
Some(inner.available_permits) | |||
let id = inner.available_ids.pop_front().unwrap_or(0); // The unwrap_or case should never happen. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would probably be less error prone for this to either:
- replace the
inner.available_permits
counter entirely, so that we can be sure that we don't promise a permit unless there is anavailable_id
. - be decoupled from the semaphore and completely independent (such that it generated tokens lazily if it was empty, counting upward or something).
But if you trust that the test is sufficient, it's not a blocker per-se.
Problem
This is an attempt to fix an issue where the concurrency ID passed into processes executed under the
BoundedCommandRunner
is being limited to a smaller set of IDs than the limit of theBoundedCommandRunner
.We were previously using the (
usize
) value ofavailable_permits
on the inner, locked portion of theAsyncSemaphor
data structure thatBoundedCommandRunner
uses internally, as the value for the concurrency ID. However, this resulted in undesirable behavior where IDs passed to processes would count down from the local concurrency limit value (e.g. 16) to 1, and then never rise above 1 again. Every time a task finished, it would incrementavailable_permits
from 0 to 1, and then wake up the next process waiting on the queue, which would decrementavailable_permits
again when it began to run and use1
as its ID.There was also an issue with always waking up the task in the
front
position in the queue. In the case where a large number of processes were all scheduled at once, they might all callwake_by_id
on the same waiting task, meaning that any other waiting tasks after the first would wouldn't get awoken until either the first task, or a newly-scheduled task, completed and woke up tasks in the queue beyond the first.Solution
To solve the concurrency ID issue, we add a new queue data structure to
Inner
, that keeps track of discrete IDs. When a permit to run is issued (inPermitFuture::poll
), an ID is popped from the front of this queue, and when a task completes (inPermit::drop
) that ID is pushed onto the back of the queue. So, a limited number of IDs will always be cycling through theAsyncSemaphor
as long as it is running, and we won't see the behavior. This change also has the effect of causing IDs to count up from 1 rather than down from the local concurrency limit, which is only an implementation detail, since these IDs are meant to be opaque identifiers.When an asynchronous task completes, instead of calling
wake_by_ref()
on the waiter at the front of the queue, useavailable_permits
to index into thewaiters
queue and wake up a task. This ensures that each completing task will wake up at least one yet-unwoken waiting task (if there are any such waiting tasks), which will cause those waiting tasks to get scheduled immediately and take up a concurrency slot.Also, a debug logger with the concurrency ID is added in this commit, to facilitate debugging any further concurrency-ID-related issues we may run into.