/
base.py
446 lines (353 loc) · 14.6 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
# SPDX-License-Identifier: LGPL-2.1-or-later
#
# Copyright (C) 2018-2023 Collabora Limited
# Author: Guillaume Tucker <guillaume.tucker@collabora.com>
# mypy: ignore-errors
"""Common classes for all YAML pipeline config types"""
import copy
import re
import yaml
BUILDROOT_ARCH = {
'arm': 'armel',
'x86_64': 'x86',
}
CROS_ARCH = {
'arm': 'armel',
}
DEB_ARCH = {
'arm': 'armhf',
'riscv': 'riscv64',
'x86_64': 'amd64',
}
KERNEL_ARCH = {
'armel': 'arm',
'x86_64': 'x86',
}
def get_system_arch(system: str, arch: str):
"""Get system-dependent architecture string
Depending on the OS and/or build system, the string used to identify a
given hardware architecture can vary. For example, `amd64` (used in Debian)
is `x86` in the Linux kernel and Buildroot, and `x86_64` for ChromeOS.
This function returns the string used by the selected system depending on
the architecture defined in the KernelCI configuration. The `system`
argument can have one of the following values:
* `brarch`(Buildroot)
* `crosarch`(ChromeOS)
* `debarch`(Debian)
* `karch`(Linux)
"""
if system == 'brarch':
return BUILDROOT_ARCH.get(arch) or arch
if system == 'crosarch':
return CROS_ARCH.get(arch) or arch
if system == 'debarch':
return DEB_ARCH.get(arch) or arch
if system == 'karch':
return KERNEL_ARCH.get(arch) or arch
print(f"Unknown system-specific architecture field '{arch}'")
return arch
def _format_dict_strings(param, fmap):
"""Format strings from a dict based on a format map
Modify all the string objects under a dict, processing each one as an
f-string using values from fmap as format arguments. This is typically
executed to set generic configs with placeholders for job/platform-specific
attributes, in order to e.g. reuse a single config for platforms of
different architectures.
"""
if isinstance(param, str):
try:
param = param.format_map(fmap)
except KeyError:
return param # Don't do anything but keep python happy
elif isinstance(param, dict):
for key in param:
param[key] = _format_dict_strings(param[key], fmap)
return param
class YAMLConfigObject(yaml.YAMLObject):
"""Base class with helper methods to handle configuration YAML data
This class contains methods to help constructing configuration objects from
YAML data. Then each subclass should implement its standard `to_yaml()`
method to be able to dump the whole configuration hierarchy back to YAML.
"""
@classmethod
def load_from_yaml(cls, config, **kwargs):
"""Load the YAML configuration
Load the YAML configuration passed as a *config* data structure with a
given *name*. This method should return an instance of a
YAMLConfigObject subclass.
"""
yaml_attributes = cls._get_yaml_attributes()
kwargs.update(cls._kw_from_yaml(config, yaml_attributes))
return cls(**kwargs)
@classmethod
def _kw_from_yaml(cls, data, attributes):
"""Create some keyword arguments based on a YAML dictionary
Return a dictionary suitable to be used as Python keyword arguments in
an object constructor using values from some YAML *data*. The
*attributes* are a list of keys to look up from the *data* and convert
to a dictionary. Keys that are not in the YAML data are simply omitted
from the returned keywords, relying on default values in object
constructors.
"""
return {
k: v for k, v in ((k, data.get(k))for k in attributes)
if v is not None
} if data else {}
@classmethod
def _get_yaml_attributes(cls):
"""Get a set of YAML attribute names
Get a set object with all the YAML configuration attribute names for
the configuration class. This can be used to make keyword arguments
when creating a configuration object as well as when serialising it
back to YAML.
"""
return set()
@classmethod
def to_yaml(cls, dumper, data):
return dumper.represent_mapping(
'tag:yaml.org,2002:map', {
key: getattr(data, key)
for key in cls._get_yaml_attributes()
}
)
def _get_format_map(self):
"""Get object attributes as dict
Get a given object's attributes as a dictionary where the keys are the
attribute's name and the values are the attribute's value. The output
from this function can then be used to process f-strings containing
attribute names as placeholders.
"""
return {
k: getattr(self, k)
for k in self._get_yaml_attributes() if k not in ('params', 'rules')
}
def format_params(self, param, fmap=None):
"""Format strings from a dict based on object attributes
Modify all the strings under a dict object, processing each one as an
f-string using the object attributes combined with the optional 'fmap'
parameter as format arguments.
"""
args = self._get_format_map()
if fmap:
args.update(fmap)
arch = args.get('arch')
if arch:
for system in ('brarch', 'crosarch', 'debarch', 'karch'):
args.update({system: get_system_arch(system, arch)})
return _format_dict_strings(param, args)
class _YAMLObject:
"""Base class with helper methods to initialise objects from YAML data."""
@classmethod
def from_yaml(cls, config, **kwargs):
"""Load the YAML configuration
Load the YAML configuration passed as a *config* data structure with a
given *name*. This method should return an instance of a _YAMLObject
subclass.
"""
yaml_attributes = cls._get_yaml_attributes()
kwargs.update(cls._kw_from_yaml(config, yaml_attributes))
return cls(**kwargs)
@classmethod
def _kw_from_yaml(cls, data, attributes):
"""Create some keyword arguments based on a YAML dictionary
Return a dictionary suitable to be used as Python keyword arguments in
an object constructor using values from some YAML *data*. The
*attributes* are a list of keys to look up from the *data* and convert
to a dictionary. Keys that are not in the YAML data are simply omitted
from the returned keywords, relying on default values in object
constructors.
"""
return {
k: v for k, v in ((k, data.get(k))for k in attributes)
if v is not None
} if data else {}
@classmethod
def _get_yaml_attributes(cls):
"""Get a set of YAML attribute names
Get a set object with all the YAML configuration attribute names for
the configuration class. This can be used to make keyword arguments
when creating a configuration object as well as when serialising it
back to YAML.
"""
return set()
def to_dict(self):
"""Create a dictionary with the configuration data
Create a Python dictionary object with key-values representing the
configuration data originally imported from YAML. This may not include
non-serialisable objects such as Filters.
"""
return {
attr: value for attr, value in (
(attr, getattr(self, attr))
for attr in self._get_yaml_attributes()
) if value is not None
}
def to_yaml(self):
"""Recreate a YAML representation of the configuration data
Recreate a YAML text representation of the key-values made available
via the .to_dict() method. This can be used to serialise a
configuration object and load it again without access to the full
original YAML files.
"""
return yaml.dump(self.to_dict())
class Filter(YAMLConfigObject):
"""Base class to implement arbitrary configuration filters."""
def __init__(self, items):
"""The *items* can be any data used to filter configurations."""
self._items = items
@property
def items(self):
"""Filter items"""
return self._items
@classmethod
def to_yaml(cls, dumper, data):
return dumper.represent_mapping(
'tag:yaml.org,2002:map', dict(data.items.items())
)
def match(self, **kw):
"""Return True if the given *kw* keywords match the filter."""
raise NotImplementedError("Filter.match() is not implemented")
def combine(self, items): # pylint: disable=unused-argument
"""Try to avoid making a new filter if we can make a combined
filter matching both our existing data and the new items.
The *items* can be any data used to filter configurations.
Return True if we can combine, False otherwise.
"""
return False
def _merge_filter_lists(old, update):
"""Merge the items for a Blocklist or Passlist.
*old* is the items from the existing filter list that we
have already created.
*update* are the items of a new filter list, of the same type as
*old*, loaded from another configuration source. We are
going represent this second filter list declaration by
updating *old*, rather than by having a new object to
represent it.
"""
for key, value in update.items():
old.setdefault(key, []).extend(value)
class Blocklist(Filter):
"""Blocklist filter to discard certain configurations.
Blocklist *items* are a dictionary associating keys with lists of values.
Any configuration with a key-value pair present in these lists will be
rejected.
"""
yaml_tag = '!BlockList'
name = 'blocklist'
def match(self, **kwargs):
for key, value in kwargs.items():
blocklist = self._items.get(key)
if not blocklist:
continue
if any(item in value for item in blocklist):
return False
return True
def combine(self, items):
_merge_filter_lists(self._items, items)
return True
class Passlist(Filter):
"""Passlist filter to only accept certain configurations.
Passlist *items* are a dictionary associating keys with lists of values.
For a configuration to be accepted, there must be a value found in each of
these lists.
"""
yaml_tag = '!PassList'
name = 'passlist'
def match(self, **kwargs):
for key, passlist in self._items.items():
value = kwargs.get(key)
if not value:
return False
if not any(item in value for item in passlist):
return False
return True
def combine(self, items):
_merge_filter_lists(self._items, items)
return True
class Regex(Filter):
"""Regex filter to only accept certain configurations.
Regex *items* are a dictionary associating keys with regular expressions.
The should be one regular expression for each key, not a list of them. For
a configuration to be accepted, its value must match the regular expression
for each key specified in the filter items.
"""
yaml_tag = '!Regex'
name = 'regex'
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._re_items = {k: re.compile(v) for k, v in self._items.items()}
def match(self, **kwargs):
for key, regex in self._re_items.items():
value = kwargs.get(key)
return value and regex.match(value)
class Combination(Filter):
"""Combination filter to only accept some combined configurations.
Combination *items* are a dictionary with 'keys' and 'values'. The 'keys'
are a list of keywords to look for, and 'values' are a list of combined
values for the given keys. The length of each 'values' item must therefore
match the length of the 'keys' list, and the order of the values must match
the order of the keys.
"""
yaml_tag = '!Combination'
name = 'combination'
def __init__(self, items):
super().__init__(items)
self._keys = tuple(items['keys'])
self._values = list(tuple(values) for values in items['values'])
def match(self, **kwargs):
filter_values = tuple(kwargs.get(k) for k in self._keys)
return filter_values in self._values
def combine(self, items):
keys = tuple(items['keys'])
if keys != self._keys:
return False
self._values.extend([tuple(values) for values in items['values']])
return True
class FilterFactory:
"""Factory to create filters from YAML data."""
_classes = {
cls.name: cls for cls in [
Blocklist,
Passlist,
Regex,
Combination,
]
}
@classmethod
def load_from_yaml(cls, filter_params):
"""Iterate through the YAML filters and return Filter objects."""
filter_list = []
filters = {}
for fil in filter_params:
for filter_type, items in fil.items():
for subfil in filters.get(filter_type, []):
if subfil.combine(items):
break
else:
filter_cls = cls._classes[filter_type]
# We need to provide the new filter with its own
# item arrays, so that we don't accidentally
# corrupt the initial dictionary we were
# passed. That can cause bleed-through, where our
# filter terms start being applied in other places
# unexpectedly.
filter_instance = filter_cls(copy.deepcopy(items))
filters.setdefault(filter_type, []).append(
filter_instance)
filter_list.append(filter_instance)
return filter_list
@classmethod
def from_data(cls, data, default_filters=None):
"""Look for filters in YAML *data* or return *default_filters*.
Look for a *filters* element in the YAML *data* dictionary. If there
is one, iterate over each item to return a list of Filter objects.
Otherwise, return *default_filters*.
"""
params = data.get('filters')
return cls.load_from_yaml(params) if params else default_filters
def default_filters_from_yaml(data):
"""Load the default YAML filters"""
return {
entry_type: FilterFactory.load_from_yaml(filters_data)
for entry_type, filters_data in data.get('default_filters', {}).items()
}