diff --git a/test/test_wait.py b/test/test_wait.py new file mode 100644 index 0000000000..aa7a8c9773 --- /dev/null +++ b/test/test_wait.py @@ -0,0 +1,124 @@ +import select +import signal +import socket +try: + from time import monotonic +except ImportError: + from time import time as monotonic + +import pytest + +from .socketpair_helper import socketpair +from urllib3.util.wait import ( + wait_for_read, + wait_for_write, + wait_for_socket, + select_wait_for_socket, + poll_wait_for_socket, +) + +@pytest.fixture +def spair(): + a, b = socketpair() + yield a, b + a.close() + b.close() + + +variants = [ + wait_for_socket, + select_wait_for_socket, +] +if hasattr(select, "poll"): + variants.append(poll_wait_for_socket) + + +@pytest.mark.parametrize("wfs", variants) +def test_wait_for_socket(wfs, spair): + a, b = spair + + with pytest.raises(RuntimeError): + wfs(a, read=False, write=False) + + assert not wfs(a, read=True, timeout=0) + assert wfs(a, write=True, timeout=0) + + b.send(b"x") + assert wfs(a, read=True, timeout=0) + assert wfs(a, read=True, timeout=10) + assert wfs(a, read=True, timeout=None) + + # Fill up the socket with data + a.setblocking(False) + try: + while True: + a.send(b"x" * 999999) + except (OSError, socket.error): + pass + + # Now it's not writable anymore + assert not wfs(a, write=True, timeout=0) + + # But if we ask for read-or-write, that succeeds + assert wfs(a, read=True, write=True, timeout=0) + + # Unless we read from it + assert a.recv(1) == b"x" + assert not wfs(a, read=True, write=True, timeout=0) + + +def test_wait_for_read_write(spair): + a, b = spair + + assert not wait_for_read(a, 0) + assert wait_for_write(a, 0) + + b.send(b"x") + + assert wait_for_read(a, 0) + assert wait_for_write(a, 0) + + # Fill up the socket with data + a.setblocking(False) + try: + while True: + a.send(b"x" * 999999) + except (OSError, socket.error): + pass + + # Now it's not writable anymore + assert not wait_for_write(a, 0) + + +@pytest.mark.skipif( + not hasattr(signal, "setitimer"), + reason="need setitimer() support" +) +@pytest.mark.parametrize("wfs", variants) +def test_eintr(wfs, spair): + a, b = spair + interrupt_count = [0] + + def handler(sig, frame): + assert sig == signal.SIGALRM + interrupt_count[0] += 1 + + old_handler = signal.signal(signal.SIGALRM, handler) + try: + assert not wfs(a, read=True, timeout=0) + start = monotonic() + try: + # Start delivering SIGALRM 10 times per second + signal.setitimer(signal.ITIMER_REAL, 0.1, 0.1) + # Sleep for 1 second (we hope!) + wfs(a, read=True, timeout=1) + finally: + # Stop delivering SIGALRM + signal.setitimer(signal.ITIMER_REAL, 0) + end = monotonic() + dur = end - start + assert 0.9 < dur < 3 + finally: + signal.signal(signal.SIGALRM, old_handler) + + assert interrupt_count[0] > 0 diff --git a/urllib3/contrib/pyopenssl.py b/urllib3/contrib/pyopenssl.py index 5263814e40..4d4b1aff97 100644 --- a/urllib3/contrib/pyopenssl.py +++ b/urllib3/contrib/pyopenssl.py @@ -273,8 +273,7 @@ def recv(self, *args, **kwargs): else: raise except OpenSSL.SSL.WantReadError: - rd = util.wait_for_read(self.socket, self.socket.gettimeout()) - if not rd: + if not util.wait_for_read(self.socket, self.socket.gettimeout()): raise timeout('The read operation timed out') else: return self.recv(*args, **kwargs) @@ -295,8 +294,7 @@ def recv_into(self, *args, **kwargs): else: raise except OpenSSL.SSL.WantReadError: - rd = util.wait_for_read(self.socket, self.socket.gettimeout()) - if not rd: + if not util.wait_for_read(self.socket, self.socket.gettimeout()): raise timeout('The read operation timed out') else: return self.recv_into(*args, **kwargs) @@ -309,8 +307,7 @@ def _send_until_done(self, data): try: return self.connection.send(data) except OpenSSL.SSL.WantWriteError: - wr = util.wait_for_write(self.socket, self.socket.gettimeout()) - if not wr: + if not util.wait_for_write(self.socket, self.socket.gettimeout()): raise timeout() continue except OpenSSL.SSL.SysCallError as e: @@ -446,8 +443,7 @@ def wrap_socket(self, sock, server_side=False, try: cnx.do_handshake() except OpenSSL.SSL.WantReadError: - rd = util.wait_for_read(sock, sock.gettimeout()) - if not rd: + if not util.wait_for_read(sock, sock.gettimeout()): raise timeout('select timed out') continue except OpenSSL.SSL.Error as e: diff --git a/urllib3/contrib/securetransport.py b/urllib3/contrib/securetransport.py index ea54c0c79b..1eeb9d8226 100644 --- a/urllib3/contrib/securetransport.py +++ b/urllib3/contrib/securetransport.py @@ -201,8 +201,7 @@ def _read_callback(connection_id, data_buffer, data_length_pointer): try: while read_count < requested_length: if timeout is None or timeout >= 0: - readables = util.wait_for_read([base_socket], timeout) - if not readables: + if not util.wait_for_read(base_socket, timeout): raise socket.error(errno.EAGAIN, 'timed out') # We need to tell ctypes that we have a buffer that can be @@ -257,8 +256,7 @@ def _write_callback(connection_id, data_buffer, data_length_pointer): try: while sent < bytes_to_write: if timeout is None or timeout >= 0: - writables = util.wait_for_write([base_socket], timeout) - if not writables: + if not util.wait_for_write(base_socket, timeout): raise socket.error(errno.EAGAIN, 'timed out') chunk_sent = base_socket.send(data) sent += chunk_sent diff --git a/urllib3/util/connection.py b/urllib3/util/connection.py index bf699cfd0e..68b34e4942 100644 --- a/urllib3/util/connection.py +++ b/urllib3/util/connection.py @@ -1,7 +1,6 @@ from __future__ import absolute_import import socket -from .wait import wait_for_read -from .selectors import HAS_SELECT, SelectorError +from .wait import HAS_WAIT_FOR_SOCKET, wait_for_read def is_connection_dropped(conn): # Platform-specific @@ -19,14 +18,10 @@ def is_connection_dropped(conn): # Platform-specific return False if sock is None: # Connection already closed (such as by httplib). return True - - if not HAS_SELECT: + if not HAS_WAIT_FOR_SOCKET: # Platform-specific: AppEngine return False - - try: - return bool(wait_for_read(sock, timeout=0.0)) - except SelectorError: - return True + # Returns True if readable, which here means it's been dropped + return wait_for_read(sock, timeout=0.0) # This function is copied from socket.py in the Python 2.7 standard diff --git a/urllib3/util/wait.py b/urllib3/util/wait.py index cb396e508c..0f32f32d44 100644 --- a/urllib3/util/wait.py +++ b/urllib3/util/wait.py @@ -1,40 +1,130 @@ -from .selectors import ( - HAS_SELECT, - DefaultSelector, - EVENT_READ, - EVENT_WRITE -) - - -def _wait_for_io_events(socks, events, timeout=None): - """ Waits for IO events to be available from a list of sockets - or optionally a single socket if passed in. Returns a list of - sockets that can be interacted with immediately. """ - if not HAS_SELECT: - raise ValueError('Platform does not have a selector') - if not isinstance(socks, list): - # Probably just a single socket. - if hasattr(socks, "fileno"): - socks = [socks] - # Otherwise it might be a non-list iterable. +import errno +from functools import partial +import select +import sys +try: + from time import monotonic +except ImportError: + from time import time as monotonic + +__all__ = ["HAS_WAIT_FOR_SOCKET", "wait_for_read", "wait_for_write"] + +# How should we wait on sockets? +# +# There are two types of APIs you can use for waiting on sockets: the fancy +# modern stateful APIs like epoll/kqueue, and the older stateless APIs like +# select/poll. The stateful APIs are more efficient when you have a lots of +# sockets to keep track of, because you can set them up once and then use them +# lots of times. But we only ever want to wait on a single socket at a time +# and don't want to keep track of state, so the stateless APIs are actually +# more efficient. So we want to use select() or poll(). +# +# Now, how do we choose between select() and poll()? On traditional Unixes, +# select() has a strange calling convention that makes it slow, or fail +# altogether, for high-numbered file descriptors. The point of poll() is to fix +# that, so on Unixes, we prefer poll(). +# +# On Windows, there is no poll() (or at least Python doesn't provide a wrapper +# for it), but that's OK, because on Windows, select() doesn't have this +# strange calling convention; plain select() works fine. +# +# So: on Windows we use select(), and everywhere else we use poll(). + +if sys.version_info >= (3, 5): + # Modern Python, that retries syscalls by default + def _retry_on_intr(fn, timeout): + return fn(timeout) +else: + # Old and broken Pythons. + def _retry_on_intr(fn, timeout): + if timeout is not None and timeout <= 0: + return fn(timeout) + + if timeout is None: + deadline = float("inf") else: - socks = list(socks) - with DefaultSelector() as selector: - for sock in socks: - selector.register(sock, events) - return [key[0].fileobj for key in - selector.select(timeout) if key[1] & events] - - -def wait_for_read(socks, timeout=None): - """ Waits for reading to be available from a list of sockets - or optionally a single socket if passed in. Returns a list of - sockets that can be read from immediately. """ - return _wait_for_io_events(socks, EVENT_READ, timeout) - - -def wait_for_write(socks, timeout=None): - """ Waits for writing to be available from a list of sockets - or optionally a single socket if passed in. Returns a list of - sockets that can be written to immediately. """ - return _wait_for_io_events(socks, EVENT_WRITE, timeout) + deadline = monotonic() + timeout + + while True: + try: + return fn(timeout) + # OSError for 3 <= pyver < 3.5, select.error for pyver <= 2.7 + except (OSError, select.error) as e: + # 'e.args[0]' incantation works for both OSError and select.error + if e.args[0] != errno.EINTR: + raise + else: + timeout = deadline - monotonic() + if timeout == float("inf"): + timeout = None + if timeout < 0: + timeout = 0 + continue + + +def select_wait_for_socket(sock, read=False, write=False, timeout=None): + if not read and not write: + raise RuntimeError("must specify at least one of read=True, write=True") + rcheck = [] + wcheck = [] + if read: + rcheck.append(sock) + if write: + wcheck.append(sock) + # When doing a non-blocking connect, most systems signal success by + # marking the socket writable. Windows, though, signals success by marked + # it as "exceptional". We paper over the difference by checking the write + # sockets for both conditions. (The stdlib selectors module does the same + # thing.) + fn = partial(select.select, rcheck, wcheck, wcheck) + rready, wready, xready = _retry_on_intr(fn, timeout) + return bool(rready or wready or xready) + + +def poll_wait_for_socket(sock, read=False, write=False, timeout=None): + if not read and not write: + raise RuntimeError("must specify at least one of read=True, write=True") + mask = 0 + if read: + mask |= select.POLLIN + if write: + mask |= select.POLLOUT + poll_obj = select.poll() + poll_obj.register(sock, mask) + + # For some reason, poll() takes timeout in milliseconds + def do_poll(t): + if t is not None: + t *= 1000 + return poll_obj.poll(t) + + return bool(_retry_on_intr(do_poll, timeout)) + + +def null_wait_for_socket(*args, **kwargs): + raise RuntimeError("no select-equivalent available") + + +if hasattr(select, "poll"): + wait_for_socket = poll_wait_for_socket + HAS_WAIT_FOR_SOCKET = True +elif hasattr(select, "select"): # Platform-specific: Windows. + wait_for_socket = select_wait_for_socket + HAS_WAIT_FOR_SOCKET = True +else: # Platform-specific: Appengine. + wait_for_socket = null_wait_for_socket + HAS_WAIT_FOR_SOCKET = False + + +def wait_for_read(sock, timeout=None): + """ Waits for reading to be available on a given socket. + Returns True if the socket is readable, or False if the timeout expired. + """ + return wait_for_socket(sock, read=True, timeout=timeout) + + +def wait_for_write(sock, timeout=None): + """ Waits for writing to be available on a given socket. + Returns True if the socket is readable, or False if the timeout expired. + """ + return wait_for_socket(sock, write=True, timeout=timeout)