Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamically choose per-process concurrency for supported processes #14184

Merged
merged 6 commits into from Jan 24, 2022

Conversation

stuhood
Copy link
Sponsor Member

@stuhood stuhood commented Jan 18, 2022

When tools support internal concurrency and cannot be partitioned (either because they don't support it, such as in the case of a PEX resolve, or because of overhead to partitioning as fine-grained as desired), Pants' own concurrency currently makes it ~impossible for them to set their concurrency settings correctly.

As sketched in #9964, this change adjusts Pants' local runner to dynamically choose concurrency values per process based on the current concurrency.

  1. When acquiring a slot on the bounded::CommandRunner, a process takes as much concurrency as it a) is capable of, as configured by a new Process.concurrency_available field, b) deserves for the purposes of a fairness (i.e. half, for two processes). This results in some amount of over-commit.
  2. Periodically, a balancing task runs and preempts/re-schedules processes which have been running for less than a very short threshold (200ms currently) and which are the largest contributors to over/under-commit. This fixes some over/under-commit, but not all of it, because if a process becomes over/under-committed (because other processes started or finished) after it has been running a while, we will not preempt it.

Combined with #14186, this change results in an additional 2% speedup for lint and fmt. But it should also have a positive impact on PEX processes, which were the original motivation for #9964.

Fixes #9964.

[ci skip-build-wheels]

@stuhood stuhood force-pushed the stuhood/process-concurrency branch 3 times, most recently from e6280fc to 22b5198 Compare January 19, 2022 22:24
@stuhood stuhood requested review from illicitonion, tdyas, jsirois and Eric-Arellano and removed request for illicitonion January 19, 2022 23:17
@stuhood stuhood marked this pull request as ready for review January 19, 2022 23:18
@stuhood
Copy link
Sponsor Member Author

stuhood commented Jan 19, 2022

This is ready for review. Commits are useful to look at independently: in particular, the first commit is pure reorganization, with no semantic changes.

Copy link
Contributor

@Eric-Arellano Eric-Arellano left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neat!

Do you know how this will impact cache keys with the argv no longer being deterministic? Iiuc, the local cache CommandRunner doesn't know about bounded.rs and simply sees the argv has changed.

#[tokio::test]
async fn balance_overcommitted() {
// Preempt the first Task and give it one slot, without adjusting the second task.
test_balance(2, 1, vec![(2, 2, 1), (1, 1, 1)]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this be right?

test_balance(1, 1, vec![(1, 1, 1), (1, 1, 0)]);

If so, maybe worth adding.

Copy link
Sponsor Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this be right?

It wouldn't be, because we will never give a process less than 1 concurrency slot. And because we only calculate concurrency once we have a slot on the semaphore, having more processes than there is concurrency available (i.e. #processes > #permits) shouldn't be possible.

#[tokio::test]
async fn balance_undercommitted() {
// Should preempt both Tasks to give them more concurrency.
test_balance(4, 2, vec![(2, 1, 2), (2, 1, 2)]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And is this right?

test_balance(1, 1, vec!([1, 0, 1)])

Copy link
Sponsor Member Author

@stuhood stuhood Jan 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above: a process shouldn't ever have 0. The minimum that we will give is 1.

Comment on lines 86 to 121
// TODO: Both of these templating cases should be implemented at the lowest possible level:
// they might currently be applied above a cache.
if let Some(ref execution_slot_env_var) = process.execution_slot_variable {
process.env.insert(
execution_slot_env_var.clone(),
format!("{}", permit.concurrency_slot()),
);
}
if process.concurrency_available > 0 {
let concurrency = format!("{}", permit.concurrency());
let mut matched = false;
process.argv = std::mem::take(&mut process.argv)
.into_iter()
.map(
|arg| match CONCURRENCY_TEMPLATE_RE.replace_all(&arg, &concurrency) {
Cow::Owned(altered) => {
matched = true;
altered
}
Cow::Borrowed(_original) => arg,
},
)
.collect();
if !matched {
return Err(format!("Process {} set `concurrency_available={}`, but did not include the `{}` template variable in its arguments.", process.description, process.concurrency_available, *CONCURRENCY_TEMPLATE_RE));
}
}

let running_process = self.inner.run(context.clone(), workunit, process.clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to understand why it's safe to repeatedly mutate the Process inside loop. Naively, I would think that would cause the Process to restart because it's a new argv -- but iiuc we're only trying to preempt/pause it, not restart.

Copy link
Sponsor Member Author

@stuhood stuhood Jan 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mutation that is happening here is of a copy of the Process struct that is created on line 77: so each iteration of the loop effectively gets a fresh copy of the original argument to this function.

Naively, I would think that would cause the Process to restart because it's a new argv -- but iiuc we're only trying to preempt/pause it, not restart.

So, the terminology might be confusing here: the effect of this loop is that we will restart the process if permit.notified_concurrency_changed(). We cannot affect a process once it is already running, and it's why we have the preemption threshold: restarting a process once it has made a bunch of progress isn't cheap, but if it has just barely started, we can be fairly sure we're not losing much progress when we restart it.

So preemption...

temporarily interrupting an executing task, with the intention of resuming it at a later time

...is relatively accurate, but an unconventional usage. Because no operating system that I know of speculatively restarts tasks like this. I'm just not aware of a better word for it.

Copy link
Sponsor Member Author

@stuhood stuhood Jan 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, correction: kubernetes apparently uses the word "preemption" this way too, in that it will kill and reschedule pods to make room for other pods: https://kubernetes.io/docs/concepts/scheduling-eviction/pod-priority-preemption/#preemption

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, okay! That would be helpful to add as a comment, including an explicit callout that we implement pre-emption differently than the reader may expect with that term.

Copy link
Sponsor Member Author

@stuhood stuhood left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know how this will impact cache keys with the argv no longer being deterministic? Iiuc, the local cache CommandRunner doesn't know about bounded.rs and simply sees the argv has changed.

@Eric-Arellano : The bounded::CommandRunner is always configured to be below all cache lookups. This is intended, because cache lookups should always be cheaper than actually running a process, and so aren't throttled/bounded.

It definitely is fidgety though... the composition of CommandRunners is powerful, but important to stack just-so.

Comment on lines 86 to 121
// TODO: Both of these templating cases should be implemented at the lowest possible level:
// they might currently be applied above a cache.
if let Some(ref execution_slot_env_var) = process.execution_slot_variable {
process.env.insert(
execution_slot_env_var.clone(),
format!("{}", permit.concurrency_slot()),
);
}
if process.concurrency_available > 0 {
let concurrency = format!("{}", permit.concurrency());
let mut matched = false;
process.argv = std::mem::take(&mut process.argv)
.into_iter()
.map(
|arg| match CONCURRENCY_TEMPLATE_RE.replace_all(&arg, &concurrency) {
Cow::Owned(altered) => {
matched = true;
altered
}
Cow::Borrowed(_original) => arg,
},
)
.collect();
if !matched {
return Err(format!("Process {} set `concurrency_available={}`, but did not include the `{}` template variable in its arguments.", process.description, process.concurrency_available, *CONCURRENCY_TEMPLATE_RE));
}
}

let running_process = self.inner.run(context.clone(), workunit, process.clone());
Copy link
Sponsor Member Author

@stuhood stuhood Jan 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mutation that is happening here is of a copy of the Process struct that is created on line 77: so each iteration of the loop effectively gets a fresh copy of the original argument to this function.

Naively, I would think that would cause the Process to restart because it's a new argv -- but iiuc we're only trying to preempt/pause it, not restart.

So, the terminology might be confusing here: the effect of this loop is that we will restart the process if permit.notified_concurrency_changed(). We cannot affect a process once it is already running, and it's why we have the preemption threshold: restarting a process once it has made a bunch of progress isn't cheap, but if it has just barely started, we can be fairly sure we're not losing much progress when we restart it.

So preemption...

temporarily interrupting an executing task, with the intention of resuming it at a later time

...is relatively accurate, but an unconventional usage. Because no operating system that I know of speculatively restarts tasks like this. I'm just not aware of a better word for it.

Copy link
Contributor

@illicitonion illicitonion left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks really nice!

/// of any preemptible tasks, and notify them of the changes.
///
/// Returns the number of Tasks that were preempted.
pub(crate) fn balance(concurrency_limit: usize, now: Instant, tasks: Vec<&Task>) -> usize {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like this should maybe take a &mut State rather than a Vec<&Task>? Even though the tasks aren't technically modified, it feels like they kind of are in spirit? It feels like the caller currently goes to efforts to make sure that this is done under a mutex, and it would be nice if the language instead could enforce that?

Copy link
Sponsor Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, reasonable: I'll take a look at what it does to the tests.

let permit = self.sema.acquire().await.expect("semaphore closed");
let task = {
let mut state = self.state.lock();
let id = state
.available_ids
.pop_front()
.expect("More permits were distributed than ids exist.");
// TODO: Configure and calculate concurrency.

// A Task is initially given its fair share of the available concurrency (i.e., for two
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure the "for two tasks" piece here is accurate - I think the state.total_permits / (state.tasks.len() + 1) logic here means that for two tasks each trying to max out concurrency one would initially get the full concurrency, and the second would get half (and the third a third, and so on) until the rebalancing happens? Which I think is necessary, because we can't predict how many tasks will come in, but feels different from "for two tasks, each gets half"?

Copy link
Sponsor Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that for two tasks each trying to max out concurrency one would initially get the full concurrency, and the second would get half (and the third a third, and so on) until the rebalancing happens?

Yea, exactly. The example given was supposed to just indicate that the second task would take half, not that the first task would also get half. I can try to clarify.

…ore", since it will require further specialization.

[ci skip-build-wheels]
# Building wheels and fs_util will be skipped. Delete if not intended.
[ci skip-build-wheels]
# Building wheels and fs_util will be skipped. Delete if not intended.
[ci skip-build-wheels]
@stuhood stuhood enabled auto-merge (squash) January 24, 2022 18:45
@stuhood stuhood merged commit 706e384 into pantsbuild:main Jan 24, 2022
@stuhood stuhood deleted the stuhood/process-concurrency branch January 24, 2022 20:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Per-process parallelism should be chosen dynamically, and possibly per platform/host
4 participants