Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batching of lint and fmt invokes #14186

Merged
merged 5 commits into from Jan 19, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/python/pants/backend/terraform/lint/tffmt/tffmt.py
Expand Up @@ -4,6 +4,7 @@
import textwrap

from pants.backend.terraform.style import StyleSetup, StyleSetupRequest
from pants.backend.terraform.target_types import TerraformFieldSet
from pants.backend.terraform.tool import TerraformProcess
from pants.backend.terraform.tool import rules as tool_rules
from pants.core.goals.fmt import FmtRequest, FmtResult
Expand Down Expand Up @@ -39,7 +40,7 @@ def register_options(cls, register):


class TffmtRequest(FmtRequest):
pass
field_set_type = TerraformFieldSet
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Not specific to this PR)

I find it somewhat noteworthy that as Pants evolves it's way of doing things, little idiosyncrasies come out of the woodwork (like this one). In #14182 I noticed GofmtRequest didn't inherit from LintRequest.

It seems to me potentially hazardous that code can "seemingly work" in one dimension or another, but then a change brings to light that it was incorrectly configured. I wonder when third-party plugins become more standard how we can be proactive about avoiding these possible idiosyncrasies.

Copy link
Sponsor Member Author

@stuhood stuhood Jan 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This particular change is because mypy can't check some usages of ClassVar, unfortunately. This is an abstract ClassVar that it failed to check was implemented for this subclass of FmtRequest.

In #14182 I noticed GofmtRequest didn't inherit from LintRequest.

That is because @union doesn't actually require that a type extend any particular interface, although @kaos has prototyped changing that: #12577. There is rough consensus that we should change unions, but not exactly how.



@rule(desc="Format with `terraform fmt`")
Expand Down
35 changes: 34 additions & 1 deletion src/python/pants/core/goals/fmt.py
Expand Up @@ -18,6 +18,7 @@
from pants.engine.rules import Get, MultiGet, collect_rules, goal_rule, rule
from pants.engine.target import SourcesField, Targets
from pants.engine.unions import UnionMembership, union
from pants.util.collections import partition_sequentially
from pants.util.logging import LogLevel
from pants.util.strutil import strip_v2_chroot_path

Expand Down Expand Up @@ -135,6 +136,11 @@ def register_options(cls, register) -> None:
advanced=True,
type=bool,
default=False,
removal_version="2.11.0.dev0",
removal_hint=(
"Formatters are now broken into multiple batches by default using the "
"`--batch-size` argument."
stuhood marked this conversation as resolved.
Show resolved Hide resolved
),
help=(
"Rather than formatting all files in a single batch, format each file as a "
"separate process.\n\nWhy do this? You'll get many more cache hits. Why not do "
Expand All @@ -145,11 +151,35 @@ def register_options(cls, register) -> None:
"faster than `--no-per-file-caching` for your use case."
),
)
register(
"--batch-size",
advanced=True,
type=int,
default=128,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We technically shouldn't change this default after we release this. Thoughts if it's worth us trying to benchmark what the optimal number is? I imagine that's hard to arrive at, including depending on your machine's specs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not just machine specs, but each tool will likely exhibit different characteristics affected by batch size 🤔

Could be fun to benchmark though 😉

Copy link
Sponsor Member Author

@stuhood stuhood Jan 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a bit of benchmarking to settle on 128 here on an integration branch with #9964 included: 128 was best by ~1%. Additional benchmarking and adjustment after both this and #9964 have landed will be good, since they interplay strongly with one another: I'll include more numbers over there.

Not just machine specs, but each tool will likely exhibit different characteristics affected by batch size 🤔

Yea, there are potentially a lot of dimensions here. But I think that from a complexity perspective, we're not going to want to expose per-tool batch size knobs without some quantitative justification (post landing).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK this one had me stumped until I tried it. My assumption was that the batches would fill all the threads and so in-tool parallelization would only result in over-allocating your resources. What I see though is that depending on the number of threads available, number of files in the target-list, and batch size, there are many points in time you're running fewer batches than available threads.

Of course, (and I hope to show this via data) using the poor-man's in-tool parallelization is likely not ideal as it isn't dynamic and would result in over-allocation of resources in the "bursts" where there are more-than-thread-count of rules.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be a power of two?

Copy link
Sponsor Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No.

help=(
"The target minimum number of files that will be included in each formatter batch.\n"
"\n"
"Formatter processes are batched for a few reasons:\n"
"\n"
"1. to avoid OS argument length limits (in processes which don't support argument "
"files)\n"
"2. to support more stable cache keys than would be possible if all files were "
"operated on in a single batch.\n"
"3. to allow for parallelism in formatter processes which don't have internal "
"parallelism, or -- if they do support internal parallelism -- to improve scheduling "
"behavior when multiple processes are competing for cores and so internal "
"parallelism cannot be used perfectly.\n"
),
)

@property
def per_file_caching(self) -> bool:
return cast(bool, self.options.per_file_caching)

@property
def batch_size(self) -> int:
return cast(int, self.options.batch_size)


class Fmt(Goal):
subsystem_cls = FmtSubsystem
Expand Down Expand Up @@ -187,9 +217,12 @@ async def fmt(
per_language_results = await MultiGet(
Get(
_LanguageFmtResults,
_LanguageFmtRequest(fmt_requests, Targets(targets)),
_LanguageFmtRequest(fmt_requests, Targets(target_batch)),
)
for fmt_requests, targets in targets_by_fmt_request_order.items()
for target_batch in partition_sequentially(
targets, key=lambda t: t.address.spec, size_min=fmt_subsystem.batch_size
)
)

individual_results = list(
Expand Down
3 changes: 1 addition & 2 deletions src/python/pants/core/goals/fmt_test.py
Expand Up @@ -17,7 +17,7 @@
from pants.engine.rules import Get, collect_rules, rule
from pants.engine.target import FieldSet, MultipleSourcesField, Target
from pants.engine.unions import UnionRule
from pants.testutil.rule_runner import RuleRunner, logging
from pants.testutil.rule_runner import RuleRunner
from pants.util.logging import LogLevel

FORTRAN_FILE = FileContent("formatted.f98", b"READ INPUT TAPE 5\n")
Expand Down Expand Up @@ -135,7 +135,6 @@ def run_fmt(rule_runner: RuleRunner, *, target_specs: List[str], per_file_cachin
return result.stderr


@logging
@pytest.mark.parametrize("per_file_caching", [True, False])
def test_summary(per_file_caching: bool) -> None:
"""Tests that the final summary is correct.
Expand Down
90 changes: 66 additions & 24 deletions src/python/pants/core/goals/lint.py
Expand Up @@ -16,8 +16,9 @@
from pants.engine.goal import Goal, GoalSubsystem
from pants.engine.process import FallibleProcessResult
from pants.engine.rules import Get, MultiGet, collect_rules, goal_rule
from pants.engine.target import Targets
from pants.engine.target import FieldSet, Targets
from pants.engine.unions import UnionMembership, union
from pants.util.collections import partition_sequentially
from pants.util.logging import LogLevel
from pants.util.memo import memoized_property
from pants.util.meta import frozen_after_init
Expand Down Expand Up @@ -153,6 +154,11 @@ def register_options(cls, register) -> None:
advanced=True,
type=bool,
default=False,
removal_version="2.11.0.dev0",
removal_hint=(
"Linters are now broken into multiple batches by default using the "
"`--batch-size` argument."
stuhood marked this conversation as resolved.
Show resolved Hide resolved
),
help=(
"Rather than linting all files in a single batch, lint each file as a "
"separate process.\n\nWhy do this? You'll get many more cache hits. Why not do "
Expand All @@ -163,11 +169,35 @@ def register_options(cls, register) -> None:
"faster than `--no-per-file-caching` for your use case."
),
)
register(
"--batch-size",
advanced=True,
type=int,
default=128,
help=(
"The target minimum number of files that will be included in each linter batch.\n"
"\n"
"Linter processes are batched for a few reasons:\n"
"\n"
"1. to avoid OS argument length limits (in processes which don't support argument "
"files)\n"
"2. to support more stable cache keys than would be possible if all files were "
"operated on in a single batch.\n"
"3. to allow for parallelism in linter processes which don't have internal "
"parallelism, or -- if they do support internal parallelism -- to improve scheduling "
"behavior when multiple processes are competing for cores and so internal "
"parallelism cannot be used perfectly.\n"
),
stuhood marked this conversation as resolved.
Show resolved Hide resolved
)

@property
def per_file_caching(self) -> bool:
return cast(bool, self.options.per_file_caching)

@property
def batch_size(self) -> int:
return cast(int, self.options.batch_size)


class Lint(Goal):
subsystem_cls = LintSubsystem
Expand All @@ -182,7 +212,7 @@ async def lint(
union_membership: UnionMembership,
dist_dir: DistDir,
) -> Lint:
request_types = cast("Iterable[type[StyleRequest]]", union_membership[LintRequest])
request_types = cast("Iterable[type[LintRequest]]", union_membership[LintRequest])
stuhood marked this conversation as resolved.
Show resolved Hide resolved
requests = tuple(
request_type(
request_type.field_set_type.create(target)
Expand All @@ -193,36 +223,48 @@ async def lint(
)

if lint_subsystem.per_file_caching:
all_per_file_results = await MultiGet(
all_batch_results = await MultiGet(
Get(LintResults, LintRequest, request.__class__([field_set]))
for request in requests
for field_set in request.field_sets
if request.field_sets
for field_set in request.field_sets
)
else:

def key_fn(results: LintResults):
return results.linter_name

# NB: We must pre-sort the data for itertools.groupby() to work properly.
sorted_all_per_files_results = sorted(all_per_file_results, key=key_fn)
# We consolidate all results for each linter into a single `LintResults`.
all_results = tuple(
LintResults(
itertools.chain.from_iterable(
per_file_results.results for per_file_results in all_linter_results
),
linter_name=linter_name,
)
for linter_name, all_linter_results in itertools.groupby(
sorted_all_per_files_results, key=key_fn
def address_str(fs: FieldSet) -> str:
return fs.address.spec

all_batch_results = await MultiGet(
Get(LintResults, LintRequest, request.__class__(field_sets))
for request in requests
if request.field_sets
for field_sets in partition_sequentially(
stuhood marked this conversation as resolved.
Show resolved Hide resolved
request.field_sets, key=address_str, size_min=lint_subsystem.batch_size
)
)
else:
all_results = await MultiGet(
Get(LintResults, LintRequest, request) for request in requests if request.field_sets
)

all_results = tuple(sorted(all_results, key=lambda results: results.linter_name))
def key_fn(results: LintResults):
return results.linter_name

# NB: We must pre-sort the data for itertools.groupby() to work properly.
sorted_all_batch_results = sorted(all_batch_results, key=key_fn)
# We consolidate all results for each linter into a single `LintResults`.
all_results = tuple(
sorted(
(
LintResults(
itertools.chain.from_iterable(
per_file_results.results for per_file_results in all_linter_results
),
linter_name=linter_name,
)
for linter_name, all_linter_results in itertools.groupby(
sorted_all_batch_results, key=key_fn
)
),
key=lambda results: results.linter_name,
stuhood marked this conversation as resolved.
Show resolved Hide resolved
)
)

def get_tool_name(res: LintResults) -> str:
return res.linter_name
Expand Down
102 changes: 62 additions & 40 deletions src/python/pants/core/goals/lint_test.py
Expand Up @@ -112,7 +112,8 @@ def run_lint_rule(
*,
lint_request_types: List[Type[LintRequest]],
targets: List[Target],
per_file_caching: bool,
per_file_caching: bool = False,
batch_size: int = 128,
) -> Tuple[int, str]:
with mock_console(rule_runner.options_bootstrapper) as (console, stdio_reader):
union_membership = UnionMembership({LintRequest: lint_request_types})
Expand All @@ -123,7 +124,9 @@ def run_lint_rule(
Workspace(rule_runner.scheduler, _enforce_effects=False),
Targets(targets),
create_goal_subsystem(
LintSubsystem, per_file_caching=per_file_caching, per_target_caching=False
LintSubsystem,
per_file_caching=per_file_caching,
batch_size=128,
),
union_membership,
DistDir(relpath=Path("dist")),
Expand All @@ -141,22 +144,20 @@ def run_lint_rule(
return result.exit_code, stdio_reader.get_stderr()


def test_invalid_target_noops(rule_runner: RuleRunner) -> None:
def assert_noops(per_file_caching: bool) -> None:
exit_code, stderr = run_lint_rule(
rule_runner,
lint_request_types=[InvalidRequest],
targets=[make_target()],
per_file_caching=per_file_caching,
)
assert exit_code == 0
assert stderr == ""

assert_noops(per_file_caching=False)
assert_noops(per_file_caching=True)
@pytest.mark.parametrize("per_file_caching", [True, False])
def test_invalid_target_noops(rule_runner: RuleRunner, per_file_caching: bool) -> None:
exit_code, stderr = run_lint_rule(
rule_runner,
lint_request_types=[InvalidRequest],
targets=[make_target()],
per_file_caching=per_file_caching,
)
assert exit_code == 0
assert stderr == ""


def test_summary(rule_runner: RuleRunner) -> None:
@pytest.mark.parametrize("per_file_caching", [True, False])
def test_summary(rule_runner: RuleRunner, per_file_caching: bool) -> None:
"""Test that we render the summary correctly.

This tests that we:
Expand All @@ -166,31 +167,52 @@ def test_summary(rule_runner: RuleRunner) -> None:
good_address = Address("", target_name="good")
bad_address = Address("", target_name="bad")

def assert_expected(*, per_file_caching: bool) -> None:
exit_code, stderr = run_lint_rule(
rule_runner,
lint_request_types=[
ConditionallySucceedsRequest,
FailingRequest,
SkippedRequest,
SuccessfulRequest,
],
targets=[make_target(good_address), make_target(bad_address)],
per_file_caching=per_file_caching,
)
assert exit_code == FailingRequest.exit_code([bad_address])
assert stderr == dedent(
"""\

𐄂 ConditionallySucceedsLinter failed.
𐄂 FailingLinter failed.
- SkippedLinter skipped.
✓ SuccessfulLinter succeeded.
"""
)
exit_code, stderr = run_lint_rule(
rule_runner,
lint_request_types=[
ConditionallySucceedsRequest,
FailingRequest,
SkippedRequest,
SuccessfulRequest,
],
targets=[make_target(good_address), make_target(bad_address)],
per_file_caching=per_file_caching,
)
assert exit_code == FailingRequest.exit_code([bad_address])
assert stderr == dedent(
"""\

assert_expected(per_file_caching=False)
assert_expected(per_file_caching=True)
𐄂 ConditionallySucceedsLinter failed.
𐄂 FailingLinter failed.
- SkippedLinter skipped.
✓ SuccessfulLinter succeeded.
"""
)


@pytest.mark.parametrize("batch_size", [1, 32, 128, 1024])
def test_batched(rule_runner: RuleRunner, batch_size: int) -> None:
exit_code, stderr = run_lint_rule(
rule_runner,
lint_request_types=[
ConditionallySucceedsRequest,
FailingRequest,
SkippedRequest,
SuccessfulRequest,
],
targets=[make_target(Address("", target_name=f"good{i}")) for i in range(0, 512)],
batch_size=batch_size,
)
assert exit_code == FailingRequest.exit_code([])
assert stderr == dedent(
"""\

✓ ConditionallySucceedsLinter succeeded.
𐄂 FailingLinter failed.
- SkippedLinter skipped.
✓ SuccessfulLinter succeeded.
"""
)


def test_streaming_output_skip() -> None:
Expand Down
1 change: 1 addition & 0 deletions src/python/pants/engine/internals/native_engine.pyi
Expand Up @@ -291,6 +291,7 @@ def lease_files_in_graph(scheduler: PyScheduler, session: PySession) -> None: ..
def strongly_connected_components(
adjacency_lists: Sequence[Tuple[Any, Sequence[Any]]]
) -> Sequence[Sequence[Any]]: ...
def hash_prefix_zero_bits(item: str) -> int: ...

class PyExecutionRequest:
def __init__(
Expand Down