Skip to content

Commit

Permalink
feat: speed up processing records in the ServiceBrowser (#1143)
Browse files Browse the repository at this point in the history
  • Loading branch information
bdraco committed Apr 2, 2023
1 parent 68871c3 commit 6a327d0
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 8 deletions.
21 changes: 13 additions & 8 deletions src/zeroconf/_services/browser.py
Expand Up @@ -25,6 +25,7 @@
import random
import threading
import warnings
from abc import abstractmethod
from collections import OrderedDict
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -408,21 +409,14 @@ def async_update_records(self, zc: 'Zeroconf', now: float, records: List[RecordU
for record in records:
self._async_process_record_update(now, record[0], record[1])

@abstractmethod
def async_update_records_complete(self) -> None:
"""Called when a record update has completed for all handlers.
At this point the cache will have the new records.
This method will be run in the event loop.
"""
while self._pending_handlers:
event = self._pending_handlers.popitem(False)
# If there is a queue running (ServiceBrowser)
# get fired in dedicated thread
if self.queue:
self.queue.put(event)
else:
self._fire_service_state_changed_event(event)

def _fire_service_state_changed_event(self, event: Tuple[Tuple[str, str], ServiceStateChange]) -> None:
"""Fire a service state changed event.
Expand Down Expand Up @@ -553,3 +547,14 @@ def run(self) -> None:
if event is None:
return
self._fire_service_state_changed_event(event)

def async_update_records_complete(self) -> None:
"""Called when a record update has completed for all handlers.
At this point the cache will have the new records.
This method will be run in the event loop.
"""
assert self.queue is not None
while self._pending_handlers:
self.queue.put(self._pending_handlers.popitem(False))
10 changes: 10 additions & 0 deletions src/zeroconf/asyncio.py
Expand Up @@ -82,6 +82,16 @@ async def async_cancel(self) -> None:
"""Cancel the browser."""
self._async_cancel()

def async_update_records_complete(self) -> None:
"""Called when a record update has completed for all handlers.
At this point the cache will have the new records.
This method will be run in the event loop.
"""
while self._pending_handlers:
self._fire_service_state_changed_event(self._pending_handlers.popitem(False))


class AsyncZeroconfServiceTypes(ZeroconfServiceTypes):
"""An async version of ZeroconfServiceTypes."""
Expand Down

0 comments on commit 6a327d0

Please sign in to comment.