diff --git a/lambdalib/cores/i2c/init.py b/lambdalib/cores/i2c/init.py index 2f8ed10..91fa8dd 100644 --- a/lambdalib/cores/i2c/init.py +++ b/lambdalib/cores/i2c/init.py @@ -8,15 +8,25 @@ class I2CRegisterInit(Elaboratable): + """Sends an init sequence to an I2C device. I2C writer + can be exposed for further usage (eg. readjusting + parameters after init).""" def __init__(self, sys_clk_freq, i2c_addr, regs_data, - i2c_freq=400e3, i2c_pins=None): + i2c_freq=400e3, i2c_pins=None, + expose_writer=False): self.sys_clk_freq = sys_clk_freq self.i2c_addr = i2c_addr self.regs_data = regs_data self.i2c_freq = i2c_freq self.i2c_pins = i2c_pins + self.done = Signal() # Asserted when init sequence has been sent + + if expose_writer: + self.writer = stream.Endpoint([("data", 8)]) + else: + self.writer = None def regs_data_to_mem(self, regs_data): mem = [] @@ -37,6 +47,21 @@ def elaborate(self, platform): i2c_period = (self.sys_clk_freq // self.i2c_freq) m.submodules.writer = writer = I2CWriterStream(self.i2c_pins, i2c_period) - m.d.comb += mem.source.connect(writer.sink) + with m.If(mem.source.valid & mem.source.ready & mem.source.last): + m.d.sync += self.done.eq(1) + + if self.writer is None: + m.d.comb += mem.source.connect(writer.sink) + else: + with m.If(~self.done): + m.d.comb += [ + mem.source.connect(writer.sink), + self.writer.ready.eq(0), + ] + with m.Else(): + m.d.comb += [ + self.writer.connect(writer.sink), + mem.source.ready.eq(0), + ] return m diff --git a/lambdalib/cores/i2c/stream.py b/lambdalib/cores/i2c/stream.py index c24baba..cd196bd 100644 --- a/lambdalib/cores/i2c/stream.py +++ b/lambdalib/cores/i2c/stream.py @@ -9,6 +9,7 @@ __all__ = [ "I2CStream", "I2CWriterStream", + "I2CRegStream", "i2c_writer_description", ] @@ -226,3 +227,65 @@ def elaborate(self, platform): m.next = "IDLE" return m + + +class I2CRegStream(Elaboratable): + """Converts addr/val stream into an I2C stream. + Currently only supports 8bit reg addresses and values.""" + def __init__(self, i2c_addr, addr_width=8, val_width=8): + assert addr_width == 8 + assert val_width == 8 + + self._i2c_addr = i2c_addr + self._addr_width = addr_width + self._val_width = val_width + + self.sink = stream.Endpoint([ + ("addr", addr_width), + ("val", val_width) + ]) + self.source = stream.Endpoint([ + ("data", 8), + ]) + + def elaborate(self, platform): + m = Module() + + # Latch addr/val + addr_d = Signal(self._addr_width) + val_d = Signal(self._val_width) + + with m.FSM(): + with m.State("IDLE"): + m.d.comb += self.sink.ready.eq(1) + with m.If(self.sink.valid): + m.d.sync += [ + self.source.valid.eq(1), + self.source.data.eq(self._i2c_addr << 1), + addr_d.eq(self.sink.addr), + val_d.eq(self.sink.val), + ] + m.next = "PUSH-SLAVE-ADDR" + + with m.State("PUSH-SLAVE-ADDR"): + with m.If(self.source.ready): + m.d.sync += self.source.data.eq(addr_d) + m.next = "PUSH-REG-ADDR" + + with m.State("PUSH-REG-ADDR"): + with m.If(self.source.ready): + m.d.sync += [ + self.source.data.eq(val_d), + self.source.last.eq(1), + ] + m.next = "PUSH-REG-VAL" + + with m.State("PUSH-REG-VAL"): + with m.If(self.source.ready): + m.d.sync += [ + self.source.valid.eq(0), + self.source.last.eq(0), + ] + m.next = "IDLE" + + return m diff --git a/lambdalib/tests/test_cores_i2c.py b/lambdalib/tests/test_cores_i2c.py index e347175..88aaed3 100644 --- a/lambdalib/tests/test_cores_i2c.py +++ b/lambdalib/tests/test_cores_i2c.py @@ -103,7 +103,41 @@ def test_i2c_proto(): sim.run() +def test_i2c_reg_stream(): + dut = I2CRegStream(0x2C, 8, 8) + sim = Simulator(dut) + + datas = { + "addr": [0xAB, 0xCD], + "val": [0xEF, 0x01], + } + + out_stream = [ + 0x2C << 1, + 0xAB, + 0xEF, + + 0x2C << 1, + 0xCD, + 0x01, + ] + + sender = StreamSimSender(dut.sink, datas, speed=0.9, + verbose=True, strname="sender") + receiver = StreamSimReceiver(dut.source, length=len(out_stream), speed=1, + verbose=True, strname="receiver") + + sim.add_clock(1e-6) + sim.add_sync_process(sender.sync_process) + sim.add_sync_process(receiver.sync_process) + with sim.write_vcd("tests/test_i2c_reg_command_generator.vcd"): + sim.run() + + assert receiver.data["data"] == out_stream + + if __name__ == "__main__": test_i2c_stream_writer() test_i2c_stream() test_i2c_proto() + test_i2c_reg_stream()