-
Notifications
You must be signed in to change notification settings - Fork 15
/
marbles.py
544 lines (449 loc) · 20.2 KB
/
marbles.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
#
# Copyright (c) 2017 Two Sigma Investments, LP
# All Rights Reserved
#
# THIS IS UNPUBLISHED PROPRIETARY SOURCE CODE OF
# Two Sigma Investments, LP.
#
# The copyright notice above does not evidence any
# actual or intended publication of such source code.
#
'''Extends :mod:`unittest` functionality by augmenting the way failed
assertions are handled to provide more actionable failure information
to the test consumer.
Briefly, by inheriting from :class:`marbles.AnnotatedTestCase` rather
than :class:`unittest.TestCase`, the test author gains the ability to
provide richer failure messages in their assert statements. These
messages can be format strings which are expanded using local
variables defined within the test itself. The inclusion of this
additional information is enforced within the class.
'''
import ast
import collections.abc
import functools
import inspect
import itertools
import linecache
import logging
import re
import sys
import textwrap
import unittest
from . import log
from . import _stack
_log = logging.getLogger(__name__)
# We subclass TextWrapper (instead of just writing a wrap()
# function) because we ultimately use TextWrapper.fill() to
# return the advice as a wrapped string.
class _AdviceWrapper(textwrap.TextWrapper):
def wrap(self, text, **kwargs):
'''Wraps each paragraph in ``text`` individually.
Parameters
----------
text : str
Returns
-------
str
Single string containing the wrapped paragraphs.
'''
pilcrow = re.compile(r'(\n\s*\n)', re.MULTILINE)
list_prefix = re.compile(r'\s*(?:\w|[0-9]+)[\.\)]\s+')
paragraphs = pilcrow.split(text)
wrapped_lines = []
for paragraph in paragraphs:
if paragraph.isspace():
wrapped_lines.append('')
else:
wrapper = textwrap.TextWrapper(**vars(self))
list_item = re.match(list_prefix, paragraph)
if list_item:
wrapper.subsequent_indent += ' ' * len(list_item.group(0))
wrapped_lines.extend(wrapper.wrap(paragraph))
return wrapped_lines
class _StatementFinder(ast.NodeVisitor):
'''Finds the line of the statement containing a target line.
For reasons passing understanding, :meth:`ast.walk` traverses the
tree in breadth-first order rather than depth-first. In order to
traverse depth-first (which we want), you have to implement a
:class:`ast.NodeVisitor`.
Startlingly, :meth:`ast.walk`'s documentation says that it traverses
"in no particular order". While I respect the decision to document
the fact that the order should not be relied on as it might change
in the future, to claim that it traverses "in no particular order"
is simply a lie.
In any case, this visitor will traverse the tree, and when it finds
a node on the target line, it sets ``self.found`` to the line number
of the innermost ancestor which is a Statement.
Example::
finder = _StatementFinder(target_linenumber)
finder.visit(tree)
containing_statement_linenumber = finder.found
'''
def __init__(self, target):
self.target = target
self.stack = []
self.found = None
@property
def current_stmt(self):
return self.stack[-1]
def visit(self, node):
lineno = getattr(node, 'lineno', None)
if lineno == self.target and self.found is None:
if isinstance(node, ast.stmt):
self.found = node.lineno
else:
self.found = self.current_stmt.lineno
if isinstance(node, ast.stmt):
self.stack.append(node)
try:
self.generic_visit(node)
finally:
self.stack.pop()
else:
self.generic_visit(node)
class AnnotationError(Exception):
'''Raised when there is a problem with the way an assertion was
annotated.
'''
pass
class AnnotatedAssertionError(AssertionError):
'''AnnotatedAssertionError is an :class:`AssertionError` that
expects a dictionary or tuple of additionional information beyond
the static message string accepted by :class:`AssertionError`.
The additional information provided is formatted with the context
of the locals where the assertion error is raised. Annotated
assertions expect an 'advice' key describing what to do if/when
the assertion fails.
See :class:`marbles.AnnotatedTestCase` for example usage. To make
annotated assertions in your tests, your tests should inherit from
:class:`marbles.AnnotatedTestCase` instead of
:class:`unittest.TestCase`.
'''
_META_FORMAT_STRING = '''{standardMsg}
Source:
{assert_stmt}
Locals:
{locals}
Advice:
{advice}
'''
REQUIRED_KEYS = ['advice']
# If msg and/or advice are declared in the test's scope and passed
# as variables to the assert statement, instead of being declared
# directly in the assert statement, we don't want to display them
# in the Locals section of the test output because both the msg
# and the advice will be displayed elsewhere in the output anyway
_IGNORE_LOCALS = ['msg', 'advice', 'self']
def __init__(self, *args):
'''Assume args contains a tuple of two arguments:
1. the annotation provided by the test author, and
2. the "standardMsg" from :mod:`unittest` which is the
string representation of the asserted fact that wasn't
true
Annotation is a dictionary containing at least the key 'advice'.
See the documentation for :class:`AnnotatedTestCase` to see
what the user API looks like.
Parameters
----------
advice : str
This string is meant to inform the test consumer of what
to do when the test fails. It can contain format string
fields that will be formatted with local variables
defined within the test itself.
'''
# These attributes are publicly exposed as properties below to
# facilitate programmatic interactions with test failures
# (e.g., aggregating and formatting output into a consolidated
# report)
annotation, standardMsg = args[0]
locals_, module, filename, linenumber = _stack.get_stack_info()
# When the wrapper in AnnotatedTestCase sees both msg and
# advice, it bundles msg with advice in order to thread it
# down the stack. So if the user was trying to override the
# standard message, their value would actually be here.
msg = annotation.pop('msg', None)
if not msg:
msg = standardMsg
setattr(self, '_advice', annotation['advice'])
setattr(self, 'standardMsg', msg)
setattr(self, '_locals', locals_)
setattr(self, '_module', module)
setattr(self, '_filename', filename)
setattr(self, '_linenumber', linenumber)
super(AnnotatedAssertionError, self).__init__(self.formattedMsg)
@property
def advice(self):
formatted_advice = self._advice.format(**self.locals)
wrapper = _AdviceWrapper(width=72,
break_long_words=False,
initial_indent='\t',
subsequent_indent='\t')
return wrapper.fill(formatted_advice)
@property
def locals(self):
'''Returns a string displaying the public (a.k.a., not internal
or name-mangled) locals defined within the test.
.. note:
The public local variables ``self`` and ``advice`` are
also excluded.
'''
return self._locals
@property
def module(self):
return self._module
@property
def filename(self):
return self._filename
@property
def linenumber(self):
return self._linenumber
@property
def assert_stmt(self):
'''Returns a string displaying the whole statement that failed,
with a '>' indicator on the line starting the expression.
'''
# This will be used by linecache to read the source of this
# module. See the docstring for _find_assert_stmt below which
# explains how.
#
# We don't have a test for this because automating the
# creation of an egg, installation into an environment,
# running of tests, and verification that marbles found the
# right source and was able to print it is a lot of
# automation. We have tested manually, and marbles works with
# all check installation mechanisms we know of right now
# (setup.py install, setup.py develop, pip install, bdist_egg,
# bdist_wheel).
module_globals = vars(sys.modules[self.module])
line_range, lineno = self._find_assert_stmt(
self.filename, self.linenumber, module_globals=module_globals)
source = [linecache.getline(self.filename, x,
module_globals=module_globals)
for x in line_range]
# Dedent the source, removing the final newline added by dedent
dedented_lines = textwrap.dedent(''.join(source)).split('\n')[:-1]
formatted_lines = []
for i, line in zip(line_range, dedented_lines):
prefix = '>' if i == lineno else ' '
formatted_lines.append(' {0} {1:4d} {2}'.format(prefix, i, line))
return '\n'.join(formatted_lines)
@property
def formattedMsg(self): # mimic unittest's name for standardMsg
return self._META_FORMAT_STRING.format(
standardMsg=self.standardMsg, assert_stmt=self.assert_stmt,
advice=self.advice, locals=self._format_locals(self.locals))
@classmethod
def _format_locals(cls, locals_):
locals_ = {k: v for k, v in locals_.items()
if k not in cls._IGNORE_LOCALS and not k.startswith('_')}
return '\n'.join('\t{0}={1}'.format(k, v) for k, v in locals_.items())
@staticmethod
def _find_assert_stmt(filename, linenumber, leading=1, following=2,
module_globals=None):
'''Given a Python module name, filename and line number, find the
lines that are a part of the statement containing that line.
Python stacktraces, when reporting which line they're on, always
show the last line of the statement. This can be confusing if
the statement spans multiple lines. This function helps
reconstruct the whole statement, and is used by
:meth:`marbles.AnnotatedAssertionError.assert_stmt`.
Returns a tuple of the range of lines spanned by the source
being returned, the number of the line on which the interesting
statement starts.
We may need the ``module_globals`` in order to tell
:mod:`linecache` how to find the file, if it comes from inside
an egg. In that case, ``module_globals`` should contain a key
``__loader__`` which knows how to read from that file.
'''
lines = linecache.getlines(
filename, module_globals=module_globals)
_source = ''.join(lines)
_tree = ast.parse(_source)
finder = _StatementFinder(linenumber)
finder.visit(_tree)
line_range = range(finder.found - leading, linenumber + following)
return line_range, finder.found
class AnnotationContext(object):
'''Validates and packs msg and advice, and stashes advice for use
down the stack.
Within this context manager, if another assertion is called
without passing advice, we use the advice from the earlier call
rather than raising an error about missing advice. This allows
e.g. :meth:`unittest.TestCase.assertMultiLineEqual` to make some
additional assertions and pass its own msg without advice, without
causing an error there.
'''
def __init__(self, case, assertion, msg, advice, args, kwargs):
setattr(self, '_case', case)
setattr(self, '_assertion', assertion)
setattr(self, '_msg', msg)
setattr(self, '_advice', advice)
setattr(self, '_args', args)
setattr(self, '_kwargs', kwargs)
@staticmethod
def _validate_annotation(annotation):
'''Ensures that the annotation has the right fields.'''
required = set(AnnotatedAssertionError.REQUIRED_KEYS)
present = set(key for key, val in annotation.items() if val)
missing = required.difference(present)
if missing:
error = 'Annotation missing required fields: {0}'.format(missing)
raise AnnotationError(error)
def __enter__(self):
current_advice = getattr(self._case, '__current_advice', None)
advice = self._advice or current_advice
if isinstance(self._msg, collections.abc.Mapping):
annotation = self._msg
else:
annotation = {'msg': self._msg, 'advice': advice}
if not current_advice:
self._validate_annotation(annotation)
setattr(self, '_old_advice', current_advice)
setattr(self._case, '__current_advice', advice)
return annotation
def __exit__(self, *exc_info):
setattr(self._case, '__current_advice', self._old_advice)
if self._old_advice is None:
try:
log.logger._log_assertion(self._case, self._assertion,
self._args, self._kwargs, self._msg,
self._advice, *exc_info)
except Exception:
_log.exception('Failed to log assertion')
def _find_msg_argument(signature):
'''Locates the msg argument in a function signature.
We need to determine where we expect to find msg if it's passed
positionally, so we can extract it if the user passed it.
Returns
-------
tuple
The index of the ``msg`` param, the default value for it,
and the number of non-``msg`` positional parameters we expect.
'''
names = signature.parameters.keys()
try:
msg_idx = list(names).index('msg')
default_msg = signature.parameters['msg'].default
except ValueError: # 'msg' is not in list
# It's likely that this is a custom assertion that's just
# passing all remaining args and kwargs through
# (e.g. tests.marbles.ReversingTestCaseMixin). Unfortunately,
# we can't inspect its code to find the assert it's wrapping,
# so we just have to assume it's of the standard form with msg
# in the last position with a default of None.
msg_idx = -1
default_msg = None
# We also don't want to steal any actually positional arguments if
# we can help it. Therefore, we leave the default msg if there are
# fewer than this many args passed. We stop counting at a
# parameter named 'msg' or when we hit a varargs or keyword-only
# parameter. See
# https://gitlab.twosigma.com/jane/marbles/issues/10.
kinds = (inspect.Parameter.POSITIONAL_ONLY,
inspect.Parameter.POSITIONAL_OR_KEYWORD)
non_msg_params = itertools.takewhile(
lambda param: param.name != 'msg' and param.kind in kinds,
signature.parameters.values())
non_msg_params = sum(1 for _ in non_msg_params)
return msg_idx, default_msg, non_msg_params
def _extract_msg(args, kwargs, msg_idx, default_msg, non_msg_params):
'''Extracts the msg argument from the passed args.
Returns
-------
tuple
The found ``msg``, the args and kwargs with that ``msg``
removed, and any remaining positional args after ``msg``.
'''
rem_args = []
if 'msg' in kwargs:
msg = kwargs.pop('msg')
elif len(args) > non_msg_params and msg_idx < len(args):
msg = args[msg_idx]
if 0 <= msg_idx:
rem_args = args[msg_idx + 1:]
args = args[:msg_idx]
else:
msg = default_msg
return msg, args, rem_args, kwargs
class AnnotatedTestCase(unittest.TestCase):
'''AnnotatedTestCase is an extension of :class:`unittest.TestCase`.
When writing a test class based on :class:`AnnotatedTestCase`, all
assert statements like :meth:`unittest.TestCase.assertEqual`, in
addition to accepting an optional final string parameter ``msg``,
expect a keyword parameter ``advice``, which should describe what
steps should be taken when the test fails.
The advice string (and the ``msg`` parameter, if provided) are
formatted with :meth:`str.format` given the local variables
defined within the test itself.
Every assertion checks to make sure both message and advice are
provided, and if not, raises an :class:`AnnotationError`.
Example:
.. literalinclude:: ../examples/sla.py
'''
failureException = AnnotatedAssertionError
def _formatMessage(self, msg, standardMsg):
return (msg, standardMsg)
def __wrap_assertion(self, attr):
signature = inspect.signature(attr)
msg_idx, default_msg, non_msg_params = _find_msg_argument(signature)
@functools.wraps(attr)
def wrapper(*args, **kwargs):
msg, args, rem_args, kwargs = _extract_msg(
args, kwargs, msg_idx, default_msg, non_msg_params)
advice = kwargs.pop('advice', None)
with AnnotationContext(self, attr, msg, advice,
list(args) + list(rem_args), kwargs) as annotation:
if rem_args:
return attr(*args, annotation, *rem_args, **kwargs)
return attr(*args, msg=annotation, **kwargs)
return wrapper
def __wrap_fail(self, attr):
signature = inspect.signature(attr)
msg_idx, default_msg, non_msg_params = _find_msg_argument(signature)
# For TestCase.fail, we're not going to call _formatMessage,
# so we need to call the real TestCase.fail function with the
# thing we want passed to AnnotatedAssertionError. Thus, we
# extract msg and advice as usual, but when we call the
# wrapped function, we do what our _formatMessage would do and
# pass the tuple directly.
@functools.wraps(attr)
def wrapper(*args, **kwargs):
msg, args, rem_args, kwargs = _extract_msg(
args, kwargs, msg_idx, default_msg, non_msg_params)
# TestCase.fail doesn't have args after msg
assert len(rem_args) == 0
advice = kwargs.pop('advice', None)
with AnnotationContext(self, attr, msg, advice,
list(args) + list(rem_args), kwargs) as annotation:
# Some builtin assertions (like assertIsNotNone)
# have already called _formatMessage and pass that
# to TestCase.fail, so if what we get is already a
# tuple, we just pass it along.
if isinstance(msg, tuple):
return attr(*args, msg=msg, **kwargs)
packed_msg = self._formatMessage(annotation, msg)
return attr(*args, msg=packed_msg, **kwargs)
return wrapper
def __getattribute__(self, key):
'''Keyword argument support for assertions.
We want AnnotatedTestCases to be able to call assertions with
syntax like this:
self.assertTrue(True, msg='message', advice='advice')
self.assertTrue(True, 'message', advice='advice')
To do so, we override __getattribute__ so that any method that
gets looked up and starts with 'assert' gets wrapped so that
it does what we want. We override __getattribute__ rather than
__getattr__ because __getattr__ doesn't get called when the
method just exists.
To add other keyword arguments in the future, you have to make
sure that the way the underlying assertion gets called is
going to work with _formatMessage above, and the unpacking of
args in AnnotatedAssertionError.__init__, and you should watch
out for backwards compatibility with existing usage.
'''
attr = object.__getattribute__(self, key)
if callable(attr) and key.startswith('assert'):
attr = self.__wrap_assertion(attr)
elif callable(attr) and key == 'fail':
attr = self.__wrap_fail(attr)
return attr