Skip to content

Commit

Permalink
lib.cdc: Add ElasticBuffer and smoke test
Browse files Browse the repository at this point in the history
  • Loading branch information
Wren6991 committed Apr 7, 2019
1 parent cdba03d commit 37ca0dc
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 0 deletions.
62 changes: 62 additions & 0 deletions nmigen/lib/cdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"ResetSynchronizer",
"PulseSynchronizer",
"BusSynchronizer",
"ElasticBuffer",
"Gearbox"
]

Expand Down Expand Up @@ -261,6 +262,67 @@ def elaborate(self, platform):

return m


class ElasticBuffer:
"""Pass data between two clock domains with the same frequency, and bounded phase difference.
Increasing the storage depth increases tolerance for clock wander and jitter, but still within
some bound. For less-well-behaved clocks, consider AsyncFIFO.
Parameters
----------
width : int > 0
Width of databus to be resynchronized
depth : int > 1
Number of storage elements in buffer
idomain : str
Name of input clock domain
odomain : str
Name of output clock domain
Attributes
----------
i : Signal(width)
Input data bus
o : Signal(width)
Output data bus
"""
def __init__(self, width, depth, idomain, odomain):
if not isinstance(width, int) or width < 1:
raise TypeError("width must be a positive integer, not '{!r}'".format(width))
if not isinstance(depth, int) or depth <= 1:
raise TypeError("depth must be an integer > 1, not '{!r}'".format(depth))

self.i = Signal(width)
self.o = Signal(width)
self.width = width
self.depth = depth
self.idomain = idomain
self.odomain = odomain

def elaborate(self, platform):
m = Module()

wptr = Signal(max=self.depth, reset=self.depth // 2)
rptr = Signal(max=self.depth)
m.d[self.idomain] += wptr.eq(_incr(wptr, self.depth))
m.d[self.odomain] += rptr.eq(_incr(rptr, self.depth))

storage = Memory(self.width, self.depth)
wport = m.submodules.wport = storage.write_port(domain=self.idomain)
rport = m.submodules.rport = storage.read_port(domain=self.odomain)

m.d.comb += [
wport.en.eq(1),
wport.addr.eq(wptr),
wport.data.eq(self.i),
rport.addr.eq(rptr),
self.o.eq(rport.data)
]

return m


class Gearbox:
"""Adapt the width of a continous datastream.
Expand Down
41 changes: 41 additions & 0 deletions nmigen/test/test_lib_cdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,47 @@ def process():
sim.add_process(process)
sim.run()

class ElasticBufferTestCase(FHDLTestCase):
def test_paramcheck(self):
with self.assertRaises(TypeError):
e = ElasticBuffer(0, 2, "i", "o")
with self.assertRaises(TypeError):
e = ElasticBuffer("i", 2, "i", "o")
with self.assertRaises(TypeError):
e = ElasticBuffer(1, 1, "i", "o")
with self.assertRaises(TypeError):
e = ElasticBuffer(1, "i", "i", "o")
e = ElasticBuffer(1, 2, "i", "o")

def test_smoke_po2(self):
self.check_smoke(8, 8)

def test_smoke_nonpo2(self):
self.check_smoke(8, 7)

def check_smoke(self, width, depth):
m = Module()
m.domains += ClockDomain("sync")
e = m.submodules.dut = ElasticBuffer(width, depth, "sync", "sync")

with Simulator(m, vcd_file = open("test.vcd", "w")) as sim:
sim.add_clock(1e-6)
def process():
pipeline_filled = False
expected_out = 1
yield Tick()
for i in range(depth * 4):
yield e.i.eq(i)
if (yield e.o):
pipeline_filled = True
if pipeline_filled:
self.assertEqual((yield e.o), expected_out)
expected_out = (expected_out + 1) % (2 ** width)
yield Tick()
self.assertEqual(pipeline_filled, True)
sim.add_process(process)
sim.run()

# TODO: test with distinct clocks
# (since we can currently only test symmetric aspect ratio)
class GearboxTestCase(FHDLTestCase):
Expand Down

0 comments on commit 37ca0dc

Please sign in to comment.