Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: speed up outgoing multicast queue #1277

Merged
merged 2 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions build_ext.py
Expand Up @@ -31,6 +31,7 @@ def build(setup_kwargs: Any) -> None:
"src/zeroconf/_protocol/outgoing.py",
"src/zeroconf/_handlers/answers.py",
"src/zeroconf/_handlers/record_manager.py",
"src/zeroconf/_handlers/multicast_outgoing_queue.py",
"src/zeroconf/_handlers/query_handler.py",
"src/zeroconf/_services/browser.py",
"src/zeroconf/_services/info.py",
Expand Down
6 changes: 3 additions & 3 deletions src/zeroconf/_handlers/answers.pxd
Expand Up @@ -15,9 +15,9 @@ cdef class QuestionAnswers:

cdef class AnswerGroup:

cdef public object send_after
cdef public object send_before
cdef public object answers
cdef public float send_after
cdef public float send_before
cdef public cython.dict answers



Expand Down
25 changes: 25 additions & 0 deletions src/zeroconf/_handlers/multicast_outgoing_queue.pxd
@@ -0,0 +1,25 @@

import cython

from .._utils.time cimport current_time_millis, millis_to_seconds
from .answers cimport AnswerGroup, construct_outgoing_multicast_answers


cdef object TYPE_CHECKING
cdef tuple MULTICAST_DELAY_RANDOM_INTERVAL
cdef object RAND_INT

cdef class MulticastOutgoingQueue:

cdef object zc
cdef object queue
cdef cython.uint additional_delay
cdef cython.uint aggregation_delay

@cython.locals(last_group=AnswerGroup, random_int=cython.uint, random_delay=float, send_after=float, send_before=float)
cpdef async_add(self, float now, cython.dict answers)

@cython.locals(pending=AnswerGroup)
cdef _remove_answers_from_queue(self, cython.dict answers)

cpdef async_ready(self)
26 changes: 18 additions & 8 deletions src/zeroconf/_handlers/multicast_outgoing_queue.py
Expand Up @@ -32,9 +32,13 @@
construct_outgoing_multicast_answers,
)

RAND_INT = random.randint

if TYPE_CHECKING:
from .._core import Zeroconf

_float = float


class MulticastOutgoingQueue:
"""An outgoing queue used to aggregate multicast responses."""
Expand All @@ -50,10 +54,13 @@ def __init__(self, zeroconf: 'Zeroconf', additional_delay: int, max_aggregation_
self.additional_delay = additional_delay
self.aggregation_delay = max_aggregation_delay

def async_add(self, now: float, answers: _AnswerWithAdditionalsType) -> None:
def async_add(self, now: _float, answers: _AnswerWithAdditionalsType) -> None:
"""Add a group of answers with additionals to the outgoing queue."""
assert self.zc.loop is not None
random_delay = random.randint(*MULTICAST_DELAY_RANDOM_INTERVAL) + self.additional_delay
loop = self.zc.loop
if TYPE_CHECKING:
assert loop is not None
random_int = RAND_INT(*MULTICAST_DELAY_RANDOM_INTERVAL)
random_delay = random_int + self.additional_delay
send_after = now + random_delay
send_before = now + self.aggregation_delay + self.additional_delay
if len(self.queue):
Expand All @@ -66,7 +73,7 @@ def async_add(self, now: float, answers: _AnswerWithAdditionalsType) -> None:
last_group.answers.update(answers)
return
else:
self.zc.loop.call_later(millis_to_seconds(random_delay), self.async_ready)
loop.call_at(loop.time() + millis_to_seconds(random_delay), self.async_ready)
self.queue.append(AnswerGroup(send_after, send_before, answers))

def _remove_answers_from_queue(self, answers: _AnswerWithAdditionalsType) -> None:
Expand All @@ -77,13 +84,16 @@ def _remove_answers_from_queue(self, answers: _AnswerWithAdditionalsType) -> Non

def async_ready(self) -> None:
"""Process anything in the queue that is ready."""
assert self.zc.loop is not None
zc = self.zc
loop = zc.loop
if TYPE_CHECKING:
assert loop is not None
now = current_time_millis()

if len(self.queue) > 1 and self.queue[0].send_before > now:
# There is more than one answer in the queue,
# delay until we have to send it (first answer group reaches send_before)
self.zc.loop.call_later(millis_to_seconds(self.queue[0].send_before - now), self.async_ready)
loop.call_at(loop.time() + millis_to_seconds(self.queue[0].send_before - now), self.async_ready)
return

answers: _AnswerWithAdditionalsType = {}
Expand All @@ -94,9 +104,9 @@ def async_ready(self) -> None:
if len(self.queue):
# If there are still groups in the queue that are not ready to send
# be sure we schedule them to go out later
self.zc.loop.call_later(millis_to_seconds(self.queue[0].send_after - now), self.async_ready)
loop.call_at(loop.time() + millis_to_seconds(self.queue[0].send_after - now), self.async_ready)

if answers:
# If we have the same answer scheduled to go out, remove them
self._remove_answers_from_queue(answers)
self.zc.async_send(construct_outgoing_multicast_answers(answers))
zc.async_send(construct_outgoing_multicast_answers(answers))