-
Notifications
You must be signed in to change notification settings - Fork 4
/
yaml_check.py
189 lines (160 loc) · 6.59 KB
/
yaml_check.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
from __future__ import print_function
import sys
import re
from operator import attrgetter
from collections import OrderedDict
import yaml
from amf_check_writer.exceptions import InvalidRowError
from amf_check_writer.base_file import AmfFile
from amf_check_writer.cvs.base import StripWhitespaceReader
class YamlCheck(AmfFile):
"""
A YAML file that can be used with cc-yaml to run a suite of checks
"""
def to_yaml_check(self):
"""
Use `get_yaml_checks` to write a YAML check suite for use with cc-yaml
:return: the YAML document as a string
"""
return yaml.dump({
"suite_name": "{}_checks".format(self.namespace),
"description": "Check '{}' in AMF files".format(" ".join(self.facets)),
"checks": list(self.get_yaml_checks())
})
def get_yaml_checks(self):
"""
Return an iterable of check dictionaries for use with cc-yaml check
suite. Must be implemented in child classes.
"""
raise NotImplementedError
class WrapperYamlCheck(YamlCheck):
"""
Wrapper check that just includes checks from other files
"""
def __init__(self, child_checks, *args, **kwargs):
self.child_checks = child_checks
super(WrapperYamlCheck, self).__init__(*args, **kwargs)
def get_yaml_checks(self):
for check in sorted(self.child_checks, key=attrgetter("namespace")):
yield {"__INCLUDE__": check.get_filename("yml")}
class FileInfoCheck(YamlCheck):
"""
Checks for general properties of files. Note that this is entirely static
and does not depend on any data from the spreadsheets
"""
def get_yaml_checks(self):
check_package = "checklib.register.file_checks_register"
size_checks = [
("soft", 2, "LOW"),
("hard", 4, "HIGH")
]
for strictness, limit, level in size_checks:
yield {
"check_id": "check_{}_file_size_limit".format(strictness),
"check_name": "{}.FileSizeCheck".format(check_package),
"check_level": level,
"parameters": {"strictness": strictness, "threshold": limit}
}
yield {
"check_id": "check_filename_structure",
"check_name": "{}.FileNameStructureCheck".format(check_package),
"check_level": "HIGH",
"parameters": {"delimiter": "_", "extension": ".nc"}
}
class FileStructureCheck(YamlCheck):
"""
Check a dataset is a valid NetCDF4 file. Note that this is entirely static
and does not depend on any data from the spreadsheets
"""
def get_yaml_checks(self):
yield {
"check_id": "check_valid_netcdf4_file",
"check_name": "checklib.register.nc_file_checks_register.NetCDFFormatCheck",
"parameters": {"format": "NETCDF4_CLASSIC"}
}
class GlobalAttrCheck(YamlCheck):
"""
Check that value of global attributes match given regular expressions
"""
def __init__(self, tsv_file, facets):
"""
Parse TSV file and construct regexes
:param tsv_file: file object for the input TSV file
:param facets: filename facets
"""
super(GlobalAttrCheck, self).__init__(facets)
reader = StripWhitespaceReader(tsv_file, delimiter="\t")
self.regexes = OrderedDict()
for row in reader:
try:
attr, regex = GlobalAttrCheck.parse_row(row)
self.regexes[attr] = regex
except InvalidRowError:
pass
except ValueError as ex:
print("WARNING: {}".format(ex), file=sys.stderr)
def get_yaml_checks(self):
check_name = "checklib.register.nc_file_checks_register.GlobalAttrRegexCheck"
for attr, regex in self.regexes.items():
yield {
"check_id": "check_{}_global_attribute".format(attr),
"check_name": check_name,
"parameters": {"attribute": attr, "regex": regex}
}
@classmethod
def parse_row(cls, row):
"""
Parse a row of the spreadsheet to get the attribute name and a regex to
check the attribute value
:param row: Row from spreadsheet as a dict indexed by column name
:return: A tuple (attr, regex) where regex is a python regex as a
string
:raises ValueError: if compliance checking rule is not recognised
:raises InvalidRowError: if the row could not be parsed
"""
try:
attr = row["Name"]
rule = row["Compliance checking rules"]
assert attr and rule
except (KeyError, AssertionError):
raise InvalidRowError()
# Regexes for exact matches in the rule column
_NOT_APPLICABLE_RULES = "(N/A)|(NA)|(N A)|(n/a)|(na)|(n a)|" \
"(Not Applicable)|(Not applicable)|(Not available)|(Not Available)|" \
"(not applicable)|(not available)"
static_rules = {
"Integer": r"-?\d+",
"Valid email": r"[^@\s]+@[^@\s]+\.[^\s@]+",
"Valid URL": r"https?://[^\s]+\.[^\s]*[^\s\.](/[^\s]+)?",
"Valid URL _or_ N/A": r"(https?://[^\s]+\.[^\s]*[^\s\.](/[^\s]+))|" + _NOT_APPLICABLE_RULES,
"Match: vN.M": r"v\d\.\d",
"Match: YYYY-MM-DDThh:mm:ss\.\d+": "\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d+)?",
"Match: YYYY-MM-DDThh:mm:ss\.\d+ _or_ N/A":
"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d+)?)|" + _NOT_APPLICABLE_RULES,
"Exact match: <number> m": r"-?\d+(\.\d+)? m",
}
# Regexes based on a regex in the rule column
regex_rules = {
r"String: min (?P<count>\d+) characters?":
lambda m: r".{" + str(m.group("count")) + r",}"
}
regex = None
try:
regex = static_rules[rule]
except KeyError:
for rule_regex, func in regex_rules.items():
match = re.match(rule_regex, rule)
if match:
regex = func(match)
break
if regex is None:
# Handle 'exact match' case where need to look at other columns
fixed_val_col = "Fixed Value"
if (fixed_val_col in row
and rule.lower() in ("exact match", "exact match of text to the left")):
regex = re.escape(row["Fixed Value"])
else:
raise ValueError(
"Unrecognised global attribute check rule: {}".format(rule)
)
return attr, regex