Skip to content

Commit

Permalink
Dynamically choose per-process concurrency for supported processes (#…
Browse files Browse the repository at this point in the history
…14184)

When tools support internal concurrency and cannot be partitioned (either because they don't support it, such as in the case of a PEX resolve, or because of overhead to partitioning as fine-grained as desired), Pants' own concurrency currently makes it ~impossible for them to set their concurrency settings correctly.

As sketched in #9964, this change adjusts Pants' local runner to dynamically choose concurrency values per process based on the current concurrency.
1. When acquiring a slot on the `bounded::CommandRunner`, a process takes as much concurrency as it a) is capable of, as configured by a new `Process.concurrency_available` field, b) deserves for the purposes of a fairness (i.e. half, for two processes). This results in some amount of over-commit.
2. Periodically, a balancing task runs and preempts/re-schedules processes which have been running for less than a very short threshold (`200ms` currently) and which are the largest contributors to over/under-commit. This fixes some over/under-commit, but not all of it, because if a process becomes over/under-committed after it has been running a while (because other processes started or finished), we will not preempt it.

Combined with #14186, this change results in an additional 2% speedup for `lint` and `fmt`. But it should also have a positive impact on PEX processes, which were the original motivation for #9964.

Fixes #9964. 

[ci skip-build-wheels]
  • Loading branch information
stuhood committed Jan 24, 2022
1 parent 0006cde commit 706e384
Show file tree
Hide file tree
Showing 19 changed files with 599 additions and 235 deletions.
2 changes: 2 additions & 0 deletions src/python/pants/backend/python/lint/black/rules.py
Expand Up @@ -58,6 +58,7 @@ def generate_argv(source_files: SourceFiles, black: Black, *, check_only: bool)
args.append("--check")
if black.config:
args.extend(["--config", black.config])
args.extend(["-W", "{pants_concurrency}"])
args.extend(black.args)
args.extend(source_files.files)
return tuple(args)
Expand Down Expand Up @@ -124,6 +125,7 @@ async def setup_black(
argv=generate_argv(source_files, black, check_only=setup_request.check_only),
input_digest=input_digest,
output_files=source_files_snapshot.files,
concurrency_available=len(setup_request.request.field_sets),
description=f"Run Black on {pluralize(len(setup_request.request.field_sets), 'file')}.",
level=LogLevel.DEBUG,
),
Expand Down
2 changes: 2 additions & 0 deletions src/python/pants/backend/python/lint/flake8/rules.py
Expand Up @@ -39,6 +39,7 @@ def generate_argv(source_files: SourceFiles, flake8: Flake8) -> Tuple[str, ...]:
args = []
if flake8.config:
args.append(f"--config={flake8.config}")
args.append("--jobs={pants_concurrency}")
args.extend(flake8.args)
args.extend(source_files.files)
return tuple(args)
Expand Down Expand Up @@ -90,6 +91,7 @@ async def flake8_lint_partition(
input_digest=input_digest,
output_directories=(REPORT_DIR,),
extra_env={"PEX_EXTRA_SYS_PATH": first_party_plugins.PREFIX},
concurrency_available=len(partition.field_sets),
description=f"Run Flake8 on {pluralize(len(partition.field_sets), 'file')}.",
level=LogLevel.DEBUG,
),
Expand Down
2 changes: 2 additions & 0 deletions src/python/pants/backend/python/lint/pylint/rules.py
Expand Up @@ -77,6 +77,7 @@ def generate_argv(source_files: SourceFiles, pylint: Pylint) -> Tuple[str, ...]:
args = []
if pylint.config is not None:
args.append(f"--rcfile={pylint.config}")
args.append("--jobs={pants_concurrency}")
args.extend(pylint.args)
args.extend(source_files.files)
return tuple(args)
Expand Down Expand Up @@ -180,6 +181,7 @@ async def pylint_lint_partition(
input_digest=input_digest,
output_directories=(REPORT_DIR,),
extra_env={"PEX_EXTRA_SYS_PATH": ":".join(pythonpath)},
concurrency_available=len(partition.field_sets),
description=f"Run Pylint on {pluralize(len(partition.field_sets), 'file')}.",
level=LogLevel.DEBUG,
),
Expand Down
2 changes: 2 additions & 0 deletions src/python/pants/backend/python/subsystems/setup.py
Expand Up @@ -203,6 +203,8 @@ def register_options(cls, register):
type=int,
default=CPU_COUNT // 2,
default_help_repr="#cores/2",
removal_version="2.11.0.dev0",
removal_hint="Now set automatically based on the amount of concurrency available.",
advanced=True,
help=(
"The maximum number of concurrent jobs to build wheels with.\n\nBecause Pants "
Expand Down
25 changes: 20 additions & 5 deletions src/python/pants/backend/python/util_rules/pex.py
Expand Up @@ -408,9 +408,6 @@ async def build_pex(

argv.append("--no-emit-warnings")

if python_setup.resolver_jobs:
argv.extend(["--jobs", str(python_setup.resolver_jobs)])

if python_setup.manylinux:
argv.extend(["--manylinux", python_setup.manylinux])
else:
Expand All @@ -435,6 +432,7 @@ async def build_pex(
repository_pex_digest = repository_pex.digest if repository_pex else EMPTY_DIGEST
constraints_file_digest = EMPTY_DIGEST
requirements_file_digest = EMPTY_DIGEST
requirement_count: int

# TODO(#12314): Capture the resolve name for multiple user lockfiles.
resolve_name = (
Expand All @@ -452,22 +450,26 @@ async def build_pex(
glob_match_error_behavior=GlobMatchErrorBehavior.error,
description_of_origin=request.requirements.file_path_description_of_origin,
)
requirements_file_digest = await Get(Digest, PathGlobs, globs)
requirements_file_digest_contents = await Get(
DigestContents, Digest, requirements_file_digest
)
requirement_count = len(requirements_file_digest_contents[0].content.decode().splitlines())
if python_setup.invalid_lockfile_behavior in {
InvalidLockfileBehavior.warn,
InvalidLockfileBehavior.error,
}:
requirements_file_digest_contents = await Get(DigestContents, PathGlobs, globs)
metadata = PythonLockfileMetadata.from_lockfile(
requirements_file_digest_contents[0].content,
request.requirements.file_path,
resolve_name,
)
_validate_metadata(metadata, request, request.requirements, python_setup)
requirements_file_digest = await Get(Digest, PathGlobs, globs)

elif isinstance(request.requirements, LockfileContent):
is_monolithic_resolve = True
file_content = request.requirements.file_content
requirement_count = len(file_content.content.decode().splitlines())
argv.extend(["--requirement", file_content.path])
argv.append("--no-transitive")
if python_setup.invalid_lockfile_behavior in {
Expand All @@ -482,6 +484,7 @@ async def build_pex(
else:
assert isinstance(request.requirements, PexRequirements)
is_monolithic_resolve = request.requirements.is_all_constraints_resolve
requirement_count = len(request.requirements.req_strings)

if request.requirements.constraints_strings:
constraints_file = "__constraints.txt"
Expand Down Expand Up @@ -532,6 +535,10 @@ async def build_pex(
description=_build_pex_description(request),
output_files=output_files,
output_directories=output_directories,
# TODO: This is not the best heuristic for available concurrency, since the
# requirements almost certainly have transitive deps which also need building, but it
# is better than using something hardcoded.
concurrency_available=requirement_count,
),
)

Expand Down Expand Up @@ -979,6 +986,7 @@ class PexProcess:
output_directories: tuple[str, ...] | None
timeout_seconds: int | None
execution_slot_variable: str | None
concurrency_available: int
cache_scope: ProcessCacheScope

def __init__(
Expand All @@ -995,6 +1003,7 @@ def __init__(
output_directories: Iterable[str] | None = None,
timeout_seconds: int | None = None,
execution_slot_variable: str | None = None,
concurrency_available: int = 0,
cache_scope: ProcessCacheScope = ProcessCacheScope.SUCCESSFUL,
) -> None:
self.pex = pex
Expand All @@ -1008,6 +1017,7 @@ def __init__(
self.output_directories = tuple(output_directories) if output_directories else None
self.timeout_seconds = timeout_seconds
self.execution_slot_variable = execution_slot_variable
self.concurrency_available = concurrency_available
self.cache_scope = cache_scope


Expand Down Expand Up @@ -1037,6 +1047,7 @@ async def setup_pex_process(request: PexProcess, pex_environment: PexEnvironment
append_only_caches=complete_pex_env.append_only_caches,
timeout_seconds=request.timeout_seconds,
execution_slot_variable=request.execution_slot_variable,
concurrency_available=request.concurrency_available,
cache_scope=request.cache_scope,
)

Expand All @@ -1055,6 +1066,7 @@ class VenvPexProcess:
output_directories: tuple[str, ...] | None
timeout_seconds: int | None
execution_slot_variable: str | None
concurrency_available: int
cache_scope: ProcessCacheScope

def __init__(
Expand All @@ -1071,6 +1083,7 @@ def __init__(
output_directories: Iterable[str] | None = None,
timeout_seconds: int | None = None,
execution_slot_variable: str | None = None,
concurrency_available: int = 0,
cache_scope: ProcessCacheScope = ProcessCacheScope.SUCCESSFUL,
) -> None:
self.venv_pex = venv_pex
Expand All @@ -1084,6 +1097,7 @@ def __init__(
self.output_directories = tuple(output_directories) if output_directories else None
self.timeout_seconds = timeout_seconds
self.execution_slot_variable = execution_slot_variable
self.concurrency_available = concurrency_available
self.cache_scope = cache_scope


Expand Down Expand Up @@ -1117,6 +1131,7 @@ async def setup_venv_pex_process(
).append_only_caches,
timeout_seconds=request.timeout_seconds,
execution_slot_variable=request.execution_slot_variable,
concurrency_available=request.concurrency_available,
cache_scope=request.cache_scope,
)

Expand Down
8 changes: 8 additions & 0 deletions src/python/pants/backend/python/util_rules/pex_cli.py
Expand Up @@ -71,6 +71,7 @@ class PexCliProcess:
output_directories: Optional[Tuple[str, ...]]
python: Optional[PythonExecutable]
level: LogLevel
concurrency_available: int
cache_scope: ProcessCacheScope

def __init__(
Expand All @@ -86,6 +87,7 @@ def __init__(
output_directories: Optional[Iterable[str]] = None,
python: Optional[PythonExecutable] = None,
level: LogLevel = LogLevel.INFO,
concurrency_available: int = 0,
cache_scope: ProcessCacheScope = ProcessCacheScope.SUCCESSFUL,
) -> None:
self.subcommand = tuple(subcommand)
Expand All @@ -98,6 +100,7 @@ def __init__(
self.output_directories = tuple(output_directories) if output_directories else None
self.python = python
self.level = level
self.concurrency_available = concurrency_available
self.cache_scope = cache_scope
self.__post_init__()

Expand Down Expand Up @@ -164,6 +167,10 @@ async def setup_pex_cli_process(
"--tmpdir",
tmpdir,
]

if request.concurrency_available > 0:
global_args.extend(["--jobs", "{pants_concurrency}"])

if pex_runtime_env.verbosity > 0:
global_args.append(f"-{'v' * pex_runtime_env.verbosity}")

Expand Down Expand Up @@ -200,6 +207,7 @@ async def setup_pex_cli_process(
output_directories=request.output_directories,
append_only_caches=complete_pex_env.append_only_caches,
level=request.level,
concurrency_available=request.concurrency_available,
cache_scope=request.cache_scope,
)

Expand Down
3 changes: 3 additions & 0 deletions src/python/pants/engine/process.py
Expand Up @@ -74,6 +74,7 @@ class Process:
timeout_seconds: int | float
jdk_home: str | None
execution_slot_variable: str | None
concurrency_available: int
cache_scope: ProcessCacheScope
platform: str | None

Expand All @@ -94,6 +95,7 @@ def __init__(
timeout_seconds: int | float | None = None,
jdk_home: str | None = None,
execution_slot_variable: str | None = None,
concurrency_available: int = 0,
cache_scope: ProcessCacheScope = ProcessCacheScope.SUCCESSFUL,
platform: Platform | None = None,
) -> None:
Expand Down Expand Up @@ -140,6 +142,7 @@ def __init__(
self.timeout_seconds = timeout_seconds if timeout_seconds and timeout_seconds > 0 else -1
self.jdk_home = jdk_home
self.execution_slot_variable = execution_slot_variable
self.concurrency_available = concurrency_available
self.cache_scope = cache_scope
self.platform = platform.value if platform is not None else None

Expand Down
11 changes: 0 additions & 11 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions src/rust/engine/Cargo.toml
Expand Up @@ -21,7 +21,6 @@ resolver = "2"
members = [
".",
"async_latch",
"async_semaphore",
"async_value",
"cache",
"client",
Expand Down Expand Up @@ -62,7 +61,6 @@ members = [
default-members = [
".",
"async_latch",
"async_semaphore",
"async_value",
"cache",
"client",
Expand Down Expand Up @@ -105,7 +103,6 @@ default = []

[dependencies]
async_latch = { path = "async_latch" }
async_semaphore = { path = "async_semaphore" }
# Pin async-trait due to https://github.com/dtolnay/async-trait/issues/144.
async-trait = "=0.1.42"
protos = { path = "protos" }
Expand Down
14 changes: 0 additions & 14 deletions src/rust/engine/async_semaphore/Cargo.toml

This file was deleted.

0 comments on commit 706e384

Please sign in to comment.