diff --git a/labthings/core/event.py b/labthings/core/event.py index a6fcc23b..778360a9 100644 --- a/labthings/core/event.py +++ b/labthings/core/event.py @@ -4,6 +4,7 @@ import logging from gevent.monkey import get_original +from gevent.lock import Semaphore # Guarantee that Task threads will always be proper system threads, regardless of Gevent patches Event = get_original("threading", "Event") @@ -19,6 +20,7 @@ class ClientEvent(object): def __init__(self): self.events = {} + self._setting_lock = Semaphore(value=1) def wait(self, timeout: int = 5): """Wait for the next data frame (invoked from each client's thread).""" @@ -40,23 +42,24 @@ def wait(self, timeout: int = 5): def set(self, timeout=5): """Signal that a new frame is available.""" - now = time.time() - remove = None - for ident, event in self.events.items(): - if not event[0].is_set(): - # if this client's event is not set, then set it - # also update the last set timestamp to now - event[0].set() - event[1] = now - else: - # if the client's event is already set, it means the client - # did not process a previous frame - # if the event stays set for more than `timeout` seconds, then assume - # the client is gone and remove it - if now - event[1] >= timeout: - remove = ident - if remove: - del self.events[remove] + with self._setting_lock: + now = time.time() + remove_keys = set() + for ident, event in self.events.items(): + if not event[0].is_set(): + # if this client's event is not set, then set it + # also update the last set timestamp to now + event[0].set() + event[1] = now + else: + # if the client's event is already set, it means the client + # did not process a previous frame + # if the event stays set for more than `timeout` seconds, then + # assume the client is gone and remove it + if now - event[1] >= timeout: + remove_keys.add(ident) + if remove_keys: + del self.events[ident] def clear(self): """Clear frame event, once processed.""" diff --git a/labthings/core/tasks/thread.py b/labthings/core/tasks/thread.py index 8b9b3ea8..ee1d5487 100644 --- a/labthings/core/tasks/thread.py +++ b/labthings/core/tasks/thread.py @@ -36,6 +36,13 @@ def __init__(self, target, *args, **kwargs): # Nice string representation of target function self.target_string = f"{self._target}(args={self._args}, kwargs={self._kwargs})" + # copy_current_request_context allows threads to access flask current_app + if has_request_context(): + logging.debug(f"Copying request context to {self._target}") + self._target = copy_current_request_context(self._target) + else: + logging.debug("No request context to copy") + # Private state properties self._status: str = "idle" # Task status self._return_value = None # Return value @@ -79,12 +86,7 @@ def update_data(self, data: dict): self.data.update(data) def _run(self): # pylint: disable=E0202 - # copy_current_request_context allows threads to access flask current_app - if has_request_context(): - target = copy_current_request_context(self._target) - else: - target = self._target - return self._thread_proc(target)(*self._args, **self._kwargs) + return self._thread_proc(self._target)(*self._args, **self._kwargs) def _thread_proc(self, f): """