Skip to content

Commit

Permalink
sim.core: add bench processes that always settle implicitly.
Browse files Browse the repository at this point in the history
  • Loading branch information
whitequark authored and zyp committed Dec 6, 2023
1 parent 120375d commit ef1f8f7
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 0 deletions.
20 changes: 20 additions & 0 deletions amaranth/sim/core.py
Expand Up @@ -91,6 +91,26 @@ def wrapper():
yield from process()
self._engine.add_coroutine_process(wrapper, default_cmd=Tick(domain))

def add_bench_process(self, process, *, domain="sync"):
process = self._check_process(process)
def wrapper():
generator = process()
# Only start a bench process after power-on reset settles.
try:
yield Settle()
command = generator.send(None)
while True:
try:
result = yield command
except Exception as e:
generator.throw(e)
continue
yield Settle()
command = generator.send(result)
except StopIteration:
pass
self._engine.add_coroutine_process(wrapper, default_cmd=Tick(domain))

def add_clock(self, period, *, phase=None, domain="sync", if_exists=False):
"""Add a clock process.
Expand Down
56 changes: 56 additions & 0 deletions tests/test_sim.py
Expand Up @@ -690,6 +690,19 @@ def process():
sim.add_process(process)
self.assertTrue(survived)

def test_bench_command_wrong(self):
survived = False
with self.assertSimulation(Module()) as sim:
def process():
nonlocal survived
with self.assertRaisesRegex(TypeError,
r"Received unsupported command 1 from process .+?"):
yield 1
yield Settle()
survived = True
sim.add_bench_process(process)
self.assertTrue(survived)

def setUp_memory(self, rd_synchronous=True, rd_transparent=True, wr_granularity=None):
self.m = Module()
self.memory = Memory(width=8, depth=4, init=[0xaa, 0x55])
Expand Down Expand Up @@ -911,6 +924,49 @@ def process():
sim.add_clock(1e-6)
sim.add_sync_process(process)

def test_comb_bench_process(self):
m = Module()
a = Signal(reset=1)
b = Signal()
m.d.comb += b.eq(a)
with self.assertSimulation(m) as sim:
def process():
self.assertEqual((yield a), 1)
self.assertEqual((yield b), 1)
yield a.eq(0)
self.assertEqual((yield a), 0)
self.assertEqual((yield b), 0)
sim.add_bench_process(process)

def test_sync_bench_process(self):
m = Module()
a = Signal(reset=1)
b = Signal()
m.d.sync += b.eq(a)
t = Signal()
m.d.sync += t.eq(~t)
with self.assertSimulation(m) as sim:
def process():
self.assertEqual((yield a), 1)
self.assertEqual((yield b), 0)
self.assertEqual((yield t), 0)
yield
self.assertEqual((yield a), 1)
self.assertEqual((yield b), 1)
self.assertEqual((yield t), 1)
yield
self.assertEqual((yield a), 1)
self.assertEqual((yield b), 1)
self.assertEqual((yield t), 0)
yield a.eq(0)
self.assertEqual((yield a), 0)
self.assertEqual((yield b), 1)
yield
self.assertEqual((yield a), 0)
self.assertEqual((yield b), 0)
sim.add_clock(1e-6)
sim.add_bench_process(process)

@_ignore_deprecated
def test_sample_helpers(self):
m = Module()
Expand Down

0 comments on commit ef1f8f7

Please sign in to comment.