In [None]:
import asyncio
import scipp as sc
import plopp as pp
from queue import Queue, Empty

%matplotlib widget
from beamlime.resources.generated import load_static_default_config

In [None]:
global_da = sc.DataArray(data=sc.zeros(sizes={'y': 64, 'x': 64}),
                         coords={'x': sc.linspace('x', 0, 65, 65, unit='mm'),
                                 'y': sc.linspace('y', 0, 65, 65, unit='mm')})

In [None]:
def get_data():
    global global_da
    return global_da

stream_node = pp.Node(get_data)

In [None]:
class RealtimePlot:
    _input_ch = None
    _output_ch = None

    def __init__(self) -> None:
        pass

    @property
    def input_channel(self):
        return self._input_ch

    @input_channel.setter
    def input_channel(self, input_channel):
        self._input_ch = input_channel

    @property
    def output_channel(self):
        return self._output_ch
    
    @output_channel.setter
    def output_channel(self, output_channel):
        self._output_ch = output_channel
    
    def set_stream_node(self, stream_node):
        self.stream_node = stream_node

    @staticmethod
    async def retrieve_new_data(queue):
        try:
            return queue.get(block=False, timeout=2)
        except Empty:
            return None
    
    @staticmethod
    def process(new_data):
        global global_da
        global_da.values = new_data['heatmap'].values
        global stream_node
        stream_node.notify_children('update')
        return f"value updated with frame number {new_data['frame-number-counting']}"

    @staticmethod
    async def _run(ear: Queue, mouth: Queue):
        await asyncio.sleep(2)
        new_data = await RealtimePlot.retrieve_new_data(ear)
        while new_data is not None:
            result = RealtimePlot.process(new_data)
            await asyncio.sleep(0.5)
            new_data = await RealtimePlot.retrieve_new_data(ear)
            print(result)

    def create_task(self):
        return asyncio.create_task(
            self._run(ear=self.input_channel,
                      mouth=self.output_channel)
        )


In [None]:
def build_instances(config: dict):
    from importlib import import_module

    itf_map = dict()
    for itf in config["data-stream"]["applications"]:
        itf_map[itf["name"]] = itf
        handler_name = itf["data-handler"].split(".")
        dh_parent = ".".join(handler_name[:-1])
        dh_class = handler_name[-1]
        if len(dh_parent) > 0:
            parent_module = import_module(dh_parent)
        else:
            parent_module = import_module(__name__)

        handler = getattr(parent_module, dh_class)

        # This if statements will be replaced parsed arguments from configuration.
        if dh_class == "BeamLimeDataReductionApplication":
            itf["instance"] = handler(config=config["data-reduction"])
        elif dh_class == "Fake2dDetectorImageFeeder":
            itf["instance"] = handler(config={"num-frame": 10,
                                              "noise-range": 0.5})
        else:
            itf["instance"] = handler()
    return itf_map

In [None]:
def connect_instances(config: dict, itf_map: dict):
    for mapping in config["data-stream"]["applications-mapping"]:
        sender_config = itf_map[mapping["from"]]
        receiver_config = itf_map[mapping["to"]]
        if sender_config["output-channel"] != receiver_config["input-channel"]:
            raise ValueError(
                "`input-channel` of the `from` interface"
                " and the `output-channel` of the `to` interface"
                "should have the same option."
            )

        sender = sender_config["instance"]
        receiver = receiver_config["instance"]
        
        if receiver_config["output-channel"] == "QUEUE":
            if (receiver.input_channel is None and sender.output_channel is None):
                new_queue = Queue(maxsize=100)
                sender.output_channel = new_queue
                receiver.input_channel = new_queue
            elif (receiver.input_channel != sender.output_channel):
                raise RuntimeError("There's a problem in the mapping")


In [None]:
config = load_static_default_config()
itf_map = build_instances(config)
connect_instances(config, itf_map)

In [None]:
def main(instances: list):
    tasks = [inst.create_task() for inst in instances]

In [None]:
fig = pp.figure2d(stream_node)

In [None]:
fig

In [None]:
main([inst["instance"] for inst in itf_map.values()])