/
dsl.py
488 lines (415 loc) · 18.3 KB
/
dsl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
from collections import OrderedDict, namedtuple
from collections.abc import Iterable
from contextlib import contextmanager
from enum import Enum
import warnings
from ..tools import flatten, bits_for, deprecated
from .. import tracer
from .ast import *
from .ir import *
from .xfrm import *
__all__ = ["SyntaxError", "SyntaxWarning", "Module"]
class SyntaxError(Exception):
pass
class SyntaxWarning(Warning):
pass
class _ModuleBuilderProxy:
def __init__(self, builder, depth):
object.__setattr__(self, "_builder", builder)
object.__setattr__(self, "_depth", depth)
class _ModuleBuilderDomainExplicit(_ModuleBuilderProxy):
def __init__(self, builder, depth, domain):
super().__init__(builder, depth)
self._domain = domain
def __iadd__(self, assigns):
self._builder._add_statement(assigns, domain=self._domain, depth=self._depth)
return self
class _ModuleBuilderDomainImplicit(_ModuleBuilderProxy):
def __getattr__(self, name):
if name == "comb":
domain = None
else:
domain = name
return _ModuleBuilderDomainExplicit(self._builder, self._depth, domain)
def __getitem__(self, name):
return self.__getattr__(name)
def __setattr__(self, name, value):
if name == "_depth":
object.__setattr__(self, name, value)
elif not isinstance(value, _ModuleBuilderDomainExplicit):
raise AttributeError("Cannot assign 'd.{}' attribute; did you mean 'd.{} +='?"
.format(name, name))
def __setitem__(self, name, value):
return self.__setattr__(name, value)
class _ModuleBuilderRoot:
def __init__(self, builder, depth):
self._builder = builder
self.domain = self.d = _ModuleBuilderDomainImplicit(builder, depth)
def __getattr__(self, name):
if name in ("comb", "sync"):
raise AttributeError("'{}' object has no attribute '{}'; did you mean 'd.{}'?"
.format(type(self).__name__, name, name))
raise AttributeError("'{}' object has no attribute '{}'"
.format(type(self).__name__, name))
class _ModuleBuilderSubmodules:
def __init__(self, builder):
object.__setattr__(self, "_builder", builder)
def __iadd__(self, modules):
for module in flatten([modules]):
self._builder._add_submodule(module)
return self
def __setattr__(self, name, submodule):
self._builder._add_submodule(submodule, name)
def __setitem__(self, name, value):
return self.__setattr__(name, value)
def __getattr__(self, name):
return self._builder._get_submodule(name)
def __getitem__(self, name):
return self.__getattr__(name)
class _ModuleBuilderDomainSet:
def __init__(self, builder):
object.__setattr__(self, "_builder", builder)
def __iadd__(self, domains):
for domain in flatten([domains]):
self._builder._add_domain(domain)
return self
def __setattr__(self, name, domain):
self._builder._add_domain(domain)
class FSM:
def __init__(self, state, encoding, decoding):
self.state = state
self.encoding = encoding
self.decoding = decoding
def ongoing(self, name):
if name not in self.encoding:
self.encoding[name] = len(self.encoding)
return Operator("==", [self.state, self.encoding[name]], src_loc_at=0)
class Module(_ModuleBuilderRoot, Elaboratable):
def __init__(self):
_ModuleBuilderRoot.__init__(self, self, depth=0)
self.submodules = _ModuleBuilderSubmodules(self)
self.domains = _ModuleBuilderDomainSet(self)
self._statements = Statement.wrap([])
self._ctrl_context = None
self._ctrl_stack = []
self._driving = SignalDict()
self._named_submodules = {}
self._anon_submodules = []
self._domains = []
self._generated = {}
def _check_context(self, construct, context):
if self._ctrl_context != context:
if self._ctrl_context is None:
raise SyntaxError("{} is not permitted outside of {}"
.format(construct, context))
else:
if self._ctrl_context == "Switch":
secondary_context = "Case"
if self._ctrl_context == "FSM":
secondary_context = "State"
raise SyntaxError("{} is not permitted directly inside of {}; it is permitted "
"inside of {} {}"
.format(construct, self._ctrl_context,
self._ctrl_context, secondary_context))
def _get_ctrl(self, name):
if self._ctrl_stack:
top_name, top_data = self._ctrl_stack[-1]
if top_name == name:
return top_data
def _flush_ctrl(self):
while len(self._ctrl_stack) > self.domain._depth:
self._pop_ctrl()
def _set_ctrl(self, name, data):
self._flush_ctrl()
self._ctrl_stack.append((name, data))
return data
def _check_signed_cond(self, cond):
cond = Value.wrap(cond)
bits, sign = cond.shape()
if sign:
warnings.warn("Signed values in If/Elif conditions usually result from inverting "
"Python booleans with ~, which leads to unexpected results: ~True is "
"-2, which is truthful. Replace `~flag` with `not flag`. (If this is "
"a false positive, silence this warning with "
"`m.If(x)` → `m.If(x.bool())`.)",
SyntaxWarning, stacklevel=4)
return cond
@contextmanager
def If(self, cond):
self._check_context("If", context=None)
cond = self._check_signed_cond(cond)
src_loc = tracer.get_src_loc(src_loc_at=1)
if_data = self._set_ctrl("If", {
"tests": [],
"bodies": [],
"src_loc": src_loc,
"src_locs": [],
})
try:
_outer_case, self._statements = self._statements, []
self.domain._depth += 1
yield
self._flush_ctrl()
if_data["tests"].append(cond)
if_data["bodies"].append(self._statements)
if_data["src_locs"].append(src_loc)
finally:
self.domain._depth -= 1
self._statements = _outer_case
@contextmanager
def Elif(self, cond):
self._check_context("Elif", context=None)
cond = self._check_signed_cond(cond)
src_loc = tracer.get_src_loc(src_loc_at=1)
if_data = self._get_ctrl("If")
if if_data is None:
raise SyntaxError("Elif without preceding If")
try:
_outer_case, self._statements = self._statements, []
self.domain._depth += 1
yield
self._flush_ctrl()
if_data["tests"].append(cond)
if_data["bodies"].append(self._statements)
if_data["src_locs"].append(src_loc)
finally:
self.domain._depth -= 1
self._statements = _outer_case
@contextmanager
def Else(self):
self._check_context("Else", context=None)
src_loc = tracer.get_src_loc(src_loc_at=1)
if_data = self._get_ctrl("If")
if if_data is None:
raise SyntaxError("Else without preceding If/Elif")
try:
_outer_case, self._statements = self._statements, []
self.domain._depth += 1
yield
self._flush_ctrl()
if_data["bodies"].append(self._statements)
if_data["src_locs"].append(src_loc)
finally:
self.domain._depth -= 1
self._statements = _outer_case
self._pop_ctrl()
@contextmanager
def Switch(self, test):
self._check_context("Switch", context=None)
switch_data = self._set_ctrl("Switch", {
"test": Value.wrap(test),
"cases": OrderedDict(),
"src_loc": tracer.get_src_loc(src_loc_at=1),
"case_src_locs": {},
})
try:
self._ctrl_context = "Switch"
self.domain._depth += 1
yield
finally:
self.domain._depth -= 1
self._ctrl_context = None
self._pop_ctrl()
@contextmanager
def Case(self, *patterns):
self._check_context("Case", context="Switch")
src_loc = tracer.get_src_loc(src_loc_at=1)
switch_data = self._get_ctrl("Switch")
new_patterns = ()
for pattern in patterns:
if not isinstance(pattern, (int, str, Enum)):
raise SyntaxError("Case pattern must be an integer, a string, or an enumeration, "
"not {!r}"
.format(pattern))
if isinstance(pattern, str) and any(bit not in "01-" for bit in pattern):
raise SyntaxError("Case pattern '{}' must consist of 0, 1, and - (don't care) bits"
.format(pattern))
if isinstance(pattern, str) and len(pattern) != len(switch_data["test"]):
raise SyntaxError("Case pattern '{}' must have the same width as switch value "
"(which is {})"
.format(pattern, len(switch_data["test"])))
if isinstance(pattern, int) and bits_for(pattern) > len(switch_data["test"]):
warnings.warn("Case pattern '{:b}' is wider than switch value "
"(which has width {}); comparison will never be true"
.format(pattern, len(switch_data["test"])),
SyntaxWarning, stacklevel=3)
continue
new_patterns = (*new_patterns, pattern)
try:
_outer_case, self._statements = self._statements, []
self._ctrl_context = None
yield
self._flush_ctrl()
# If none of the provided cases can possibly be true, omit this branch completely.
# This needs to be differentiated from no cases being provided in the first place,
# which means the branch will always match.
if not (patterns and not new_patterns):
switch_data["cases"][new_patterns] = self._statements
switch_data["case_src_locs"][new_patterns] = src_loc
finally:
self._ctrl_context = "Switch"
self._statements = _outer_case
def Default(self):
return self.Case()
@contextmanager
def FSM(self, reset=None, domain="sync", name="fsm"):
self._check_context("FSM", context=None)
if domain == "comb":
raise ValueError("FSM may not be driven by the '{}' domain".format(domain))
fsm_data = self._set_ctrl("FSM", {
"name": name,
"signal": Signal(name="{}_state".format(name), src_loc_at=2),
"reset": reset,
"domain": domain,
"encoding": OrderedDict(),
"decoding": OrderedDict(),
"states": OrderedDict(),
"src_loc": tracer.get_src_loc(src_loc_at=1),
"state_src_locs": {},
})
self._generated[name] = fsm = \
FSM(fsm_data["signal"], fsm_data["encoding"], fsm_data["decoding"])
try:
self._ctrl_context = "FSM"
self.domain._depth += 1
yield fsm
finally:
self.domain._depth -= 1
self._ctrl_context = None
self._pop_ctrl()
@contextmanager
def State(self, name):
self._check_context("FSM State", context="FSM")
src_loc = tracer.get_src_loc(src_loc_at=1)
fsm_data = self._get_ctrl("FSM")
if name in fsm_data["states"]:
raise SyntaxError("FSM state '{}' is already defined".format(name))
if name not in fsm_data["encoding"]:
fsm_data["encoding"][name] = len(fsm_data["encoding"])
try:
_outer_case, self._statements = self._statements, []
self._ctrl_context = None
yield
self._flush_ctrl()
fsm_data["states"][name] = self._statements
fsm_data["state_src_locs"][name] = src_loc
finally:
self._ctrl_context = "FSM"
self._statements = _outer_case
@property
def next(self):
raise SyntaxError("Only assignment to `m.next` is permitted")
@next.setter
def next(self, name):
if self._ctrl_context != "FSM":
for level, (ctrl_name, ctrl_data) in enumerate(reversed(self._ctrl_stack)):
if ctrl_name == "FSM":
if name not in ctrl_data["encoding"]:
ctrl_data["encoding"][name] = len(ctrl_data["encoding"])
self._add_statement(
assigns=[ctrl_data["signal"].eq(ctrl_data["encoding"][name])],
domain=ctrl_data["domain"],
depth=len(self._ctrl_stack))
return
raise SyntaxError("`m.next = <...>` is only permitted inside an FSM state")
def _pop_ctrl(self):
name, data = self._ctrl_stack.pop()
src_loc = data["src_loc"]
if name == "If":
if_tests, if_bodies = data["tests"], data["bodies"]
if_src_locs = data["src_locs"]
tests, cases = [], OrderedDict()
for if_test, if_case in zip(if_tests + [None], if_bodies):
if if_test is not None:
if_test = Value.wrap(if_test)
if len(if_test) != 1:
if_test = if_test.bool()
tests.append(if_test)
if if_test is not None:
match = ("1" + "-" * (len(tests) - 1)).rjust(len(if_tests), "-")
else:
match = None
cases[match] = if_case
self._statements.append(Switch(Cat(tests), cases,
src_loc=src_loc, case_src_locs=dict(zip(cases, if_src_locs))))
if name == "Switch":
switch_test, switch_cases = data["test"], data["cases"]
switch_case_src_locs = data["case_src_locs"]
self._statements.append(Switch(switch_test, switch_cases,
src_loc=src_loc, case_src_locs=switch_case_src_locs))
if name == "FSM":
fsm_signal, fsm_reset, fsm_encoding, fsm_decoding, fsm_states = \
data["signal"], data["reset"], data["encoding"], data["decoding"], data["states"]
fsm_state_src_locs = data["state_src_locs"]
if not fsm_states:
return
fsm_signal.nbits = bits_for(len(fsm_encoding) - 1)
if fsm_reset is None:
fsm_signal.reset = fsm_encoding[next(iter(fsm_states))]
else:
fsm_signal.reset = fsm_encoding[fsm_reset]
# The FSM is encoded such that the state with encoding 0 is always the reset state.
fsm_decoding.update((n, s) for s, n in fsm_encoding.items())
fsm_signal.decoder = lambda n: "{}/{}".format(fsm_decoding[n], n)
self._statements.append(Switch(fsm_signal,
OrderedDict((fsm_encoding[name], stmts) for name, stmts in fsm_states.items()),
src_loc=src_loc, case_src_locs={fsm_encoding[name]: fsm_state_src_locs[name]
for name in fsm_states}))
def _add_statement(self, assigns, domain, depth, compat_mode=False):
def domain_name(domain):
if domain is None:
return "comb"
else:
return domain
while len(self._ctrl_stack) > self.domain._depth:
self._pop_ctrl()
for assign in Statement.wrap(assigns):
if not compat_mode and not isinstance(assign, (Assign, Assert, Assume, Cover)):
raise SyntaxError(
"Only assignments and property checks may be appended to d.{}"
.format(domain_name(domain)))
assign = SampleDomainInjector(domain)(assign)
for signal in assign._lhs_signals():
if signal not in self._driving:
self._driving[signal] = domain
elif self._driving[signal] != domain:
cd_curr = self._driving[signal]
raise SyntaxError(
"Driver-driver conflict: trying to drive {!r} from d.{}, but it is "
"already driven from d.{}"
.format(signal, domain_name(domain), domain_name(cd_curr)))
self._statements.append(assign)
def _add_submodule(self, submodule, name=None):
if not hasattr(submodule, "elaborate"):
raise TypeError("Trying to add '{!r}', which does not implement .elaborate(), as "
"a submodule".format(submodule))
if name == None:
self._anon_submodules.append(submodule)
else:
if name in self._named_submodules:
raise NameError("Submodule named '{}' already exists".format(name))
self._named_submodules[name] = submodule
def _get_submodule(self, name):
if name in self._named_submodules:
return self._named_submodules[name]
else:
raise AttributeError("No submodule named '{}' exists".format(name))
def _add_domain(self, cd):
self._domains.append(cd)
def _flush(self):
while self._ctrl_stack:
self._pop_ctrl()
def elaborate(self, platform):
self._flush()
fragment = Fragment()
for name in self._named_submodules:
fragment.add_subfragment(Fragment.get(self._named_submodules[name], platform), name)
for submodule in self._anon_submodules:
fragment.add_subfragment(Fragment.get(submodule, platform), None)
statements = SampleDomainInjector("sync")(self._statements)
fragment.add_statements(statements)
for signal, domain in self._driving.items():
fragment.add_driver(signal, domain)
fragment.add_domains(self._domains)
fragment.generated.update(self._generated)
return fragment