Skip to content

Commit

Permalink
Implement wait_for_[read,write} directly on top of 'select' module
Browse files Browse the repository at this point in the history
Our vendored backport of the 'selectors' module is huge, its tests are
flaky, and the only we thing we use it for is to... turn around and
implement some trivial select() operations. It lets urllib3 use epoll
or kqueue... but the way urllib3 uses them, they're actually *less*
efficient than just using poll or select.

This commit removes the dependency on 'selectors', by implementing
urllib3's wait_for_{read,write} helpers directly on top of
poll/select. Because I'm sneaky, it does this in terms of a more
generic wait_for_socket(...) operation, which is exactly the primitive
that we need for the bleach-spike branch (see urllib3gh-1323), so this should
also help keep the diff down.
  • Loading branch information
njsmith committed Apr 4, 2018
1 parent 7bab7ae commit 03d7753
Show file tree
Hide file tree
Showing 5 changed files with 263 additions and 60 deletions.
124 changes: 124 additions & 0 deletions test/test_wait.py
@@ -0,0 +1,124 @@
import select
import signal
import socket
try:
from time import monotonic
except ImportError:
from time import time as monotonic

import pytest

from .socketpair_helper import socketpair
from urllib3.util.wait import (
wait_for_read,
wait_for_write,
wait_for_socket,
select_wait_for_socket,
poll_wait_for_socket,
)

@pytest.fixture
def spair():
a, b = socketpair()
yield a, b
a.close()
b.close()


variants = [
wait_for_socket,
select_wait_for_socket,
]
if hasattr(select, "poll"):
variants.append(poll_wait_for_socket)


@pytest.mark.parametrize("wfs", variants)
def test_wait_for_socket(wfs, spair):
a, b = spair

with pytest.raises(RuntimeError):
wfs(a, read=False, write=False)

assert not wfs(a, read=True, timeout=0)
assert wfs(a, write=True, timeout=0)

b.send(b"x")
assert wfs(a, read=True, timeout=0)
assert wfs(a, read=True, timeout=10)
assert wfs(a, read=True, timeout=None)

# Fill up the socket with data
a.setblocking(False)
try:
while True:
a.send(b"x" * 999999)
except (OSError, socket.error):
pass

# Now it's not writable anymore
assert not wfs(a, write=True, timeout=0)

# But if we ask for read-or-write, that succeeds
assert wfs(a, read=True, write=True, timeout=0)

# Unless we read from it
assert a.recv(1) == b"x"
assert not wfs(a, read=True, write=True, timeout=0)


def test_wait_for_read_write(spair):
a, b = spair

assert not wait_for_read(a, 0)
assert wait_for_write(a, 0)

b.send(b"x")

assert wait_for_read(a, 0)
assert wait_for_write(a, 0)

# Fill up the socket with data
a.setblocking(False)
try:
while True:
a.send(b"x" * 999999)
except (OSError, socket.error):
pass

# Now it's not writable anymore
assert not wait_for_write(a, 0)


@pytest.mark.skipif(
not hasattr(signal, "setitimer"),
reason="need setitimer() support"
)
@pytest.mark.parametrize("wfs", variants)
def test_eintr(wfs, spair):
a, b = spair
interrupt_count = [0]

def handler(sig, frame):
assert sig == signal.SIGALRM
interrupt_count[0] += 1

old_handler = signal.signal(signal.SIGALRM, handler)
try:
assert not wfs(a, read=True, timeout=0)
start = monotonic()
try:
# Start delivering SIGALRM 10 times per second
signal.setitimer(signal.ITIMER_REAL, 0.1, 0.1)
# Sleep for 1 second (we hope!)
wfs(a, read=True, timeout=1)
finally:
# Stop delivering SIGALRM
signal.setitimer(signal.ITIMER_REAL, 0)
end = monotonic()
dur = end - start
assert 0.9 < dur < 3
finally:
signal.signal(signal.SIGALRM, old_handler)

assert interrupt_count[0] > 0
12 changes: 4 additions & 8 deletions urllib3/contrib/pyopenssl.py
Expand Up @@ -273,8 +273,7 @@ def recv(self, *args, **kwargs):
else:
raise
except OpenSSL.SSL.WantReadError:
rd = util.wait_for_read(self.socket, self.socket.gettimeout())
if not rd:
if not util.wait_for_read(self.socket, self.socket.gettimeout()):
raise timeout('The read operation timed out')
else:
return self.recv(*args, **kwargs)
Expand All @@ -295,8 +294,7 @@ def recv_into(self, *args, **kwargs):
else:
raise
except OpenSSL.SSL.WantReadError:
rd = util.wait_for_read(self.socket, self.socket.gettimeout())
if not rd:
if not util.wait_for_read(self.socket, self.socket.gettimeout()):
raise timeout('The read operation timed out')
else:
return self.recv_into(*args, **kwargs)
Expand All @@ -309,8 +307,7 @@ def _send_until_done(self, data):
try:
return self.connection.send(data)
except OpenSSL.SSL.WantWriteError:
wr = util.wait_for_write(self.socket, self.socket.gettimeout())
if not wr:
if not util.wait_for_write(self.socket, self.socket.gettimeout()):
raise timeout()
continue
except OpenSSL.SSL.SysCallError as e:
Expand Down Expand Up @@ -446,8 +443,7 @@ def wrap_socket(self, sock, server_side=False,
try:
cnx.do_handshake()
except OpenSSL.SSL.WantReadError:
rd = util.wait_for_read(sock, sock.gettimeout())
if not rd:
if not util.wait_for_read(sock, sock.gettimeout()):
raise timeout('select timed out')
continue
except OpenSSL.SSL.Error as e:
Expand Down
6 changes: 2 additions & 4 deletions urllib3/contrib/securetransport.py
Expand Up @@ -201,8 +201,7 @@ def _read_callback(connection_id, data_buffer, data_length_pointer):
try:
while read_count < requested_length:
if timeout is None or timeout >= 0:
readables = util.wait_for_read([base_socket], timeout)
if not readables:
if not util.wait_for_read(base_socket, timeout):
raise socket.error(errno.EAGAIN, 'timed out')

# We need to tell ctypes that we have a buffer that can be
Expand Down Expand Up @@ -257,8 +256,7 @@ def _write_callback(connection_id, data_buffer, data_length_pointer):
try:
while sent < bytes_to_write:
if timeout is None or timeout >= 0:
writables = util.wait_for_write([base_socket], timeout)
if not writables:
if not util.wait_for_write(base_socket, timeout):
raise socket.error(errno.EAGAIN, 'timed out')
chunk_sent = base_socket.send(data)
sent += chunk_sent
Expand Down
13 changes: 4 additions & 9 deletions urllib3/util/connection.py
@@ -1,7 +1,6 @@
from __future__ import absolute_import
import socket
from .wait import wait_for_read
from .selectors import HAS_SELECT, SelectorError
from .wait import HAS_WAIT_FOR_SOCKET, wait_for_read


def is_connection_dropped(conn): # Platform-specific
Expand All @@ -19,14 +18,10 @@ def is_connection_dropped(conn): # Platform-specific
return False
if sock is None: # Connection already closed (such as by httplib).
return True

if not HAS_SELECT:
if not HAS_WAIT_FOR_SOCKET: # Platform-specific: AppEngine
return False

try:
return bool(wait_for_read(sock, timeout=0.0))
except SelectorError:
return True
# Returns True if readable, which here means it's been dropped
return wait_for_read(sock, timeout=0.0)


# This function is copied from socket.py in the Python 2.7 standard
Expand Down
168 changes: 129 additions & 39 deletions urllib3/util/wait.py
@@ -1,40 +1,130 @@
from .selectors import (
HAS_SELECT,
DefaultSelector,
EVENT_READ,
EVENT_WRITE
)


def _wait_for_io_events(socks, events, timeout=None):
""" Waits for IO events to be available from a list of sockets
or optionally a single socket if passed in. Returns a list of
sockets that can be interacted with immediately. """
if not HAS_SELECT:
raise ValueError('Platform does not have a selector')
if not isinstance(socks, list):
# Probably just a single socket.
if hasattr(socks, "fileno"):
socks = [socks]
# Otherwise it might be a non-list iterable.
import errno
from functools import partial
import select
import sys
try:
from time import monotonic
except ImportError:
from time import time as monotonic

__all__ = ["HAS_WAIT_FOR_SOCKET", "wait_for_read", "wait_for_write"]

# How should we wait on sockets?
#
# There are two types of APIs you can use for waiting on sockets: the fancy
# modern stateful APIs like epoll/kqueue, and the older stateless APIs like
# select/poll. The stateful APIs are more efficient when you have a lots of
# sockets to keep track of, because you can set them up once and then use them
# lots of times. But we only ever want to wait on a single socket at a time
# and don't want to keep track of state, so the stateless APIs are actually
# more efficient. So we want to use select() or poll().
#
# Now, how do we choose between select() and poll()? On traditional Unixes,
# select() has a strange calling convention that makes it slow, or fail
# altogether, for high-numbered file descriptors. The point of poll() is to fix
# that, so on Unixes, we prefer poll().
#
# On Windows, there is no poll() (or at least Python doesn't provide a wrapper
# for it), but that's OK, because on Windows, select() doesn't have this
# strange calling convention; plain select() works fine.
#
# So: on Windows we use select(), and everywhere else we use poll().

if sys.version_info >= (3, 5):
# Modern Python, that retries syscalls by default
def _retry_on_intr(fn, timeout):
return fn(timeout)
else:
# Old and broken Pythons.
def _retry_on_intr(fn, timeout):
if timeout is not None and timeout <= 0:
return fn(timeout)

if timeout is None:
deadline = float("inf")
else:
socks = list(socks)
with DefaultSelector() as selector:
for sock in socks:
selector.register(sock, events)
return [key[0].fileobj for key in
selector.select(timeout) if key[1] & events]


def wait_for_read(socks, timeout=None):
""" Waits for reading to be available from a list of sockets
or optionally a single socket if passed in. Returns a list of
sockets that can be read from immediately. """
return _wait_for_io_events(socks, EVENT_READ, timeout)


def wait_for_write(socks, timeout=None):
""" Waits for writing to be available from a list of sockets
or optionally a single socket if passed in. Returns a list of
sockets that can be written to immediately. """
return _wait_for_io_events(socks, EVENT_WRITE, timeout)
deadline = monotonic() + timeout

while True:
try:
return fn(timeout)
# OSError for 3 <= pyver < 3.5, select.error for pyver <= 2.7
except (OSError, select.error) as e:
# 'e.args[0]' incantation works for both OSError and select.error
if e.args[0] != errno.EINTR:
raise
else:
timeout = deadline - monotonic()
if timeout == float("inf"):
timeout = None
if timeout < 0:
timeout = 0
continue


def select_wait_for_socket(sock, read=False, write=False, timeout=None):
if not read and not write:
raise RuntimeError("must specify at least one of read=True, write=True")
rcheck = []
wcheck = []
if read:
rcheck.append(sock)
if write:
wcheck.append(sock)
# When doing a non-blocking connect, most systems signal success by
# marking the socket writable. Windows, though, signals success by marked
# it as "exceptional". We paper over the difference by checking the write
# sockets for both conditions. (The stdlib selectors module does the same
# thing.)
fn = partial(select.select, rcheck, wcheck, wcheck)
rready, wready, xready = _retry_on_intr(fn, timeout)
return bool(rready or wready or xready)


def poll_wait_for_socket(sock, read=False, write=False, timeout=None):
if not read and not write:
raise RuntimeError("must specify at least one of read=True, write=True")
mask = 0
if read:
mask |= select.POLLIN
if write:
mask |= select.POLLOUT
poll_obj = select.poll()
poll_obj.register(sock, mask)

# For some reason, poll() takes timeout in milliseconds
def do_poll(t):
if t is not None:
t *= 1000
return poll_obj.poll(t)

return bool(_retry_on_intr(do_poll, timeout))


def null_wait_for_socket(*args, **kwargs):
raise RuntimeError("no select-equivalent available")


if hasattr(select, "poll"):
wait_for_socket = poll_wait_for_socket
HAS_WAIT_FOR_SOCKET = True
elif hasattr(select, "select"): # Platform-specific: Windows.
wait_for_socket = select_wait_for_socket
HAS_WAIT_FOR_SOCKET = True
else: # Platform-specific: Appengine.
wait_for_socket = null_wait_for_socket
HAS_WAIT_FOR_SOCKET = False


def wait_for_read(sock, timeout=None):
""" Waits for reading to be available on a given socket.
Returns True if the socket is readable, or False if the timeout expired.
"""
return wait_for_socket(sock, read=True, timeout=timeout)


def wait_for_write(sock, timeout=None):
""" Waits for writing to be available on a given socket.
Returns True if the socket is readable, or False if the timeout expired.
"""
return wait_for_socket(sock, write=True, timeout=timeout)

0 comments on commit 03d7753

Please sign in to comment.