Skip to content

Commit

Permalink
feat: reduce overhead to process incoming updates by avoiding the han…
Browse files Browse the repository at this point in the history
…dle_response shim (#1247)
  • Loading branch information
bdraco committed Sep 2, 2023
1 parent f26218d commit 5e31f0a
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 23 deletions.
1 change: 1 addition & 0 deletions src/zeroconf/_core.py
Expand Up @@ -564,6 +564,7 @@ def async_remove_listener(self, listener: RecordUpdateListener) -> None:
def handle_response(self, msg: DNSIncoming) -> None:
"""Deal with incoming response packets. All answers
are held in the cache, and listeners are notified."""
self.log_warning_once("handle_response is deprecated, use record_manager.async_updates_from_response")
self.record_manager.async_updates_from_response(msg)

def handle_assembled_query(
Expand Down
4 changes: 4 additions & 0 deletions src/zeroconf/_listener.pxd
@@ -1,6 +1,7 @@

import cython

from ._handlers.record_manager cimport RecordManager
from ._protocol.incoming cimport DNSIncoming
from ._utils.time cimport current_time_millis, millis_to_seconds

Expand All @@ -12,9 +13,12 @@ cdef object TYPE_CHECKING
cdef cython.uint _MAX_MSG_ABSOLUTE
cdef cython.uint _DUPLICATE_PACKET_SUPPRESSION_INTERVAL



cdef class AsyncListener:

cdef public object zc
cdef RecordManager _record_manager
cdef public cython.bytes data
cdef public cython.float last_time
cdef public DNSIncoming last_message
Expand Down
4 changes: 3 additions & 1 deletion src/zeroconf/_listener.py
Expand Up @@ -55,6 +55,7 @@ class AsyncListener:

__slots__ = (
'zc',
'_record_manager',
'data',
'last_time',
'last_message',
Expand All @@ -66,6 +67,7 @@ class AsyncListener:

def __init__(self, zc: 'Zeroconf') -> None:
self.zc = zc
self._record_manager = zc.record_manager
self.data: Optional[bytes] = None
self.last_time: float = 0
self.last_message: Optional[DNSIncoming] = None
Expand Down Expand Up @@ -156,7 +158,7 @@ def datagram_received(
return

if not msg.is_query():
self.zc.handle_response(msg)
self._record_manager.async_updates_from_response(msg)
return

self.handle_query_or_defer(msg, addr, port, self.transport, v6_flow_scope)
Expand Down
4 changes: 4 additions & 0 deletions src/zeroconf/_protocol/incoming.pxd
Expand Up @@ -70,6 +70,10 @@ cdef class DNSIncoming:
)
cpdef has_qu_question(self)

cpdef is_query(self)

cpdef is_response(self)

@cython.locals(
off=cython.uint,
label_idx=cython.uint,
Expand Down
2 changes: 1 addition & 1 deletion tests/__init__.py
Expand Up @@ -42,7 +42,7 @@ def _inject_responses(zc: Zeroconf, msgs: List[DNSIncoming]) -> None:

async def _wait_for_response():
for msg in msgs:
zc.handle_response(msg)
zc.record_manager.async_updates_from_response(msg)

asyncio.run_coroutine_threadsafe(_wait_for_response(), zc.loop).result()

Expand Down
30 changes: 15 additions & 15 deletions tests/services/test_info.py
Expand Up @@ -903,7 +903,7 @@ async def test_release_wait_when_new_recorded_added():
)
await aiozc.zeroconf.async_wait_for_start()
await asyncio.sleep(0)
aiozc.zeroconf.handle_response(r.DNSIncoming(generated.packets()[0]))
aiozc.zeroconf.record_manager.async_updates_from_response(r.DNSIncoming(generated.packets()[0]))
assert await asyncio.wait_for(task, timeout=2)
assert info.addresses == [b'\x7f\x00\x00\x01']
await aiozc.async_close()
Expand Down Expand Up @@ -966,7 +966,7 @@ async def test_port_changes_are_seen():
)
await aiozc.zeroconf.async_wait_for_start()
await asyncio.sleep(0)
aiozc.zeroconf.handle_response(r.DNSIncoming(generated.packets()[0]))
aiozc.zeroconf.record_manager.async_updates_from_response(r.DNSIncoming(generated.packets()[0]))

generated = r.DNSOutgoing(const._FLAGS_QR_RESPONSE)
generated.add_answer_at_time(
Expand All @@ -982,7 +982,7 @@ async def test_port_changes_are_seen():
),
0,
)
aiozc.zeroconf.handle_response(r.DNSIncoming(generated.packets()[0]))
aiozc.zeroconf.record_manager.async_updates_from_response(r.DNSIncoming(generated.packets()[0]))

info = ServiceInfo(type_, registration_name, 80, 10, 10, desc, host)
await info.async_request(aiozc.zeroconf, timeout=200)
Expand Down Expand Up @@ -1049,7 +1049,7 @@ async def test_port_changes_are_seen_with_directed_request():
)
await aiozc.zeroconf.async_wait_for_start()
await asyncio.sleep(0)
aiozc.zeroconf.handle_response(r.DNSIncoming(generated.packets()[0]))
aiozc.zeroconf.record_manager.async_updates_from_response(r.DNSIncoming(generated.packets()[0]))

generated = r.DNSOutgoing(const._FLAGS_QR_RESPONSE)
generated.add_answer_at_time(
Expand All @@ -1065,7 +1065,7 @@ async def test_port_changes_are_seen_with_directed_request():
),
0,
)
aiozc.zeroconf.handle_response(r.DNSIncoming(generated.packets()[0]))
aiozc.zeroconf.record_manager.async_updates_from_response(r.DNSIncoming(generated.packets()[0]))

info = ServiceInfo(type_, registration_name, 80, 10, 10, desc, host)
await info.async_request(aiozc.zeroconf, timeout=200, addr="127.0.0.1", port=5353)
Expand Down Expand Up @@ -1131,7 +1131,7 @@ async def test_ipv4_changes_are_seen():
)
await aiozc.zeroconf.async_wait_for_start()
await asyncio.sleep(0)
aiozc.zeroconf.handle_response(r.DNSIncoming(generated.packets()[0]))
aiozc.zeroconf.record_manager.async_updates_from_response(r.DNSIncoming(generated.packets()[0]))
info = ServiceInfo(type_, registration_name)
info.load_from_cache(aiozc.zeroconf)
assert info.addresses_by_version(IPVersion.V4Only) == [b'\x7f\x00\x00\x01']
Expand All @@ -1147,7 +1147,7 @@ async def test_ipv4_changes_are_seen():
),
0,
)
aiozc.zeroconf.handle_response(r.DNSIncoming(generated.packets()[0]))
aiozc.zeroconf.record_manager.async_updates_from_response(r.DNSIncoming(generated.packets()[0]))

info = ServiceInfo(type_, registration_name)
info.load_from_cache(aiozc.zeroconf)
Expand Down Expand Up @@ -1213,7 +1213,7 @@ async def test_ipv6_changes_are_seen():
)
await aiozc.zeroconf.async_wait_for_start()
await asyncio.sleep(0)
aiozc.zeroconf.handle_response(r.DNSIncoming(generated.packets()[0]))
aiozc.zeroconf.record_manager.async_updates_from_response(r.DNSIncoming(generated.packets()[0]))
info = ServiceInfo(type_, registration_name)
info.load_from_cache(aiozc.zeroconf)
assert info.addresses_by_version(IPVersion.V6Only) == [
Expand All @@ -1231,7 +1231,7 @@ async def test_ipv6_changes_are_seen():
),
0,
)
aiozc.zeroconf.handle_response(r.DNSIncoming(generated.packets()[0]))
aiozc.zeroconf.record_manager.async_updates_from_response(r.DNSIncoming(generated.packets()[0]))

info = ServiceInfo(type_, registration_name)
info.load_from_cache(aiozc.zeroconf)
Expand Down Expand Up @@ -1295,7 +1295,7 @@ async def test_bad_ip_addresses_ignored_in_cache():

await aiozc.zeroconf.async_wait_for_start()
await asyncio.sleep(0)
aiozc.zeroconf.handle_response(r.DNSIncoming(generated.packets()[0]))
aiozc.zeroconf.record_manager.async_updates_from_response(r.DNSIncoming(generated.packets()[0]))
info = ServiceInfo(type_, registration_name)
info.load_from_cache(aiozc.zeroconf)
assert info.addresses_by_version(IPVersion.V4Only) == [b'\x7f\x00\x00\x01']
Expand Down Expand Up @@ -1354,7 +1354,7 @@ async def test_service_name_change_as_seen_has_ip_in_cache():
)
await aiozc.zeroconf.async_wait_for_start()
await asyncio.sleep(0)
aiozc.zeroconf.handle_response(r.DNSIncoming(generated.packets()[0]))
aiozc.zeroconf.record_manager.async_updates_from_response(r.DNSIncoming(generated.packets()[0]))

info = ServiceInfo(type_, registration_name)
await info.async_request(aiozc.zeroconf, timeout=200)
Expand All @@ -1374,7 +1374,7 @@ async def test_service_name_change_as_seen_has_ip_in_cache():
),
0,
)
aiozc.zeroconf.handle_response(r.DNSIncoming(generated.packets()[0]))
aiozc.zeroconf.record_manager.async_updates_from_response(r.DNSIncoming(generated.packets()[0]))

info = ServiceInfo(type_, registration_name)
await info.async_request(aiozc.zeroconf, timeout=200)
Expand Down Expand Up @@ -1426,7 +1426,7 @@ async def test_service_name_change_as_seen_ip_not_in_cache():
)
await aiozc.zeroconf.async_wait_for_start()
await asyncio.sleep(0)
aiozc.zeroconf.handle_response(r.DNSIncoming(generated.packets()[0]))
aiozc.zeroconf.record_manager.async_updates_from_response(r.DNSIncoming(generated.packets()[0]))

info = ServiceInfo(type_, registration_name)
await info.async_request(aiozc.zeroconf, timeout=200)
Expand Down Expand Up @@ -1456,7 +1456,7 @@ async def test_service_name_change_as_seen_ip_not_in_cache():
),
0,
)
aiozc.zeroconf.handle_response(r.DNSIncoming(generated.packets()[0]))
aiozc.zeroconf.record_manager.async_updates_from_response(r.DNSIncoming(generated.packets()[0]))

info = ServiceInfo(type_, registration_name)
await info.async_request(aiozc.zeroconf, timeout=200)
Expand Down Expand Up @@ -1530,7 +1530,7 @@ async def test_release_wait_when_new_recorded_added_concurrency():
await asyncio.sleep(0)
for task in tasks:
assert not task.done()
aiozc.zeroconf.handle_response(r.DNSIncoming(generated.packets()[0]))
aiozc.zeroconf.record_manager.async_updates_from_response(r.DNSIncoming(generated.packets()[0]))
_, pending = await asyncio.wait(tasks, timeout=2)
assert not pending
assert info.addresses == [b'\x7f\x00\x00\x01']
Expand Down
1 change: 1 addition & 0 deletions tests/test_asyncio.py
Expand Up @@ -1192,6 +1192,7 @@ def update_service(self, zc, type_, name) -> None: # type: ignore[no-untyped-de
0,
)

zc.record_manager.async_updates_from_response(DNSIncoming(generated.packets()[0]))
zc.handle_response(DNSIncoming(generated.packets()[0]))

await browser.async_cancel()
Expand Down
2 changes: 1 addition & 1 deletion tests/test_core.py
Expand Up @@ -105,7 +105,7 @@ def test_launch_and_close_apple_p2p_on_mac(self):
rv = r.Zeroconf(apple_p2p=True)
rv.close()

def test_handle_response(self):
def test_async_updates_from_response(self):
def mock_incoming_msg(service_state_change: r.ServiceStateChange) -> r.DNSIncoming:
ttl = 120
generated = r.DNSOutgoing(const._FLAGS_QR_RESPONSE)
Expand Down
10 changes: 5 additions & 5 deletions tests/test_handlers.py
Expand Up @@ -1423,7 +1423,7 @@ async def test_response_aggregation_timings(run_isolated):
assert len(calls) == 1
outgoing = send_mock.call_args[0][0]
incoming = r.DNSIncoming(outgoing.packets()[0])
zc.handle_response(incoming)
zc.record_manager.async_updates_from_response(incoming)
assert info.dns_pointer() in incoming.answers
assert info2.dns_pointer() in incoming.answers
send_mock.reset_mock()
Expand All @@ -1437,7 +1437,7 @@ async def test_response_aggregation_timings(run_isolated):
assert len(calls) == 1
outgoing = send_mock.call_args[0][0]
incoming = r.DNSIncoming(outgoing.packets()[0])
zc.handle_response(incoming)
zc.record_manager.async_updates_from_response(incoming)
assert info3.dns_pointer() in incoming.answers
send_mock.reset_mock()

Expand Down Expand Up @@ -1499,7 +1499,7 @@ async def test_response_aggregation_timings_multiple(run_isolated, disable_dupli
assert len(calls) == 1
outgoing = send_mock.call_args[0][0]
incoming = r.DNSIncoming(outgoing.packets()[0])
zc.handle_response(incoming)
zc.record_manager.async_updates_from_response(incoming)
assert info2.dns_pointer() in incoming.answers

send_mock.reset_mock()
Expand All @@ -1509,7 +1509,7 @@ async def test_response_aggregation_timings_multiple(run_isolated, disable_dupli
assert len(calls) == 1
outgoing = send_mock.call_args[0][0]
incoming = r.DNSIncoming(outgoing.packets()[0])
zc.handle_response(incoming)
zc.record_manager.async_updates_from_response(incoming)
assert info2.dns_pointer() in incoming.answers

send_mock.reset_mock()
Expand All @@ -1532,7 +1532,7 @@ async def test_response_aggregation_timings_multiple(run_isolated, disable_dupli
assert len(calls) == 1
outgoing = send_mock.call_args[0][0]
incoming = r.DNSIncoming(outgoing.packets()[0])
zc.handle_response(incoming)
zc.record_manager.async_updates_from_response(incoming)
assert info2.dns_pointer() in incoming.answers


Expand Down

0 comments on commit 5e31f0a

Please sign in to comment.