Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 20 additions & 17 deletions labthings/core/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging

from gevent.monkey import get_original
from gevent.lock import Semaphore

# Guarantee that Task threads will always be proper system threads, regardless of Gevent patches
Event = get_original("threading", "Event")
Expand All @@ -19,6 +20,7 @@ class ClientEvent(object):

def __init__(self):
self.events = {}
self._setting_lock = Semaphore(value=1)

def wait(self, timeout: int = 5):
"""Wait for the next data frame (invoked from each client's thread)."""
Expand All @@ -40,23 +42,24 @@ def wait(self, timeout: int = 5):

def set(self, timeout=5):
"""Signal that a new frame is available."""
now = time.time()
remove = None
for ident, event in self.events.items():
if not event[0].is_set():
# if this client's event is not set, then set it
# also update the last set timestamp to now
event[0].set()
event[1] = now
else:
# if the client's event is already set, it means the client
# did not process a previous frame
# if the event stays set for more than `timeout` seconds, then assume
# the client is gone and remove it
if now - event[1] >= timeout:
remove = ident
if remove:
del self.events[remove]
with self._setting_lock:
now = time.time()
remove_keys = set()
for ident, event in self.events.items():
if not event[0].is_set():
# if this client's event is not set, then set it
# also update the last set timestamp to now
event[0].set()
event[1] = now
else:
# if the client's event is already set, it means the client
# did not process a previous frame
# if the event stays set for more than `timeout` seconds, then
# assume the client is gone and remove it
if now - event[1] >= timeout:
remove_keys.add(ident)
if remove_keys:
del self.events[ident]

def clear(self):
"""Clear frame event, once processed."""
Expand Down
14 changes: 8 additions & 6 deletions labthings/core/tasks/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ def __init__(self, target, *args, **kwargs):
# Nice string representation of target function
self.target_string = f"{self._target}(args={self._args}, kwargs={self._kwargs})"

# copy_current_request_context allows threads to access flask current_app
if has_request_context():
logging.debug(f"Copying request context to {self._target}")
self._target = copy_current_request_context(self._target)
else:
logging.debug("No request context to copy")

# Private state properties
self._status: str = "idle" # Task status
self._return_value = None # Return value
Expand Down Expand Up @@ -79,12 +86,7 @@ def update_data(self, data: dict):
self.data.update(data)

def _run(self): # pylint: disable=E0202
# copy_current_request_context allows threads to access flask current_app
if has_request_context():
target = copy_current_request_context(self._target)
else:
target = self._target
return self._thread_proc(target)(*self._args, **self._kwargs)
return self._thread_proc(self._target)(*self._args, **self._kwargs)

def _thread_proc(self, f):
"""
Expand Down