-
-
Notifications
You must be signed in to change notification settings - Fork 606
/
source_file_validator.py
337 lines (272 loc) · 11.7 KB
/
source_file_validator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
# Copyright 2019 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).
import re
import textwrap
from dataclasses import dataclass
from enum import Enum
from typing import Any, Dict, Set, Tuple, cast
from pants.base.exiter import PANTS_FAILED_EXIT_CODE, PANTS_SUCCEEDED_EXIT_CODE
from pants.engine.collection import Collection
from pants.engine.console import Console
from pants.engine.fs import Digest, DigestContents, SourcesSnapshot
from pants.engine.goal import Goal, GoalSubsystem
from pants.engine.rules import goal_rule, register_rules
from pants.engine.selectors import Get
from pants.subsystem.subsystem import Subsystem
from pants.util.frozendict import FrozenDict
from pants.util.memo import memoized_method
class DetailLevel(Enum):
"""How much detail about validation to emit to the console.
none: Emit nothing.
summary: Emit a summary only.
nonmatching: Emit details for files that failed to match at least one pattern.
name_only: Emit just the paths of files that failed to match at least one pattern.
all: Emit details for all files.
"""
none = "none"
summary = "summary"
nonmatching = "nonmatching"
names = "names"
all = "all"
class ValidateSubsystem(GoalSubsystem):
"""Validate sources against regexes."""
name = "validate"
@classmethod
def register_options(cls, register):
super().register_options(register)
register(
"--detail-level",
type=DetailLevel,
default=DetailLevel.nonmatching,
help="How much detail to emit to the console.",
)
@property
def detail_level(self) -> DetailLevel:
return cast(DetailLevel, self.options.detail_level)
class Validate(Goal):
subsystem_cls = ValidateSubsystem
@dataclass(frozen=True)
class PathPattern:
name: str
pattern: str
inverted: bool = False
content_encoding: str = "utf8"
@dataclass(frozen=True)
class ContentPattern:
name: str
pattern: str
inverted: bool = False
@dataclass(frozen=True)
class ValidationConfig:
path_patterns: Tuple[PathPattern, ...]
content_patterns: Tuple[ContentPattern, ...]
required_matches: FrozenDict[str, Tuple[str]] # path pattern name -> content pattern names.
@classmethod
def from_dict(cls, d: Dict[str, Any]) -> "ValidationConfig":
return cls(
path_patterns=tuple(PathPattern(**kwargs) for kwargs in d["path_patterns"]),
content_patterns=tuple(ContentPattern(**kwargs) for kwargs in d["content_patterns"]),
required_matches=FrozenDict({k: tuple(v) for k, v in d["required_matches"].items()}),
)
class SourceFileValidation(Subsystem):
"""Configuration for source file validation."""
options_scope = "sourcefile-validation"
@classmethod
def register_options(cls, register):
schema_help = textwrap.dedent(
"""
Config schema is as follows:
{
'path_patterns': [
{
'name': path_pattern1',
'pattern': <path regex pattern>,
'inverted': True|False (defaults to False),
'content_encoding': <encoding> (defaults to utf8)
},
...
],
'content_patterns': [
{
'name': 'content_pattern1',
'pattern': <content regex pattern>,
'inverted': True|False (defaults to False)
}
...
],
'required_matches': {
'path_pattern1': [content_pattern1, content_pattern2],
'path_pattern2': [content_pattern1, content_pattern3],
...
}
}
Meaning: if a file matches some path pattern, its content must match all
the corresponding content patterns.
"""
)
super().register_options(register)
register(
"--config", type=dict, fromfile=True, help=schema_help,
)
@memoized_method
def get_multi_matcher(self):
return MultiMatcher(ValidationConfig.from_dict(self.options.config))
@dataclass(frozen=True)
class RegexMatchResult:
"""The result of running regex matches on a source file."""
path: str
matching: Tuple
nonmatching: Tuple
class RegexMatchResults(Collection[RegexMatchResult]):
pass
class Matcher:
"""Class to match a single (possibly inverted) regex.
Matches are allowed anywhere in the string (so really a "search" in the Python regex parlance).
To anchor a match at the beginning of a string, use the ^ anchor. To anchor at the beginning of
any line, use the ^ anchor along with the MULTILINE directive (?m). See test for examples.
"""
def __init__(self, pattern, inverted=False):
self.compiled_regex = re.compile(pattern)
self.inverted = inverted
def matches(self, s):
"""Whether the pattern matches anywhere in the string s."""
regex_matches = self.compiled_regex.search(s) is not None
return not regex_matches if self.inverted else regex_matches
class PathMatcher(Matcher):
"""A matcher for matching file paths."""
def __init__(self, path_pattern: PathPattern):
super().__init__(path_pattern.pattern, path_pattern.inverted)
# The expected encoding of the content of files whose paths match this pattern.
self.content_encoding = path_pattern.content_encoding
class ContentMatcher(Matcher):
"""A matcher for matching file content."""
def __init__(self, content_pattern: ContentPattern):
super().__init__(content_pattern.pattern, content_pattern.inverted)
class MultiMatcher:
def __init__(self, config: ValidationConfig):
"""Class to check multiple regex matching on files.
:param dict config: Regex matching config (see above).
"""
# Validate the pattern names mentioned in required_matches.
path_patterns_used: Set[str] = set()
content_patterns_used: Set[str] = set()
for k, v in config.required_matches.items():
path_patterns_used.add(k)
if not isinstance(v, (tuple, list)):
raise ValueError(
"Value for path pattern {} in required_matches must be tuple of "
"content pattern names, but was {}".format(k, v)
)
content_patterns_used.update(v)
unknown_path_patterns = path_patterns_used.difference(
pp.name for pp in config.path_patterns
)
if unknown_path_patterns:
raise ValueError(
"required_matches uses unknown path pattern names: "
"{}".format(", ".join(sorted(unknown_path_patterns)))
)
unknown_content_patterns = content_patterns_used.difference(
cp.name for cp in config.content_patterns
)
if unknown_content_patterns:
raise ValueError(
"required_matches uses unknown content pattern names: "
"{}".format(", ".join(sorted(unknown_content_patterns)))
)
self._path_matchers = {pp.name: PathMatcher(pp) for pp in config.path_patterns}
self._content_matchers = {cp.name: ContentMatcher(cp) for cp in config.content_patterns}
self._required_matches = config.required_matches
def check_source_file(self, path, content):
content_pattern_names, encoding = self.get_applicable_content_pattern_names(path)
matching, nonmatching = self.check_content(content_pattern_names, content, encoding)
return RegexMatchResult(path, matching, nonmatching)
def check_content(self, content_pattern_names, content, encoding):
"""Check which of the named patterns matches the given content.
Returns a pair (matching, nonmatching), in which each element is a tuple of pattern names.
:param iterable content_pattern_names: names of content patterns to check.
:param bytes content: the content to check.
:param str encoding: the expected encoding of content.
"""
if not content_pattern_names or not encoding:
return (), ()
matching = []
nonmatching = []
for content_pattern_name in content_pattern_names:
if self._content_matchers[content_pattern_name].matches(content.decode(encoding)):
matching.append(content_pattern_name)
else:
nonmatching.append(content_pattern_name)
return tuple(matching), tuple(nonmatching)
def get_applicable_content_pattern_names(self, path):
"""Return the content patterns applicable to a given path.
Returns a tuple (applicable_content_pattern_names, content_encoding).
If path matches no path patterns, the returned content_encoding will be None (and
applicable_content_pattern_names will be empty).
"""
encodings = set()
applicable_content_pattern_names = set()
for path_pattern_name, content_pattern_names in self._required_matches.items():
m = self._path_matchers[path_pattern_name]
if m.matches(path):
encodings.add(m.content_encoding)
applicable_content_pattern_names.update(content_pattern_names)
if len(encodings) > 1:
raise ValueError(
"Path matched patterns with multiple content encodings ({}): {}".format(
", ".join(sorted(encodings)), path
)
)
content_encoding = next(iter(encodings)) if encodings else None
return applicable_content_pattern_names, content_encoding
# TODO: Consider switching this to `lint`. The main downside is that we would no longer be able to
# run on files with no owning targets, such as running on BUILD files.
@goal_rule
async def validate(
console: Console,
sources_snapshot: SourcesSnapshot,
validate_subsystem: ValidateSubsystem,
source_file_validation: SourceFileValidation,
) -> Validate:
multi_matcher = source_file_validation.get_multi_matcher()
digest_contents = await Get(DigestContents, Digest, sources_snapshot.snapshot.digest)
regex_match_results = RegexMatchResults(
multi_matcher.check_source_file(file_content.path, file_content.content)
for file_content in sorted(digest_contents, key=lambda fc: fc.path)
)
detail_level = validate_subsystem.detail_level
num_matched_all = 0
num_nonmatched_some = 0
for rmr in regex_match_results:
if not rmr.matching and not rmr.nonmatching:
continue
if detail_level == DetailLevel.names:
if rmr.nonmatching:
console.print_stdout(rmr.path)
continue
if rmr.nonmatching:
icon = "X"
num_nonmatched_some += 1
else:
icon = "V"
num_matched_all += 1
matched_msg = " Matched: {}".format(",".join(rmr.matching)) if rmr.matching else ""
nonmatched_msg = (
" Didn't match: {}".format(",".join(rmr.nonmatching)) if rmr.nonmatching else ""
)
if detail_level == DetailLevel.all or (
detail_level == DetailLevel.nonmatching and nonmatched_msg
):
console.print_stdout("{} {}:{}{}".format(icon, rmr.path, matched_msg, nonmatched_msg))
if detail_level not in (DetailLevel.none, DetailLevel.names):
console.print_stdout("\n{} files matched all required patterns.".format(num_matched_all))
console.print_stdout(
"{} files failed to match at least one required pattern.".format(num_nonmatched_some)
)
if num_nonmatched_some:
exit_code = PANTS_FAILED_EXIT_CODE
else:
exit_code = PANTS_SUCCEEDED_EXIT_CODE
return Validate(exit_code)
def rules():
return register_rules()