Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Call UpdateService for SRV & A/AAAA records as well as TXT #239

Closed
wants to merge 44 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
43e1b24
Interim Commit
mattsaxon Apr 5, 2020
d314896
Call UpdateService on SRV & A/AAAA updates as well as TXT
mattsaxon Apr 6, 2020
f40649a
Tidy up tests and fix mypy issue
mattsaxon Apr 7, 2020
6a3deb7
Black reformat
mattsaxon Apr 7, 2020
4a1f7e5
adds waits to deal with possible pypy issues
mattsaxon Apr 8, 2020
2b258fd
Nasty fixes
mattsaxon Apr 8, 2020
064056b
Merge pull request #1 from jstasiak/master
mattsaxon Apr 10, 2020
cc78868
Update test.py
mattsaxon Apr 10, 2020
aa3e947
Update test.py
mattsaxon Apr 10, 2020
c1d5625
update tests
mattsaxon Apr 10, 2020
dc570c9
Update test.py
mattsaxon Apr 10, 2020
d8f3508
Add lock around handlers list
mattsaxon Apr 10, 2020
902548b
more locks and waits
mattsaxon Apr 10, 2020
f3d6bf2
Revert "more locks and waits"
mattsaxon Apr 10, 2020
53cd650
Revert "Add lock around handlers list"
mattsaxon Apr 10, 2020
700a183
Revert "Update test.py"
mattsaxon Apr 10, 2020
d2cfb86
Revert "update tests"
mattsaxon Apr 10, 2020
23aa41f
Revert "Update test.py"
mattsaxon Apr 10, 2020
2c16b31
Revert "Update test.py"
mattsaxon Apr 10, 2020
98bdbfd
return to a reasonable test place!
mattsaxon Apr 10, 2020
a3ab378
Update test.py
mattsaxon Apr 10, 2020
a42ee2c
Update test.py
mattsaxon Apr 10, 2020
5d69f91
Add lock
mattsaxon Apr 10, 2020
c2d4b0c
Tidy up code and add some comments
mattsaxon Apr 10, 2020
6831c2e
remove unstable IPv6 tests from travis
mattsaxon Apr 10, 2020
38c3ed9
Merge branch 'IPv6-travis-fix'
mattsaxon Apr 11, 2020
23e4af1
Reverse DNSCache order to ensure newest records take precedence
mattsaxon Apr 12, 2020
4bb5539
Update __init__.py
mattsaxon Apr 12, 2020
dc653d1
Update .gitignore
mattsaxon Apr 12, 2020
d086b82
remove linting errors
mattsaxon Apr 12, 2020
a6da721
linting
mattsaxon Apr 12, 2020
592c0e1
linting
mattsaxon Apr 12, 2020
982f75b
linting
mattsaxon Apr 12, 2020
4b48dd9
linting
mattsaxon Apr 12, 2020
56955ed
linting
mattsaxon Apr 12, 2020
abc4773
linting
mattsaxon Apr 13, 2020
73ae509
Strengthen type checking of handlers_to_call
mattsaxon Apr 13, 2020
f142584
code review comments (1)
mattsaxon Apr 19, 2020
aad875d
code review comments (2)
mattsaxon Apr 19, 2020
7430ca0
code review comments (3)
mattsaxon Apr 25, 2020
9fe89c6
linting
mattsaxon Apr 25, 2020
55a0026
linting
mattsaxon Apr 25, 2020
4b2c476
Delete launch.json
jstasiak Apr 25, 2020
4e6c84d
Delete settings.json
jstasiak Apr 25, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
132 changes: 85 additions & 47 deletions zeroconf/__init__.py
Expand Up @@ -35,6 +35,7 @@
import threading
import time
import warnings
from collections import OrderedDict
from typing import Dict, List, Optional, Sequence, Union, cast
from typing import Any, Callable, Set, Tuple # noqa # used in type hints

Expand Down Expand Up @@ -1121,8 +1122,9 @@ def __init__(self) -> None:

def add(self, entry: DNSRecord) -> None:
"""Adds an entry"""
# Insert first in list so get returns newest entry
self.cache.setdefault(entry.key, []).insert(0, entry)
# Insert last in list, get will return newest entry
# iteration will result in last update winning
self.cache.setdefault(entry.key, []).append(entry)
mattsaxon marked this conversation as resolved.
Show resolved Hide resolved

def remove(self, entry: DNSRecord) -> None:
"""Removes an entry"""
Expand All @@ -1142,7 +1144,7 @@ def get(self, entry: DNSEntry) -> Optional[DNSRecord]:
matching entry."""
try:
list_ = self.cache[entry.key]
for cached_entry in list_:
for cached_entry in reversed(list_):
if entry.__eq__(cached_entry):
return cached_entry
return None
Expand All @@ -1164,7 +1166,7 @@ def entries_with_name(self, name: str) -> List[DNSRecord]:

def current_entry_with_name_and_alias(self, name: str, alias: str) -> Optional[DNSRecord]:
now = current_time_millis()
for record in self.entries_with_name(name):
for record in reversed(self.entries_with_name(name)):
if (
record.type == _TYPE_PTR
and not record.is_expired(now)
Expand Down Expand Up @@ -1383,7 +1385,7 @@ def __init__(
self.services = {} # type: Dict[str, DNSRecord]
self.next_time = current_time_millis()
self.delay = delay
self._handlers_to_call = [] # type: List[Callable[[Zeroconf], None]]
self._handlers_to_call = OrderedDict() # type: OrderedDict[str, ServiceStateChange]

self._service_state_changed = Signal()

Expand Down Expand Up @@ -1428,14 +1430,30 @@ def service_state_changed(self) -> SignalRegistrationInterface:
def update_record(self, zc: 'Zeroconf', now: float, record: DNSRecord) -> None:
"""Callback invoked by Zeroconf when new information arrives.

Updates information required by browser in the Zeroconf cache."""
Updates information required by browser in the Zeroconf cache.

Ensures that there is are no unecessary duplicates in the list

"""

def enqueue_callback(state_change: ServiceStateChange, name: str) -> None:
self._handlers_to_call.append(
lambda zeroconf: self._service_state_changed.fire(
zeroconf=zeroconf, service_type=self.type, name=name, state_change=state_change

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, two things here.

Regarding the logic, I had to write myself a little table to make sure I understand things correctly. I added some notes in two cases that I believe to be worth taking a second look at. The table:

Existing enqueued state change New state change to enqueue State change on the list afterwards My notes
Added Added
Updated Updated
Removed Removed
Added Added Added
Added Updated Added
Added Removed Added Shouldn't this be just Removed, since the service is gone? When we enqueue Removed then there's Updated on the list already we switch to Removed.
Updated Added Added
Updated Updated Updated
Updated Removed Removed
Removed Added Added
Removed Updated Removed Now I'm not sure when would this happen, why wouldn't it make sense to result in Updated here?
Removed Removed Removed

Regarding the implementation details, I think _handlers_to_call could work better as an OrderedDict[str, ServiceStateChange] instead of List[Tuple[str,ServiceStateChange]]. The code below could then look more or less like (keeping the logic intact, I let myself reverse the logic and remove the early return(s), as it it wouldn't really make sense in this case):

            if (
                (state_change is ServiceStateChange.Updated and name not in self._handlers_to_call)
                or (
                    state_change is ServiceStateChange.Removed
                    and self._handlers_to_call.get(name) is ServiceStateChange.Updated
                )
                or state_change is ServiceStateChange.Added
            ):
                self._handlers_to_call[name] = state_change

This removes all that count(), remove() and append() dance which is nice I think. Then the popping code would use OrderedDict.popitem() instead of list.pop().

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about it, the code could just be an unconditional self._handlers_to_call[name] = state_change. :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’ll have a bit more of a think about this one the coming week.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your last suggestion won't work because then an update trumps an add. I've used a slightly amended version of your first suggestion and the code you had above didn't update removed if for example the list was empty.

Regarding your 2 queries about the truth table, yes there are some combinations that don't make sense and in a well written responded shouldn't happen. However I've taken the view that I need to make a decision on the precedence of the messages and my view is that Added always wins, so for example if we receive an add and remove message in the same batch, then add is the most likely meaning. I can hypothesise a reason this might occur if you have multiple res ponders and one goes offline. In this case Added + Removed still means there is a service there to respond. (note this is a contrived example as the messages would come in a different batch)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough – if it causes issues I expect it'll be reported and we'll sort it out then, having a concrete case to deal with.

# Code to ensure we only do a single update message
# Precedence is; Added, Remove, Update

if (
state_change is ServiceStateChange.Added
or (
state_change is ServiceStateChange.Removed
and (
self._handlers_to_call.get(name) is ServiceStateChange.Updated
or self._handlers_to_call.get(name) is ServiceStateChange.Added
or self._handlers_to_call.get(name) is None
)
)
)
or (state_change is ServiceStateChange.Updated and name not in self._handlers_to_call)
):
self._handlers_to_call[name] = state_change

if record.type == _TYPE_PTR and record.name == self.type:
assert isinstance(record, DNSPointer)
Expand All @@ -1459,8 +1477,20 @@ def enqueue_callback(state_change: ServiceStateChange, name: str) -> None:
if expires < self.next_time:
self.next_time = expires

elif record.type == _TYPE_TXT and record.name.endswith(self.type):
assert isinstance(record, DNSText)
elif record.type == _TYPE_A or record.type == _TYPE_AAAA:
assert isinstance(record, DNSAddress)

# Iterate through the DNSCache and callback any services that use this address
for service in zc.cache.entries():
if (
isinstance(service, DNSService)
and service.name.endswith(self.type)
and service.server == record.name
and not record.is_expired(now)
):
enqueue_callback(ServiceStateChange.Updated, service.name)

elif record.name.endswith(self.type):
expired = record.is_expired(now)
if not expired:
enqueue_callback(ServiceStateChange.Updated, record.name)
Expand Down Expand Up @@ -1492,8 +1522,11 @@ def run(self) -> None:
self.delay = min(_BROWSER_BACKOFF_LIMIT * 1000, self.delay * 2)

if len(self._handlers_to_call) > 0 and not self.zc.done:
handler = self._handlers_to_call.pop(0)
handler(self.zc)
with self.zc._handlers_lock:
handler = self._handlers_to_call.popitem(False)
self._service_state_changed.fire(
zeroconf=self.zc, service_type=self.type, name=handler[0], state_change=handler[1]
)


class ServiceInfo(RecordUpdateListener):
Expand Down Expand Up @@ -2156,6 +2189,8 @@ def __init__(

self.debug = None # type: Optional[DNSOutgoing]

self._handlers_lock = threading.Lock() # ensure we process a full message in one go

@property
def done(self) -> bool:
return self._GLOBAL_DONE
Expand Down Expand Up @@ -2432,42 +2467,45 @@ def update_record(self, now: float, rec: DNSRecord) -> None:
def handle_response(self, msg: DNSIncoming) -> None:
"""Deal with incoming response packets. All answers
are held in the cache, and listeners are notified."""
now = current_time_millis()
for record in msg.answers:

updated = True

if record.unique: # https://tools.ietf.org/html/rfc6762#section-10.2
# Since the cache format is keyed on the lower case record name
# we can avoid iterating everything in the cache and
# only look though entries for the specific name.
# entries_with_name will take care of converting to lowercase
#
# We make a copy of the list that entries_with_name returns
# since we cannot iterate over something we might remove
for entry in self.cache.entries_with_name(record.name).copy():

if entry == record:
updated = False
with self._handlers_lock:

# Check the time first because it is far cheaper
# than the __eq__
if (record.created - entry.created > 1000) and DNSEntry.__eq__(entry, record):
self.cache.remove(entry)

expired = record.is_expired(now)
maybe_entry = self.cache.get(record)
if not expired:
if maybe_entry is not None:
maybe_entry.reset_ttl(record)
now = current_time_millis()
for record in msg.answers:

updated = True

if record.unique: # https://tools.ietf.org/html/rfc6762#section-10.2
# Since the cache format is keyed on the lower case record name
# we can avoid iterating everything in the cache and
# only look though entries for the specific name.
# entries_with_name will take care of converting to lowercase
#
# We make a copy of the list that entries_with_name returns
# since we cannot iterate over something we might remove
for entry in self.cache.entries_with_name(record.name).copy():

if entry == record:
updated = False

# Check the time first because it is far cheaper
# than the __eq__
if (record.created - entry.created > 1000) and DNSEntry.__eq__(entry, record):
self.cache.remove(entry)

expired = record.is_expired(now)
maybe_entry = self.cache.get(record)
if not expired:
if maybe_entry is not None:
maybe_entry.reset_ttl(record)
else:
self.cache.add(record)
if updated:
self.update_record(now, record)
else:
self.cache.add(record)
if updated:
self.update_record(now, record)
else:
if maybe_entry is not None:
self.update_record(now, record)
self.cache.remove(maybe_entry)
if maybe_entry is not None:
self.update_record(now, record)
self.cache.remove(maybe_entry)

def handle_query(self, msg: DNSIncoming, addr: Optional[str], port: int) -> None:
"""Deal with incoming query packets. Provides a response if
Expand Down