In [1]:
from myhdl import block, delay, always_seq, instance, always, Signal, ResetSignal, traceSignals, now
from dataclasses import dataclass
from itertools import product, tee
from typing import Callable, Generator
from copy import deepcopy
from abc import ABC, abstractmethod, abstractproperty
import logging
from sys import version
logging.basicConfig(level=logging.DEBUG)
logging.info(version)

INFO:root:3.9.5 (default, Aug 31 2021, 00:10:23) 
[GCC 9.3.0]


In [2]:
@ block
def counter(clk, reset, count):
    @always_seq(clk.posedge, reset=reset)
    def increment():
        count.next = count.val + 1
    return increment

@block
def clk_driver(clk, enable, period=20):
    lowTime = int(period / 2)
    highTime = period - lowTime

    @instance
    def drive_clk():
        while True:
            if not enable: 
                yield enable
            yield delay(lowTime)
            clk.next = 1
            yield delay(highTime)
            clk.next = 0

    return drive_clk

In [7]:
class AddressStreamDescriptor(ABC):
    def __init__(self):
        self._done = False
    
    @abstractmethod
    def __post_init__(self):
        pass 
    
    @abstractmethod
    def reset(self):
        pass
    
    @abstractmethod
    def __iter__(self):
        pass
    
    @abstractmethod
    def __next__(self):
        pass
    
    @property
    def done(self):
        return self._done
    
    @done.setter
    def done(self, val):
        if val:
            logging.debug("{} has concluded @T={}".format(self, now()))
        else:
            logging.debug("{} initialized @T={}".format(self, now()))
        self._done = val

@dataclass
class HighLevelAddressStreamDescriptor(AddressStreamDescriptor):
    index_generator_fn: Generator

    def __post_init__(self):
        super().__init__()
        self.done = False
        self.index_generator_fn, self.initial_index_generator_fn = tee(self.index_generator_fn)

    def reset(self):
        self.done = False
        self.index_generator_fn = self.initial_index_generator_fn
        self.index_generator_fn, self.initial_index_generator_fn = tee(self.index_generator_fn)

    def __iter__(self):
        return self

    def __next__(self):
        try:
            next_index = next(self.index_generator_fn)
        except StopIteration:
            next_index = 0
            self.done = True
        return next_index 

@dataclass
class LowLevelAddressStreamDescriptor(AddressStreamDescriptor):
    iteration_domain: Generator
    access_map: Callable
    condition: Callable

    def __post_init__(self):
        super().__init__()
        self.done = False
        self.initial_iteration_domain = deepcopy(self.iteration_domain)

    def reset(self):
        self.done = False
        self.iteration_domain = deepcopy(self.initial_iteration_domain)

    def __iter__(self):
        return self

    def __next__(self):
        try:
            next_iteration_vector = next(self.iteration_domain)
            print(next_iteration_vector)
            if self.condition(next_iteration_vector):
                next_index = self.access_map(next_iteration_vector)
            else:
                next_index = 0
        except StopIteration:
            next_index = 0
            self.done = True
        return next_index   


In [8]:
@ block
def stream_generator(clk, enable, reset, stream, stream_out):
    @always(clk.posedge, reset.posedge)
    def generate():
        if not reset and enable:
            if not stream.done:
                stream_out.next = next(stream)
        elif reset:
            stream.reset()
            stream_out.next = 0
    return generate


@block
def top():

    clk = Signal(bool(0))
    enable = Signal(bool(0))
    count = Signal(0)
    reset = ResetSignal(bool(0), active=1, isasync=True)
    counter_inst = counter(clk, reset, count)
    clk_driver_inst = clk_driver(clk, enable, period=10)
    def high_level_stream_descriptor():
        for i in range(5, 10):
            for j in range(10):
                if (i>=0 and j>=0):
                    yield i*10+j
    stream = HighLevelAddressStreamDescriptor(high_level_stream_descriptor())
    # stream = LowLevelAddressStreamDescriptor(iteration_domain=product(*([range(10)]*2)), 
    #                                          access_map = lambda iter_vect: iter_vect[0]*10+iter_vect[1], 
    #                                          condition = lambda iter_vect: iter_vect[0] >= 0 and iter_vect[1] >= 0)
    stream_out = Signal(0)
    stream_generator_inst = stream_generator(clk, enable, reset, stream, stream_out) 

    @instance
    def start_sim():
        # reset cycle
        enable.next = 0
        reset.next = 1
        yield delay(10)
        enable.next = 1
        reset.next = 0
        yield delay(40)
        enable.next = 0
        reset.next = 1
        yield delay(10)
        enable.next = 1
        reset.next = 0
        
    return clk_driver_inst, counter_inst, start_sim, stream_generator_inst

In [9]:
inst = traceSignals(top())
inst.run_sim(100)
inst.quit_sim()

DEBUG:root:HighLevelAddressStreamDescriptor(index_generator_fn=<generator object top.<locals>.high_level_stream_descriptor at 0x7f5a9496e7b0>) initialized @T=0
DEBUG:root:HighLevelAddressStreamDescriptor(index_generator_fn=<itertools._tee object at 0x7f5a9443bac0>) initialized @T=0
DEBUG:root:HighLevelAddressStreamDescriptor(index_generator_fn=<itertools._tee object at 0x7f5a9443b040>) initialized @T=50
DEBUG:root:HighLevelAddressStreamDescriptor(index_generator_fn=<itertools._tee object at 0x7f5a9443bac0>) initialized @T=55
<class 'myhdl._SuspendSimulation'>: Simulated 100 timesteps
