-
Notifications
You must be signed in to change notification settings - Fork 48
/
condition_parser.py
1353 lines (1173 loc) · 45.3 KB
/
condition_parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import ast
import collections
import contextlib
import enum
import inspect
import re
import sys
import textwrap
import traceback
import types
from dataclasses import dataclass, replace
from functools import partial, wraps
from inspect import BoundArguments, Parameter, Signature
from itertools import chain
from typing import (
Any,
Callable,
ContextManager,
Dict,
FrozenSet,
Iterable,
Iterator,
List,
Mapping,
MutableMapping,
Optional,
Sequence,
Set,
Tuple,
Type,
cast,
)
try:
import icontract # type: ignore
except ModuleNotFoundError:
icontract = None # type: ignore
try:
import deal # type: ignore
except ModuleNotFoundError:
deal = None # type: ignore
try:
import hypothesis
from hypothesis import strategies as st
from hypothesis.control import BuildContext
from hypothesis.database import ExampleDatabase
from hypothesis.internal.conjecture.data import ConjectureData
except ImportError:
hypothesis = None # type: ignore
ExampleDatabase = object # type: ignore
from crosshair.auditwall import opened_auditwall
from crosshair.fnutil import FunctionInfo, fn_globals, set_first_arg_type
from crosshair.options import AnalysisKind
from crosshair.register_contract import get_contract
from crosshair.tracers import NoTracing
from crosshair.util import (
DynamicScopeVar,
EvalFriendlyReprContext,
IdKeyedDict,
IgnoreAttempt,
UnexploredPath,
debug,
eval_friendly_repr,
format_boundargs,
frame_summary_for_fn,
is_pure_python,
sourcelines,
test_stack,
)
class ConditionExprType(enum.Enum):
INVARIANT = "invariant"
PRECONDITION = "precondition"
POSTCONDITION = "postcondition"
def __str__(self):
return self.value
# For convience
INVARIANT = ConditionExprType.INVARIANT
PRECONDITION = ConditionExprType.PRECONDITION
POSTCONDITION = ConditionExprType.POSTCONDITION
class NoEnforce:
"""
Signal to suppress contract enforcement.
This function wrapper does nothing on its own. But the enforcement tracer
looks for it and will skip conditions on `fn` when this wrapper is detected.
"""
def __init__(self, fn):
self.fn = fn
def __call__(self, *a, **kw) -> object:
return self.fn(*a, **kw)
def strip_comment_line(line: str) -> str:
line = line.strip()
if line.startswith("'''") or line.startswith('"""'):
line = line[3:]
if line.endswith("'''") or line.endswith('"""'):
line = line[:-3]
return line.strip()
def get_doc_lines(thing: object) -> Iterable[Tuple[int, str]]:
_filename, line_num, lines = sourcelines(thing) # type:ignore
if not lines:
return
try:
module = ast.parse(textwrap.dedent("".join(lines)))
except SyntaxError:
debug(f"Unable to parse {thing} into an AST; will not detect PEP316 contracts.")
return
fndef = module.body[0]
if not isinstance(fndef, (ast.ClassDef, ast.FunctionDef)):
return
firstnode = fndef.body[0]
if not isinstance(firstnode, ast.Expr):
return
strnode = firstnode.value
if not isinstance(strnode, ast.Str):
return
end_lineno = getattr(strnode, "end_lineno", None)
if end_lineno is not None:
candidates = enumerate(lines[strnode.lineno - 1 : end_lineno])
line_num += strnode.lineno - 1
else:
candidates = enumerate(lines[: strnode.lineno + 1])
OPEN_RE = re.compile("^\\s*r?('''|\"\"\")")
CLOSE_RE = re.compile("('''|\"\"\")\\s*(#.*)?$")
started = False
for idx, line in candidates:
if not started:
(line, replaced) = OPEN_RE.subn("", line)
if replaced:
started = True
if started:
(line, replaced) = CLOSE_RE.subn("", line)
yield (line_num + idx, line)
if replaced:
return
class ImpliesTransformer(ast.NodeTransformer):
"""
Transform AST to rewrite implies operation.
Pre- and post-conditions commonly want an implies(X, Y) operation.
But it's important to only evaluate Y when X is true; so we rewrite
this function into "Y if X else True"
"""
def visit_Call(self, node):
self.generic_visit(node)
if isinstance(node.func, ast.Name) and node.func.id == "implies":
if len(node.args) != 2:
raise SyntaxError("implies() must have exactly two arguments")
condition, implication = node.args
pos = {"lineno": node.lineno, "col_offset": node.col_offset}
return ast.IfExp(condition, implication, ast.Constant(True, **pos), **pos)
return node
def compile_expr(src: str) -> types.CodeType:
parsed = ast.parse(src, "<string>", "eval")
parsed = ImpliesTransformer().visit(parsed)
return compile(parsed, "<string>", "eval")
def default_counterexample(
fn_name: str,
bound_args: BoundArguments,
return_val: object,
repr_overrides: IdKeyedDict,
) -> Tuple[str, str]:
from crosshair.tracers import ResumedTracing
with ResumedTracing(), EvalFriendlyReprContext(repr_overrides) as ctx:
args_string = format_boundargs(bound_args)
call_desc = f"{fn_name}({ctx.cleanup(args_string)})"
return (call_desc, eval_friendly_repr(return_val))
@dataclass()
class ConditionSyntaxMessage:
filename: str
line_num: int
message: str
@dataclass
class ConditionExpr:
condition_type: ConditionExprType
evaluate: Optional[Callable[[Mapping[str, object]], bool]]
filename: str
line: int
expr_source: str
compile_err: Optional[ConditionSyntaxMessage] = None
def __repr__(self):
return (
f"ConditionExpr(filename={self.filename!r}, "
f"line={self.line!r}, "
f"expr_source={self.expr_source!r}, "
f"compile_err={self.compile_err!r})"
)
@dataclass(frozen=True)
class Conditions:
"""Describe the contract of a function."""
fn: Callable
"""
The body of the function to analyze.
Ideally, this is just the body of the function and does not include checking
pre- or post-conditions. (though this is not always possible)
"""
src_fn: Callable
"""
The body of the function to use for error reporting. Usually the same as
`fn`, but sometimes the original is wrapped in shell for exception handling
or other reasons.
"""
pre: Sequence[ConditionExpr]
""" The preconditions of the function. """
post: Sequence[ConditionExpr]
""" The postconditions of the function. """
raises: FrozenSet[Type[BaseException]]
"""
A set of expection types that are expected.
Subtypes of expected exceptions are also considered to be expected.
CrossHair will attempt to report when this function raises an
unexpected exception.
"""
sig: inspect.Signature
"""
The signature of the funtion. Argument and return type
annotations should be resolved to real python types when possible.
"""
# TODO: can mutation checking be implemented as just another kind of postcondition?
mutable_args: Optional[FrozenSet[str]]
"""
A set of arguments that are deeply immutable.
When None, no assertion about mutability is provided.
OTOH, an empty set asserts that the function does not mutate any argument.
"""
fn_syntax_messages: Sequence[ConditionSyntaxMessage]
"""
A list of errors resulting from the parsing of the contract.
In general, conditions should not be checked when such messages exist.
"""
counterexample_description_maker: Optional[
Callable[[BoundArguments, object, IdKeyedDict], Tuple[str, str]]
] = None
"""
An optional callback that formats a counterexample invocation as text.
It takes the example arguments and the returned value.
It returns string representations of the invocation and return value.
"""
def has_any(self) -> bool:
return bool(self.pre or self.post or self.fn_syntax_messages)
def syntax_messages(self) -> Iterator[ConditionSyntaxMessage]:
for cond in chain(self.pre, self.post):
if cond.compile_err is not None:
yield cond.compile_err
yield from self.fn_syntax_messages
def format_counterexample(
self, args: BoundArguments, return_val: object, repr_overrides: IdKeyedDict
) -> Tuple[str, str]:
if self.counterexample_description_maker is not None:
return self.counterexample_description_maker(
args, return_val, repr_overrides
)
return default_counterexample(
self.src_fn.__name__, args, return_val, repr_overrides
)
@dataclass(frozen=True)
class ClassConditions:
inv: List[ConditionExpr]
"""
Invariants declared explicitly on the class.
Does not include invariants of superclasses.
"""
methods: Mapping[str, Conditions]
"""
Maps member names to the conditions for that member.
Conditions reflect not only what's directly declared to the method, but also:
* Conditions from superclass implementations of the same method.
* Conditions inferred from class invariants.
* Conditions inferred from superclass invariants.
"""
def has_any(self) -> bool:
return bool(self.inv) or any(c.has_any() for c in self.methods.values())
def merge_fn_conditions(
sub_conditions: Conditions, super_conditions: Conditions
) -> Conditions:
# TODO: resolve the warning below:
# (1) the type of self always changes
# (2) paramter renames (or *a, **kws) could result in varied bindings
if sub_conditions.sig is not None and sub_conditions.sig != super_conditions.sig:
debug(
"WARNING: inconsistent signatures", sub_conditions.sig, super_conditions.sig
)
pre = sub_conditions.pre if sub_conditions.pre else super_conditions.pre
post = list(chain(super_conditions.post, sub_conditions.post))
raises = sub_conditions.raises | super_conditions.raises
mutable_args = (
sub_conditions.mutable_args
if sub_conditions.mutable_args is not None
else super_conditions.mutable_args
)
fn = sub_conditions.fn
return Conditions(
fn,
fn,
pre,
post,
raises,
sub_conditions.sig,
mutable_args,
sub_conditions.fn_syntax_messages,
)
def merge_method_conditions(
class_conditions: List[ClassConditions],
) -> Dict[str, Conditions]:
methods: Dict[str, Conditions] = {}
# reverse because mro searches left side first
for class_condition in reversed(class_conditions):
methods.update(class_condition.methods)
return methods
_HEADER_LINE = re.compile(
r"""^(\s*)\:? # whitespace with optional leading colon
((?:post)|(?:pre)|(?:raises)|(?:inv)) # noncapturing keywords
(?:\[([\w\s\,\.]*)\])? # optional params in square brackets
\:\:?\s* # single or double colons
(.*?) # The (non-greedy) content
\s*$""",
re.VERBOSE,
)
_SECTION_LINE = re.compile(r"^(\s*)(.*?)\s*$")
@dataclass(init=False)
class SectionParse:
syntax_messages: List[ConditionSyntaxMessage]
sections: Dict[str, List[Tuple[int, str]]]
mutable_expr: Optional[str] = None
def __init__(self):
self.sections = collections.defaultdict(list)
self.syntax_messages = []
def has_expr(line: str) -> bool:
line = line.strip()
return bool(line) and not line.startswith("#")
def parse_sections(
lines: List[Tuple[int, str]], sections: Tuple[str, ...], filename: str
) -> SectionParse:
parse = SectionParse()
cur_section: Optional[Tuple[str, int]] = None
for line_num, line in lines:
if line.strip() == "":
continue
if cur_section:
section, indent = cur_section
match = _SECTION_LINE.match(line)
if match:
this_indent = len(match.groups()[0])
if this_indent > indent:
if has_expr(match.groups()[1]):
parse.sections[section].append((line_num, match.groups()[1]))
# Still in the current section; continue:
continue
cur_section = None
match = _HEADER_LINE.match(line)
if match:
indentstr, section, bracketed, inline_expr = match.groups()
if section not in sections:
continue
if bracketed is not None:
if section != "post":
parse.syntax_messages.append(
ConditionSyntaxMessage(
filename,
line_num,
f"brackets not allowed in {section} section",
)
)
continue
if parse.mutable_expr is not None:
parse.syntax_messages.append(
ConditionSyntaxMessage(
filename, line_num, f"duplicate post section"
)
)
continue
else:
parse.mutable_expr = bracketed
if has_expr(inline_expr):
parse.sections[section].append((line_num, inline_expr))
continue
else:
cur_section = (section, len(indentstr))
return parse
class ConditionParser:
def get_fn_conditions(self, fn: FunctionInfo) -> Optional[Conditions]:
"""
Return conditions declared (directly) on a function.
Does not include conditions inferred from invariants or superclasses.
Return None if it is impossible for this method to have conditions, even if
gained via subclass invariants. (i.e. `fn` is not a function or has no
signature)
"""
raise NotImplementedError
def get_class_conditions(self, cls: type) -> ClassConditions:
raise NotImplementedError
def class_can_have_conditions(sel, cls: type) -> bool:
raise NotImplementedError
class ConcreteConditionParser(ConditionParser):
def __init__(self, toplevel_parser: Optional[ConditionParser] = None):
if toplevel_parser is None:
toplevel_parser = self
self._toplevel_parser = toplevel_parser
def get_toplevel_parser(self):
return self._toplevel_parser
def get_class_invariants(self, cls: type) -> List[ConditionExpr]:
"""
Return invariants declared explicitly on the given class.
Does not include invarants of superclasses.
"""
raise NotImplementedError
def class_can_have_conditions(sel, cls: type) -> bool:
# We can't get conditions/line numbers for classes written in C.
return is_pure_python(cls)
def get_class_conditions(self, cls: type) -> ClassConditions:
if not self.class_can_have_conditions(cls):
return ClassConditions([], {})
toplevel_parser = self.get_toplevel_parser()
methods = {}
super_methods = merge_method_conditions(
[toplevel_parser.get_class_conditions(base) for base in cls.__bases__]
)
inv = self.get_class_invariants(cls)
# TODO: consider the case where superclass defines methods w/o contracts and
# then subclass adds an invariant.
method_names = set(cls.__dict__.keys()) | super_methods.keys()
for method_name in method_names:
method = cls.__dict__.get(method_name, None)
super_method_conditions = super_methods.get(method_name)
if super_method_conditions is not None:
revised_sig = set_first_arg_type(super_method_conditions.sig, cls)
super_method_conditions = replace(
super_method_conditions, sig=revised_sig
)
if method is None:
if super_method_conditions is None:
continue
else:
conditions: Conditions = super_method_conditions
else:
parsed_conditions = toplevel_parser.get_fn_conditions(
FunctionInfo.from_class(cls, method_name)
)
if parsed_conditions is None:
# debug(f'Skipping "{method_name}": Unable to determine the function signature.')
continue
if super_method_conditions is None:
conditions = parsed_conditions
else:
conditions = merge_fn_conditions(
parsed_conditions, super_method_conditions
)
# Selectively add conditions inferred from invariants:
final_pre = list(conditions.pre)
final_post = list(conditions.post)
if method_name in ("__new__", "__repr__"):
# __new__ isn't passed a concrete instance.
# __repr__ is itself required for reporting problems with invariants.
pass
elif method_name == "__replace__":
# TODO: remove this case when fixed in 3.13
# see https://github.com/python/cpython/issues/114198
pass
elif method_name == "__del__":
final_pre.extend(inv)
elif method_name == "__init__":
final_post.extend(inv)
elif method_name.startswith("__") and method_name.endswith("__"):
final_pre.extend(inv)
final_post.extend(inv)
elif method_name.startswith("_"):
pass
else:
final_pre.extend(inv)
final_post.extend(inv)
conditions = replace(conditions, pre=final_pre, post=final_post)
if conditions.has_any():
methods[method_name] = conditions
if inv and "__init__" not in methods:
# We assume that the default methods on `object` won't break invariants.
# Except `__init__`! That's what this conditional is for.
# Note that we don't check contracts on __init__ directly (but we do check
# them in while checking other contracts). Therefore, we're a little loose
# with the paramters (like signature) because many of them don't really
# matter.
initfn = getattr(cls, "__init__")
init_sig = inspect.signature(initfn)
methods["__init__"] = Conditions(
initfn, initfn, [], inv[:], frozenset(), init_sig, None, [], None
)
return ClassConditions(inv, methods)
class CompositeConditionParser(ConditionParser):
def __init__(self):
self.parsers = []
self.class_cache: Dict[type, ClassConditions] = {}
def get_toplevel_parser(self) -> ConditionParser:
return self
def get_fn_conditions(self, fn: FunctionInfo) -> Optional[Conditions]:
ret = None
for parser in self.parsers:
conditions = parser.get_fn_conditions(fn)
if conditions is not None:
ret = conditions
if conditions.has_any():
break
return ret
def get_class_conditions(self, cls: type) -> ClassConditions:
cached_ret = self.class_cache.get(cls)
if cached_ret is not None:
return cached_ret
ret = ClassConditions([], {})
# We skip the "typing" module because class condition computation fails for some
# typing classes:
if cls.__module__ != "typing":
for parser in self.parsers:
conditions = parser.get_class_conditions(cls)
if conditions.has_any():
ret = conditions
break
self.class_cache[cls] = ret
return ret
def condition_from_source_text(
condition_type: ConditionExprType,
filename: str,
line: int,
expr_source: str,
namespace: Dict[str, object],
) -> ConditionExpr:
evaluate, compile_err = None, None
try:
compiled = compile_expr(expr_source)
def evaluatefn(bindings: Mapping[str, object]) -> bool:
# TODO: eval() is oddly expensive when tracing is on.
# Consider eval()ing this as an entire function.
return eval(compiled, {**namespace, **bindings})
evaluate = evaluatefn
except Exception:
e = sys.exc_info()[1]
compile_err = ConditionSyntaxMessage(filename, line, str(e))
return ConditionExpr(
condition_type=condition_type,
filename=filename,
line=line,
expr_source=expr_source,
evaluate=evaluate,
compile_err=compile_err,
)
_RAISE_SPHINX_RE = re.compile(
r"""
(?: ^ \s* \: raises \s+ ( [\w\.]+ ) \: ) |
(?: ^ \s* \:? raises \s* \: ( [^\r\n#]+ ) )
""",
re.MULTILINE | re.VERBOSE,
)
def parse_sphinx_raises(fn: Callable) -> Set[Type[BaseException]]:
raises: Set[Type[BaseException]] = set()
doc = getattr(fn, "__doc__", None)
if doc is None:
return raises
for group1, group2 in _RAISE_SPHINX_RE.findall(doc):
if group1:
excnamelist = [group1]
else:
excnamelist = group2.split(",")
for excname in excnamelist:
try:
exc_type = eval(excname, fn_globals(fn))
except Exception as e:
continue
if not isinstance(exc_type, type):
continue
if not issubclass(exc_type, BaseException):
continue
raises.add(exc_type)
return raises
class Pep316Parser(ConcreteConditionParser):
def get_fn_conditions(self, ctxfn: FunctionInfo) -> Optional[Conditions]:
fn_and_sig = ctxfn.get_callable()
if fn_and_sig is None:
return None
(fn, sig) = fn_and_sig
filename, first_fn_lineno, _lines = sourcelines(fn)
if isinstance(fn, types.BuiltinFunctionType):
return Conditions(fn, fn, [], [], frozenset(), sig, frozenset(), [])
lines = list(get_doc_lines(fn))
parse = parse_sections(lines, ("pre", "post"), filename)
pre: List[ConditionExpr] = []
post_conditions: List[ConditionExpr] = []
mutable_args: Optional[FrozenSet[str]] = None
if parse.mutable_expr is not None:
mutable_args = frozenset(
expr.strip().split(".")[0]
for expr in parse.mutable_expr.split(",")
if expr != ""
)
for line_num, expr in parse.sections["pre"]:
pre.append(
condition_from_source_text(
PRECONDITION,
filename,
line_num,
expr,
fn_globals(fn),
)
)
for line_num, expr in parse.sections["post"]:
post_conditions.append(
condition_from_source_text(
POSTCONDITION,
filename,
line_num,
expr,
fn_globals(fn),
)
)
if pre and not post_conditions:
post_conditions.append(
ConditionExpr(
POSTCONDITION, lambda vars: True, filename, first_fn_lineno, ""
)
)
return Conditions(
fn,
fn,
pre,
post_conditions,
frozenset(parse_sphinx_raises(fn)),
sig,
mutable_args,
parse.syntax_messages,
)
def get_class_invariants(self, cls: type) -> List[ConditionExpr]:
try:
filename = inspect.getsourcefile(cls)
except TypeError: # raises TypeError for builtins
filename = None
if filename is None:
return []
namespace = sys.modules[cls.__module__].__dict__
parse = parse_sections(list(get_doc_lines(cls)), ("inv",), filename)
inv = []
for line_num, line in parse.sections["inv"]:
inv.append(
condition_from_source_text(
INVARIANT, filename, line_num, line, namespace
)
)
return inv
class IcontractParser(ConcreteConditionParser):
def __init__(self, toplevel_parser: Optional[ConditionParser] = None):
super().__init__(toplevel_parser)
def contract_text(self, contract) -> str:
ls = icontract._represent.inspect_lambda_condition(condition=contract.condition)
return ls.text if ls else ""
def get_fn_conditions(self, ctxfn: FunctionInfo) -> Optional[Conditions]:
if icontract is None:
return None
fn_and_sig = ctxfn.get_callable()
if fn_and_sig is None:
return None
(fn, sig) = fn_and_sig
checker = icontract._checkers.find_checker(func=fn) # type: ignore
contractless_fn = fn # type: ignore
while (
hasattr(contractless_fn, "__is_invariant_check__")
or hasattr(contractless_fn, "__preconditions__")
or hasattr(contractless_fn, "__postconditions__")
):
contractless_fn = contractless_fn.__wrapped__ # type: ignore
if checker is None:
return Conditions(
contractless_fn, contractless_fn, [], [], frozenset(), sig, None, []
)
pre: List[ConditionExpr] = []
post: List[ConditionExpr] = []
def eval_contract(contract, kwargs: Mapping) -> bool:
condition_kwargs = icontract._checkers.select_condition_kwargs(
contract=contract, resolved_kwargs=kwargs
)
return contract.condition(**condition_kwargs)
disjunction = checker.__preconditions__ # type: ignore
if len(disjunction) == 0:
pass
elif len(disjunction) == 1:
for contract in disjunction[0]:
evalfn = partial(eval_contract, contract)
filename, line_num, _lines = sourcelines(contract.condition)
pre.append(
ConditionExpr(
PRECONDITION,
evalfn,
filename,
line_num,
self.contract_text(contract),
)
)
else:
def eval_disjunction(disjunction, kwargs: Mapping) -> bool:
for conjunction in disjunction:
ok = True
for contract in conjunction:
if not eval_contract(contract, kwargs):
ok = False
break
if ok:
return True
return False
evalfn = partial(eval_disjunction, disjunction)
filename, line_num, _lines = sourcelines(contractless_fn)
source = (
"("
+ ") or (".join(
[
" and ".join([self.contract_text(c) for c in conj])
for conj in disjunction
]
)
+ ")"
)
pre.append(ConditionExpr(PRECONDITION, evalfn, filename, line_num, source))
snapshots = checker.__postcondition_snapshots__ # type: ignore
def take_snapshots(**kwargs):
old_as_mapping: MutableMapping[str, Any] = {}
for snap in snapshots:
snap_kwargs = icontract._checkers.select_capture_kwargs(
a_snapshot=snap, resolved_kwargs=kwargs
)
old_as_mapping[snap.name] = snap.capture(**snap_kwargs)
return icontract._checkers.Old(mapping=old_as_mapping)
def post_eval(contract, orig_kwargs: Mapping) -> bool:
kwargs = dict(orig_kwargs)
_old = kwargs.pop("__old__")
kwargs["OLD"] = take_snapshots(**_old.__dict__)
kwargs["result"] = kwargs.pop("__return__")
del kwargs["_"]
condition_kwargs = icontract._checkers.select_condition_kwargs(
contract=contract, resolved_kwargs=kwargs
)
return contract.condition(**condition_kwargs)
for postcondition in checker.__postconditions__: # type: ignore
evalfn = partial(post_eval, postcondition)
filename, line_num, _lines = sourcelines(postcondition.condition)
post.append(
ConditionExpr(
POSTCONDITION,
evalfn,
filename,
line_num,
self.contract_text(postcondition),
)
)
if pre and not post:
filename, line_num, _lines = sourcelines(contractless_fn)
post.append(
ConditionExpr(POSTCONDITION, lambda vars: True, filename, line_num, "")
)
return Conditions(
contractless_fn,
contractless_fn,
pre,
post,
raises=frozenset(parse_sphinx_raises(fn)),
sig=sig,
mutable_args=None,
fn_syntax_messages=[],
)
def get_class_invariants(self, cls: type) -> List[ConditionExpr]:
invariants = getattr(cls, "__invariants__", ()) # type: ignore
ret = []
def inv_eval(contract, kwargs):
return contract.condition(self=kwargs["self"])
for contract in invariants:
filename, line_num, _lines = sourcelines(contract.condition)
ret.append(
ConditionExpr(
INVARIANT,
partial(inv_eval, contract),
filename,
line_num,
self.contract_text(contract),
)
)
return ret
_DEALL_MARKERS_TO_SKIP = frozenset(
[
# NOTE: These are (re-)enumerated in kinds_of_contracts.rst
# TODO: Make this list customizable?
"write",
"network",
"stdin",
"syscall",
]
)
class DealParser(ConcreteConditionParser):
def _contract_validates(
self,
contract: "deal.introspection.ValidatedContract",
args: Sequence,
kwargs: Mapping[str, object],
) -> bool:
try:
contract.validate(*args, **kwargs)
return True
except contract.exception_type:
return False
def _extract_a_and_kw(
self, bindings: Mapping[str, object], sig: Signature
) -> Tuple[List[object], Dict[str, object]]:
positional_args = []
keyword_args = {}
for param in sig.parameters.values():
if param.kind == inspect.Parameter.KEYWORD_ONLY:
keyword_args[param.name] = bindings[param.name]
positional_args.append(bindings[param.name])
return (positional_args, keyword_args)
def _make_pre_expr(
self, contract: "deal.introspection.Pre", sig: Signature
) -> Callable[[Mapping[str, object]], bool]:
def evaluatefn(bindings: Mapping[str, object]) -> bool:
args, kwargs = self._extract_a_and_kw(bindings, sig)
return self._contract_validates(contract, args, kwargs)
return evaluatefn
def _make_post_expr(
self, contract: "deal.introspection.Post", sig: Signature
) -> Callable[[Mapping[str, object]], bool]:
return lambda b: self._contract_validates(contract, (b["__return__"],), {})
def _make_ensure_expr(
self, contract: "deal.introspection.Ensure", sig: Signature
) -> Callable[[Mapping[str, object]], bool]:
def evaluatefn(bindings: Mapping[str, object]) -> bool:
args, kwargs = self._extract_a_and_kw(bindings, sig)
kwargs["result"] = bindings["__return__"]
return self._contract_validates(contract, args, kwargs)
return evaluatefn
def get_fn_conditions(self, ctxfn: FunctionInfo) -> Optional[Conditions]:
if deal is None:
return None
fn_and_sig = ctxfn.get_callable()
if fn_and_sig is None:
return None
(fn, sig) = fn_and_sig
contracts = list(deal.introspection.get_contracts(fn))
if not contracts:
return None
deal.introspection.init_all(fn)
pre: List[ConditionExpr] = []
post: List[ConditionExpr] = []
exceptions: List[Type[Exception]] = []
for contract in contracts:
if isinstance(contract, deal.introspection.Raises):
exceptions.extend(contract.exceptions)
continue
if isinstance(contract, deal.introspection.Has):
for marker in contract.markers:
if marker in _DEALL_MARKERS_TO_SKIP:
debug(
f"Skipping analysis of {fn.__name__} because it is marked with '{marker}'"
)
return None
if not isinstance(contract, deal.introspection.ValidatedContract):
continue
fname, lineno, _lines = sourcelines(fn)
exprsrc = contract.source
if isinstance(contract, deal.introspection.Pre):
expr = self._make_pre_expr(contract, sig)
pre.append(ConditionExpr(PRECONDITION, expr, fname, lineno, exprsrc))
elif isinstance(contract, deal.introspection.Post):
expr = self._make_post_expr(contract, sig)
post.append(ConditionExpr(POSTCONDITION, expr, fname, lineno, exprsrc))
elif isinstance(contract, deal.introspection.Ensure):
expr = self._make_ensure_expr(contract, sig)
post.append(ConditionExpr(POSTCONDITION, expr, fname, lineno, exprsrc))
if pre and not post:
filename, line_num, _lines = sourcelines(fn)
post.append(
ConditionExpr(POSTCONDITION, lambda vars: True, filename, line_num, "")
)
raw_fn = deal.introspection.unwrap(fn)
return Conditions(