-
-
Notifications
You must be signed in to change notification settings - Fork 608
/
style_request.py
124 lines (98 loc) · 4.24 KB
/
style_request.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# Copyright 2020 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).
from __future__ import annotations
import logging
import os.path
from abc import ABCMeta
from dataclasses import dataclass
from typing import Any, Callable, ClassVar, Generic, Iterable, Sequence, TypeVar
from typing_extensions import Protocol
from pants.core.util_rules.distdir import DistDir
from pants.engine.collection import Collection
from pants.engine.engine_aware import EngineAwareParameter
from pants.engine.fs import EMPTY_DIGEST, Digest, Snapshot, Workspace
from pants.engine.target import FieldSet
from pants.util.meta import frozen_after_init
from pants.util.strutil import path_safe
logger = logging.getLogger(__name__)
_FS = TypeVar("_FS", bound=FieldSet)
def style_batch_size_help(uppercase: str, lowercase: str) -> str:
return (
f"The target number of files to be included in each {lowercase} batch.\n"
"\n"
f"{uppercase} processes are batched for a few reasons:\n"
"\n"
"1. to avoid OS argument length limits (in processes which don't support argument "
"files)\n"
"2. to support more stable cache keys than would be possible if all files were "
"operated on in a single batch.\n"
f"3. to allow for parallelism in {lowercase} processes which don't have internal "
"parallelism, or -- if they do support internal parallelism -- to improve scheduling "
"behavior when multiple processes are competing for cores and so internal "
"parallelism cannot be used perfectly.\n"
"\n"
"In order to improve cache hit rates (see 2.), batches are created at stable boundaries, "
'and so this value is only a "target" batch size (rather than an exact value).'
)
@frozen_after_init
@dataclass(unsafe_hash=True)
class StyleRequest(Generic[_FS], EngineAwareParameter, metaclass=ABCMeta):
"""A request to format or lint a collection of `FieldSet`s.
Should be subclassed for a particular style engine in order to support autoformatting or
linting.
"""
field_set_type: ClassVar[type[_FS]]
field_sets: Collection[_FS]
# TODO: Move this onto `FmtRequest`.
prior_formatter_result: Snapshot | None = None
def __init__(
self,
field_sets: Iterable[_FS],
*,
prior_formatter_result: Snapshot | None = None,
) -> None:
self.field_sets = Collection[_FS](field_sets)
self.prior_formatter_result = prior_formatter_result
def metadata(self) -> dict[str, Any]:
return {"addresses": [fs.address.spec for fs in self.field_sets]}
class _ResultWithReport(Protocol):
@property
def report(self) -> Digest:
...
@property
def partition_description(self) -> str | None:
...
class _ResultsWithReports(Protocol):
@property
def results(self) -> Sequence[_ResultWithReport]:
...
_R = TypeVar("_R", bound=_ResultsWithReports)
def write_reports(
all_results: tuple[_ResultsWithReports, ...],
workspace: Workspace,
dist_dir: DistDir,
*,
goal_name: str,
get_tool_name: Callable[[_R], str],
) -> None:
disambiguated_dirs: set[str] = set()
def write_report(digest: Digest, subdir: str) -> None:
while subdir in disambiguated_dirs:
# It's unlikely that two distinct partition descriptions will become the
# same after path_safe(), but might as well be safe.
subdir += "_"
disambiguated_dirs.add(subdir)
output_dir = str(dist_dir.relpath / goal_name / subdir)
workspace.write_digest(digest, path_prefix=output_dir)
logger.info(f"Wrote {goal_name} report files to {output_dir}.")
for results in all_results:
tool_name = get_tool_name(results).lower() # type: ignore[arg-type]
if len(results.results) == 1 and results.results[0].report != EMPTY_DIGEST:
write_report(results.results[0].report, tool_name)
else:
for result in results.results:
if result.report != EMPTY_DIGEST:
write_report(
result.report,
os.path.join(tool_name, path_safe(result.partition_description or "all")),
)