Skip to content

Commit

Permalink
feat: speed up processing incoming packets (#1352)
Browse files Browse the repository at this point in the history
  • Loading branch information
bdraco committed Dec 19, 2023
1 parent a014c7c commit 6c15325
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 47 deletions.
16 changes: 8 additions & 8 deletions src/zeroconf/_cache.pxd
Expand Up @@ -26,23 +26,23 @@ cdef class DNSCache:

cpdef bint async_add_records(self, object entries)

cpdef async_remove_records(self, object entries)
cpdef void async_remove_records(self, object entries)

@cython.locals(
store=cython.dict,
)
cpdef async_get_unique(self, DNSRecord entry)
cpdef DNSRecord async_get_unique(self, DNSRecord entry)

@cython.locals(
record=DNSRecord,
)
cpdef async_expire(self, double now)
cpdef list async_expire(self, double now)

@cython.locals(
records=cython.dict,
record=DNSRecord,
)
cpdef async_all_by_details(self, str name, object type_, object class_)
cpdef list async_all_by_details(self, str name, object type_, object class_)

cpdef cython.dict async_entries_with_name(self, str name)

Expand All @@ -51,7 +51,7 @@ cdef class DNSCache:
@cython.locals(
cached_entry=DNSRecord,
)
cpdef get_by_details(self, str name, object type_, object class_)
cpdef DNSRecord get_by_details(self, str name, object type_, object class_)

@cython.locals(
records=cython.dict,
Expand All @@ -62,12 +62,12 @@ cdef class DNSCache:
@cython.locals(
store=cython.dict,
)
cdef _async_add(self, DNSRecord record)
cdef bint _async_add(self, DNSRecord record)

cdef _async_remove(self, DNSRecord record)
cdef void _async_remove(self, DNSRecord record)

@cython.locals(
record=DNSRecord,
created_double=double,
)
cpdef async_mark_unique_records_older_than_1s_to_expire(self, cython.set unique_types, object answers, double now)
cpdef void async_mark_unique_records_older_than_1s_to_expire(self, cython.set unique_types, object answers, double now)
7 changes: 4 additions & 3 deletions src/zeroconf/_core.py
Expand Up @@ -182,16 +182,17 @@ def __init__(
self.registry = ServiceRegistry()
self.cache = DNSCache()
self.question_history = QuestionHistory()

self.out_queue = MulticastOutgoingQueue(self, 0, _AGGREGATION_DELAY)
self.out_delay_queue = MulticastOutgoingQueue(self, _ONE_SECOND, _PROTECTED_AGGREGATION_DELAY)

self.query_handler = QueryHandler(self)
self.record_manager = RecordManager(self)

self._notify_futures: Set[asyncio.Future] = set()
self.loop: Optional[asyncio.AbstractEventLoop] = None
self._loop_thread: Optional[threading.Thread] = None

self.out_queue = MulticastOutgoingQueue(self, 0, _AGGREGATION_DELAY)
self.out_delay_queue = MulticastOutgoingQueue(self, _ONE_SECOND, _PROTECTED_AGGREGATION_DELAY)

self.start()

@property
Expand Down
14 changes: 7 additions & 7 deletions src/zeroconf/_handlers/answers.pxd
Expand Up @@ -7,10 +7,10 @@ from .._protocol.outgoing cimport DNSOutgoing

cdef class QuestionAnswers:

cdef public object ucast
cdef public object mcast_now
cdef public object mcast_aggregate
cdef public object mcast_aggregate_last_second
cdef public dict ucast
cdef public dict mcast_now
cdef public dict mcast_aggregate
cdef public dict mcast_aggregate_last_second


cdef class AnswerGroup:
Expand All @@ -25,11 +25,11 @@ cdef class AnswerGroup:
cdef object _FLAGS_QR_RESPONSE_AA
cdef object NAME_GETTER

cpdef construct_outgoing_multicast_answers(cython.dict answers)
cpdef DNSOutgoing construct_outgoing_multicast_answers(cython.dict answers)

cpdef construct_outgoing_unicast_answers(
cpdef DNSOutgoing construct_outgoing_unicast_answers(
cython.dict answers, bint ucast_source, cython.list questions, object id_
)

@cython.locals(answer=DNSRecord, additionals=cython.set, additional=DNSRecord)
cdef _add_answers_additionals(DNSOutgoing out, cython.dict answers)
cdef void _add_answers_additionals(DNSOutgoing out, cython.dict answers)
6 changes: 3 additions & 3 deletions src/zeroconf/_handlers/multicast_outgoing_queue.pxd
Expand Up @@ -19,9 +19,9 @@ cdef class MulticastOutgoingQueue:
cdef object _aggregation_delay

@cython.locals(last_group=AnswerGroup, random_int=cython.uint)
cpdef async_add(self, double now, cython.dict answers)
cpdef void async_add(self, double now, cython.dict answers)

@cython.locals(pending=AnswerGroup)
cdef _remove_answers_from_queue(self, cython.dict answers)
cdef void _remove_answers_from_queue(self, cython.dict answers)

cpdef async_ready(self)
cpdef void async_ready(self)
18 changes: 9 additions & 9 deletions src/zeroconf/_handlers/query_handler.pxd
Expand Up @@ -53,12 +53,12 @@ cdef class _QueryResponse:
cdef cython.set _mcast_aggregate_last_second

@cython.locals(record=DNSRecord)
cdef add_qu_question_response(self, cython.dict answers)
cdef void add_qu_question_response(self, cython.dict answers)

cdef add_ucast_question_response(self, cython.dict answers)
cdef void add_ucast_question_response(self, cython.dict answers)

@cython.locals(answer=DNSRecord, question=DNSQuestion)
cdef add_mcast_question_response(self, cython.dict answers)
cdef void add_mcast_question_response(self, cython.dict answers)

@cython.locals(maybe_entry=DNSRecord)
cdef bint _has_mcast_within_one_quarter_ttl(self, DNSRecord record)
Expand All @@ -74,15 +74,17 @@ cdef class QueryHandler:
cdef ServiceRegistry registry
cdef DNSCache cache
cdef QuestionHistory question_history
cdef MulticastOutgoingQueue out_queue
cdef MulticastOutgoingQueue out_delay_queue

@cython.locals(service=ServiceInfo)
cdef _add_service_type_enumeration_query_answers(self, list types, cython.dict answer_set, DNSRRSet known_answers)
cdef void _add_service_type_enumeration_query_answers(self, list types, cython.dict answer_set, DNSRRSet known_answers)

@cython.locals(service=ServiceInfo)
cdef _add_pointer_answers(self, list services, cython.dict answer_set, DNSRRSet known_answers)
cdef void _add_pointer_answers(self, list services, cython.dict answer_set, DNSRRSet known_answers)

@cython.locals(service=ServiceInfo, dns_address=DNSAddress)
cdef _add_address_answers(self, list services, cython.dict answer_set, DNSRRSet known_answers, cython.uint type_)
cdef void _add_address_answers(self, list services, cython.dict answer_set, DNSRRSet known_answers, cython.uint type_)

@cython.locals(question_lower_name=str, type_=cython.uint, service=ServiceInfo)
cdef cython.dict _answer_question(self, DNSQuestion question, unsigned int strategy_type, list types, list services, DNSRRSet known_answers)
Expand All @@ -102,13 +104,11 @@ cdef class QueryHandler:
cpdef QuestionAnswers async_response(self, cython.list msgs, cython.bint unicast_source)

@cython.locals(name=str, question_lower_name=str)
cdef _get_answer_strategies(self, DNSQuestion question)
cdef list _get_answer_strategies(self, DNSQuestion question)

@cython.locals(
first_packet=DNSIncoming,
ucast_source=bint,
out_queue=MulticastOutgoingQueue,
out_delay_queue=MulticastOutgoingQueue
)
cpdef void handle_assembled_query(
self,
Expand Down
21 changes: 10 additions & 11 deletions src/zeroconf/_handlers/query_handler.py
Expand Up @@ -191,14 +191,16 @@ def _has_mcast_record_in_last_second(self, record: DNSRecord) -> bool:
class QueryHandler:
"""Query the ServiceRegistry."""

__slots__ = ("zc", "registry", "cache", "question_history")
__slots__ = ("zc", "registry", "cache", "question_history", "out_queue", "out_delay_queue")

def __init__(self, zc: 'Zeroconf') -> None:
"""Init the query handler."""
self.zc = zc
self.registry = zc.registry
self.cache = zc.cache
self.question_history = zc.question_history
self.out_queue = zc.out_queue
self.out_delay_queue = zc.out_delay_queue

def _add_service_type_enumeration_query_answers(
self, types: List[str], answer_set: _AnswerWithAdditionalsType, known_answers: DNSRRSet
Expand Down Expand Up @@ -301,7 +303,7 @@ def async_response( # pylint: disable=unused-argument
"""
strategies: List[_AnswerStrategy] = []
for msg in msgs:
for question in msg.questions:
for question in msg._questions:
strategies.extend(self._get_answer_strategies(question))

if not strategies:
Expand All @@ -311,7 +313,8 @@ def async_response( # pylint: disable=unused-argument
return None

is_probe = False
questions = msg.questions
msg = msgs[0]
questions = msg._questions
# Only decode known answers if we are not a probe and we have
# at least one answer strategy
answers: List[DNSRecord] = []
Expand All @@ -321,7 +324,6 @@ def async_response( # pylint: disable=unused-argument
else:
answers.extend(msg.answers())

msg = msgs[0]
query_res = _QueryResponse(self.cache, questions, is_probe, msg.now)
known_answers = DNSRRSet(answers)
known_answers_set: Optional[Set[DNSRecord]] = None
Expand Down Expand Up @@ -412,13 +414,12 @@ def handle_assembled_query(
packet will be in packets.
"""
first_packet = packets[0]
now = first_packet.now
ucast_source = port != _MDNS_PORT
question_answers = self.async_response(packets, ucast_source)
if not question_answers:
if question_answers is None:
return
if question_answers.ucast:
questions = first_packet.questions
questions = first_packet._questions
id_ = first_packet.id
out = construct_outgoing_unicast_answers(question_answers.ucast, ucast_source, questions, id_)
# When sending unicast, only send back the reply
Expand All @@ -428,11 +429,9 @@ def handle_assembled_query(
if question_answers.mcast_now:
self.zc.async_send(construct_outgoing_multicast_answers(question_answers.mcast_now))
if question_answers.mcast_aggregate:
out_queue = self.zc.out_queue
out_queue.async_add(now, question_answers.mcast_aggregate)
self.out_queue.async_add(first_packet.now, question_answers.mcast_aggregate)
if question_answers.mcast_aggregate_last_second:
# https://datatracker.ietf.org/doc/html/rfc6762#section-14
# If we broadcast it in the last second, we have to delay
# at least a second before we send it again
out_delay_queue = self.zc.out_delay_queue
out_delay_queue.async_add(now, question_answers.mcast_aggregate_last_second)
self.out_delay_queue.async_add(first_packet.now, question_answers.mcast_aggregate_last_second)
12 changes: 6 additions & 6 deletions src/zeroconf/_protocol/incoming.pxd
Expand Up @@ -70,7 +70,7 @@ cdef class DNSIncoming:

cpdef bint is_probe(self)

cpdef answers(self)
cpdef list answers(self)

cpdef bint is_response(self)

Expand All @@ -86,16 +86,16 @@ cdef class DNSIncoming:
cdef unsigned int _decode_labels_at_offset(self, unsigned int off, cython.list labels, cython.set seen_pointers)

@cython.locals(offset="unsigned int")
cdef _read_header(self)
cdef void _read_header(self)

cdef _initial_parse(self)
cdef void _initial_parse(self)

@cython.locals(
end="unsigned int",
length="unsigned int",
offset="unsigned int"
)
cdef _read_others(self)
cdef void _read_others(self)

@cython.locals(offset="unsigned int")
cdef _read_questions(self)
Expand Down Expand Up @@ -123,6 +123,6 @@ cdef class DNSIncoming:
i="unsigned int",
bitmap_length="unsigned int",
)
cdef _read_bitmap(self, unsigned int end)
cdef list _read_bitmap(self, unsigned int end)

cdef _read_name(self)
cdef str _read_name(self)

0 comments on commit 6c15325

Please sign in to comment.