-
-
Notifications
You must be signed in to change notification settings - Fork 789
/
stmt.py
455 lines (372 loc) · 16.5 KB
/
stmt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
import vyper.codegen.events as events
import vyper.utils as util
from vyper import ast as vy_ast
from vyper.builtins.functions import STMT_DISPATCH_TABLE
from vyper.codegen import external_call, self_call
from vyper.codegen.context import Constancy, Context
from vyper.codegen.core import (
LOAD,
STORE,
IRnode,
append_dyn_array,
check_assign,
clamp,
dummy_node_for_type,
get_dyn_array_count,
get_element_ptr,
getpos,
is_return_from_function,
make_byte_array_copier,
make_setter,
pop_dyn_array,
zero_pad,
)
from vyper.codegen.expr import Expr
from vyper.codegen.return_ import make_return_stmt
from vyper.evm.address_space import MEMORY, STORAGE
from vyper.exceptions import CompilerPanic, StructureException, TypeCheckFailure
from vyper.semantics.types import DArrayT, MemberFunctionT
from vyper.semantics.types.shortcuts import INT256_T, UINT256_T
class Stmt:
def __init__(self, node: vy_ast.VyperNode, context: Context) -> None:
self.stmt = node
self.context = context
fn = getattr(self, f"parse_{type(node).__name__}", None)
if fn is None:
raise TypeCheckFailure(f"Invalid statement node: {type(node).__name__}")
with context.internal_memory_scope():
self.ir_node = fn()
if self.ir_node is None:
raise TypeCheckFailure("Statement node did not produce IR")
self.ir_node.annotation = self.stmt.get("node_source_code")
self.ir_node.source_pos = getpos(self.stmt)
def parse_Expr(self):
# TODO: follow analysis modules and dispatch down to expr.py
return Stmt(self.stmt.value, self.context).ir_node
def parse_Pass(self):
return IRnode.from_list("pass")
def parse_Name(self):
if self.stmt.id == "vdb":
return IRnode("debugger")
else:
raise StructureException(f"Unsupported statement type: {type(self.stmt)}", self.stmt)
def parse_AnnAssign(self):
ltyp = self.stmt.target._metadata["type"]
varname = self.stmt.target.id
alloced = self.context.new_variable(varname, ltyp)
assert self.stmt.value is not None
rhs = Expr(self.stmt.value, self.context).ir_node
lhs = IRnode.from_list(alloced, typ=ltyp, location=MEMORY)
return make_setter(lhs, rhs)
def parse_Assign(self):
# Assignment (e.g. x[4] = y)
src = Expr(self.stmt.value, self.context).ir_node
dst = self._get_target(self.stmt.target)
ret = ["seq"]
overlap = len(dst.referenced_variables & src.referenced_variables) > 0
if overlap and not dst.typ._is_prim_word:
# there is overlap between the lhs and rhs, and the type is
# complex - i.e., it spans multiple words. for safety, we
# copy to a temporary buffer before copying to the destination.
tmp = self.context.new_internal_variable(src.typ)
tmp = IRnode.from_list(tmp, typ=src.typ, location=MEMORY)
ret.append(make_setter(tmp, src))
src = tmp
ret.append(make_setter(dst, src))
return IRnode.from_list(ret)
def parse_If(self):
if self.stmt.orelse:
with self.context.block_scope():
add_on = [parse_body(self.stmt.orelse, self.context)]
else:
add_on = []
with self.context.block_scope():
test_expr = Expr.parse_value_expr(self.stmt.test, self.context)
body = ["if", test_expr, parse_body(self.stmt.body, self.context)] + add_on
ir_node = IRnode.from_list(body)
return ir_node
def parse_Log(self):
event = self.stmt._metadata["type"]
args = [Expr(arg, self.context).ir_node for arg in self.stmt.value.args]
topic_ir = []
data_ir = []
for arg, is_indexed in zip(args, event.indexed):
if is_indexed:
topic_ir.append(arg)
else:
data_ir.append(arg)
return events.ir_node_for_log(self.stmt, event, topic_ir, data_ir, self.context)
def parse_Call(self):
# TODO use expr.func.type.is_internal once type annotations
# are consistently available.
is_self_function = (
(isinstance(self.stmt.func, vy_ast.Attribute))
and isinstance(self.stmt.func.value, vy_ast.Name)
and self.stmt.func.value.id == "self"
)
if isinstance(self.stmt.func, vy_ast.Name):
funcname = self.stmt.func.id
return STMT_DISPATCH_TABLE[funcname].build_IR(self.stmt, self.context)
elif isinstance(self.stmt.func, vy_ast.Attribute) and self.stmt.func.attr in (
"append",
"pop",
):
func_type = self.stmt.func._metadata["type"]
if isinstance(func_type, MemberFunctionT):
darray = Expr(self.stmt.func.value, self.context).ir_node
args = [Expr(x, self.context).ir_node for x in self.stmt.args]
if self.stmt.func.attr == "append":
# sanity checks
assert len(args) == 1
arg = args[0]
assert isinstance(darray.typ, DArrayT)
check_assign(
dummy_node_for_type(darray.typ.value_type), dummy_node_for_type(arg.typ)
)
return append_dyn_array(darray, arg)
else:
assert len(args) == 0
return pop_dyn_array(darray, return_popped_item=False)
if is_self_function:
return self_call.ir_for_self_call(self.stmt, self.context)
else:
return external_call.ir_for_external_call(self.stmt, self.context)
def _assert_reason(self, test_expr, msg):
# from parse_Raise: None passed as the assert condition
is_raise = test_expr is None
if isinstance(msg, vy_ast.Name) and msg.id == "UNREACHABLE":
if is_raise:
return IRnode.from_list(["invalid"], error_msg="raise unreachable")
else:
return IRnode.from_list(
["assert_unreachable", test_expr], error_msg="assert unreachable"
)
# set constant so that revert reason str is well behaved
try:
tmp = self.context.constancy
self.context.constancy = Constancy.Constant
msg_ir = Expr(msg, self.context).ir_node
finally:
self.context.constancy = tmp
# TODO this is probably useful in codegen.core
# compare with eval_seq.
def _get_last(ir):
if len(ir.args) == 0:
return ir.value
return _get_last(ir.args[-1])
# TODO maybe use ensure_in_memory
if msg_ir.location != MEMORY:
buf = self.context.new_internal_variable(msg_ir.typ)
instantiate_msg = make_byte_array_copier(buf, msg_ir)
else:
buf = _get_last(msg_ir)
if not isinstance(buf, int):
raise CompilerPanic(f"invalid bytestring {buf}\n{self}")
instantiate_msg = msg_ir
# offset of bytes in (bytes,)
method_id = util.method_id_int("Error(string)")
# abi encode method_id + bytestring
assert buf >= 36, "invalid buffer"
# we don't mind overwriting other memory because we are
# getting out of here anyway.
_runtime_length = ["mload", buf]
revert_seq = [
"seq",
instantiate_msg,
zero_pad(buf),
["mstore", buf - 64, method_id],
["mstore", buf - 32, 0x20],
["revert", buf - 36, ["add", 4 + 32 + 32, ["ceil32", _runtime_length]]],
]
if is_raise:
ir_node = revert_seq
else:
ir_node = ["if", ["iszero", test_expr], revert_seq]
return IRnode.from_list(ir_node, error_msg="user revert with reason")
def parse_Assert(self):
test_expr = Expr.parse_value_expr(self.stmt.test, self.context)
if self.stmt.msg:
return self._assert_reason(test_expr, self.stmt.msg)
else:
return IRnode.from_list(["assert", test_expr], error_msg="user assert")
def parse_Raise(self):
if self.stmt.exc:
return self._assert_reason(None, self.stmt.exc)
else:
return IRnode.from_list(["revert", 0, 0], error_msg="user raise")
def _check_valid_range_constant(self, arg_ast_node):
with self.context.range_scope():
arg_expr = Expr.parse_value_expr(arg_ast_node, self.context)
return arg_expr
def _get_range_const_value(self, arg_ast_node):
arg_expr = self._check_valid_range_constant(arg_ast_node)
return arg_expr.value
def parse_For(self):
with self.context.block_scope():
if self.stmt.get("iter.func.id") == "range":
return self._parse_For_range()
else:
return self._parse_For_list()
def _parse_For_range(self):
# TODO make sure type always gets annotated
if "type" in self.stmt.target._metadata:
iter_typ = self.stmt.target._metadata["type"]
else:
iter_typ = INT256_T
# Get arg0
arg0 = self.stmt.iter.args[0]
num_of_args = len(self.stmt.iter.args)
kwargs = {
s.arg: Expr.parse_value_expr(s.value, self.context)
for s in self.stmt.iter.keywords or []
}
# Type 1 for, e.g. for i in range(10): ...
if num_of_args == 1:
n = Expr.parse_value_expr(arg0, self.context)
start = IRnode.from_list(0, typ=iter_typ)
rounds = n
rounds_bound = kwargs.get("bound", rounds)
# Type 2 for, e.g. for i in range(100, 110): ...
elif self._check_valid_range_constant(self.stmt.iter.args[1]).is_literal:
arg0_val = self._get_range_const_value(arg0)
arg1_val = self._get_range_const_value(self.stmt.iter.args[1])
start = IRnode.from_list(arg0_val, typ=iter_typ)
rounds = IRnode.from_list(arg1_val - arg0_val, typ=iter_typ)
rounds_bound = rounds
# Type 3 for, e.g. for i in range(x, x + 10): ...
else:
arg1 = self.stmt.iter.args[1]
rounds = self._get_range_const_value(arg1.right)
start = Expr.parse_value_expr(arg0, self.context)
_, hi = start.typ.int_bounds
start = clamp("le", start, hi + 1 - rounds)
rounds_bound = rounds
bound = rounds_bound if isinstance(rounds_bound, int) else rounds_bound.value
if bound < 1:
return
varname = self.stmt.target.id
i = IRnode.from_list(self.context.fresh_varname("range_ix"), typ=UINT256_T)
iptr = self.context.new_variable(varname, iter_typ)
self.context.forvars[varname] = True
loop_body = ["seq"]
# store the current value of i so it is accessible to userland
loop_body.append(["mstore", iptr, i])
loop_body.append(parse_body(self.stmt.body, self.context))
# NOTE: codegen for `repeat` inserts an assertion that
# (gt rounds_bound rounds). note this also covers the case where
# rounds < 0.
# if we ever want to remove that, we need to manually add the assertion
# where it makes sense.
ir_node = IRnode.from_list(
["repeat", i, start, rounds, rounds_bound, loop_body], error_msg="range() bounds check"
)
del self.context.forvars[varname]
return ir_node
def _parse_For_list(self):
with self.context.range_scope():
iter_list = Expr(self.stmt.iter, self.context).ir_node
target_type = self.stmt.target._metadata["type"]
assert target_type == iter_list.typ.value_type
# user-supplied name for loop variable
varname = self.stmt.target.id
loop_var = IRnode.from_list(
self.context.new_variable(varname, target_type), typ=target_type, location=MEMORY
)
i = IRnode.from_list(self.context.fresh_varname("for_list_ix"), typ=UINT256_T)
self.context.forvars[varname] = True
ret = ["seq"]
# list literal, force it to memory first
if isinstance(self.stmt.iter, vy_ast.List):
tmp_list = IRnode.from_list(
self.context.new_internal_variable(iter_list.typ),
typ=iter_list.typ,
location=MEMORY,
)
ret.append(make_setter(tmp_list, iter_list))
iter_list = tmp_list
# set up the loop variable
e = get_element_ptr(iter_list, i, array_bounds_check=False)
body = ["seq", make_setter(loop_var, e), parse_body(self.stmt.body, self.context)]
repeat_bound = iter_list.typ.count
if isinstance(iter_list.typ, DArrayT):
array_len = get_dyn_array_count(iter_list)
else:
array_len = repeat_bound
ret.append(["repeat", i, 0, array_len, repeat_bound, body])
del self.context.forvars[varname]
return IRnode.from_list(ret)
def parse_AugAssign(self):
target = self._get_target(self.stmt.target)
sub = Expr.parse_value_expr(self.stmt.value, self.context)
if not target.typ._is_prim_word:
# because of this check, we do not need to check for
# make_setter references lhs<->rhs as in parse_Assign -
# single word load/stores are atomic.
return
with target.cache_when_complex("_loc") as (b, target):
rhs = Expr.parse_value_expr(
vy_ast.BinOp(
left=IRnode.from_list(LOAD(target), typ=target.typ),
right=sub,
op=self.stmt.op,
lineno=self.stmt.lineno,
col_offset=self.stmt.col_offset,
end_lineno=self.stmt.end_lineno,
end_col_offset=self.stmt.end_col_offset,
node_source_code=self.stmt.get("node_source_code"),
),
self.context,
)
return b.resolve(STORE(target, rhs))
def parse_Continue(self):
return IRnode.from_list("continue")
def parse_Break(self):
return IRnode.from_list("break")
def parse_Return(self):
ir_val = None
if self.stmt.value is not None:
ir_val = Expr(self.stmt.value, self.context).ir_node
return make_return_stmt(ir_val, self.stmt, self.context)
def _get_target(self, target):
_dbg_expr = target
if isinstance(target, vy_ast.Name) and target.id in self.context.forvars:
raise TypeCheckFailure(f"Failed constancy check\n{_dbg_expr}")
if isinstance(target, vy_ast.Tuple):
target = Expr(target, self.context).ir_node
for node in target.args:
if (node.location == STORAGE and self.context.is_constant()) or not node.mutable:
raise TypeCheckFailure(f"Failed constancy check\n{_dbg_expr}")
return target
target = Expr.parse_pointer_expr(target, self.context)
if (target.location == STORAGE and self.context.is_constant()) or not target.mutable:
raise TypeCheckFailure(f"Failed constancy check\n{_dbg_expr}")
return target
# Parse a statement (usually one line of code but not always)
def parse_stmt(stmt, context):
return Stmt(stmt, context).ir_node
# check if a function body is "terminated"
# a function is terminated if it ends with a return stmt, OR,
# it ends with an if/else and both branches are terminated.
# (if not, we need to insert a terminator so that the IR is well-formed)
def _is_terminated(code):
last_stmt = code[-1]
if is_return_from_function(last_stmt):
return True
if isinstance(last_stmt, vy_ast.If):
if last_stmt.orelse:
return _is_terminated(last_stmt.body) and _is_terminated(last_stmt.orelse)
return False
# codegen a list of statements
def parse_body(code, context, ensure_terminated=False):
if not isinstance(code, list):
return parse_stmt(code, context)
ir_node = ["seq"]
for stmt in code:
ir = parse_stmt(stmt, context)
ir_node.append(ir)
# force using the return routine / exit_to cleanup for end of function
if ensure_terminated and context.return_type is None and not _is_terminated(code):
ir_node.append(parse_stmt(vy_ast.Return(value=None), context))
# force zerovalent, even last statement
ir_node.append("pass") # CMC 2022-01-16 is this necessary?
return IRnode.from_list(ir_node)