-
Notifications
You must be signed in to change notification settings - Fork 26
/
assertions.py
438 lines (398 loc) · 19.5 KB
/
assertions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
from copy import copy
from pythoscope.compat import set
from pythoscope.generator.dependencies import sorted_by_timestamp,\
side_effects_before, objects_affected_by_side_effects, side_effects_of,\
older_than, newer_than, resolve_dependencies
from pythoscope.generator.method_call_context import MethodCallContext
from pythoscope.generator.lines import *
from pythoscope.generator.selector import testable_calls
from pythoscope.serializer import BuiltinException, ImmutableObject, MapObject,\
UnknownObject, SequenceObject, LibraryObject
from pythoscope.side_effect import SideEffect, GlobalRead, GlobalRebind,\
BuiltinMethodWithPositionArgsSideEffect, AttributeRebind
from pythoscope.store import Function, FunctionCall, UserObject, MethodCall,\
GeneratorObject, GeneratorObjectInvocation, Call, CallToC, Method, Callable
from pythoscope.util import all_of_type, compact, flatten, underscore
# :: Call | GeneratorObject | UserObject | Method | Function -> [Event]
def assertions_for_interaction(testable_interaction):
if isinstance(testable_interaction, (Method, Function)):
timeline = []
else:
timeline = expand_into_timeline(testable_interaction)
if isinstance(testable_interaction, UserObject):
test_timeline = test_timeline_for_user_object(timeline, testable_interaction)
elif isinstance(testable_interaction, Method):
test_timeline = test_timeline_for_method(testable_interaction)
elif isinstance(testable_interaction, Function):
test_timeline = test_timeline_for_function(testable_interaction)
else:
test_timeline = test_timeline_for_call(timeline, testable_interaction)
return remove_duplicates_and_bare_method_contexts(
sorted_by_timestamp(
fix_tests_using_call_outputs(
include_requirements(test_timeline, timeline))))
# :: Method -> [Event]
def test_timeline_for_method(method):
object_name = underscore(method.klass.name)
init_stub = '# %s = %s' % (object_name, class_init_stub(method.klass))
timeline = [CommentLine(init_stub, 1)]
# Generate assertion stub, but only for non-creational methods.
if not method.is_creational():
actual = call_with_args("%s.%s" % (object_name, method.name),
method.get_call_args())
timeline.append(EqualAssertionStubLine(actual, 2))
timeline.append(SkipTestLine(3))
return timeline
# :: Function -> [Event]
def test_timeline_for_function(function):
actual = call_with_args(function.name, function.args)
return [EqualAssertionStubLine(actual, 1), SkipTestLine(2)]
def call_with_args(callable, args):
"""Return an example of a call to callable with all its standard arguments.
>>> call_with_args('fun', ['x', 'y'])
'fun(x, y)'
>>> call_with_args('fun', [('a', 'b'), 'c'])
'fun((a, b), c)'
>>> call_with_args('fun', ['a', ('b', ('c', 'd'))])
'fun(a, (b, (c, d)))'
"""
def call_arglist(args):
if isinstance(args, (list, tuple)):
return "(%s)" % ', '.join(map(call_arglist, args))
return args
return "%s%s" % (callable, call_arglist(args))
def class_init_stub(klass):
"""Create setup that contains stub of object creation for given class.
"""
args = []
init_method = klass.get_creational_method()
if init_method:
args = init_method.get_call_args()
return call_with_args(klass.name, args)
# :: ([Event], UserObject) -> [Event]
def test_timeline_for_user_object(execution_events, user_object):
"""Construct a new timeline for a test case based on real execution timeline
and a user object that needs to be tested.
The new timeline in most cases will contain assertions.
"""
init_call = user_object.get_init_call()
external_calls = testable_calls(user_object.get_external_calls())
# If the constructor raised an exception, object creation should be an assertion.
if init_call and init_call.raised_exception():
call_return_timestamp = last_call_action_timestamp(init_call)
return [RaisesAssertionLine(init_call.exception, MethodCallContext(init_call, user_object), call_return_timestamp+0.25)]
timeline = give_context_to_method_calls(compact([init_call]) + flatten(map(lambda call: test_timeline_for_call(execution_events, call), external_calls)), user_object)
if init_call and len(external_calls) == 0:
timeline.append(CommentLine("# Make sure it doesn't raise any exceptions.", timeline[-1].timestamp))
return timeline
# :: Call | GeneratorObject -> int
def last_call_action_timestamp(call):
""" Return timestamp of the last action executed within a call just before
it returned.
>>> mc1 = MethodCall(Method('m'), {})
>>> mc2 = MethodCall(Method('m2'), {})
>>> se = SideEffect([], [])
>>> mc1.timestamp = 1
>>> mc1.add_subcall(mc2)
>>> mc1.add_side_effect(se)
>>> mc2.timestamp = 2
>>> se.timestamp = 3
>>> last_call_action_timestamp(mc1)
3
>>> mc2.timestamp = 3
>>> se.timestamp = 2
>>> last_call_action_timestamp(mc1)
3
"""
if isinstance(call, GeneratorObject):
return max(map(last_call_action_timestamp, call.calls))
# Either last action of a subcall or last side effect should be taken into
# account here.
last_subcall_action_timestamp = 0
last_side_effect_timestamp = 0
if call.subcalls:
last_subcall_action_timestamp = last_call_action_timestamp(call.subcalls[-1])
if call.side_effects:
last_side_effect_timestamp = call.side_effects[-1].timestamp
return max(last_subcall_action_timestamp, last_side_effect_timestamp, call.timestamp)
# :: ([Event], UserObject) -> [Event|MethodCallContext]
def give_context_to_method_calls(events, user_object):
def contextize(event):
if isinstance(event, EqualAssertionLine) and isinstance(event.actual, Call):
event.actual = MethodCallContext(event.actual, user_object)
return event
elif isinstance(event, RaisesAssertionLine):
event.call = MethodCallContext(event.call, user_object)
return event
elif isinstance(event, GeneratorAssertionLine):
event.generator_call = MethodCallContext(event.generator_call, user_object)
return event
elif isinstance(event, MethodCall):
return MethodCallContext(event, user_object)
else:
return event
return map(contextize, events)
# :: ([Event], [Event], Call|GeneratorObject) -> None
def add_test_events_for_output(events, execution_events, call):
def copy_object_at(obj, timestamp):
if isinstance(obj, ImmutableObject):
return obj, []
new_obj = event_copy(obj)
new_ses = older_than(side_effects_that_affect_object(execution_events, obj), timestamp)
return new_obj, copy_side_effects(new_ses, obj, new_obj)
call_return_timestamp = last_call_action_timestamp(call)
if call.raised_exception():
events.extend([RaisesAssertionLine(call.exception, call, call_return_timestamp+0.25)])
else:
if isinstance(call, GeneratorObject):
events.extend([GeneratorAssertionLine(call, call_return_timestamp+0.25)])
else:
# We want a copy of the output right after the call, so we pass a timestamp
# slightly bigger than the call return.
output_copy, output_side_effects = copy_object_at(call.output, call_return_timestamp+0.01)
events.extend([output_copy] + output_side_effects)
if call.output.timestamp < call.timestamp and not isinstance(call.output, ImmutableObject):
# If object existed before the call and is mutable we need two
# assertions: one for identity, the other for value.
events.extend([EqualAssertionLine(call.output, call, call_return_timestamp+0.25),
EqualAssertionLine(output_copy, call.output, call_return_timestamp+0.75)])
else:
# If it didn't exist before the call we just need a value assertion.
events.extend([EqualAssertionLine(output_copy, call, call_return_timestamp+0.75)])
# :: ([Event], [SideEffect]) -> None
def add_test_events_for_side_effects(events, side_effects):
globals_already_setup = set()
step = 0
first_timestamp = events[0].timestamp
last_timestamp = events[-1].timestamp
for side_effect in side_effects:
if isinstance(side_effect, GlobalRead) and \
not isinstance(side_effect.value, UnknownObject) and \
side_effect.get_full_name() not in globals_already_setup:
tmp_name = "old_%s_%s" % (side_effect.module.replace(".", "_"), side_effect.name)
ref = ModuleVariableReference(side_effect.module, side_effect.name, first_timestamp-4.2-step)
# SETUP: old_module_variable = module.variable
events.insert(0, Assign(tmp_name, ref, first_timestamp-3.2-step))
# SETUP: module.variable = value
events.insert(1, Assign(side_effect.get_full_name(), side_effect.value, first_timestamp-2.2-step))
# TEARDOWN: module.variable = old_module_variable
# TODO: Crazy hack, teardowns should always be at the end, I'll fix
# that someday.
events.append(Assign(side_effect.get_full_name(), tmp_name, last_timestamp+300.2+step))
globals_already_setup.add((side_effect.get_full_name()))
elif isinstance(side_effect, GlobalRebind):
events.append(EqualAssertionLine(side_effect.value,
ModuleVariableReference(side_effect.module, side_effect.name, last_timestamp+1.1+step),
last_timestamp+2.1+step))
elif isinstance(side_effect, AttributeRebind):
events.append(EqualAssertionLine(side_effect.value,
ObjectAttributeReference(side_effect.obj, side_effect.name, last_timestamp+1.3+step),
last_timestamp+2.3+step))
step += 5
# :: Call|GeneratorObject -> [SideEffect]
def side_effects_of_call(call):
if isinstance(call, GeneratorObject):
return flatten([c.side_effects for c in call.calls])
return call.side_effects
# :: ([Event], Call|GeneratorObject) -> [Event]
def test_timeline_for_call(execution_events, call):
"""Construct a new timeline for a test case based on real execution timeline
and a call that needs to be tested.
The new timeline in most cases will contain assertions.
"""
events = []
add_test_events_for_output(events, execution_events, call)
add_test_events_for_side_effects(events, side_effects_of_call(call))
return events
# :: Event -> Event
def event_copy(event):
new_event = copy(event)
new_event.timestamp = event.timestamp+0.5
return new_event
# :: (list, object, object) -> None
def replace(alist, old_element, new_element):
def pass_or_replace(element):
if element is old_element:
return new_element
return element
return map(pass_or_replace, alist)
# :: (SideEffect, SerializedObject, SerializedObject) -> SideEffect
def copy_side_effects(side_effects, old_obj, new_obj):
"Copy side effects replacing occurences of old_obj with new_obj."
new_side_effects = []
for side_effect in side_effects:
new_side_effect = event_copy(side_effect)
new_side_effect.affected_objects = replace(new_side_effect.affected_objects, old_obj, new_obj)
new_side_effect.referenced_objects = replace(new_side_effect.referenced_objects, old_obj, new_obj)
if isinstance(side_effect, BuiltinMethodWithPositionArgsSideEffect):
new_side_effect.obj = new_side_effect.affected_objects[0]
new_side_effect.args = new_side_effect.referenced_objects[1:]
new_side_effects.append(new_side_effect)
return new_side_effects
# :: (Event, ...) -> [Event]
def expand_into_timeline(*events):
"""Return a sorted list of all events related to given events in any way.
"""
return sorted_by_timestamp(set(enumerate_events(list(events))))
# :: [Event] -> [Event]
def enumerate_events(objs):
"""Return a list of all events needed for testing by the objects passed.
Avoids infinite recursion by keeping a list of events already traversed.
"""
events_so_far = set()
def get_those_and_contained_events(objs):
"""Return a list containing given objects and all objects contained within
them.
"""
return objs + get_contained_events(objs)
def get_contained_events(obj):
"""Return a list of Events this object requires during testing.
This function will descend recursively if objects contained within given
object are composite themselves.
"""
if isinstance(obj, list):
return flatten(map(get_contained_events, obj))
# Lists are unhashable anyway, so we don't remember them.
if obj in events_so_far:
return []
else:
events_so_far.add(obj)
if isinstance(obj, ImmutableObject):
# ImmutableObjects are self-sufficient.
return []
elif isinstance(obj, UnknownObject):
return []
elif isinstance(obj, SequenceObject):
return get_those_and_contained_events(obj.contained_objects)
elif isinstance(obj, MapObject):
return get_those_and_contained_events(flatten(obj.mapping))
elif isinstance(obj, LibraryObject):
return get_those_and_contained_events(obj.arguments)
elif isinstance(obj, BuiltinException):
return get_those_and_contained_events(obj.args)
elif isinstance(obj, UserObject):
return get_contained_events(obj.get_init_and_external_calls())
elif isinstance(obj, (FunctionCall, MethodCall, GeneratorObjectInvocation)):
ret = get_those_and_contained_events(obj.input.values() + list(obj.side_effects))
if obj.caller:
ret += side_effects_before_and_affected_objects(obj)
return ret
elif isinstance(obj, GeneratorObject):
if obj.is_activated():
return get_those_and_contained_events(obj.args.values()) +\
get_contained_events(obj.calls)
else:
return []
elif isinstance(obj, SideEffect):
return [obj] + get_those_and_contained_events(list(obj.affected_objects))
elif isinstance(obj, CallToC):
return side_effects_before_and_affected_objects(obj)
else:
raise TypeError("Wrong argument to get_contained_events: %s." % repr(obj))
return get_those_and_contained_events(objs)
def side_effects_before_and_affected_objects(call):
se = side_effects_before(call)
return se + objects_affected_by_side_effects(se)
# :: [Event] -> [Event]
def remove_duplicates_and_bare_method_contexts(events):
new_events = list()
for event in events:
if not isinstance(event, MethodCallContext) and event not in new_events:
new_events.append(event)
return new_events
# :: ([Event], [Event]) -> [Event]
def include_requirements(test_events, execution_events):
ignored_side_effects = side_effects_of(explicit_calls(test_events))
ignored_objects = []
new_events = []
for event in test_events:
for new_event in objects_required_for(event, event.timestamp, execution_events):
# If a call appears explicitly in the test body we should
# ignore all side effects caused by it.
if new_event not in ignored_side_effects:
new_events.append(new_event)
elif isinstance(new_event, AttributeRebind) and not isinstance(new_event.value, ImmutableObject):
change = BindingChange(ObjectAttributeReference(new_event.obj, new_event.name, new_event.timestamp),
new_event.value,
new_event.timestamp)
new_events.append(change)
# We can ignore the object that already has an assigned name,
# unless it was needed earlier.
if new_event.value not in new_events:
ignored_objects.append(new_event.value)
return filter(lambda e: e not in ignored_objects, new_events + test_events)
# :: (Event, [Event], int) -> bool
def used_later_than(event, timeline, timestamp):
return not isinstance(event, ImmutableObject) and \
event in resolve_dependencies(newer_than(timeline, timestamp))
# :: [Event] -> [Event]
def fix_tests_using_call_outputs(timeline):
"""If a tested call output is used later in the test and it was created by
the call we have to assign it a name.
E.g. we can't just do that:
assert_equal(expected, call())
assert_equal(?call_output?, something.somewhere)
so sometimes we need this:
output = call()
assert_equal(expected, output)
assert_equal(output, something.somewhere)
"""
for event in timeline:
if isinstance(event, EqualAssertionLine) and \
isinstance(event.actual, (MethodCallContext, Call)) and \
event.actual.output.timestamp > event.actual.timestamp and \
used_later_than(event.actual.output, timeline, event.timestamp):
# TODO use unique names
yield Assign('result', event.actual, event.actual.timestamp-0.0001)
event.actual = 'result'
yield event
else:
yield event
# [Event] -> [Call]
def explicit_calls(event):
if isinstance(event, list):
return flatten(map(explicit_calls, event))
if isinstance(event, Call):
return [event] + explicit_calls(event.subcalls) + explicit_calls(event.input.values())
elif isinstance(event, Callable):
return explicit_calls(event.calls)
elif isinstance(event, EqualAssertionLine) and isinstance(event.actual, (Call, MethodCallContext)):
return explicit_calls(event.actual)
elif isinstance(event, GeneratorAssertionLine):
return explicit_calls(event.generator_call)
elif isinstance(event, RaisesAssertionLine):
return explicit_calls(event.call)
elif isinstance(event, MethodCallContext):
return explicit_calls(event.call)
return []
# :: (Event, int, [Event]) -> [SerializedObject|SideEffect]
def objects_required_for(test_event, timestamp, execution_events):
required_objects = []
required_side_effects = []
objects = resolve_dependencies(test_event)
while objects:
new_objects, new_side_effects = copy_events_over(objects, timestamp, execution_events)
required_objects.extend(new_objects)
required_side_effects.extend(new_side_effects)
objects = list(new_only(objects_affected_by_side_effects(new_side_effects), required_objects))
return required_objects + required_side_effects
# :: ([SerializedObject], int, [Event]) -> ([SerializedObject], [SideEffect])
def copy_events_over(objects, timestamp, execution_events):
copied_objects = []
copied_side_effects = []
def side_effects_of(obj):
return older_than(side_effects_that_affect_object(execution_events, obj), timestamp)
for obj in objects:
copied_objects.append(obj)
copied_side_effects.extend(side_effects_of(obj))
return copied_objects, copied_side_effects
def new_only(affected, so_far):
for obj in affected:
if obj not in so_far:
yield obj
# :: ([Event], SerializedObject) -> [SideEffect]
def side_effects_that_affect_object(events, obj):
"Filter out side effects that are irrelevant to given object."
for side_effect in all_of_type(events, SideEffect):
if obj in side_effect.affected_objects:
yield side_effect