/
dict_util.py
532 lines (425 loc) · 17.7 KB
/
dict_util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
from builtins import str as ustr
import collections
import logging
import re
import string
import warnings
from box import Box
from future.utils import raise_from
import jmespath
from tavern.util.loader import (
ANYTHING,
ForceIncludeToken,
TypeConvertToken,
TypeSentinel,
)
from . import exceptions
logger = logging.getLogger(__name__)
class _FormattedString(object):
"""Wrapper class for things that have already been formatted
This is only used below and should not be used outside this module
"""
def __init(self, s):
super(_FormattedString, self).__init__(s)
def _check_and_format_values(to_format, box_vars):
formatter = string.Formatter()
would_format = formatter.parse(to_format)
for (_, field_name, _, _) in would_format:
if field_name is None:
continue
try:
would_replace = formatter.get_field(field_name, [], box_vars)[0]
except KeyError as e:
logger.error(
"Failed to resolve string [%s] with variables [%s]", to_format, box_vars
)
logger.error("Key(s) not found in format: %s", field_name)
raise_from(exceptions.MissingFormatError(field_name), e)
except IndexError as e:
logger.error("Empty format values are invalid")
raise_from(exceptions.MissingFormatError(field_name), e)
else:
if not isinstance(would_replace, (str, ustr, int, float)):
logger.warning(
"Formatting '%s' will result in it being coerced to a string (it is a %s)",
field_name,
type(would_replace),
)
return to_format.format(**box_vars)
def _attempt_find_include(to_format, box_vars):
formatter = string.Formatter()
would_format = list(formatter.parse(to_format))
yaml_tag = ForceIncludeToken.yaml_tag
if len(would_format) != 1:
raise exceptions.InvalidFormattedJsonError(
"When using {}, there can only be one exactly format value, but got {}".format(
yaml_tag, len(would_format)
)
)
(_, field_name, format_spec, conversion) = would_format[0]
if field_name is None:
raise exceptions.InvalidFormattedJsonError(
"Invalid string used for {}".format(yaml_tag)
)
pattern = r"{" + field_name + r".*}"
if not re.match(pattern, to_format):
raise exceptions.InvalidFormattedJsonError(
"Invalid format specifier '{}' for {}".format(to_format, yaml_tag)
)
if format_spec:
logger.warning(
"Conversion specifier '%s' will be ignored for %s", format_spec, to_format
)
would_replace = formatter.get_field(field_name, [], box_vars)[0]
return formatter.convert_field(would_replace, conversion)
def format_keys(val, variables, no_double_format=True):
"""recursively format a dictionary with the given values
Args:
val (object): Input dictionary to format
variables (dict): Dictionary of keys to format it with
no_double_format (bool): Whether to use the 'inner formatted string' class to avoid double formatting
This is required if passing something via pytest-xdist, such as markers:
https://github.com/taverntesting/tavern/issues/431
Returns:
dict: recursively formatted dictionary
"""
formatted = val
box_vars = Box(variables)
if isinstance(val, dict):
formatted = {}
# formatted = {key: format_keys(val[key], box_vars) for key in val}
for key in val:
formatted[key] = format_keys(val[key], box_vars)
elif isinstance(val, (list, tuple)):
formatted = [format_keys(item, box_vars) for item in val]
elif isinstance(formatted, _FormattedString):
logger.debug("Already formatted %s, not double-formatting", formatted)
elif isinstance(val, (ustr, str)):
formatted = _check_and_format_values(val, box_vars)
if no_double_format:
class InnerFormattedString(_FormattedString, type(val)):
"""Hack for python 2"""
formatted = InnerFormattedString(formatted)
elif isinstance(val, TypeConvertToken):
if isinstance(val, ForceIncludeToken):
formatted = _attempt_find_include(val.value, box_vars)
else:
value = format_keys(val.value, box_vars)
formatted = val.constructor(value)
else:
logger.debug("Not formatting something of type '%s'", type(formatted))
return formatted
def recurse_access_key(data, query):
"""
Search for something in the given data using the given query.
Note:
Falls back to old _recurse_access_key if not found - will be removed in 1.0
Args:
data (dict, list): Data to search in
query (str): Query to run
Returns:
object: Whatever was found by the search
"""
msg = "In a future version of Tavern, selecting for values to save in nested objects will have to be done as a JMES path query - see http://jmespath.org/ for more information"
try:
from_jmespath = jmespath.search(query, data)
except jmespath.exceptions.ParseError:
# TODO: In 1.0, this will raise an error instead
logger.debug("Error parsing JMES query - %s", msg, exc_info=True)
from_jmespath = None
# The value might actually be None, in which case we will search twice for no reason,
# but this shouldn't cause any issues
if from_jmespath is None:
logger.debug(
"JMES path search for '%s' was None - trying old implementation", query
)
try:
from_recurse = _recurse_access_key(data, query.split("."))
except (IndexError, KeyError) as e:
raise_from(
exceptions.KeySearchNotFoundError(
"Error searching for key in given data"
),
e,
)
# If we found a key using 'old' style searching, which will be deprecated
warnings.warn(msg, FutureWarning)
found = from_recurse
else:
found = from_jmespath
return found
def _recurse_access_key(current_val, keys):
""" Given a list of keys and a dictionary, recursively access the dicionary
using the keys until we find the key its looking for
If a key is an integer, it will convert it and use it as a list index
Example:
>>> _recurse_access_key({'a': 'b'}, ['a'])
'b'
>>> _recurse_access_key({'a': {'b': ['c', 'd']}}, ['a', 'b', '0'])
'c'
Args:
current_val (dict): current dictionary we have recursed into
keys (list): list of str/int of subkeys
Raises:
IndexError: list index not found in data
KeyError: dict key not found in data
Returns:
str or dict: value of subkey in dict
"""
logger.debug("Recursively searching for '%s' in '%s'", keys, current_val)
if not keys:
return current_val
else:
current_key = keys.pop(0)
try:
current_key = int(current_key)
except ValueError:
pass
try:
return _recurse_access_key(current_val[current_key], keys)
except (IndexError, KeyError, TypeError) as e:
logger.error(
"%s accessing data - looking for '%s' in '%s'",
type(e).__name__,
current_key,
current_val,
)
raise
def deep_dict_merge(initial_dct, merge_dct):
""" Recursive dict merge. Instead of updating only top-level keys,
dict_merge recurses down into dicts nested to an arbitrary depth
and returns the merged dict. Keys values present in merge_dct take
precedence over values in initial_dct.
Modified from: https://gist.github.com/angstwad/bf22d1822c38a92ec0a9
Params:
initial_dct: dict onto which the merge is executed
merge_dct: dct merged into dct
Returns:
dict: recursively merged dict
"""
dct = initial_dct.copy()
for k in merge_dct:
if (
k in dct
and isinstance(dct[k], dict)
and isinstance(merge_dct[k], collections.Mapping)
):
dct[k] = deep_dict_merge(dct[k], merge_dct[k])
else:
dct[k] = merge_dct[k]
return dct
def check_expected_keys(expected, actual):
"""Check that a set of expected keys is a superset of the actual keys
Args:
expected (list, set, dict): keys we expect
actual (list, set, dict): keys we have got from the input
Raises:
UnexpectedKeysError: If not actual <= expected
"""
expected = set(expected)
keyset = set(actual)
if not keyset <= expected:
unexpected = keyset - expected
logger.debug("Valid keys = %s, actual keys = %s", expected, keyset)
msg = "Unexpected keys {}".format(unexpected)
logger.error(msg)
raise exceptions.UnexpectedKeysError(msg)
def yield_keyvals(block):
"""Return indexes, keys and expected values for matching recursive keys
Given a list or dict, return a 3-tuple of the 'split' key (key split on
dots), the original key, and the expected value. If the input is a list, it
is enumerated so the 'keys' are just [0, 1, 2, ...]
Example:
Matching a dictionary with a couple of keys:
>>> gen = yield_keyvals({"a": {"b": "c"}})
>>> next(gen)
(['a'], 'a', {'b': 'c'})
Matching nested key access:
>>> gen = yield_keyvals({"a.b.c": "d"})
>>> next(gen)
(['a', 'b', 'c'], 'a.b.c', 'd')
Matching a list of items:
>>> gen = yield_keyvals(["a", "b", "c"])
>>> next(gen)
(['0'], '0', 'a')
>>> next(gen)
(['1'], '1', 'b')
>>> next(gen)
(['2'], '2', 'c')
Args:
block (dict, list): input matches
Yields:
(list, str, str): key split on dots, key, expected value
"""
if isinstance(block, dict):
for joined_key, expected_val in block.items():
split_key = joined_key.split(".")
yield split_key, joined_key, expected_val
else:
for idx, val in enumerate(block):
sidx = str(idx)
yield [sidx], sidx, val
def check_keys_match_recursive(expected_val, actual_val, keys, strict=True):
"""Utility to recursively check response values
expected and actual both have to be of the same type or it will raise an
error.
Example:
>>> check_keys_match_recursive({"a": {"b": "c"}}, {"a": {"b": "c"}}, []) is None
True
>>> check_keys_match_recursive({"a": {"b": "c"}}, {"a": {"b": "d"}}, []) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
File "/home/michael/code/tavern/tavern/tavern/util/dict_util.py", line 223, in check_keys_match_recursive
tavern.util.exceptions.KeyMismatchError: Key mismatch: (expected["a"]["b"] = 'c', actual["a"]["b"] = 'd')
Todo:
This could be turned into a single-dispatch function for cleaner
code and to remove a load of the isinstance checks
Args:
expected_val (dict, list, str): expected value
actual_val (dict, list, str): actual value
keys (list): any keys which have been recursively parsed to get to this
point. Used for debug output.
strict (bool): Whether 'strict' key checking should be done. If this is
False, a mismatch in dictionary keys between the expected and the
actual values will not raise an error (but a mismatch in value will
raise an error)
Raises:
KeyMismatchError: expected_val and actual_val did not match
"""
# pylint: disable=too-many-locals,too-many-statements
def full_err():
"""Get error in the format:
a["b"]["c"] = 4, b["b"]["c"] = {'key': 'value'}
"""
def _format_err(which):
return "{}{}".format(which, "".join('["{}"]'.format(key) for key in keys))
e_formatted = _format_err("expected")
a_formatted = _format_err("actual")
return "{} = '{}' (type = {}), {} = '{}' (type = {})".format(
e_formatted,
expected_val,
type(expected_val),
a_formatted,
actual_val,
type(actual_val),
)
# Check required because of python 2/3 unicode compatability when loading yaml
if isinstance(actual_val, ustr):
actual_type = str
else:
actual_type = type(actual_val)
if expected_val is ANYTHING:
# Match anything. We could just early exit here but having the debug
# logging below is useful
expected_matches = True
elif isinstance(expected_val, TypeSentinel):
# If the 'expected' type is actually just a sentinel for another type,
# then it should match
expected_matches = expected_val.constructor == actual_type
else:
# Normal matching
expected_matches = (
# If they are the same type
isinstance(expected_val, actual_type)
or
# Handles the case where, for example, the 'actual type' returned by
# a custom backend returns an OrderedDict, which is a subclass of
# dict but will raise a confusing error if the contents are
# different
issubclass(actual_type, type(expected_val))
)
try:
assert actual_val == expected_val
except AssertionError as e:
# At this point, there is likely to be an error unless we're using any
# of the type sentinels
if not (expected_val is ANYTHING): # pylint: disable=superfluous-parens
# NOTE
# Second part of this check will be removed in future - see deprecation
# warning below for details
if not expected_matches and expected_val is not None:
raise_from(
exceptions.KeyMismatchError(
"Type of returned data was different than expected ({})".format(
full_err()
)
),
e,
)
if isinstance(expected_val, dict):
akeys = set(actual_val.keys())
ekeys = set(expected_val.keys())
if akeys != ekeys:
extra_actual_keys = akeys - ekeys
extra_expected_keys = ekeys - akeys
msg = ""
if extra_actual_keys:
msg += " - Extra keys in response: {}".format(extra_actual_keys)
if extra_expected_keys:
msg += " - Keys missing from response: {}".format(
extra_expected_keys
)
full_msg = "Structure of returned data was different than expected {} ({})".format(
msg, full_err()
)
# If there are more keys in 'expected' compared to 'actual',
# this is still a hard error and we shouldn't continue
if extra_expected_keys or strict:
raise_from(exceptions.KeyMismatchError(full_msg), e)
else:
logger.debug(
"Mismatch in returned data, continuing due to strict=%s: %s",
strict,
full_msg,
exc_info=True,
)
# If strict is True, an error will be raised above. If not, recurse
# through both sets of keys and just ignore missing ones
to_recurse = akeys | ekeys
for key in to_recurse:
try:
check_keys_match_recursive(
expected_val[key], actual_val[key], keys + [key], strict
)
except KeyError:
logger.debug(
"Skipping comparing missing key %s due to strict=%s",
key,
strict,
)
elif isinstance(expected_val, list):
if len(expected_val) != len(actual_val):
raise_from(
exceptions.KeyMismatchError(
"Length of returned list was different than expected - expected {} items, got {} ({})".format(
len(expected_val), len(actual_val), full_err()
)
),
e,
)
# TODO
# Check things in the wrong order?
for i, (e_val, a_val) in enumerate(zip(expected_val, actual_val)):
try:
check_keys_match_recursive(e_val, a_val, keys + [i], strict)
except exceptions.KeyMismatchError as sub_e:
# This will still raise an error, but it will be more
# obvious where the error came from (in python 3 at least)
# and will take ANYTHING into account
raise_from(sub_e, e)
elif expected_val is None:
warnings.warn(
"Expected value was 'null', so this check will pass - this will be removed in a future version. IF you want to check against 'any' value, use '!anything' instead.",
FutureWarning,
)
elif expected_val is ANYTHING:
logger.debug("Actual value = '%s' - matches !anything", actual_val)
elif isinstance(expected_val, TypeSentinel) and expected_matches:
logger.debug(
"Actual value = '%s' - matches !any%s",
actual_val,
expected_val.constructor,
)
else:
raise_from(
exceptions.KeyMismatchError("Key mismatch: ({})".format(full_err())), e
)