New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
bpo-30931: Asyncore alternative fix #2764
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -131,33 +131,30 @@ def poll(timeout=0.0, map=None): | |
is_r = obj.readable() | ||
is_w = obj.writable() | ||
if is_r: | ||
r.append(fd) | ||
r.append(obj) | ||
# accepting sockets should not be writable | ||
if is_w and not obj.accepting: | ||
w.append(fd) | ||
w.append(obj) | ||
if is_r or is_w: | ||
e.append(fd) | ||
e.append(obj) | ||
if [] == r == w == e: | ||
time.sleep(timeout) | ||
return | ||
|
||
r, w, e = select.select(r, w, e, timeout) | ||
|
||
for fd in r: | ||
obj = map.get(fd) | ||
if obj is None: | ||
for obj in r: | ||
if obj.closing: | ||
continue | ||
read(obj) | ||
|
||
for fd in w: | ||
obj = map.get(fd) | ||
if obj is None: | ||
for obj in w: | ||
if obj.closing: | ||
continue | ||
write(obj) | ||
|
||
for fd in e: | ||
obj = map.get(fd) | ||
if obj is None: | ||
for obj in e: | ||
if obj.closing: | ||
continue | ||
_exception(obj) | ||
|
||
|
@@ -181,11 +178,12 @@ def poll2(timeout=0.0, map=None): | |
pollster.register(fd, flags) | ||
|
||
r = pollster.poll(timeout) | ||
for fd, flags in r: | ||
obj = map.get(fd) | ||
if obj is None: | ||
continue | ||
readwrite(obj, flags) | ||
ready = [(map[fd], flags) for fd, flags in r] | ||
|
||
for obj, flags in ready: | ||
if not obj.closing: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would you mind to first address the first bug in a first PR, map modified in this loop, and then work on a new PR (once the first PR is merged) to add closing? Again, I prefer very small changes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. adding closing is part of fixing the bug, the point of it is not to called readwrite on a socket that has been closed by another socket on a previous readwrite. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using "ready = [...]", I don't see how you could have such race condition. Again, let's move slowly, step by step. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But if you store all the objects on ready you can't call just call all of them because an object previous in the list might have closed a posterior one. |
||
readwrite(obj, flags) | ||
|
||
|
||
poll3 = poll2 # Alias for backward compatibility | ||
|
||
|
@@ -388,6 +386,10 @@ def recv(self, buffer_size): | |
raise | ||
|
||
def close(self): | ||
if self.closing: | ||
return | ||
self.closing = True | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You separated this to make closing state change more clear, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, visually is what I would do for my code |
||
self.connected = False | ||
self.accepting = False | ||
self.connecting = False | ||
|
@@ -399,6 +401,11 @@ def close(self): | |
if why.args[0] not in (ENOTCONN, EBADF): | ||
raise | ||
|
||
def fileno(self): | ||
if self.socket is None: | ||
raise socket.error(EBADF, 'Bad file descriptor') | ||
return self.socket.fileno() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please remove this method. |
||
|
||
# log and log_info may be overridden to provide more sophisticated | ||
# logging and warning methods. In general, log is for 'hit' logging | ||
# and 'log_info' is for informational, warning and error logging. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -499,6 +499,12 @@ def handle_connect(self): | |
pass | ||
|
||
|
||
class SendHandler(BaseTestHandler): | ||
def __init__(self, conn): | ||
BaseTestHandler.__init__(self, conn) | ||
self.send(b'x' * 1024) | ||
|
||
|
||
class BaseTestAPI: | ||
|
||
def tearDown(self): | ||
|
@@ -576,12 +582,7 @@ class TestClient(BaseClient): | |
def handle_read(self): | ||
self.flag = True | ||
|
||
class TestHandler(BaseTestHandler): | ||
def __init__(self, conn): | ||
BaseTestHandler.__init__(self, conn) | ||
self.send(b'x' * 1024) | ||
|
||
server = BaseServer(self.family, self.addr, TestHandler) | ||
server = BaseServer(self.family, self.addr, SendHandler) | ||
client = TestClient(self.family, server.address) | ||
self.loop_waiting_for_flag(client) | ||
|
||
|
@@ -800,6 +801,44 @@ def test_quick_connect(self): | |
if t.is_alive(): | ||
self.fail("join() timed out") | ||
|
||
def test_map_altered_in_loop(self): | ||
family = self.family | ||
fail = self.fail | ||
|
||
class PoisonedClient(BaseClient): | ||
""" | ||
This dispatcher is created after closing a writable one | ||
""" | ||
def handle_write(self): | ||
fail("Attempt to call handle_write on the wrong client") | ||
|
||
# The dispatcher is not writable and therefore handle_write shouldn't be called | ||
def writable(self): | ||
return False | ||
|
||
class ManagerClient(BaseClient): | ||
|
||
def __init__(self, family, address): | ||
BaseClient.__init__(self, family, address) | ||
self.old_client = BaseClient(family, address) | ||
|
||
def handle_write(self): | ||
old_fd = self.old_client._fileno | ||
self.old_client.close() | ||
# This trusts that the fd of this client new | ||
# will be the same to the one just closed | ||
new_client = PoisonedClient(family, server.address) | ||
if new_client._fileno != old_fd: | ||
raise unittest.SkipTest("The test is meaningful only if the fd for the old and " | ||
"the new dispatcher are the same") | ||
|
||
self.flag = True | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, I don't understand why we need so much complicated code to test an altered map. Can't we just have two readable objects and the first one clears the map, so the second loop iteartion is supposed to fail? Just make sure that we called the read handler of the two objects even if the map was cleared? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think I understand this, if the first readable object closes the second and removes it from the map then it will hit the comparison There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using "ready = [...]", you can remove "if obj is None:" from the loop. Problem solved. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as before, some check must be done you have to check if the object has been closed by a previous one in while readwrite was called. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, we are talking about two different bugs. I'm talking about a first class of bugs. I didn't say that a fix PR using "ready=[...]" would fix all bugs. Just that it would be easier to review, easy to merge, backport, etc. And then it would become easier to discuss solutions for more complex bugs. |
||
|
||
server = BaseServer(self.family, self.addr, SendHandler) | ||
manager = ManagerClient(self.family, server.address) | ||
self.loop_waiting_for_flag(manager) | ||
|
||
|
||
class TestAPI_UseIPv4Sockets(BaseTestAPI): | ||
family = socket.AF_INET | ||
addr = (support.HOST, 0) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
asyncore is close to its death. I'm not able to accept such large changes. I would prefer changes as small as possible, so please use the same method than for poll():
The thing is that asyncore code base is old and error-prone, the test suite is very small. I cannot affort the risk of introducing a regression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can be done like this but I think the
closing
variable should remainThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just write a second PR when the first one is merged. I would just be easier to review it. IMHO you are fixing two bugs at once, it makes the review harder.