diff --git a/src/zeroconf/_cache.pxd b/src/zeroconf/_cache.pxd index 84107957..af27a1d5 100644 --- a/src/zeroconf/_cache.pxd +++ b/src/zeroconf/_cache.pxd @@ -26,23 +26,23 @@ cdef class DNSCache: cpdef bint async_add_records(self, object entries) - cpdef async_remove_records(self, object entries) + cpdef void async_remove_records(self, object entries) @cython.locals( store=cython.dict, ) - cpdef async_get_unique(self, DNSRecord entry) + cpdef DNSRecord async_get_unique(self, DNSRecord entry) @cython.locals( record=DNSRecord, ) - cpdef async_expire(self, double now) + cpdef list async_expire(self, double now) @cython.locals( records=cython.dict, record=DNSRecord, ) - cpdef async_all_by_details(self, str name, object type_, object class_) + cpdef list async_all_by_details(self, str name, object type_, object class_) cpdef cython.dict async_entries_with_name(self, str name) @@ -51,7 +51,7 @@ cdef class DNSCache: @cython.locals( cached_entry=DNSRecord, ) - cpdef get_by_details(self, str name, object type_, object class_) + cpdef DNSRecord get_by_details(self, str name, object type_, object class_) @cython.locals( records=cython.dict, @@ -62,12 +62,12 @@ cdef class DNSCache: @cython.locals( store=cython.dict, ) - cdef _async_add(self, DNSRecord record) + cdef bint _async_add(self, DNSRecord record) - cdef _async_remove(self, DNSRecord record) + cdef void _async_remove(self, DNSRecord record) @cython.locals( record=DNSRecord, created_double=double, ) - cpdef async_mark_unique_records_older_than_1s_to_expire(self, cython.set unique_types, object answers, double now) + cpdef void async_mark_unique_records_older_than_1s_to_expire(self, cython.set unique_types, object answers, double now) diff --git a/src/zeroconf/_core.py b/src/zeroconf/_core.py index 156e0b1a..4b29717a 100644 --- a/src/zeroconf/_core.py +++ b/src/zeroconf/_core.py @@ -182,6 +182,10 @@ def __init__( self.registry = ServiceRegistry() self.cache = DNSCache() self.question_history = QuestionHistory() + + self.out_queue = MulticastOutgoingQueue(self, 0, _AGGREGATION_DELAY) + self.out_delay_queue = MulticastOutgoingQueue(self, _ONE_SECOND, _PROTECTED_AGGREGATION_DELAY) + self.query_handler = QueryHandler(self) self.record_manager = RecordManager(self) @@ -189,9 +193,6 @@ def __init__( self.loop: Optional[asyncio.AbstractEventLoop] = None self._loop_thread: Optional[threading.Thread] = None - self.out_queue = MulticastOutgoingQueue(self, 0, _AGGREGATION_DELAY) - self.out_delay_queue = MulticastOutgoingQueue(self, _ONE_SECOND, _PROTECTED_AGGREGATION_DELAY) - self.start() @property diff --git a/src/zeroconf/_handlers/answers.pxd b/src/zeroconf/_handlers/answers.pxd index 5a3010ad..25b3c1a1 100644 --- a/src/zeroconf/_handlers/answers.pxd +++ b/src/zeroconf/_handlers/answers.pxd @@ -7,10 +7,10 @@ from .._protocol.outgoing cimport DNSOutgoing cdef class QuestionAnswers: - cdef public object ucast - cdef public object mcast_now - cdef public object mcast_aggregate - cdef public object mcast_aggregate_last_second + cdef public dict ucast + cdef public dict mcast_now + cdef public dict mcast_aggregate + cdef public dict mcast_aggregate_last_second cdef class AnswerGroup: @@ -25,11 +25,11 @@ cdef class AnswerGroup: cdef object _FLAGS_QR_RESPONSE_AA cdef object NAME_GETTER -cpdef construct_outgoing_multicast_answers(cython.dict answers) +cpdef DNSOutgoing construct_outgoing_multicast_answers(cython.dict answers) -cpdef construct_outgoing_unicast_answers( +cpdef DNSOutgoing construct_outgoing_unicast_answers( cython.dict answers, bint ucast_source, cython.list questions, object id_ ) @cython.locals(answer=DNSRecord, additionals=cython.set, additional=DNSRecord) -cdef _add_answers_additionals(DNSOutgoing out, cython.dict answers) +cdef void _add_answers_additionals(DNSOutgoing out, cython.dict answers) diff --git a/src/zeroconf/_handlers/multicast_outgoing_queue.pxd b/src/zeroconf/_handlers/multicast_outgoing_queue.pxd index 1a8d6741..88cfdaa0 100644 --- a/src/zeroconf/_handlers/multicast_outgoing_queue.pxd +++ b/src/zeroconf/_handlers/multicast_outgoing_queue.pxd @@ -19,9 +19,9 @@ cdef class MulticastOutgoingQueue: cdef object _aggregation_delay @cython.locals(last_group=AnswerGroup, random_int=cython.uint) - cpdef async_add(self, double now, cython.dict answers) + cpdef void async_add(self, double now, cython.dict answers) @cython.locals(pending=AnswerGroup) - cdef _remove_answers_from_queue(self, cython.dict answers) + cdef void _remove_answers_from_queue(self, cython.dict answers) - cpdef async_ready(self) + cpdef void async_ready(self) diff --git a/src/zeroconf/_handlers/query_handler.pxd b/src/zeroconf/_handlers/query_handler.pxd index bb7198be..89a1f2b2 100644 --- a/src/zeroconf/_handlers/query_handler.pxd +++ b/src/zeroconf/_handlers/query_handler.pxd @@ -53,12 +53,12 @@ cdef class _QueryResponse: cdef cython.set _mcast_aggregate_last_second @cython.locals(record=DNSRecord) - cdef add_qu_question_response(self, cython.dict answers) + cdef void add_qu_question_response(self, cython.dict answers) - cdef add_ucast_question_response(self, cython.dict answers) + cdef void add_ucast_question_response(self, cython.dict answers) @cython.locals(answer=DNSRecord, question=DNSQuestion) - cdef add_mcast_question_response(self, cython.dict answers) + cdef void add_mcast_question_response(self, cython.dict answers) @cython.locals(maybe_entry=DNSRecord) cdef bint _has_mcast_within_one_quarter_ttl(self, DNSRecord record) @@ -74,15 +74,17 @@ cdef class QueryHandler: cdef ServiceRegistry registry cdef DNSCache cache cdef QuestionHistory question_history + cdef MulticastOutgoingQueue out_queue + cdef MulticastOutgoingQueue out_delay_queue @cython.locals(service=ServiceInfo) - cdef _add_service_type_enumeration_query_answers(self, list types, cython.dict answer_set, DNSRRSet known_answers) + cdef void _add_service_type_enumeration_query_answers(self, list types, cython.dict answer_set, DNSRRSet known_answers) @cython.locals(service=ServiceInfo) - cdef _add_pointer_answers(self, list services, cython.dict answer_set, DNSRRSet known_answers) + cdef void _add_pointer_answers(self, list services, cython.dict answer_set, DNSRRSet known_answers) @cython.locals(service=ServiceInfo, dns_address=DNSAddress) - cdef _add_address_answers(self, list services, cython.dict answer_set, DNSRRSet known_answers, cython.uint type_) + cdef void _add_address_answers(self, list services, cython.dict answer_set, DNSRRSet known_answers, cython.uint type_) @cython.locals(question_lower_name=str, type_=cython.uint, service=ServiceInfo) cdef cython.dict _answer_question(self, DNSQuestion question, unsigned int strategy_type, list types, list services, DNSRRSet known_answers) @@ -102,13 +104,11 @@ cdef class QueryHandler: cpdef QuestionAnswers async_response(self, cython.list msgs, cython.bint unicast_source) @cython.locals(name=str, question_lower_name=str) - cdef _get_answer_strategies(self, DNSQuestion question) + cdef list _get_answer_strategies(self, DNSQuestion question) @cython.locals( first_packet=DNSIncoming, ucast_source=bint, - out_queue=MulticastOutgoingQueue, - out_delay_queue=MulticastOutgoingQueue ) cpdef void handle_assembled_query( self, diff --git a/src/zeroconf/_handlers/query_handler.py b/src/zeroconf/_handlers/query_handler.py index 8349b584..ba9c9e31 100644 --- a/src/zeroconf/_handlers/query_handler.py +++ b/src/zeroconf/_handlers/query_handler.py @@ -191,7 +191,7 @@ def _has_mcast_record_in_last_second(self, record: DNSRecord) -> bool: class QueryHandler: """Query the ServiceRegistry.""" - __slots__ = ("zc", "registry", "cache", "question_history") + __slots__ = ("zc", "registry", "cache", "question_history", "out_queue", "out_delay_queue") def __init__(self, zc: 'Zeroconf') -> None: """Init the query handler.""" @@ -199,6 +199,8 @@ def __init__(self, zc: 'Zeroconf') -> None: self.registry = zc.registry self.cache = zc.cache self.question_history = zc.question_history + self.out_queue = zc.out_queue + self.out_delay_queue = zc.out_delay_queue def _add_service_type_enumeration_query_answers( self, types: List[str], answer_set: _AnswerWithAdditionalsType, known_answers: DNSRRSet @@ -301,7 +303,7 @@ def async_response( # pylint: disable=unused-argument """ strategies: List[_AnswerStrategy] = [] for msg in msgs: - for question in msg.questions: + for question in msg._questions: strategies.extend(self._get_answer_strategies(question)) if not strategies: @@ -311,7 +313,8 @@ def async_response( # pylint: disable=unused-argument return None is_probe = False - questions = msg.questions + msg = msgs[0] + questions = msg._questions # Only decode known answers if we are not a probe and we have # at least one answer strategy answers: List[DNSRecord] = [] @@ -321,7 +324,6 @@ def async_response( # pylint: disable=unused-argument else: answers.extend(msg.answers()) - msg = msgs[0] query_res = _QueryResponse(self.cache, questions, is_probe, msg.now) known_answers = DNSRRSet(answers) known_answers_set: Optional[Set[DNSRecord]] = None @@ -412,13 +414,12 @@ def handle_assembled_query( packet will be in packets. """ first_packet = packets[0] - now = first_packet.now ucast_source = port != _MDNS_PORT question_answers = self.async_response(packets, ucast_source) - if not question_answers: + if question_answers is None: return if question_answers.ucast: - questions = first_packet.questions + questions = first_packet._questions id_ = first_packet.id out = construct_outgoing_unicast_answers(question_answers.ucast, ucast_source, questions, id_) # When sending unicast, only send back the reply @@ -428,11 +429,9 @@ def handle_assembled_query( if question_answers.mcast_now: self.zc.async_send(construct_outgoing_multicast_answers(question_answers.mcast_now)) if question_answers.mcast_aggregate: - out_queue = self.zc.out_queue - out_queue.async_add(now, question_answers.mcast_aggregate) + self.out_queue.async_add(first_packet.now, question_answers.mcast_aggregate) if question_answers.mcast_aggregate_last_second: # https://datatracker.ietf.org/doc/html/rfc6762#section-14 # If we broadcast it in the last second, we have to delay # at least a second before we send it again - out_delay_queue = self.zc.out_delay_queue - out_delay_queue.async_add(now, question_answers.mcast_aggregate_last_second) + self.out_delay_queue.async_add(first_packet.now, question_answers.mcast_aggregate_last_second) diff --git a/src/zeroconf/_protocol/incoming.pxd b/src/zeroconf/_protocol/incoming.pxd index a8c0dbdb..bb438303 100644 --- a/src/zeroconf/_protocol/incoming.pxd +++ b/src/zeroconf/_protocol/incoming.pxd @@ -70,7 +70,7 @@ cdef class DNSIncoming: cpdef bint is_probe(self) - cpdef answers(self) + cpdef list answers(self) cpdef bint is_response(self) @@ -86,16 +86,16 @@ cdef class DNSIncoming: cdef unsigned int _decode_labels_at_offset(self, unsigned int off, cython.list labels, cython.set seen_pointers) @cython.locals(offset="unsigned int") - cdef _read_header(self) + cdef void _read_header(self) - cdef _initial_parse(self) + cdef void _initial_parse(self) @cython.locals( end="unsigned int", length="unsigned int", offset="unsigned int" ) - cdef _read_others(self) + cdef void _read_others(self) @cython.locals(offset="unsigned int") cdef _read_questions(self) @@ -123,6 +123,6 @@ cdef class DNSIncoming: i="unsigned int", bitmap_length="unsigned int", ) - cdef _read_bitmap(self, unsigned int end) + cdef list _read_bitmap(self, unsigned int end) - cdef _read_name(self) + cdef str _read_name(self)