Skip to content

Commit

Permalink
lib.cdc: Add PulseSynchronizer and Gearbox modules, and smoke tests f…
Browse files Browse the repository at this point in the history
…or these
  • Loading branch information
Wren6991 committed Mar 4, 2019
1 parent a27c13e commit 8b9f855
Show file tree
Hide file tree
Showing 2 changed files with 210 additions and 1 deletion.
132 changes: 131 additions & 1 deletion nmigen/lib/cdc.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from .. import *
from math import gcd


__all__ = ["MultiReg", "ResetSynchronizer"]
__all__ = ["MultiReg", "ResetSynchronizer", "PulseSynchronizer", "Gearbox"]


class MultiReg:
Expand Down Expand Up @@ -107,3 +108,132 @@ def elaborate(self, platform):
ResetSignal(self.domain).eq(self._regs[-1])
]
return m


class PulseSynchronizer:
"""A one-clock pulse on the input produces a one-clock pulse on the output.
If the output clock is faster than the input clock, then the input
may be safely asserted at 100% duty cycle; otherwise some level of
sparsity is required at the input to avoid pulses being dropped.
Other than this there is no constraint on the relationship between
input and output clock frequency.
Parameters
----------
idomain : str
Name of input clock domain.
odomain : str
Name of output clock domain.
sync_stages : int
Number of synchronisation flops between the two clock domains.
2 is the default, and minimum safe value. High-frequency designs
may choose to increase this.
"""
def __init__(self, idomain, odomain, sync_stages=2):
if not isinstance(sync_stages, int) or sync_stages < 1:
raise TypeError("sync_stages must be a positive integer, not '{!r}'".format(sync_stages))

self.i = Signal()
self.o = Signal()
self.idomain = idomain
self.odomain = odomain
self.sync_stages = sync_stages

def elaborate(self, platform):
m = Module()

itoggle = Signal()
otoggle = Signal()
mreg = m.submodules.mreg = \
MultiReg(itoggle, otoggle, odomain=self.odomain, n=self.sync_stages)
otoggle_prev = Signal()

m.d[self.idomain] += itoggle.eq(itoggle ^ self.i)
m.d[self.odomain] += otoggle_prev.eq(otoggle)
m.d.comb += self.o.eq(otoggle ^ otoggle_prev)

return m


class Gearbox:
"""Adapt the width of a continous datastream.
Input: m bits wide, clock frequency f MHz.
Output: n bits wide, clock frequency m / n * f MHz.
Used to adjust width of a datastream when interfacing
system logic to a SerDes. The input and output clocks
must be derived from the same frequency reference,
to avoid long-term phase drift.
Parameters
----------
iwidth : int
Bit width of the input
idomain : str
Name of input clock domain
owidth : int
Bit width of the output
odomain : str
Name of output clock domain
Attributes
----------
i : Signal(iwidth), in
Input datastream. Sampled on every input clock.
o : Signal(owidth), out
Output datastream. Transitions on every output clock.
"""
def __init__(self, iwidth, idomain, owidth, odomain):
if not isinstance(iwidth, int) or iwidth < 1:
raise TypeError("iwidth must be a positive integer, not '{!r}'".format(iwidth))
if not isinstance(owidth, int) or owidth < 1:
raise TypeError("owidth must be a positive integer, not '{!r}'".format(owidth))

self.i = Signal(iwidth)
self.o = Signal(owidth)
self.iwidth = iwidth
self.idomain = idomain
self.owidth = owidth
self.odomain = odomain

storagesize = iwidth * owidth // gcd(iwidth, owidth)
while storagesize // iwidth < 4:
storagesize *= 2
while storagesize // owidth < 4:
storagesize *= 2

self._storagesize = storagesize
self._ichunks = storagesize // self.iwidth
self._ochunks = storagesize // self.owidth
assert(self._ichunks * self.iwidth == storagesize)
assert(self._ochunks * self.owidth == storagesize)

def elaborate(self, platform):
m = Module()

storage = Signal(self._storagesize)
i_faster = self._ichunks > self._ochunks
iptr = Signal(max=self._ichunks - 1, reset=(self._ichunks // 2 if i_faster else 0))
optr = Signal(max=self._ochunks - 1, reset=(0 if i_faster else self._ochunks // 2))


m.d[self.idomain] += iptr.eq(iptr + 1)
m.d[self.odomain] += optr.eq(optr + 1)

with m.Switch(iptr):
for n in range(self._ichunks):
s = slice(n * self.iwidth, (n + 1) * self.iwidth)
with m.Case(n):
m.d[self.idomain] += storage[s].eq(self.i)

with m.Switch(optr):
for n in range(self._ochunks):
s = slice(n * self.owidth, (n + 1) * self.owidth)
with m.Case(n):
m.d[self.odomain] += self.o.eq(storage[s])

return m
79 changes: 79 additions & 0 deletions nmigen/test/test_lib_cdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,82 @@ def process():
yield Tick(); yield Delay(1e-8)
sim.add_process(process)
sim.run()


# TODO: test with distinct clocks
class PulseSynchronizerTestCase(FHDLTestCase):
def test_paramcheck(self):
with self.assertRaises(TypeError):
ps = PulseSynchronizer("w", "r", sync_stages=0)
with self.assertRaises(TypeError):
ps = PulseSynchronizer("w", "r", sync_stages="abc")
ps = PulseSynchronizer("w", "r", sync_stages = 1)

def test_smoke(self):
m = Module()
m.domains += ClockDomain("sync")
ps = m.submodules.dut = PulseSynchronizer("sync", "sync")

with Simulator(m, vcd_file = open("test.vcd", "w")) as sim:
sim.add_clock(1e-6)
def process():
yield ps.i.eq(0)
# TODO: think about reset
for n in range(5):
yield Tick()
# Make sure no pulses are generated in quiescent state
for n in range(3):
yield Tick()
self.assertEqual((yield ps.o), 0)
# Check conservation of pulses
accum = 0
for n in range(10):
yield ps.i.eq(1 if n < 4 else 0)
yield Tick()
accum += yield ps.o
self.assertEqual(accum, 4)
sim.add_process(process)
sim.run()


# TODO: test with distinct clocks
# (since we can currently only test symmetric aspect ratio)
class GearboxTestCase(FHDLTestCase):
def test_paramcheck(self):
with self.assertRaises(TypeError):
g = Gearbox(0, "i", 1, "o")
with self.assertRaises(TypeError):
g = Gearbox(1, "i", 0, "o")
with self.assertRaises(TypeError):
g = Gearbox("x", "i", 1, "o")
with self.assertRaises(TypeError):
g = Gearbox(1, "i", "x", "o")
g = Gearbox(1, "i", 1, "o")
g = Gearbox(7, "i", 1, "o")
g = Gearbox(7, "i", 3, "o")
g = Gearbox(7, "i", 7, "o")
g = Gearbox(3, "i", 7, "o")

def test_smoke_symmetric(self):
m = Module()
m.domains += ClockDomain("sync")
g = m.submodules.dut = Gearbox(8, "sync", 8, "sync")

with Simulator(m, vcd_file = open("test.vcd", "w")) as sim:
sim.add_clock(1e-6)
def process():
pipeline_filled = False
expected_out = 1
yield Tick()
for i in range(g._ichunks * 4):
yield g.i.eq(i)
if (yield g.o):
pipeline_filled = True
if pipeline_filled:
self.assertEqual((yield g.o), expected_out)
expected_out += 1
yield Tick()
self.assertEqual(pipeline_filled, True)
self.assertEqual(expected_out > g._ichunks * 2, True)
sim.add_process(process)
sim.run()

0 comments on commit 8b9f855

Please sign in to comment.