Skip to content

Commit

Permalink
issue #542: return of select poller, new selection logic
Browse files Browse the repository at this point in the history
  • Loading branch information
dw committed Feb 14, 2019
1 parent 0aa4c9d commit 9bcd2ec
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 44 deletions.
54 changes: 26 additions & 28 deletions mitogen/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1912,6 +1912,8 @@ class Poller(object):
Pollers may only be used by one thread at a time.
"""
SUPPORTED = True

# This changed from select() to poll() in Mitogen 0.2.4. Since poll() has
# no upper FD limit, it is suitable for use with Latch, which must handle
# FDs larger than select's limit during many-host runs. We want this
Expand All @@ -1928,11 +1930,16 @@ class Poller(object):
def __init__(self):
self._rfds = {}
self._wfds = {}
self._pollobj = select.poll()

def __repr__(self):
return '%s(%#x)' % (type(self).__name__, id(self))

def _update(self, fd):
"""
Required by PollPoller subclass.
"""
pass

@property
def readers(self):
"""
Expand All @@ -1955,20 +1962,6 @@ def close(self):
"""
pass

_readmask = select.POLLIN | select.POLLHUP
# TODO: no proof we dont need writemask too

def _update(self, fd):
mask = (((fd in self._rfds) and self._readmask) |
((fd in self._wfds) and select.POLLOUT))
if mask:
self._pollobj.register(fd, mask)
else:
try:
self._pollobj.unregister(fd)
except KeyError:
pass

def start_receive(self, fd, data=None):
"""
Cause :meth:`poll` to yield `data` when `fd` is readable.
Expand Down Expand Up @@ -2004,22 +1997,27 @@ def stop_transmit(self, fd):
self._update(fd)

def _poll(self, timeout):
(rfds, wfds, _), _ = io_op(select.select,
self._rfds,
self._wfds,
(), timeout
)

for fd in rfds:
_vv and IOLOG.debug('%r: POLLIN for %r', self, fd)
data, gen = self._rfds.get(fd, (None, None))
if gen and gen < self._generation:
yield data

for fd in wfds:
_vv and IOLOG.debug('%r: POLLOUT for %r', self, fd)
data, gen = self._wfds.get(fd, (None, None))
if gen and gen < self._generation:
yield data

if timeout:
timeout *= 1000

events, _ = io_op(self._pollobj.poll, timeout)
for fd, event in events:
if event & self._readmask:
_vv and IOLOG.debug('%r: POLLIN|POLLHUP for %r', self, fd)
data, gen = self._rfds.get(fd, (None, None))
if gen and gen < self._generation:
yield data
if event & select.POLLOUT:
_vv and IOLOG.debug('%r: POLLOUT for %r', self, fd)
data, gen = self._wfds.get(fd, (None, None))
if gen and gen < self._generation:
yield data

def poll(self, timeout=None):
"""
Block the calling thread until one or more FDs are ready for IO.
Expand Down
73 changes: 60 additions & 13 deletions mitogen/parent.py
Original file line number Diff line number Diff line change
Expand Up @@ -890,10 +890,58 @@ def __repr__(self):
)


class PollPoller(mitogen.core.Poller):
"""
Poller based on the POSIX poll(2) interface. Not available on some versions
of OS X, otherwise it is the preferred poller for small FD counts.
"""
SUPPORTED = hasattr(select, 'poll')
_repr = 'PollPoller()'

def __init__(self):
super(PollPoller, self).__init__()
self._pollobj = select.poll()

# TODO: no proof we dont need writemask too
_readmask = (
getattr(select, 'POLLIN', 0) |
getattr(select, 'POLLHUP', 0)
)

def _update(self, fd):
mask = (((fd in self._rfds) and self._readmask) |
((fd in self._wfds) and select.POLLOUT))
if mask:
self._pollobj.register(fd, mask)
else:
try:
self._pollobj.unregister(fd)
except KeyError:
pass

def _poll(self, timeout):
if timeout:
timeout *= 1000

events, _ = mitogen.core.io_op(self._pollobj.poll, timeout)
for fd, event in events:
if event & self._readmask:
IOLOG.debug('%r: POLLIN|POLLHUP for %r', self, fd)
data, gen = self._rfds.get(fd, (None, None))
if gen and gen < self._generation:
yield data
if event & select.POLLOUT:
IOLOG.debug('%r: POLLOUT for %r', self, fd)
data, gen = self._wfds.get(fd, (None, None))
if gen and gen < self._generation:
yield data


class KqueuePoller(mitogen.core.Poller):
"""
Poller based on the FreeBSD/Darwin kqueue(2) interface.
"""
SUPPORTED = hasattr(select, 'kqueue')
_repr = 'KqueuePoller()'

def __init__(self):
Expand Down Expand Up @@ -971,6 +1019,7 @@ class EpollPoller(mitogen.core.Poller):
"""
Poller based on the Linux epoll(2) interface.
"""
SUPPORTED = hasattr(select, 'epoll')
_repr = 'EpollPoller()'

def __init__(self):
Expand Down Expand Up @@ -1041,20 +1090,18 @@ def _poll(self, timeout):
yield data


if sys.version_info < (2, 6):
# 2.4 and 2.5 only had select.select() and select.poll().
POLLER_BY_SYSNAME = {}
else:
POLLER_BY_SYSNAME = {
'Darwin': KqueuePoller,
'FreeBSD': KqueuePoller,
'Linux': EpollPoller,
}
# 2.4 and 2.5 only had select.select() and select.poll().
for _klass in mitogen.core.Poller, PollPoller, KqueuePoller, EpollPoller:
if _klass.SUPPORTED:
PREFERRED_POLLER = _klass

PREFERRED_POLLER = POLLER_BY_SYSNAME.get(
os.uname()[0],
mitogen.core.Poller,
)
# For apps that start threads dynamically, it's possible Latch will also get
# very high-numbered wait fds when there are many connections, and so select()
# becomes useless there too. So swap in our favourite poller.
if PollPoller.SUPPORTED:
mitogen.core.Latch.poller_class = PollPoller
else:
mitogen.core.Latch.poller_class = PREFERRED_POLLER


class DiagLogStream(mitogen.core.BasicStream):
Expand Down
15 changes: 12 additions & 3 deletions tests/poller_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,16 +401,25 @@ class SelectTest(AllMixin, testlib.TestCase):
klass = mitogen.core.Poller

SelectTest = unittest2.skipIf(
condition=not hasattr(select, 'select'),
condition=(not SelectTest.klass.SUPPORTED),
reason='select.select() not supported'
)(SelectTest)


class PollTest(AllMixin, testlib.TestCase):
klass = mitogen.parent.PollPoller

PollTest = unittest2.skipIf(
condition=(not PollTest.klass.SUPPORTED),
reason='select.poll() not supported'
)(PollTest)


class KqueueTest(AllMixin, testlib.TestCase):
klass = mitogen.parent.KqueuePoller

KqueueTest = unittest2.skipIf(
condition=not hasattr(select, 'kqueue'),
condition=(not KqueueTest.klass.SUPPORTED),
reason='select.kqueue() not supported'
)(KqueueTest)

Expand All @@ -419,7 +428,7 @@ class EpollTest(AllMixin, testlib.TestCase):
klass = mitogen.parent.EpollPoller

EpollTest = unittest2.skipIf(
condition=not hasattr(select, 'epoll'),
condition=(not EpollTest.klass.SUPPORTED),
reason='select.epoll() not supported'
)(EpollTest)

Expand Down

0 comments on commit 9bcd2ec

Please sign in to comment.