# Utils

> TODO fill in description

In [None]:
#| default_exp utils

In [None]:
#| hide
from nbdev.showdoc import *; 

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()

In [None]:
#|export
import asyncio
from typing import Optional, Type, Union
from types import MappingProxyType
import copy
import traceback

import fbdev

In [None]:
#|hide
show_doc(fbdev.utils.create_task_with_exception_handler)

---

### create_task_with_exception_handler

>      create_task_with_exception_handler (coroutine)

In [None]:
#|exporti
def handle_exception(task):
    try:
        task.result()
    except Exception as e:
        print(f"Caught exception: {e}")
        traceback.print_exc()

In [None]:
#|export
def create_task_with_exception_handler(coroutine):
    task = asyncio.create_task(coroutine)
    task.add_done_callback(handle_exception)
    return task

In [None]:
#|hide
show_doc(fbdev.utils.await_multiple_events)

---

### await_multiple_events

>      await_multiple_events (*events)

In [None]:
#|export
async def await_multiple_events(*events):
    while not all([event.is_set() for event in events]): # In the off-chance that as asyncio.wait finishes, one of the events is cleared
        event_await_tasks = []
        for event in events:
            async def await_event(): await event.wait()
            event_await_tasks.append(create_task_with_exception_handler(await_event()))
        await asyncio.wait(event_await_tasks)

In [None]:
#|hide
show_doc(fbdev.utils.await_any_event)

---

### await_any_event

>      await_any_event (*events)

In [None]:
#|export
async def await_any_event(*events):
    event_await_tasks = []
    for event in events:
        async def await_event(event=event):  # Capture the current event
            await event.wait()
        event_await_tasks.append(create_task_with_exception_handler(await_event()))
    await asyncio.wait(event_await_tasks, return_when=asyncio.FIRST_COMPLETED)

In [None]:
ev1 = asyncio.Event()
async def foo1():
    await asyncio.sleep(0.5)
    ev1.set()
    
ev2 = asyncio.Event()
async def foo2():
    await asyncio.sleep(1)
    ev2.set()

asyncio.create_task(foo1())
asyncio.create_task(foo2())
await await_any_event(ev1, ev2)
print("Done waiting")
print("ev1 is set:", ev1.is_set())
print("ev2 is set:", ev2.is_set())

Done waiting
ev1 is set: True
ev2 is set: False


In [None]:
#|hide
show_doc(fbdev.utils.AttrContainer)

---

### AttrContainer

>      AttrContainer (_attrs=None, obj_name='AttrContainer',
>                     dtype:Optional[Type]=None)

*Initialize self.  See help(type(self)) for accurate signature.*

In [None]:
#|export
class AttrContainer:
    def __init__(self, _attrs=None, obj_name="AttrContainer", dtype:Optional[Type]=None):
        self.idx = ()
        self._attrs = dict(_attrs) if _attrs is not None else {}
        self._obj_name = obj_name
        self._dtype = dtype
        
    def __getattr__(self, key):
        if key.startswith("__") and key.endswith("__"):
            raise AttributeError(f"'{type(self).__name__}' object has no attribute '{key}' (in {self._obj_name})")
        return self[key]
        
    def __getitem__(self, key):
        if key in self._attrs:
            return self._attrs[key]
        elif key.startswith("__") and key.endswith("__"):
            raise KeyError(f"'{type(self).__name__}' object has no key '{key}' (in {self._obj_name})")
        else:
            raise Exception(f"'{key}' does not exist (in {self._obj_name})")
        
    def _set(self, key, value):
        if self._dtype is not None and type(value) != self._dtype:
            raise TypeError(f"Value {value} is not of type {self._dtype} (in {self._obj_name}).")
        self._attrs[key] = value
        
    def keys(self):
        return self._attrs.keys()
    
    def values(self):
        return self._attrs.values()
    
    def items(self):
        return self._attrs.items()
    
    def as_readonly_dict(self):
        return MappingProxyType(self._attrs)
    
    def as_list(self):
        return list(self.values())
        
    def __iter__(self):
        return self._attrs.__iter__()
    
    def __contains__(self, key):
        return key in self._attrs

    def __len__(self):
        return self._attrs.__len__()
    
    def __str__(self):
        return f'{self._obj_name}: {", ".join([f"{k}: {v}" for k,v in self._attrs.items()])}'
    
    def copy(self):
        copy =  copy.copy(self)
        for key, value in self.items():
            if type(value) == AttrContainer:
                copy._set(key, value.copy())
        return copy

In [None]:
#|hide
show_doc(fbdev.utils.ReadonlyEvent)

---

### ReadonlyEvent

>      ReadonlyEvent (event:asyncio.locks.Event)

*Initialize self.  See help(type(self)) for accurate signature.*

In [None]:
#|export
class ReadonlyEvent:
    def __init__(self, event: asyncio.Event):
        self._event = event

    def is_set(self):
        return self._event.is_set()

    async def wait(self):
        await self._event.wait()

In [None]:
#|hide
show_doc(fbdev.utils.EventHandler)

---

### EventHandler

>      EventHandler (name)

*Subscribable events*

In [None]:
#|export
class EventHandler:
    """Subscribable events"""
    def __init__(self, name):
        self._events = []
        self.name = name
    
    def subscribe(self):
        event = asyncio.Event()
        self._events.append(event)
        return event

    def _trigger(self):
        for event in self._events:
            event.set()
        self._events.clear()
        
    def __str__(self):
        return f"EventHandler(name='{self.name}')"
    
    def __repr__(self):
        return str(self)

In [None]:
#|hide
show_doc(fbdev.utils.EventCollection)

---

### EventCollection

>      EventCollection ()

*Initialize self.  See help(type(self)) for accurate signature.*

In [None]:
#|export
class EventCollection(AttrContainer):
    def __init__(self) -> None:
        super().__init__({}, obj_name="EventCollection")
    
    def _add_event(self, event_handler: EventHandler):
        self._set(event_handler.name, event_handler)

In [None]:
#|hide
show_doc(fbdev.utils.StateHandler)

---

### StateHandler

>      StateHandler (name, current_state, state_vals=[True, False])

*Initialize self.  See help(type(self)) for accurate signature.*

In [None]:
#|export
class StateHandler:
    def __init__(self, name, current_state, state_vals=[True, False]):
        self.name = name
        state_vals = list(state_vals) # Can be enums
        self._state_vals = state_vals
        if len(state_vals) != len(set(state_vals)): raise ValueError("`state_vals` must have all unique elements.")
        if current_state not in state_vals: raise ValueError("`current_state` must be in `state_vals`.")
        self.__state_is_on = {state : asyncio.Event() for state in state_vals}
        self.__state_is_on[current_state].set()
        self.__state_is_off = {state : asyncio.Event() for state in state_vals}
        self._current_state = current_state
        for state in self.__state_is_off:
            if state != current_state: self.__state_is_off[state].set()
        
    def check(self, state):
        return self.__state_is_on[state].is_set()
    
    def get(self):
        return self._current_state
    
    def set(self, state):
        if state not in self._state_vals: raise ValueError(f"Invalid state: {state}. Possible states: {', '.join(self._state_vals)}")
        self._current_state = state
        for _state in self.__state_is_on:
            if _state == state:
                self.__state_is_on[_state].set()
                self.__state_is_off[_state].clear()
            else:
                self.__state_is_on[_state].clear()
                self.__state_is_off[_state].set()
            
    def wait(self, state, target_value=True):
        if target_value: return self.__state_is_on[state].wait()
        else: return self.__state_is_off[state].wait()
        
    async def __event_func(self, state, event):
        await state.wait()
        event.set()
      
    def get_state_event(self, state, target_value=True):
        if target_value: return ReadonlyEvent(self.__state_is_on[state])
        else: return ReadonlyEvent(self.__state_is_off[state])
        
    def get_state_toggle_event(self, state, target_value=True):
        event = asyncio.Event()
        if target_value: create_task_with_exception_handler(self.__event_func(self.__state_is_on[state], event))
        else: create_task_with_exception_handler(self.__event_func(self.__state_is_off[state], event))
        return event
    
    def __str__(self):
        return f"State {self.name}: {self._current_state}"
    
    def __repr__(self):
        return self.__str__()

In [None]:
#|hide
show_doc(fbdev.utils.StateView)

---

### StateView

>      StateView (state_handler)

*Initialize self.  See help(type(self)) for accurate signature.*

In [None]:
#|export
class StateView:
    def __init__(self, state_handler):
        self._state_handler: StateHandler = state_handler
        
    def check(self, state):
        return self._state_handler.check(state)
    
    def get(self):
        return self._state_handler._current_state
            
    def wait(self, state, state_value=True):
        return self._state_handler.wait(state, state_value)
      
    def get_state_event(self, state, state_value=True):
        return self._state_handler.get_state_event(state, state_value)
        
    def get_state_toggle_event(self, state, state_value=True):
        return self._state_handler.get_state_toggle_event(state, state_value)
    
    def __str__(self):
        return str(self._state_handler)
    
    def __repr__(self):
        return self.__str__()

In [None]:
#|hide
show_doc(fbdev.utils.StateCollection)

---

### StateCollection

>      StateCollection ()

*Initialize self.  See help(type(self)) for accurate signature.*

In [None]:
#|export
class StateCollection(AttrContainer):
    def __init__(self) -> None:
        super().__init__({}, obj_name="StateCollection")
    
    def _add_state(self, state_handler: StateHandler, readonly=False):
        self._set(f"_{state_handler.name}", state_handler)
        if readonly:
            self._set(f"{state_handler.name}", StateView(state_handler))
        else:
            self._set(state_handler.name, state_handler)