-
-
Notifications
You must be signed in to change notification settings - Fork 32.2k
bpo-30931: Ensure the right socket is retrieved in asyncore #2707
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please explain why this is needed, and add a failing test demonstrating this issue.
Lib/asyncore.py
Outdated
@@ -127,7 +127,8 @@ def poll(timeout=0.0, map=None): | |||
map = socket_map | |||
if map: | |||
r = []; w = []; e = [] | |||
for fd, obj in list(map.items()): | |||
map_copy = dict(map) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain why this is needed? asyncore is not thread safe, and cannot be used by different threads, so the map cannot change while this code is iterating on the map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the review,
If this is the case then we are misusing asyncore, however that would surprise me since I guess the list
in list(map.items())
is there precisely because map
may change during the iteration.
Also, looks like there are tests with threads:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use threads to send and receive from file descriptors watched by asyncore
dispatchers, and this is how you can test asyncore. But you cannot call dispatcher
methods from another thread, or modify asyncore map from other threads. asyncore
does not have builtin support for threading.
To use threads with asyncore you will have to extend it to have a queue of functions
scheduled to run when the event loop wakes up, and a way to wake up the event loop,
and a new asyncore.loop function integrating these features. Such code exists in the
old meduza project:
http://www.nightmare.com/medusa/
If you are using python 2.7, you may find this useful - backport of python 3 event loop
to 2.7 and integration with asyncore. This add threading and timeouts support.
https://github.com/oVirt/vdsm/blob/master/lib/vdsm/storage/asyncevent.py
If you are using python 3, I think it is better to use asyncio, it is very hard to get error
handling right with asyncore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it is better to copy only the readable and writable objects, instead of the whole map.
We can do:
pending = {}
for fd, obj in map.items():
...
if is_r or is_w:
pending[fd] = obj
And later handle events only on objects from the pending dict. This may be faster if only some of the objects are active.
Lib/asyncore.py
Outdated
obj = map.get(fd) | ||
if obj is None: | ||
obj = map_copy.get(fd) | ||
if fd not in map or map.get(fd) != obj: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same, the map cannot change while the single thread that may access the map is block on select().
And, getting the fd from the map is good check in case the fd was removed from the map by a dispatcher.
I haven't been able to write a test for this due to that it only happens if a very particular sequence of events occurs, I can however provide a POC by coordinating the asyncore loop thread with the main thread, if asyncore is supposed to be thread safe, otherwise I don't think this is a bug. |
@bjmb there may be a real bug here, even if you are not using threads. r, w, e = select.select(r, w, e, timeout)
for fd in r:
obj = map.get(fd)
if obj is None:
continue
read(obj) read close obj, removing fd from map, then creates a new socket for fd in w:
obj = map.get(fd) this get the new object from the map, instead of the closed one. if obj is None:
continue
write(obj) invoke write on the wrong socket, which is not writable for fd in e:
obj = map.get(fd) same issue here if obj is None:
continue
_exception(obj) And poll2 has same issue: r = pollster.poll(timeout)
for fd, flags in r:
obj = map.get(fd)
if obj is None:
continue
readwrite(obj, flags) An fd may have been closed and recreated by in a previous iteration of the loop. |
Yes, that's what's happening I believe is happening.
who creates a new socket with the same fd? It must be another thread. We are using a very similar approach to the one you linked: https://github.com/datastax/python-driver/blob/master/cassandra/io/asyncorereactor.py , init is called from a different thread so I assume this shouldn't be done by what you are saying. Still it wouldn't explain why right now a copy of the list is made before iterating if it shouldn't change the size. |
It can be the same thread : def handle_read(self):
...
self.close()
do_stuff() If do_stuff creates a new dispatcher, there is a good chance that it will get the same
The list(map.items()) in https://github.com/python/cpython/blob/master/Lib/asyncore.py#L130 is not needed, the map cannot be modified from another thread and is not modified in that loop. It was added in d74900e as part of very big python 3 port. I think it was safe way to convert python 2 map.items() to python 3 map.items(), keeping the old behavior, but the python 3 version could avoid the unneeded copy and use just map.items(). |
OK got it, that makes sense, should be possible to write a test this way, I'll do it later if nobody gets ahead of me. |
Now that I was checking to write a test I'm not sure it can happen with poll2 @nirs , since the list poll is returning is |
@bjmb I think it can happen:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool test! it fails in both poll2 and poll.
BaseTestHandler.__init__(self, conn) | ||
self.send(b'x' * 1024) | ||
|
||
|
||
class BaseTestAPI: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this change needed? does it break the new test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not necessary but this was the TestHandler
before defined inside test_handle_read
, I pulled it out and renamed it to use it in this new test as well.
Lib/test/test_asyncore.py
Outdated
family = self.family | ||
fail = self.fail | ||
|
||
class NoWriteClient(BaseClient): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure that people will understand why this client is not writable. Maybe call this BateClient or PoisonedClient?
And add docstring explaining why we need this.
Lib/test/test_asyncore.py
Outdated
class NoWriteClient(BaseClient): | ||
|
||
def handle_write(self): | ||
fail("This is a non writable socket") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to explain the issue in the error message, something like:
Attempt to call handle_write on the wrong client
def handle_write(self): | ||
fail("This is a non writable socket") | ||
|
||
def writable(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would comment here that this is not writable to prove that the event loop is calling handle_write on the wrong client.
Lib/test/test_asyncore.py
Outdated
self.flag = True | ||
|
||
server = BaseServer(self.family, self.addr, SendHandler) | ||
client = ManagerClient(self.family, server.address) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe call this manager instead of client?
Lib/test/test_asyncore.py
Outdated
|
||
def __init__(self, family, address): | ||
BaseClient.__init__(self, family, address) | ||
self.to_close = BaseClient(family, address) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe old_client, for consistency with new_client?
Lib/test/test_asyncore.py
Outdated
self.to_close = BaseClient(family, address) | ||
|
||
def handle_write(self): | ||
self.to_close.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lest keep the fd here:
old_fd = self.old_client.fileno()
Lib/test/test_asyncore.py
Outdated
self.to_close.close() | ||
# This trusts that the fd of this client new | ||
# will be the same to the one just closed | ||
new_client = NoWriteClient(family, server.address) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is another thread running, we may not get the same fd - in this case we should skip the test with a warning.
The comment should explain that this test is relevant only if the new client got the same fd of the old client, and in the code we can verify this:
new_client = ...
if new_client.fileno() != old_fd:
skip test with a warning...
@@ -800,6 +801,36 @@ def test_quick_connect(self): | |||
if t.is_alive(): | |||
self.fail("join() timed out") | |||
|
|||
def test_map_altered_in_loop(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To help other understand this issue, I would move this patch before the actual fix, and mark it as @unittest.expectedFailure so it can be merged even if we don't have a good fix.
Lib/asyncore.py
Outdated
obj = map.get(fd) | ||
if obj is None: | ||
obj = map_copy.get(fd) | ||
if fd not in map or map.get(fd) != obj: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are looking up fd in map twice - we can simplify this to:
if map.get(fd) is not obj:
continue
We should handle events on obj only if it is still in the map under the same fd. If it is not in the map (None), or another object is using this fd, we should ignore this object.
Lib/asyncore.py
Outdated
@@ -127,7 +127,8 @@ def poll(timeout=0.0, map=None): | |||
map = socket_map | |||
if map: | |||
r = []; w = []; e = [] | |||
for fd, obj in list(map.items()): | |||
map_copy = dict(map) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it is better to copy only the readable and writable objects, instead of the whole map.
We can do:
pending = {}
for fd, obj in map.items():
...
if is_r or is_w:
pending[fd] = obj
And later handle events only on objects from the pending dict. This may be faster if only some of the objects are active.
@@ -182,8 +184,8 @@ def poll2(timeout=0.0, map=None): | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can copy only the active objects here:
if flags:
pending[fd] = obj
pollster.register(fd, flags)
@giampaolo, do you want to take a look at this? |
Thank you for the review and suggestions @nirs, I've tried to follow them and I'm rebasing and pushing |
@bjmb I'm worried about the performance of copying the entire map for every poll. Maybe we can solve this differently, by actually closing fds only at the end of the event loop cycle? This way there is no way to create a new fd conflicting with existing fd, and no copies are needed. For example, closing a channel can add the channel to socket_to_close set, and the loop can clean up sockets in this set when all events were done. |
So that would mean @nirs that when We wouldn't run in problems by doing this? Like maybe calling close in |
@bjmb, If we close only after iterating all the ready fds, there could be no handle_read/write call. Of course one dispatcher may close other dispatchers in close(). I guess it will be more complicated than the fix you suggest. |
But we may call |
This direction requires that close() will mark the socket as closed, and handle_write will do nothing in this case, or the dispatcher.socket will be replaced with something like socket._closedsocket. |
Another thought is that we could use something different than the fd as the dictionary key, since this seems to be where all the problems come from, maybe just id(dispatcher). |
Great idea - and it works with select: >>> class wrapper(object):
... def __init__(self, sock):
... self.sock = sock
... def fileno(self):
... return self.sock.fileno()
>>> select.select([wrapper(s1), wrapper(s2)], [], [], 0)
([<__main__.wrapper object at 0x7f0c91997d10>, <__main__.wrapper object at 0x7f0c91997d90>], [], []) So we can do select on dispatchers, and iterate on them, never accessing the map: for obj in r:
if obj.fileno() is not None:
_read(obj)
for obj in w:
if obj.fileno() is not None:
_write(obj)
for obj in e:
if obj.fileno() is not None:
_exception(obj) Poll is harder: >>> pollster = select.poll()
>>> w1 = wrapper(s1)
>>> w2 = wrapper(s2)
>>> pollster.register(w1, select.POLLIN)
>>> pollster.register(w2, select.POLLIN)
>>> pollster.poll(0)
[(3, 1), (4, 1)] But we can get the objects from the map before calling readwrite(): ready = []
for fd, flags in pollster.poll(timeout):
obj = map.get(fd)
if obj:
ready.append(obj, flags) And call the ready objects: for obj, flags in ready:
if obj.fileno() is not None:
readwrite(obj, flags) Now we cannot access the wrong sockets. @bjmb, what do you think? |
@nirs both look good to me with your fixes! However I'm not sure I understand why the wrapper is necessary with select, works the same for me using the socket list. |
@bjmb, right the wrapper is not needed, this was just a quick way to test this in the shell. I would create an alternative patch and test if this approach performs better than copying the map on each poll. |
I pushed this branch with the alternative fix, did a quick performance tests with the following results: Select sockets, put polled in ready fix, use_poll: True Select sockets, put polled in ready fix, use_poll: False ======================================================================= map_copy fix, use_poll: True map_copy fix, use_poll: False I used a echo server and this script. Surprising, I always thought poll was much faster. I guess the bulk of the time is spent sending or receiving the data. |
Removing the calls Select sockets, put polled in ready fix, use_poll: True Select sockets, put polled in ready fix, use_poll: False ======================================================================= map_copy fix, use_poll, use_poll: True map_copy fix, use_poll, use_poll: False Looks like the second choice is a bit quicker |
@bjmb, I added some comments in https://github.com/bjmb/cpython/tree/asyncore_fix_select_sockets. I think you should post another pull request for this change, so make it easier to review. |
@bjmb, regarding poll, the advantage is supporting file descriptors values larger than 1024. Select blows up in this case. |
I wrote a similar but different change: see my PR #2854. I tried to compare them: https://bugs.python.org/issue30931#msg299164 |
Closed under bpo-45552. |
https://bugs.python.org/issue30931