-
-
Notifications
You must be signed in to change notification settings - Fork 30.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
_sock_connect_cb can be called twice resulting in InvalidStateError #69779
Comments
asyncio.selector_events.BaseSelectorEventLoop._sock_connect_cb is a callback based on the selector for a socket. There are certain situations when the selector triggers twice calling this callback twice, resulting in an InvalidStateError when it sets the Future to None. The way I triggered this was by having several parallel connections to the same host in a multiprocessing script. I suggest analyzing why this callback can be called twice and figuring out what the correct fix is. I monkey patched it by adding a fut.done() check at the top. If this information is not enough I can try to provide a sample script. Its currently reproducing in a fairly involved multiprocessing script. |
Please show us how to repro -- there's no way we can figure out how this "impossible" event could happen in your code without understanding your code. Is it possible that multiprocessing forked your event loop or something similarly esoteric? |
Sorry for being obscure before, it was hard to pinpoint. I think I just figured it out! I had code like this in a subprocess: def worker():
while True:
obj = self.queue.get()
# do work with obj using asyncio http module
def producer():
nonlocal self
obj2 = self.queue.get()
return obj2
workers = []
for i in range(FILE_OP_WORKERS):
t = asyncio.ensure_future(worker())
t.add_done_callback(op_finished)
workers.append(t)
while True:
f = loop.run_in_executor(None, producer)
obj = loop.run_until_complete(f)
t = async_queue.put(obj)
loop.run_until_complete(t)
loop.run_until_complete(asyncio.wait(workers)) where self.queue is a multiprocessing.Queue, and async_queue is an asyncio queue. The idea is that I have a process populating a multiprocessing queue, and I want to transfer it to an syncio queue while letting the workers do their thing. Without knowing the underlying behavior, my theory is that when python blocks on the multiprocessing queue lock, it releases socket events to the async http module's selectors, and then when the async loop gets to the selectors they're released again. If I switch the producer to instead use a queue.get_nowait and busy wait with asyncio.sleep I don't get the error...however this is not ideal is we're busy waiting. Thanks! |
I'm going to close this as I've found a work-around, if I find a better test-case I'll open a new bug. |
Actually, I just realized I had fixed it locally by changing the callback to the following: so a fix is still needed, and I also verified this happens with python3.4 as well. |
clarification, adding the fut.done() check, or monkey patching: |
Sorry, the code you posted is still incomprehensible. E.g. I suppose your
but rather something like
But in the end, even with that hypothesis, I can't explain what you're |
self.queue is not an async queue, as I stated above its a multiprocessing queue. This code is to multiplex a multiprocessing queue to a async queue. |
I believe I'm seeing this bug in a non-threaded and non-forked env. System: I'm using aiohttp to create several dozens of HTTP connections to the same server (an async tornado web server). Nothing special is being done around the event loop creation (standard get_event_loop()). However in my system the event loop is frequently stopped, via ioloop.stop(), and restarted via ioloop.run_forever(). I'm not sure this is related to the issue yet, but it's worth mentioning. I can't provide simplified test code just yet, but I can reproduce in my env with nearly 100% odds when doing a full system test. Attached is a sample backtrace. |
Perhaps I'm doing something really stupid, but I was able to reproduce the two issues I'm having with the following sample script. If you leave the monkey patch disabled, you get the InvalidStateError, if you enable it, you get the ServerDisconnect errors that I'm currently seeing which I work-around with retries. Ideas? import asyncio
import aiohttp
import multiprocessing
import aiohttp.server
import logging
import traceback
# Monkey patching
import asyncio.selector_events
# http://bugs.python.org/issue25593
if False:
orig_sock_connect_cb = asyncio.selector_events.BaseSelectorEventLoop._sock_connect_cb
def _sock_connect_cb(self, fut, sock, address):
if fut.done(): return
return orig_sock_connect_cb(self, fut, sock, address)
asyncio.selector_events.BaseSelectorEventLoop._sock_connect_cb = _sock_connect_cb
class HttpRequestHandler(aiohttp.server.ServerHttpProtocol):
@asyncio.coroutine
def handle_request(self, message, payload):
response = aiohttp.Response(self.writer, 200, http_version=message.version)
response.add_header('Content-Type', 'text/html')
response.add_header('Content-Length', '18')
response.send_headers()
yield from asyncio.sleep(0.5)
response.write(b'<h1>It Works!</h1>')
yield from response.write_eof()
def process_worker(q):
loop = asyncio.get_event_loop()
#loop.set_debug(True)
connector = aiohttp.TCPConnector(force_close=False, keepalive_timeout=8, use_dns_cache=True)
session = aiohttp.ClientSession(connector=connector)
async_queue = asyncio.Queue(100)
@asyncio.coroutine
def async_worker(session, async_queue):
while True:
try:
print("blocking on asyncio queue get")
url = yield from async_queue.get()
print("unblocking on asyncio queue get")
print("get aqueue size:", async_queue.qsize())
response = yield from session.request('GET', url)
try:
data = yield from response.read()
print(data)
finally:
yield from response.wait_for_close()
except:
traceback.print_exc()
def producer(q):
print("blocking on multiprocessing queue get")
obj2 = q.get()
print("unblocking on multiprocessing queue get")
print("get qempty:", q.empty())
return obj2
def worker_done(f):
try:
f.result()
print("worker exited")
except:
traceback.print_exc()
workers = []
for i in range(100):
t = asyncio.ensure_future(async_worker(session, async_queue))
t.add_done_callback(worker_done)
workers.append(t)
@asyncio.coroutine
def doit():
print("start producer")
obj = yield from loop.run_in_executor(None, producer, q)
print("finish producer")
print("blocking on asyncio queue put")
yield from async_queue.put(obj)
print("unblocking on asyncio queue put")
print("put aqueue size:", async_queue.qsize())
while True:
loop.run_until_complete(doit())
def server():
loop = asyncio.get_event_loop()
#loop.set_debug(True)
f = loop.create_server(lambda: HttpRequestHandler(debug=True, keep_alive=75), '0.0.0.0', '8080')
srv = loop.run_until_complete(f)
loop.run_forever()
if __name__ == '__main__':
q = multiprocessing.Queue(100)
log_proc = multiprocessing.log_to_stderr()
log_proc.setLevel(logging.DEBUG)
p = multiprocessing.Process(target=process_worker, args=(q,))
p.start()
p2 = multiprocessing.Process(target=server)
p2.start()
while True:
print("blocking on multiprocessing queue put")
q.put("http://0.0.0.0:8080")
print("unblocking on multiprocessing queue put")
print("put qempty:", q.empty()) |
I wonder if the bug is in aiohttp? The code you show is still too complex |
Attaching simplified test setup. It does take some doing to repro so the local async server is required to make it happen (for me). When I tried just pointing to python.org it would not repro in 100 iterations, but using a local dummy server repros 100% for me. |
Attached server side of repro. |
This code repros without aiohttp when pitted against the previously attached web server (again on OSX 10.11, mid-2012 MBPr). Admittedly this may seem very arbitrary but I have better reasons in my production code for stopping an IOLoop and starting it again (which seems to be important to the reproduction steps). import asyncio
loop = asyncio.get_event_loop()
def batch_open():
for i in range(100):
c = asyncio.ensure_future(asyncio.open_connection('127.0.0.1', 8080))
c.add_done_callback(on_resp)
def on_resp(task):
task.result()
loop.stop()
loop.call_soon(batch_open)
while True:
loop.run_forever() |
Just reproduced on Linux, Fedora Core 23. |
attaching my simplified testcase and logged an aiohttp bug: aio-libs/aiohttp#633 |
Justin's repro provides a clue: when the event loop is stopped before all I'm not sure how this explains Alexander's issue but it's probably |
Guido, Shouldn't this not be the case for level triggered polling? From looking at selectors it looks like these are always level triggered which means they should only event once. |
I'm not an expert on this terminology but don't you have that backwards? |
Nevermind, in the case of writeablity it won't matter either way. -- So in looking at tornado's ioloop they run the ready callbacks before calling poll(). So the callbacks can modify the poll set. |
I'm attaching a patch that runs |
Thanks, but I don't like the idea of that patch. It feels like a hack that makes it less likely that the issue occurs, but I don't feel we should rely on the callbacks being called before checking the selector again. There may be other reasons (perhaps a future modification to the code) why we might occasionally check the selector redundantly. IOW I think we should really ensure that all I/O callbacks are properly idempotent. |
I don't believe this is a case of nonidempotent callbacks, unless you are referring to Future.set_result(), which by design can't be called twice. The callbacks are given an inconsistent opportunity to modify the poll set because of indeterminacy in the ioloop. That being said I understand your reluctance given the amount of turmoil this has but would argue that consistency with tornado is a powerful ally and that a model where any callback using call_soon will be guaranteed the opportunity to modify the poll set is a good thing. |
I thought some more about this. The problem is due to stopping and I recall that I thought a LOT about whether to run callbacks and then I worry about a scenario where a callback does something like this: def evil():
loop.call_soon(evil)
loop.stop() Then the following code would never poll the selector with your fix evil()
while True:
loop.run_forever() Using the existing strategy it would still poll the selector. Also, several tests fail with your patch -- I have to investigate those. All in all I think replacing fut.cancelled() with fut.done() may be |
Interesting. I was going to do an analysis what using _ready.appendleft() for adding selector events would do for that scenario. The idea being to consistently juxtapose exiting callbacks, selector events and new callbacks. However I think this just moves the pawn in this ioloop halting problem. Is it worth investigating a change to the stop mechanism instead? Instead of raising an exception in the middle of run_once, it could set a flag to be seen by run_forever(). This may avoid this class of problem altogether and ensure run_once is a fairly simple and predictable. |
Yeah, I've thought about changing the stop() mechanism too. It might A less intrusive change to stop() would be to somehow mark the point But I still think those callbacks should be fixed (Alexander's |
Yes, that's what I was suggesting. Looking at tornado they do the stop between callbacks/matured-scheduled and events. That approach seems somewhat arbitrary to me at first glance but tornado is very mature and they usually have good reasons for what they do. The notion of always completing a cycle seems more apt to me; Ie. your first design. A compelling thought experiment for allowing stop() to be lazy is if a user could somehow know when stop() was going to run or when it had been run. The nature of ioloop programming prevents you from knowing when it will run and because stop() has no return handle/future/task a user can't actually know when it did run. Ie. there is no way to await/add_done_callback on it, so baring hacks that bookend a stop() with marker callbacks it should be, as you said, sufficiently vague to justify a (more) lazy effect. -- I more or less agree on the s/cancelled/done/ changes. I'm using a similar monkey patch in my libraries to dance around this issue right now. I still don't exactly like the idea that code is written with an explicit expectation that it could be pending or cancelled, but then must also be inherently prepared for spurious done callbacks. This seems like a borderline contract violation by add_writer() and co. I suppose that add_writer() is primarily targeted at streams and the case of an EINTR in a socket connect() is a more a one-shot. Tough call. |
btw want to thank you guys for actively looking into this, I'm very grateful! |
Thinking about this more I believe it's possible for any of the FD callbacks in selector_events.py to be placed into loop._ready multiple times if the loop is stopped after the FD is ready (and the callback is scheduled) but before the callback is called. In all cases such a scenario results in the same callback (with the same future) being scheduled twice; the first call will call fut.set_result() and then the second call, if the FD is (still, or again) ready, will fail calling fut.set_result() on the same Future. The reason we've only seen reports of this for _sock_connect_cb() is probably that the other calls are all uncommon -- you have to explicitly call loop.sock_accept(), loop.sock_recv(), or loop.sock_sendall(), which is not the usual (or recommended) idiom. Instead, most people use Transports and Protocols, which use a different API, and create_server() doesn't use sock_accept(). But create_connection() *does* call sock_connect(), so that's used by everybody's code. I think the discussed change to stop() to set a flag that is only checked after all the ready for-loop is done might work here -- it guarantees that all I/O callbacks get to run before the selector is polled again. However, it requires that an I/O callback that wants to modify the selector in order to prevent itself from being called must do so itself, not schedule some other call that modifies the selector. That's fine for the set of I/O callbacks I've looked at. I just don't feel comfortable running the ready queue before polling the selector, since a worst-case scenario could starve the selector completely (as I sketched before -- and the proposed modification to stop() doesn't directly change this). |
+1 Let me know what I can do to help. |
@justin: Do you want to come up with a PR for the stop() changes? On Mon, Nov 16, 2015 at 6:32 PM, Justin Mayfield <report@bugs.python.org> wrote:
|
You bet. |
Attached patch submission for stop flag proposal. I assume you didn't mean a github PR since the dev docs seem to indicate that is for readonly usage. This passes all the tests on my osx box but it should obviously be run by a lot more folks. |
I'm going to fix up the patch and apply it so this can make 3.5.1 rc1. |
Here's a better patch.
Please try it out!! |
Here's the file. |
New patch. Update test_utils.run_once() to use the recommended idiom. On second thought I don't like issuing a warning when stop() is called before the loop runs -- a warning seems overkill for something so minor. But I'm okay with no longer recommending the idiom. |
I should have commented more on the run_once removal. The depiction given in its docstring seemed inconsistent with the new way stop works and I found no callers, so it seemed like it was best left out to avoid confusion. No worries though, I didn't get to know that test module very well before messing with it. It just came up in my scan for stop() callers. Looks good, I've applied to a 3.5.0 build and will include it in my testing from now on. Thanks Guido. |
Ha, email race. Regarding rev 2, the updated docstring and scheduled stop looks good along with alleviating the confusion I mentioned. I'm not sure about your warning comment; Perhaps that's a patch I didn't lay eyes on. Cheers. |
No, I mentioned the idea of a warning in the thread on the |
I see. Seems like good discussion over there. I joined up. |
OK, here's another revision of the patch, setting the timeout passed to the selector to 0 when the loop is pre-stopped. |
OK, another revision, keep the mock selector. |
Whoops. Hopefully this one's right. |
New changeset 9b3144716d17 by Guido van Rossum in branch '3.4': New changeset 158cc5701488 by Guido van Rossum in branch '3.5': New changeset 2ebe03a94f8f by Guido van Rossum in branch 'default': |
Hopefully this is it! |
I'm not sure if you guys are still listening on this closed bug but I think I've found another issue ;) I'm using python 3.5.1 + asyncio 3.4.3 with the latest aiobotocore (which uses aiohttp 0.21.0) and had two sessions (two TCPConnectors), one doing a multitude of GetObjects via HTTP1.1, and the other doing PutObject, and the PutObject session returns error 61 (connection refused) from the same _sock_connect_cb. It feels like a similar issue to the original. I'll see if I can get small testcase. |
update: its unrelated to the number of sessions or SSL, but instead to the number of concurrent aiohttp requests. When set to 500, I get the error, when set to 100 I do not. |
Alexander, That sounds unrelated. I'd treat it as a new issue until you have concrete evidence to the contrary. Also on face value it sounds like it might just be your operating systems open file limit. On OSX I think the default open file limit is in the hundreds (256 on my box). Generally on unix-oid platforms it can be checked and changed with the Cheers |
sorry for disruption! ends up our router seems to be doing some kind of QoS limits on # of connections :( |
Note: these values reflect the state of the issue at the time it was migrated and might not reflect the current state.
Show more details
GitHub fields:
bugs.python.org fields:
The text was updated successfully, but these errors were encountered: