From 706e3844ac22b32fbb506c4c2f4a8121c04dd5dd Mon Sep 17 00:00:00 2001 From: Stu Hood Date: Mon, 24 Jan 2022 11:48:03 -0800 Subject: [PATCH] Dynamically choose per-process concurrency for supported processes (#14184) When tools support internal concurrency and cannot be partitioned (either because they don't support it, such as in the case of a PEX resolve, or because of overhead to partitioning as fine-grained as desired), Pants' own concurrency currently makes it ~impossible for them to set their concurrency settings correctly. As sketched in #9964, this change adjusts Pants' local runner to dynamically choose concurrency values per process based on the current concurrency. 1. When acquiring a slot on the `bounded::CommandRunner`, a process takes as much concurrency as it a) is capable of, as configured by a new `Process.concurrency_available` field, b) deserves for the purposes of a fairness (i.e. half, for two processes). This results in some amount of over-commit. 2. Periodically, a balancing task runs and preempts/re-schedules processes which have been running for less than a very short threshold (`200ms` currently) and which are the largest contributors to over/under-commit. This fixes some over/under-commit, but not all of it, because if a process becomes over/under-committed after it has been running a while (because other processes started or finished), we will not preempt it. Combined with #14186, this change results in an additional 2% speedup for `lint` and `fmt`. But it should also have a positive impact on PEX processes, which were the original motivation for #9964. Fixes #9964. [ci skip-build-wheels] --- .../pants/backend/python/lint/black/rules.py | 2 + .../pants/backend/python/lint/flake8/rules.py | 2 + .../pants/backend/python/lint/pylint/rules.py | 2 + .../pants/backend/python/subsystems/setup.py | 2 + .../pants/backend/python/util_rules/pex.py | 25 +- .../backend/python/util_rules/pex_cli.py | 8 + src/python/pants/engine/process.py | 3 + src/rust/engine/Cargo.lock | 11 - src/rust/engine/Cargo.toml | 3 - src/rust/engine/async_semaphore/Cargo.toml | 14 - src/rust/engine/async_semaphore/src/lib.rs | 115 ----- src/rust/engine/process_execution/Cargo.toml | 1 - .../engine/process_execution/src/bounded.rs | 426 ++++++++++++++++++ .../src/bounded_tests.rs} | 99 +++- src/rust/engine/process_execution/src/lib.rs | 86 +--- .../process_execution/src/remote_tests.rs | 5 + src/rust/engine/process_executor/src/main.rs | 5 + src/rust/engine/src/context.rs | 22 +- src/rust/engine/src/nodes.rs | 3 + 19 files changed, 599 insertions(+), 235 deletions(-) delete mode 100644 src/rust/engine/async_semaphore/Cargo.toml delete mode 100644 src/rust/engine/async_semaphore/src/lib.rs create mode 100644 src/rust/engine/process_execution/src/bounded.rs rename src/rust/engine/{async_semaphore/src/tests.rs => process_execution/src/bounded_tests.rs} (78%) diff --git a/src/python/pants/backend/python/lint/black/rules.py b/src/python/pants/backend/python/lint/black/rules.py index 5e681ce2775..6c15a09baf3 100644 --- a/src/python/pants/backend/python/lint/black/rules.py +++ b/src/python/pants/backend/python/lint/black/rules.py @@ -58,6 +58,7 @@ def generate_argv(source_files: SourceFiles, black: Black, *, check_only: bool) args.append("--check") if black.config: args.extend(["--config", black.config]) + args.extend(["-W", "{pants_concurrency}"]) args.extend(black.args) args.extend(source_files.files) return tuple(args) @@ -124,6 +125,7 @@ async def setup_black( argv=generate_argv(source_files, black, check_only=setup_request.check_only), input_digest=input_digest, output_files=source_files_snapshot.files, + concurrency_available=len(setup_request.request.field_sets), description=f"Run Black on {pluralize(len(setup_request.request.field_sets), 'file')}.", level=LogLevel.DEBUG, ), diff --git a/src/python/pants/backend/python/lint/flake8/rules.py b/src/python/pants/backend/python/lint/flake8/rules.py index f96e70051e2..647f74ada6d 100644 --- a/src/python/pants/backend/python/lint/flake8/rules.py +++ b/src/python/pants/backend/python/lint/flake8/rules.py @@ -39,6 +39,7 @@ def generate_argv(source_files: SourceFiles, flake8: Flake8) -> Tuple[str, ...]: args = [] if flake8.config: args.append(f"--config={flake8.config}") + args.append("--jobs={pants_concurrency}") args.extend(flake8.args) args.extend(source_files.files) return tuple(args) @@ -90,6 +91,7 @@ async def flake8_lint_partition( input_digest=input_digest, output_directories=(REPORT_DIR,), extra_env={"PEX_EXTRA_SYS_PATH": first_party_plugins.PREFIX}, + concurrency_available=len(partition.field_sets), description=f"Run Flake8 on {pluralize(len(partition.field_sets), 'file')}.", level=LogLevel.DEBUG, ), diff --git a/src/python/pants/backend/python/lint/pylint/rules.py b/src/python/pants/backend/python/lint/pylint/rules.py index b4b01d6e5cb..72dfc0d27f2 100644 --- a/src/python/pants/backend/python/lint/pylint/rules.py +++ b/src/python/pants/backend/python/lint/pylint/rules.py @@ -77,6 +77,7 @@ def generate_argv(source_files: SourceFiles, pylint: Pylint) -> Tuple[str, ...]: args = [] if pylint.config is not None: args.append(f"--rcfile={pylint.config}") + args.append("--jobs={pants_concurrency}") args.extend(pylint.args) args.extend(source_files.files) return tuple(args) @@ -180,6 +181,7 @@ async def pylint_lint_partition( input_digest=input_digest, output_directories=(REPORT_DIR,), extra_env={"PEX_EXTRA_SYS_PATH": ":".join(pythonpath)}, + concurrency_available=len(partition.field_sets), description=f"Run Pylint on {pluralize(len(partition.field_sets), 'file')}.", level=LogLevel.DEBUG, ), diff --git a/src/python/pants/backend/python/subsystems/setup.py b/src/python/pants/backend/python/subsystems/setup.py index 67159587fbb..d06f2ae26a9 100644 --- a/src/python/pants/backend/python/subsystems/setup.py +++ b/src/python/pants/backend/python/subsystems/setup.py @@ -203,6 +203,8 @@ def register_options(cls, register): type=int, default=CPU_COUNT // 2, default_help_repr="#cores/2", + removal_version="2.11.0.dev0", + removal_hint="Now set automatically based on the amount of concurrency available.", advanced=True, help=( "The maximum number of concurrent jobs to build wheels with.\n\nBecause Pants " diff --git a/src/python/pants/backend/python/util_rules/pex.py b/src/python/pants/backend/python/util_rules/pex.py index bb77fdf49f1..409035b3978 100644 --- a/src/python/pants/backend/python/util_rules/pex.py +++ b/src/python/pants/backend/python/util_rules/pex.py @@ -408,9 +408,6 @@ async def build_pex( argv.append("--no-emit-warnings") - if python_setup.resolver_jobs: - argv.extend(["--jobs", str(python_setup.resolver_jobs)]) - if python_setup.manylinux: argv.extend(["--manylinux", python_setup.manylinux]) else: @@ -435,6 +432,7 @@ async def build_pex( repository_pex_digest = repository_pex.digest if repository_pex else EMPTY_DIGEST constraints_file_digest = EMPTY_DIGEST requirements_file_digest = EMPTY_DIGEST + requirement_count: int # TODO(#12314): Capture the resolve name for multiple user lockfiles. resolve_name = ( @@ -452,22 +450,26 @@ async def build_pex( glob_match_error_behavior=GlobMatchErrorBehavior.error, description_of_origin=request.requirements.file_path_description_of_origin, ) + requirements_file_digest = await Get(Digest, PathGlobs, globs) + requirements_file_digest_contents = await Get( + DigestContents, Digest, requirements_file_digest + ) + requirement_count = len(requirements_file_digest_contents[0].content.decode().splitlines()) if python_setup.invalid_lockfile_behavior in { InvalidLockfileBehavior.warn, InvalidLockfileBehavior.error, }: - requirements_file_digest_contents = await Get(DigestContents, PathGlobs, globs) metadata = PythonLockfileMetadata.from_lockfile( requirements_file_digest_contents[0].content, request.requirements.file_path, resolve_name, ) _validate_metadata(metadata, request, request.requirements, python_setup) - requirements_file_digest = await Get(Digest, PathGlobs, globs) elif isinstance(request.requirements, LockfileContent): is_monolithic_resolve = True file_content = request.requirements.file_content + requirement_count = len(file_content.content.decode().splitlines()) argv.extend(["--requirement", file_content.path]) argv.append("--no-transitive") if python_setup.invalid_lockfile_behavior in { @@ -482,6 +484,7 @@ async def build_pex( else: assert isinstance(request.requirements, PexRequirements) is_monolithic_resolve = request.requirements.is_all_constraints_resolve + requirement_count = len(request.requirements.req_strings) if request.requirements.constraints_strings: constraints_file = "__constraints.txt" @@ -532,6 +535,10 @@ async def build_pex( description=_build_pex_description(request), output_files=output_files, output_directories=output_directories, + # TODO: This is not the best heuristic for available concurrency, since the + # requirements almost certainly have transitive deps which also need building, but it + # is better than using something hardcoded. + concurrency_available=requirement_count, ), ) @@ -979,6 +986,7 @@ class PexProcess: output_directories: tuple[str, ...] | None timeout_seconds: int | None execution_slot_variable: str | None + concurrency_available: int cache_scope: ProcessCacheScope def __init__( @@ -995,6 +1003,7 @@ def __init__( output_directories: Iterable[str] | None = None, timeout_seconds: int | None = None, execution_slot_variable: str | None = None, + concurrency_available: int = 0, cache_scope: ProcessCacheScope = ProcessCacheScope.SUCCESSFUL, ) -> None: self.pex = pex @@ -1008,6 +1017,7 @@ def __init__( self.output_directories = tuple(output_directories) if output_directories else None self.timeout_seconds = timeout_seconds self.execution_slot_variable = execution_slot_variable + self.concurrency_available = concurrency_available self.cache_scope = cache_scope @@ -1037,6 +1047,7 @@ async def setup_pex_process(request: PexProcess, pex_environment: PexEnvironment append_only_caches=complete_pex_env.append_only_caches, timeout_seconds=request.timeout_seconds, execution_slot_variable=request.execution_slot_variable, + concurrency_available=request.concurrency_available, cache_scope=request.cache_scope, ) @@ -1055,6 +1066,7 @@ class VenvPexProcess: output_directories: tuple[str, ...] | None timeout_seconds: int | None execution_slot_variable: str | None + concurrency_available: int cache_scope: ProcessCacheScope def __init__( @@ -1071,6 +1083,7 @@ def __init__( output_directories: Iterable[str] | None = None, timeout_seconds: int | None = None, execution_slot_variable: str | None = None, + concurrency_available: int = 0, cache_scope: ProcessCacheScope = ProcessCacheScope.SUCCESSFUL, ) -> None: self.venv_pex = venv_pex @@ -1084,6 +1097,7 @@ def __init__( self.output_directories = tuple(output_directories) if output_directories else None self.timeout_seconds = timeout_seconds self.execution_slot_variable = execution_slot_variable + self.concurrency_available = concurrency_available self.cache_scope = cache_scope @@ -1117,6 +1131,7 @@ async def setup_venv_pex_process( ).append_only_caches, timeout_seconds=request.timeout_seconds, execution_slot_variable=request.execution_slot_variable, + concurrency_available=request.concurrency_available, cache_scope=request.cache_scope, ) diff --git a/src/python/pants/backend/python/util_rules/pex_cli.py b/src/python/pants/backend/python/util_rules/pex_cli.py index 7a67a2b1628..b2826a465d9 100644 --- a/src/python/pants/backend/python/util_rules/pex_cli.py +++ b/src/python/pants/backend/python/util_rules/pex_cli.py @@ -71,6 +71,7 @@ class PexCliProcess: output_directories: Optional[Tuple[str, ...]] python: Optional[PythonExecutable] level: LogLevel + concurrency_available: int cache_scope: ProcessCacheScope def __init__( @@ -86,6 +87,7 @@ def __init__( output_directories: Optional[Iterable[str]] = None, python: Optional[PythonExecutable] = None, level: LogLevel = LogLevel.INFO, + concurrency_available: int = 0, cache_scope: ProcessCacheScope = ProcessCacheScope.SUCCESSFUL, ) -> None: self.subcommand = tuple(subcommand) @@ -98,6 +100,7 @@ def __init__( self.output_directories = tuple(output_directories) if output_directories else None self.python = python self.level = level + self.concurrency_available = concurrency_available self.cache_scope = cache_scope self.__post_init__() @@ -164,6 +167,10 @@ async def setup_pex_cli_process( "--tmpdir", tmpdir, ] + + if request.concurrency_available > 0: + global_args.extend(["--jobs", "{pants_concurrency}"]) + if pex_runtime_env.verbosity > 0: global_args.append(f"-{'v' * pex_runtime_env.verbosity}") @@ -200,6 +207,7 @@ async def setup_pex_cli_process( output_directories=request.output_directories, append_only_caches=complete_pex_env.append_only_caches, level=request.level, + concurrency_available=request.concurrency_available, cache_scope=request.cache_scope, ) diff --git a/src/python/pants/engine/process.py b/src/python/pants/engine/process.py index 363bba7c7d7..35a24316c8a 100644 --- a/src/python/pants/engine/process.py +++ b/src/python/pants/engine/process.py @@ -74,6 +74,7 @@ class Process: timeout_seconds: int | float jdk_home: str | None execution_slot_variable: str | None + concurrency_available: int cache_scope: ProcessCacheScope platform: str | None @@ -94,6 +95,7 @@ def __init__( timeout_seconds: int | float | None = None, jdk_home: str | None = None, execution_slot_variable: str | None = None, + concurrency_available: int = 0, cache_scope: ProcessCacheScope = ProcessCacheScope.SUCCESSFUL, platform: Platform | None = None, ) -> None: @@ -140,6 +142,7 @@ def __init__( self.timeout_seconds = timeout_seconds if timeout_seconds and timeout_seconds > 0 else -1 self.jdk_home = jdk_home self.execution_slot_variable = execution_slot_variable + self.concurrency_available = concurrency_available self.cache_scope = cache_scope self.platform = platform.value if platform is not None else None diff --git a/src/rust/engine/Cargo.lock b/src/rust/engine/Cargo.lock index 070f83ff544..7722a66c5fd 100644 --- a/src/rust/engine/Cargo.lock +++ b/src/rust/engine/Cargo.lock @@ -108,15 +108,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "async_semaphore" -version = "0.0.1" -dependencies = [ - "futures", - "parking_lot", - "tokio", -] - [[package]] name = "async_value" version = "0.0.1" @@ -608,7 +599,6 @@ dependencies = [ "async-oncecell", "async-trait", "async_latch", - "async_semaphore", "bytes", "cache", "concrete_time", @@ -2187,7 +2177,6 @@ dependencies = [ "async-lock", "async-oncecell", "async-trait", - "async_semaphore", "bincode", "bytes", "cache", diff --git a/src/rust/engine/Cargo.toml b/src/rust/engine/Cargo.toml index 82c5635375f..b6d65475661 100644 --- a/src/rust/engine/Cargo.toml +++ b/src/rust/engine/Cargo.toml @@ -21,7 +21,6 @@ resolver = "2" members = [ ".", "async_latch", - "async_semaphore", "async_value", "cache", "client", @@ -62,7 +61,6 @@ members = [ default-members = [ ".", "async_latch", - "async_semaphore", "async_value", "cache", "client", @@ -105,7 +103,6 @@ default = [] [dependencies] async_latch = { path = "async_latch" } -async_semaphore = { path = "async_semaphore" } # Pin async-trait due to https://github.com/dtolnay/async-trait/issues/144. async-trait = "=0.1.42" protos = { path = "protos" } diff --git a/src/rust/engine/async_semaphore/Cargo.toml b/src/rust/engine/async_semaphore/Cargo.toml deleted file mode 100644 index a5fea7ea2ed..00000000000 --- a/src/rust/engine/async_semaphore/Cargo.toml +++ /dev/null @@ -1,14 +0,0 @@ -[package] -version = "0.0.1" -edition = "2021" -name = "async_semaphore" -authors = [ "Pants Build " ] -publish = false - -[dependencies] -parking_lot = "0.11" -tokio = { version = "1.4", features = ["sync"] } - -[dev-dependencies] -futures = "0.3" -tokio = { version = "1.4", features = ["rt", "macros", "time"] } diff --git a/src/rust/engine/async_semaphore/src/lib.rs b/src/rust/engine/async_semaphore/src/lib.rs deleted file mode 100644 index 70f39c918cd..00000000000 --- a/src/rust/engine/async_semaphore/src/lib.rs +++ /dev/null @@ -1,115 +0,0 @@ -// Copyright 2018 Pants project contributors (see CONTRIBUTORS.md). -// Licensed under the Apache License, Version 2.0 (see LICENSE). - -#![deny(warnings)] -// Enable all clippy lints except for many of the pedantic ones. It's a shame this needs to be copied and pasted across crates, but there doesn't appear to be a way to include inner attributes from a common source. -#![deny( - clippy::all, - clippy::default_trait_access, - clippy::expl_impl_clone_on_copy, - clippy::if_not_else, - clippy::needless_continue, - clippy::unseparated_literal_suffix, - clippy::used_underscore_binding -)] -// It is often more clear to show that nothing is being moved. -#![allow(clippy::match_ref_pats)] -// Subjective style. -#![allow( - clippy::len_without_is_empty, - clippy::redundant_field_names, - clippy::too_many_arguments -)] -// Default isn't as big a deal as people seem to think it is. -#![allow(clippy::new_without_default, clippy::new_ret_no_self)] -// Arc can be more clear than needing to grok Orderings: -#![allow(clippy::mutex_atomic)] - -use std::collections::VecDeque; -use std::future::Future; -use std::sync::Arc; - -use parking_lot::Mutex; -use tokio::sync::{Semaphore, SemaphorePermit}; - -struct Inner { - sema: Semaphore, - available_ids: Mutex>, -} - -#[derive(Clone)] -pub struct AsyncSemaphore { - inner: Arc, -} - -impl AsyncSemaphore { - pub fn new(permits: usize) -> AsyncSemaphore { - let mut available_ids = VecDeque::new(); - for id in 1..=permits { - available_ids.push_back(id); - } - - AsyncSemaphore { - inner: Arc::new(Inner { - sema: Semaphore::new(permits), - available_ids: Mutex::new(available_ids), - }), - } - } - - pub fn available_permits(&self) -> usize { - self.inner.sema.available_permits() - } - - /// - /// Runs the given Future-creating function (and the Future it returns) under the semaphore. - /// - pub async fn with_acquired(self, f: F) -> O - where - F: FnOnce(usize) -> B, - B: Future, - { - let permit = self.acquire().await; - let res = f(permit.id).await; - drop(permit); - res - } - - pub async fn acquire(&self) -> Permit<'_> { - let permit = self.inner.sema.acquire().await.expect("semaphore closed"); - let id = { - let mut available_ids = self.inner.available_ids.lock(); - available_ids - .pop_front() - .expect("More permits were distributed than ids exist.") - }; - Permit { - inner: self.inner.clone(), - _permit: permit, - id, - } - } -} - -pub struct Permit<'a> { - inner: Arc, - // NB: Kept for its `Drop` impl. - _permit: SemaphorePermit<'a>, - id: usize, -} - -impl Permit<'_> { - pub fn concurrency_slot(&self) -> usize { - self.id - } -} - -impl<'a> Drop for Permit<'a> { - fn drop(&mut self) { - let mut available_ids = self.inner.available_ids.lock(); - available_ids.push_back(self.id); - } -} - -#[cfg(test)] -mod tests; diff --git a/src/rust/engine/process_execution/Cargo.toml b/src/rust/engine/process_execution/Cargo.toml index 4cd7cda6fc2..5b6a83cac8c 100644 --- a/src/rust/engine/process_execution/Cargo.toml +++ b/src/rust/engine/process_execution/Cargo.toml @@ -10,7 +10,6 @@ publish = false async-trait = "=0.1.42" async-lock = "2.4" walkdir = "2" -async_semaphore = { path = "../async_semaphore" } protos = { path = "../protos" } bytes = "1.0" cache = { path = "../cache" } diff --git a/src/rust/engine/process_execution/src/bounded.rs b/src/rust/engine/process_execution/src/bounded.rs new file mode 100644 index 00000000000..a58353d4709 --- /dev/null +++ b/src/rust/engine/process_execution/src/bounded.rs @@ -0,0 +1,426 @@ +use std::borrow::Cow; +use std::cmp::{max, min, Ordering, Reverse}; +use std::collections::VecDeque; +use std::future::Future; +use std::sync::{atomic, Arc}; +use std::time::{Duration, Instant}; + +use async_trait::async_trait; +use lazy_static::lazy_static; +use log::Level; +use parking_lot::Mutex; +use regex::Regex; +use task_executor::Executor; +use tokio::sync::{Notify, Semaphore, SemaphorePermit}; +use tokio::time::sleep; +use workunit_store::{in_workunit, RunningWorkunit, WorkunitMetadata}; + +use crate::{Context, FallibleProcessResultWithPlatform, Process}; + +lazy_static! { + // TODO: Runtime formatting is unstable in Rust, so we imitate it. + static ref CONCURRENCY_TEMPLATE_RE: Regex = Regex::new(r"\{pants_concurrency\}").unwrap(); +} + +/// +/// A CommandRunner wrapper which limits the number of concurrent requests and which provides +/// concurrency information to the process being executed. +/// +/// If a Process sets a non-zero `concurrency_available` value, it may be preempted (i.e. canceled +/// and restarted) with a new concurrency value for a short period after starting. +/// +#[derive(Clone)] +pub struct CommandRunner { + inner: Arc, + sema: AsyncSemaphore, +} + +impl CommandRunner { + pub fn new( + executor: &Executor, + inner: Box, + bound: usize, + ) -> CommandRunner { + CommandRunner { + inner: inner.into(), + sema: AsyncSemaphore::new( + executor, + bound, + // TODO: Make configurable. + Duration::from_millis(200), + ), + } + } +} + +#[async_trait] +impl crate::CommandRunner for CommandRunner { + async fn run( + &self, + context: Context, + workunit: &mut RunningWorkunit, + process: Process, + ) -> Result { + let semaphore_acquisition = self.sema.acquire(process.concurrency_available); + let permit = in_workunit!( + context.workunit_store.clone(), + "acquire_command_runner_slot".to_owned(), + WorkunitMetadata { + level: Level::Trace, + ..WorkunitMetadata::default() + }, + |workunit| async move { + let _blocking_token = workunit.blocking(); + semaphore_acquisition.await + } + ) + .await; + + loop { + let mut process = process.clone(); + let concurrency_available = permit.concurrency(); + log::debug!( + "Running {} under semaphore with concurrency id: {}, and concurrency: {}", + process.description, + permit.concurrency_slot(), + concurrency_available, + ); + + // TODO: Both of these templating cases should be implemented at the lowest possible level: + // they might currently be applied above a cache. + if let Some(ref execution_slot_env_var) = process.execution_slot_variable { + process.env.insert( + execution_slot_env_var.clone(), + format!("{}", permit.concurrency_slot()), + ); + } + if process.concurrency_available > 0 { + let concurrency = format!("{}", permit.concurrency()); + let mut matched = false; + process.argv = std::mem::take(&mut process.argv) + .into_iter() + .map( + |arg| match CONCURRENCY_TEMPLATE_RE.replace_all(&arg, &concurrency) { + Cow::Owned(altered) => { + matched = true; + altered + } + Cow::Borrowed(_original) => arg, + }, + ) + .collect(); + if !matched { + return Err(format!( + "Process {} set `concurrency_available={}`, but did not include \ + the `{}` template variable in its arguments.", + process.description, process.concurrency_available, *CONCURRENCY_TEMPLATE_RE + )); + } + } + + let running_process = self.inner.run(context.clone(), workunit, process.clone()); + tokio::select! { + _ = permit.notified_concurrency_changed() => { + log::debug!( + "Process {} was preempted, and went from concurrency {} to concurrency {}", + process.description, + concurrency_available, + permit.concurrency(), + ); + continue; + }, + res = running_process => { + // The process completed. + return res; + } + } + } + } +} + +/// A wrapped Semaphore which adds concurrency metadata which supports overcommit. +#[derive(Clone)] +pub(crate) struct AsyncSemaphore { + sema: Arc, + state: Arc>, + preemptible_duration: Duration, +} + +pub(crate) struct State { + total_concurrency: usize, + available_ids: VecDeque, + tasks: Vec>, +} + +impl State { + #[cfg(test)] + pub(crate) fn new_for_tests(total_concurrency: usize, tasks: Vec>) -> Self { + Self { + total_concurrency, + available_ids: VecDeque::new(), + tasks, + } + } +} + +impl AsyncSemaphore { + pub fn new( + executor: &Executor, + permits: usize, + preemptible_duration: Duration, + ) -> AsyncSemaphore { + let mut available_ids = VecDeque::new(); + for id in 1..=permits { + available_ids.push_back(id); + } + + let state = Arc::new(Mutex::new(State { + total_concurrency: permits, + available_ids, + tasks: Vec::new(), + })); + + // Spawn a task which will periodically balance Tasks. + let _balancer_task = { + let state = Arc::downgrade(&state); + executor.spawn(async move { + loop { + sleep(preemptible_duration / 4).await; + if let Some(state) = state.upgrade() { + // Balance tasks. + let mut state = state.lock(); + balance(Instant::now(), &mut state); + } else { + // The AsyncSemaphore was torn down. + break; + } + } + }) + }; + + AsyncSemaphore { + sema: Arc::new(Semaphore::new(permits)), + state, + preemptible_duration, + } + } + + #[cfg(test)] + pub(crate) fn available_permits(&self) -> usize { + self.sema.available_permits() + } + + /// + /// Runs the given Future-creating function (and the Future it returns) under the semaphore. + /// + /// NB: This method does not support preemption, or controlling concurrency. + /// + // TODO: https://github.com/rust-lang/rust/issues/46379 + #[allow(dead_code)] + pub(crate) async fn with_acquired(self, f: F) -> O + where + F: FnOnce(usize) -> B, + B: Future, + { + let permit = self.acquire(1).await; + let res = f(permit.task.id).await; + drop(permit); + res + } + + /// + /// Acquire a slot on the semaphore when it becomes available. Additionally, attempt to acquire + /// the given amount of concurrency. The amount actually acquired will be reported on the + /// returned Permit. + /// + pub async fn acquire(&self, concurrency_desired: usize) -> Permit<'_> { + let permit = self.sema.acquire().await.expect("semaphore closed"); + let task = { + let mut state = self.state.lock(); + let id = state + .available_ids + .pop_front() + .expect("More permits were distributed than ids exist."); + + // A Task is initially given its fair share of the available concurrency: i.e., the first + // arriving task gets all of the slots, and the second arriving gets half, even though that + // means that we overcommit. Balancing will adjust concurrency later, to the extent that it + // can given preemption timeouts. + // + // This is because we cannot anticipate the number of inbound processes, and we never want to + // delay a process from starting. + let concurrency_desired = max(concurrency_desired, 1); + let concurrency_actual = min( + concurrency_desired, + state.total_concurrency / (state.tasks.len() + 1), + ); + let task = Arc::new(Task::new( + id, + concurrency_desired, + concurrency_actual, + Instant::now() + self.preemptible_duration, + )); + state.tasks.push(task.clone()); + task + }; + Permit { + state: self.state.clone(), + _permit: permit, + task, + } + } +} + +pub struct Permit<'a> { + state: Arc>, + // NB: Kept for its `Drop` impl. + _permit: SemaphorePermit<'a>, + task: Arc, +} + +impl Permit<'_> { + pub fn concurrency_slot(&self) -> usize { + self.task.id + } + + pub fn concurrency(&self) -> usize { + self.task.concurrency() + } + + pub async fn notified_concurrency_changed(&self) { + self.task.notify_concurrency_changed.notified().await + } +} + +impl<'a> Drop for Permit<'a> { + fn drop(&mut self) { + let mut state = self.state.lock(); + state.available_ids.push_back(self.task.id); + let tasks_position = state + .tasks + .iter() + .position(|t| t.id == self.task.id) + .unwrap(); + state.tasks.swap_remove(tasks_position); + } +} + +pub(crate) struct Task { + id: usize, + concurrency_desired: usize, + pub(crate) concurrency_actual: atomic::AtomicUsize, + notify_concurrency_changed: Notify, + preemptible_until: Instant, +} + +impl Task { + pub(crate) fn new( + id: usize, + concurrency_desired: usize, + concurrency_actual: usize, + preemptible_until: Instant, + ) -> Self { + assert!(concurrency_actual <= concurrency_desired); + Self { + id, + concurrency_desired, + concurrency_actual: atomic::AtomicUsize::new(concurrency_actual), + notify_concurrency_changed: Notify::new(), + preemptible_until, + } + } + + pub(crate) fn concurrency(&self) -> usize { + self.concurrency_actual.load(atomic::Ordering::SeqCst) + } + + fn preemptible(&self, now: Instant) -> bool { + self.preemptible_until > now + } +} + +/// Given a set of Tasks with their desired and actual concurrency, balance the concurrency levels +/// of any preemptible tasks, and notify them of the changes. Returns the number of Tasks that were +/// preempted. +/// +/// This method only internally mutates tasks (by adjusting their concurrency levels and notifying +/// them), but takes State as mutable in order to guarantee that it gets an atomic view of the +/// tasks. +pub(crate) fn balance(now: Instant, state: &mut State) -> usize { + let concurrency_used: usize = state.tasks.iter().map(|t| t.concurrency()).sum(); + let mut desired_change_in_commitment = + state.total_concurrency as isize - concurrency_used as isize; + let mut prempted = 0; + + // To reduce the number of tasks that we preempty, we preempt them in order by the amount of + // concurrency that they desire or can relinquish. + match desired_change_in_commitment.cmp(&0) { + Ordering::Equal => { + // Nothing to do! Although some tasks might not have their desired concurrency levels, it's + // probably not worth preempting any tasks to fix that. + } + Ordering::Less => { + // We're overcommitted: order by the amount that they can relinquish. + let mut preemptible_tasks = state + .tasks + .iter() + .filter_map(|t| { + // A task may never have less than one slot. + let relinquishable = t.concurrency() - 1; + if relinquishable > 0 && t.preemptible(now) { + Some((relinquishable, t)) + } else { + None + } + }) + .collect::>(); + preemptible_tasks.sort_by_key(|(relinquishable, _)| Reverse(*relinquishable)); + + for (relinquishable, task) in preemptible_tasks { + if desired_change_in_commitment == 0 { + break; + } + + let relinquish = min(relinquishable, (-desired_change_in_commitment) as usize); + desired_change_in_commitment += relinquish as isize; + task + .concurrency_actual + .fetch_sub(relinquish, atomic::Ordering::SeqCst); + task.notify_concurrency_changed.notify_one(); + prempted += 1; + } + } + Ordering::Greater => { + // We're undercommitted: order by the amount that they are owed. + let mut preemptible_tasks = state + .tasks + .iter() + .filter_map(|t| { + let desired = t.concurrency_desired - t.concurrency(); + if desired > 0 && t.preemptible(now) { + Some((desired, t)) + } else { + None + } + }) + .collect::>(); + preemptible_tasks.sort_by_key(|(desired, _)| Reverse(*desired)); + + for (desired, task) in preemptible_tasks { + if desired_change_in_commitment == 0 { + break; + } + + let acquire = min(desired, desired_change_in_commitment as usize); + desired_change_in_commitment -= acquire as isize; + task + .concurrency_actual + .fetch_add(acquire, atomic::Ordering::SeqCst); + task.notify_concurrency_changed.notify_one(); + prempted += 1; + } + } + } + + prempted +} diff --git a/src/rust/engine/async_semaphore/src/tests.rs b/src/rust/engine/process_execution/src/bounded_tests.rs similarity index 78% rename from src/rust/engine/async_semaphore/src/tests.rs rename to src/rust/engine/process_execution/src/bounded_tests.rs index f7ecac87313..b1c9d291b90 100644 --- a/src/rust/engine/async_semaphore/src/tests.rs +++ b/src/rust/engine/process_execution/src/bounded_tests.rs @@ -1,21 +1,34 @@ -use std::time::Duration; +use std::sync::Arc; +use std::time::{Duration, Instant}; use futures::channel::oneshot; use futures::future::{self, FutureExt}; use tokio::time::{sleep, timeout}; -use crate::AsyncSemaphore; +use crate::bounded::{balance, AsyncSemaphore, State, Task}; + +fn mk_semaphore(permits: usize) -> AsyncSemaphore { + mk_semaphore_with_preemptible_duration(permits, Duration::from_millis(200)) +} + +fn mk_semaphore_with_preemptible_duration( + permits: usize, + preemptible_duration: Duration, +) -> AsyncSemaphore { + let executor = task_executor::Executor::new(); + AsyncSemaphore::new(&executor, permits, preemptible_duration) +} #[tokio::test] async fn acquire_and_release() { - let sema = AsyncSemaphore::new(1); + let sema = mk_semaphore(1); sema.with_acquired(|_id| future::ready(())).await; } #[tokio::test] async fn correct_semaphore_slot_ids() { - let sema = AsyncSemaphore::new(2); + let sema = mk_semaphore(2); let (tx1, rx1) = oneshot::channel(); let (tx2, rx2) = oneshot::channel(); let (tx3, rx3) = oneshot::channel(); @@ -67,7 +80,7 @@ async fn correct_semaphore_slot_ids() { #[tokio::test] async fn correct_semaphore_slot_ids_2() { - let sema = AsyncSemaphore::new(4); + let sema = mk_semaphore(4); let (tx1, rx1) = oneshot::channel(); let (tx2, rx2) = oneshot::channel(); let (tx3, rx3) = oneshot::channel(); @@ -145,7 +158,7 @@ async fn correct_semaphore_slot_ids_2() { #[tokio::test] async fn at_most_n_acquisitions() { - let sema = AsyncSemaphore::new(1); + let sema = mk_semaphore(1); let handle1 = sema.clone(); let handle2 = sema.clone(); @@ -208,7 +221,7 @@ async fn drop_while_waiting() { // If the SECOND future was not removed from the waiters queue we would not get a signal // that thread3 acquired the lock because the 2nd task would be blocking the queue trying to // poll a non existent future. - let sema = AsyncSemaphore::new(1); + let sema = mk_semaphore(1); let handle1 = sema.clone(); let handle2 = sema.clone(); let handle3 = sema.clone(); @@ -235,7 +248,7 @@ async fn drop_while_waiting() { // thread2 will wait for a little while, but then drop its PermitFuture to give up on waiting. tokio::spawn(async move { - let permit_future = handle2.acquire().boxed(); + let permit_future = handle2.acquire(1).boxed(); let delay_future = sleep(Duration::from_millis(100)).boxed(); let raced_result = future::select(delay_future, permit_future).await; // We expect to have timed out, because the other Future will not resolve until asked. @@ -270,7 +283,7 @@ async fn drop_while_waiting() { #[tokio::test] async fn dropped_future_is_removed_from_queue() { - let sema = AsyncSemaphore::new(1); + let sema = mk_semaphore(1); let handle1 = sema.clone(); let handle2 = sema.clone(); @@ -327,3 +340,71 @@ async fn dropped_future_is_removed_from_queue() { } assert_eq!(1, sema.available_permits()); } + +#[tokio::test] +async fn preemption() { + let ten_secs = Duration::from_secs(10); + let sema = mk_semaphore_with_preemptible_duration(2, ten_secs); + + // Acquire a permit which will take all concurrency, and confirm that it doesn't get preempted. + let permit1 = sema.acquire(2).await; + assert_eq!(2, permit1.concurrency()); + if let Ok(_) = timeout(ten_secs / 100, permit1.notified_concurrency_changed()).await { + panic!("permit1 should not have been preempted."); + } + + // Acquire another permit, and confirm that it doesn't get preempted. + let permit2 = sema.acquire(2).await; + if let Ok(_) = timeout(ten_secs / 100, permit2.notified_concurrency_changed()).await { + panic!("permit2 should not have been preempted."); + } + + // But that permit1 does get preempted. + if let Err(_) = timeout(ten_secs, permit1.notified_concurrency_changed()).await { + panic!("permit1 should have been preempted."); + } + + assert_eq!(1, permit1.concurrency()); + assert_eq!(1, permit2.concurrency()); +} + +/// Given Tasks as triples of desired, actual, and expected concurrency (all of which are +/// assumed to be preemptible), assert that the expected concurrency is applied. +fn test_balance( + total_concurrency: usize, + expected_preempted: usize, + task_defs: Vec<(usize, usize, usize)>, +) { + let ten_minutes_from_now = Instant::now() + Duration::from_secs(10 * 60); + let tasks = task_defs + .iter() + .enumerate() + .map(|(id, (desired, actual, _))| { + Arc::new(Task::new(id, *desired, *actual, ten_minutes_from_now)) + }) + .collect::>(); + + let mut state = State::new_for_tests(total_concurrency, tasks.clone()); + + assert_eq!(expected_preempted, balance(Instant::now(), &mut state)); + for (task, (_, _, expected)) in tasks.iter().zip(task_defs.into_iter()) { + assert_eq!(expected, task.concurrency()); + } +} + +#[tokio::test] +async fn balance_noop() { + test_balance(2, 0, vec![(1, 1, 1), (1, 1, 1)]); +} + +#[tokio::test] +async fn balance_overcommitted() { + // Preempt the first Task and give it one slot, without adjusting the second task. + test_balance(2, 1, vec![(2, 2, 1), (1, 1, 1)]); +} + +#[tokio::test] +async fn balance_undercommitted() { + // Should preempt both Tasks to give them more concurrency. + test_balance(4, 2, vec![(2, 1, 2), (2, 1, 2)]); +} diff --git a/src/rust/engine/process_execution/src/lib.rs b/src/rust/engine/process_execution/src/lib.rs index cdf4b674d35..02dde3285f8 100644 --- a/src/rust/engine/process_execution/src/lib.rs +++ b/src/rust/engine/process_execution/src/lib.rs @@ -30,21 +30,22 @@ extern crate derivative; use std::collections::{BTreeMap, BTreeSet}; use std::convert::{TryFrom, TryInto}; use std::path::PathBuf; -use std::sync::Arc; -use async_semaphore::AsyncSemaphore; use async_trait::async_trait; use concrete_time::{Duration, TimeSpan}; use fs::RelativePath; use futures::future::try_join_all; use futures::try_join; use hashing::Digest; -use log::Level; use protos::gen::build::bazel::remote::execution::v2 as remexec; use remexec::ExecutedActionMetadata; use serde::{Deserialize, Serialize}; use store::{SnapshotOps, SnapshotOpsError, Store}; -use workunit_store::{in_workunit, RunId, RunningWorkunit, WorkunitMetadata, WorkunitStore}; +use workunit_store::{RunId, RunningWorkunit, WorkunitStore}; + +pub mod bounded; +#[cfg(test)] +mod bounded_tests; pub mod cache; #[cfg(test)] @@ -367,9 +368,21 @@ pub struct Process { pub timeout: Option, - /// If not None, then if a BoundedCommandRunner executes this Process + /// If not None, then a bounded::CommandRunner executing this Process will set an environment + /// variable with this name containing a unique execution slot number. pub execution_slot_variable: Option, + /// If non-zero, the amount of parallelism that this process is capable of given its inputs. This + /// value does not directly set the number of cores allocated to the process: that is computed + /// based on availability, and provided as a template value in the arguments of the process. + /// + /// When set, a `{pants_concurrency}` variable will be templated into the `argv` of the process. + /// + /// Processes which set this value may be preempted (i.e. canceled and restarted) for a short + /// period after starting if available resources have changed (because other processes have + /// started or finished). + pub concurrency_available: usize, + #[derivative(PartialEq = "ignore", Hash = "ignore")] pub description: String, @@ -433,6 +446,7 @@ impl Process { jdk_home: None, platform_constraint: None, execution_slot_variable: None, + concurrency_available: 0, cache_scope: ProcessCacheScope::Successful, } } @@ -669,67 +683,5 @@ pub fn digest(process: &Process, metadata: &ProcessMetadata) -> Digest { execute_request.action_digest.unwrap().try_into().unwrap() } -/// -/// A CommandRunner wrapper that limits the number of concurrent requests. -/// -#[derive(Clone)] -pub struct BoundedCommandRunner { - inner: Arc<(Box, AsyncSemaphore)>, -} - -impl BoundedCommandRunner { - pub fn new(inner: Box, bound: usize) -> BoundedCommandRunner { - BoundedCommandRunner { - inner: Arc::new((inner, AsyncSemaphore::new(bound))), - } - } -} - -#[async_trait] -impl CommandRunner for BoundedCommandRunner { - async fn run( - &self, - context: Context, - workunit: &mut RunningWorkunit, - mut process: Process, - ) -> Result { - let semaphore_acquisition = self.inner.1.acquire(); - let permit = in_workunit!( - context.workunit_store.clone(), - "acquire_command_runner_slot".to_owned(), - WorkunitMetadata { - level: Level::Trace, - ..WorkunitMetadata::default() - }, - |workunit| async move { - let _blocking_token = workunit.blocking(); - semaphore_acquisition.await - } - ) - .await; - - log::debug!( - "Running {} under semaphore with concurrency id: {}", - process.description, - permit.concurrency_slot() - ); - - if let Some(ref execution_slot_env_var) = process.execution_slot_variable { - process.env.insert( - execution_slot_env_var.clone(), - format!("{}", permit.concurrency_slot()), - ); - } - - self.inner.0.run(context, workunit, process).await - } -} - -impl From> for Arc { - fn from(command_runner: Box) -> Arc { - Arc::new(*command_runner) - } -} - #[cfg(test)] mod tests; diff --git a/src/rust/engine/process_execution/src/remote_tests.rs b/src/rust/engine/process_execution/src/remote_tests.rs index 4c295b88c6a..4a7a1f63241 100644 --- a/src/rust/engine/process_execution/src/remote_tests.rs +++ b/src/rust/engine/process_execution/src/remote_tests.rs @@ -89,6 +89,7 @@ async fn make_execute_request() { jdk_home: None, platform_constraint: None, execution_slot_variable: None, + concurrency_available: 0, cache_scope: ProcessCacheScope::Always, }; @@ -165,6 +166,7 @@ async fn make_execute_request_with_instance_name() { jdk_home: None, platform_constraint: None, execution_slot_variable: None, + concurrency_available: 0, cache_scope: ProcessCacheScope::Always, }; @@ -254,6 +256,7 @@ async fn make_execute_request_with_cache_key_gen_version() { jdk_home: None, platform_constraint: None, execution_slot_variable: None, + concurrency_available: 0, cache_scope: ProcessCacheScope::Always, }; @@ -490,6 +493,7 @@ async fn make_execute_request_with_timeout() { jdk_home: None, platform_constraint: None, execution_slot_variable: None, + concurrency_available: 0, cache_scope: ProcessCacheScope::Always, }; @@ -590,6 +594,7 @@ async fn make_execute_request_using_immutable_inputs() { jdk_home: None, platform_constraint: None, execution_slot_variable: None, + concurrency_available: 0, cache_scope: ProcessCacheScope::Always, }; diff --git a/src/rust/engine/process_executor/src/main.rs b/src/rust/engine/process_executor/src/main.rs index 731b0065456..94722be7220 100644 --- a/src/rust/engine/process_executor/src/main.rs +++ b/src/rust/engine/process_executor/src/main.rs @@ -85,6 +85,9 @@ struct CommandSpec { #[structopt(long)] working_directory: Option, + #[structopt(long)] + concurrency_available: Option, + #[structopt(long)] cache_key_gen_version: Option, } @@ -433,6 +436,7 @@ async fn make_request_from_flat_args( jdk_home: args.command.jdk.clone(), platform_constraint: None, execution_slot_variable: None, + concurrency_available: args.command.concurrency_available.unwrap_or(0), cache_scope: ProcessCacheScope::Always, }; @@ -514,6 +518,7 @@ async fn extract_request_from_action_digest( std::time::Duration::from_nanos(timeout.nanos as u64 + timeout.seconds as u64 * 1000000000) }), execution_slot_variable: None, + concurrency_available: 0, description: "".to_string(), level: log::Level::Error, append_only_caches: BTreeMap::new(), diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index 0b8553966bf..70044b988a2 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -24,8 +24,8 @@ use graph::{self, EntryId, Graph, InvalidationResult, NodeContext}; use log::info; use parking_lot::Mutex; use process_execution::{ - self, BoundedCommandRunner, CommandRunner, ImmutableInputs, NamedCaches, Platform, - ProcessMetadata, RemoteCacheWarningsBehavior, + self, bounded, local, nailgun, remote, remote_cache, CommandRunner, ImmutableInputs, NamedCaches, + Platform, ProcessMetadata, RemoteCacheWarningsBehavior, }; use protos::gen::build::bazel::remote::execution::v2::ServerCapabilities; use regex::Regex; @@ -59,7 +59,7 @@ pub struct Core { pub intrinsics: Intrinsics, pub executor: Executor, store: Store, - pub command_runner: Box, + pub command_runner: Box, pub http_client: reqwest::Client, pub local_cache: PersistentCache, pub vfs: PosixFS, @@ -170,7 +170,7 @@ impl Core { exec_strategy_opts: &ExecutionStrategyOptions, ) -> Result, String> { let immutable_inputs = ImmutableInputs::new(store.clone(), local_execution_root_dir)?; - let local_command_runner = process_execution::local::CommandRunner::new( + let local_command_runner = local::CommandRunner::new( store.clone(), executor.clone(), local_execution_root_dir.to_path_buf(), @@ -179,9 +179,9 @@ impl Core { exec_strategy_opts.local_cleanup, ); - let maybe_nailgunnable_local_command_runner: Box = + let maybe_nailgunnable_local_command_runner: Box = if exec_strategy_opts.local_enable_nailgun { - Box::new(process_execution::nailgun::CommandRunner::new( + Box::new(nailgun::CommandRunner::new( local_command_runner, local_execution_root_dir.to_path_buf(), store.clone(), @@ -197,7 +197,8 @@ impl Core { Box::new(local_command_runner) }; - Ok(Box::new(BoundedCommandRunner::new( + Ok(Box::new(bounded::CommandRunner::new( + executor, maybe_nailgunnable_local_command_runner, exec_strategy_opts.local_parallelism, ))) @@ -240,8 +241,9 @@ impl Core { // `global_options.py` already validates that both are not set at the same time. let maybe_remote_enabled_command_runner: Box = if remoting_opts.execution_enable { - Box::new(BoundedCommandRunner::new( - Box::new(process_execution::remote::CommandRunner::new( + Box::new(bounded::CommandRunner::new( + executor, + Box::new(remote::CommandRunner::new( // We unwrap because global_options.py will have already validated these are defined. remoting_opts.execution_address.as_ref().unwrap(), remoting_opts.store_address.as_ref().unwrap(), @@ -262,7 +264,7 @@ impl Core { exec_strategy_opts.remote_parallelism, )) } else if remote_caching_used { - Box::new(process_execution::remote_cache::CommandRunner::new( + Box::new(remote_cache::CommandRunner::new( local_command_runner.into(), process_execution_metadata.clone(), executor.clone(), diff --git a/src/rust/engine/src/nodes.rs b/src/rust/engine/src/nodes.rs index 28b09b86a11..4b52864941c 100644 --- a/src/rust/engine/src/nodes.rs +++ b/src/rust/engine/src/nodes.rs @@ -356,6 +356,8 @@ impl ExecuteProcess { let execution_slot_variable = externs::getattr_as_optional_string(value, "execution_slot_variable"); + let concurrency_available: usize = externs::getattr(value, "concurrency_available").unwrap(); + let cache_scope: ProcessCacheScope = { let cache_scope_enum = externs::getattr(value, "cache_scope").unwrap(); externs::getattr::(cache_scope_enum, "name") @@ -384,6 +386,7 @@ impl ExecuteProcess { jdk_home, platform_constraint, execution_slot_variable, + concurrency_available, cache_scope, }) }