Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace repeated polling in Outbox.loop() with an asyncio event (see #2482) #2867

Merged
merged 14 commits into from
Apr 16, 2024
26 changes: 21 additions & 5 deletions nicegui/outbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import TYPE_CHECKING, Any, Deque, Dict, Optional, Tuple

from . import background_tasks, core
from .logging import log

if TYPE_CHECKING:
from .client import Client
Expand All @@ -23,35 +24,50 @@ def __init__(self, client: Client) -> None:
self.updates: Dict[ElementId, Optional[Element]] = {}
self.messages: Deque[Message] = deque()
self._should_stop = False
self._enqueue_event = None
falkoschindler marked this conversation as resolved.
Show resolved Hide resolved
if core.app.is_started:
background_tasks.create(self.loop(), name=f'outbox loop {client.id}')
else:
core.app.on_startup(self.loop)

def _set_enqueue_event(self):
"""Set the enqueue event while accounting for lazy initialization."""
if self._enqueue_event:
self._enqueue_event.set()

def enqueue_update(self, element: Element) -> None:
"""Enqueue an update for the given element."""
self.updates[element.id] = element
self._set_enqueue_event()

def enqueue_delete(self, element: Element) -> None:
"""Enqueue a deletion for the given element."""
self.updates[element.id] = None
self._set_enqueue_event()

def enqueue_message(self, message_type: MessageType, data: Any, target_id: ClientId) -> None:
"""Enqueue a message for the given client."""
self.messages.append((target_id, message_type, data))
self._set_enqueue_event()

async def loop(self) -> None:
"""Send updates and messages to all clients in an endless loop."""
self._enqueue_event = asyncio.Event()
self._enqueue_event.set()

while not self._should_stop:
try:
await asyncio.sleep(0.01)

if not self.updates and not self.messages:
continue
await self._enqueue_event.wait()
await asyncio.sleep(0.005)
falkoschindler marked this conversation as resolved.
Show resolved Hide resolved

if not self.client.has_socket_connection:
continue
try:
await self.client.connected(timeout=60)
except TimeoutError:
log.error('Outbox.loop() is exiting because client is not connected after 60 seconds')
return

self._enqueue_event.clear()
coros = []
data = {
element_id: None if element is None else element._to_dict() # pylint: disable=protected-access
Expand Down