Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions lambdalib/cores/i2c/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,25 @@


class I2CRegisterInit(Elaboratable):
"""Sends an init sequence to an I2C device. I2C writer
can be exposed for further usage (eg. readjusting
parameters after init)."""
def __init__(self,
sys_clk_freq, i2c_addr, regs_data,
i2c_freq=400e3, i2c_pins=None):
i2c_freq=400e3, i2c_pins=None,
expose_writer=False):

self.sys_clk_freq = sys_clk_freq
self.i2c_addr = i2c_addr
self.regs_data = regs_data
self.i2c_freq = i2c_freq
self.i2c_pins = i2c_pins
self.done = Signal() # Asserted when init sequence has been sent

if expose_writer:
self.writer = stream.Endpoint([("data", 8)])
else:
self.writer = None

def regs_data_to_mem(self, regs_data):
mem = []
Expand All @@ -37,6 +47,21 @@ def elaborate(self, platform):
i2c_period = (self.sys_clk_freq // self.i2c_freq)
m.submodules.writer = writer = I2CWriterStream(self.i2c_pins, i2c_period)

m.d.comb += mem.source.connect(writer.sink)
with m.If(mem.source.valid & mem.source.ready & mem.source.last):
m.d.sync += self.done.eq(1)

if self.writer is None:
m.d.comb += mem.source.connect(writer.sink)
else:
with m.If(~self.done):
m.d.comb += [
mem.source.connect(writer.sink),
self.writer.ready.eq(0),
]
with m.Else():
m.d.comb += [
self.writer.connect(writer.sink),
mem.source.ready.eq(0),
]

return m
63 changes: 63 additions & 0 deletions lambdalib/cores/i2c/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
__all__ = [
"I2CStream",
"I2CWriterStream",
"I2CRegStream",
"i2c_writer_description",
]

Expand Down Expand Up @@ -226,3 +227,65 @@ def elaborate(self, platform):
m.next = "IDLE"

return m


class I2CRegStream(Elaboratable):
"""Converts addr/val stream into an I2C stream.
Currently only supports 8bit reg addresses and values."""
def __init__(self, i2c_addr, addr_width=8, val_width=8):
assert addr_width == 8
assert val_width == 8

self._i2c_addr = i2c_addr
self._addr_width = addr_width
self._val_width = val_width

self.sink = stream.Endpoint([
("addr", addr_width),
("val", val_width)
])
self.source = stream.Endpoint([
("data", 8),
])

def elaborate(self, platform):
m = Module()

# Latch addr/val
addr_d = Signal(self._addr_width)
val_d = Signal(self._val_width)

with m.FSM():
with m.State("IDLE"):
m.d.comb += self.sink.ready.eq(1)
with m.If(self.sink.valid):
m.d.sync += [
self.source.valid.eq(1),
self.source.data.eq(self._i2c_addr << 1),
addr_d.eq(self.sink.addr),
val_d.eq(self.sink.val),
]
m.next = "PUSH-SLAVE-ADDR"

with m.State("PUSH-SLAVE-ADDR"):
with m.If(self.source.ready):
m.d.sync += self.source.data.eq(addr_d)
m.next = "PUSH-REG-ADDR"

with m.State("PUSH-REG-ADDR"):
with m.If(self.source.ready):
m.d.sync += [
self.source.data.eq(val_d),
self.source.last.eq(1),
]
m.next = "PUSH-REG-VAL"

with m.State("PUSH-REG-VAL"):
with m.If(self.source.ready):
m.d.sync += [
self.source.valid.eq(0),
self.source.last.eq(0),
]
m.next = "IDLE"

return m
34 changes: 34 additions & 0 deletions lambdalib/tests/test_cores_i2c.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,41 @@ def test_i2c_proto():
sim.run()


def test_i2c_reg_stream():
dut = I2CRegStream(0x2C, 8, 8)
sim = Simulator(dut)

datas = {
"addr": [0xAB, 0xCD],
"val": [0xEF, 0x01],
}

out_stream = [
0x2C << 1,
0xAB,
0xEF,

0x2C << 1,
0xCD,
0x01,
]

sender = StreamSimSender(dut.sink, datas, speed=0.9,
verbose=True, strname="sender")
receiver = StreamSimReceiver(dut.source, length=len(out_stream), speed=1,
verbose=True, strname="receiver")

sim.add_clock(1e-6)
sim.add_sync_process(sender.sync_process)
sim.add_sync_process(receiver.sync_process)
with sim.write_vcd("tests/test_i2c_reg_command_generator.vcd"):
sim.run()

assert receiver.data["data"] == out_stream


if __name__ == "__main__":
test_i2c_stream_writer()
test_i2c_stream()
test_i2c_proto()
test_i2c_reg_stream()
Loading