Skip to content

Commit

Permalink
mitogen: Refactor Poller classes and Latch.poller_class selection
Browse files Browse the repository at this point in the history
This
- Clarifies and corrects docstrings and comments based on investigation for mitogen-hq#957
- Removes unused `Poller*._repr` attributes
- De-duplicates data lookups in `Poller._poll()` et al
- Introduces `mitogen.parent.POLLERS` & `mitogen.parent.POLLER_LEIGHTWEIGHT`

```
@@ -1,7 +1,7 @@
 SSH command size: 759
-Bootstrap (mitogen.core) size: 17862 (17.44KiB)
+Bootstrap (mitogen.core) size: 17899 (17.48KiB)

                               Original          Minimized           Compressed
-mitogen.parent            98171 95.9KiB  50569 49.4KiB 51.5%  12742 12.4KiB 13.0%
+mitogen.parent            96988 94.7KiB  49853 48.7KiB 51.4%  12708 12.4KiB 13.1%
 mitogen.fork               8436  8.2KiB   4130  4.0KiB 49.0%   1648  1.6KiB 19.5%
 mitogen.ssh               10892 10.6KiB   6952  6.8KiB 63.8%   2113  2.1KiB 19.4%
```
  • Loading branch information
moreati committed Apr 18, 2024
1 parent bb9c51b commit 6c68394
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 88 deletions.
41 changes: 15 additions & 26 deletions mitogen/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2523,8 +2523,7 @@ class Poller(object):
"""
A poller manages OS file descriptors the user is waiting to become
available for IO. The :meth:`poll` method blocks the calling thread
until one or more become ready. The default implementation is based on
:func:`select.poll`.
until one or more become ready.
Each descriptor has an associated `data` element, which is unique for each
readiness type, and defaults to being the same as the file descriptor. The
Expand All @@ -2546,19 +2545,13 @@ class Poller(object):
a resource leak.
Pollers may only be used by one thread at a time.
This implementation uses :func:`select.select` for wider platform support.
That is considered an implementation detail. Previous versions have used
:func:`select.poll`. Future versions may decide at runtime.
"""
SUPPORTED = True

# This changed from select() to poll() in Mitogen 0.2.4. Since poll() has
# no upper FD limit, it is suitable for use with Latch, which must handle
# FDs larger than select's limit during many-host runs. We want this
# because poll() requires no setup and teardown: just a single system call,
# which is important because Latch.get() creates a Poller on each
# invocation. In a microbenchmark, poll() vs. epoll_ctl() is 30% faster in
# this scenario. If select() must return in future, it is important
# Latch.poller_class is set from parent.py to point to the industrial
# strength poller for the OS, otherwise Latch will fail randomly.

#: Increments on every poll(). Used to version _rfds and _wfds.
_generation = 1

Expand Down Expand Up @@ -2637,16 +2630,13 @@ def _poll(self, timeout):
self._wfds,
(), timeout
)
return self._data(((fd, self._rfds, 'IN') for fd in rfds),
((fd, self._wfds, 'OUT') for fd in wfds))

for fd in rfds:
_vv and IOLOG.debug('%r: POLLIN for %r', self, fd)
data, gen = self._rfds.get(fd, (None, None))
if gen and gen < self._generation:
yield data

for fd in wfds:
_vv and IOLOG.debug('%r: POLLOUT for %r', self, fd)
data, gen = self._wfds.get(fd, (None, None))
def _data(self, *iters):
for fd, fds, label in itertools.chain(*iters):
_vv and IOLOG.debug('%r: %s for %r', self, label, fd)
data, gen = fds.get(fd, (None, None))
if gen and gen < self._generation:
yield data

Expand Down Expand Up @@ -2681,11 +2671,10 @@ class Latch(object):
See :ref:`waking-sleeping-threads` for further discussion.
"""
#: The :class:`Poller` implementation to use for waiting. Since the poller
#: will be very short-lived, we prefer :class:`mitogen.parent.PollPoller`
#: if it is available, or :class:`mitogen.core.Poller` otherwise, since
#: these implementations require no system calls to create, configure or
#: destroy.
#: The :class:`Poller` implementation to use. Instances are short lived so
#: prefer :class:`mitogen.parent.PollPoller` if it's available, otherwise
#: :class:`mitogen.core.Poller`. They don't need syscalls to create,
#: configure, or destroy. Replaced during import of :mod:`mitogen.parent`.
poller_class = Poller

#: If not :data:`None`, a function invoked as `notify(latch)` after a
Expand Down
97 changes: 35 additions & 62 deletions mitogen/parent.py
Original file line number Diff line number Diff line change
Expand Up @@ -745,8 +745,7 @@ def _upgrade_broker(broker):
broker.timers = TimerList()
LOG.debug('upgraded %r with %r (new: %d readers, %d writers; '
'old: %d readers, %d writers)', old, new,
len(new.readers), len(new.writers),
len(old.readers), len(old.writers))
len(new._rfds), len(new._wfds), len(old._rfds), len(old._wfds))


@mitogen.core.takes_econtext
Expand Down Expand Up @@ -902,22 +901,18 @@ def __repr__(self):
class PollPoller(mitogen.core.Poller):
"""
Poller based on the POSIX :linux:man2:`poll` interface. Not available on
some versions of OS X, otherwise it is the preferred poller for small FD
counts, as there is no setup/teardown/configuration system call overhead.
some Python/OS X combinations. Otherwise the preferred poller for small
FD counts; or if many pollers are created, used once, then closed.
There there is no setup/teardown/configuration system call overhead.
"""
SUPPORTED = hasattr(select, 'poll')
_repr = 'PollPoller()'
_readmask = SUPPORTED and select.POLLIN | select.POLLHUP

def __init__(self):
super(PollPoller, self).__init__()
self._pollobj = select.poll()

# TODO: no proof we dont need writemask too
_readmask = (
getattr(select, 'POLLIN', 0) |
getattr(select, 'POLLHUP', 0)
)

def _update(self, fd):
mask = (((fd in self._rfds) and self._readmask) |
((fd in self._wfds) and select.POLLOUT))
Expand All @@ -932,32 +927,29 @@ def _update(self, fd):
def _poll(self, timeout):
if timeout:
timeout *= 1000

events, _ = mitogen.core.io_op(self._pollobj.poll, timeout)
for fd, event in events:
if event & self._readmask:
IOLOG.debug('%r: POLLIN|POLLHUP for %r', self, fd)
data, gen = self._rfds.get(fd, (None, None))
if gen and gen < self._generation:
yield data
if event & select.POLLOUT:
IOLOG.debug('%r: POLLOUT for %r', self, fd)
data, gen = self._wfds.get(fd, (None, None))
if gen and gen < self._generation:
yield data
return self._data(self._items(events))

def _items(self, events):
for fd, mask in events:
if mask & self._readmask: yield fd, self._rfds, 'IN|HUP'
if mask & select.POLLOUT: yield fd, self._wfds, 'OUT'


class KqueuePoller(mitogen.core.Poller):
"""
Poller based on the FreeBSD/Darwin :freebsd:man2:`kqueue` interface.
"""
SUPPORTED = hasattr(select, 'kqueue')
_repr = 'KqueuePoller()'

def __init__(self):
super(KqueuePoller, self).__init__()
self._kqueue = select.kqueue()
self._changelist = []
self._kq_filters = {
select.KQ_FILTER_READ: (self._rfds, 'IN'),
select.KQ_FILTER_WRITE: (self._wfds, 'OUT'),
}

def close(self):
super(KqueuePoller, self).close()
Expand Down Expand Up @@ -1007,30 +999,25 @@ def _poll(self, timeout):
self._changelist = []
events, _ = mitogen.core.io_op(self._kqueue.control,
changelist, 32, timeout)
return self._data(self._items(events))

def _items(self, events):
for event in events:
fd = event.ident
if event.flags & select.KQ_EV_ERROR:
LOG.debug('ignoring stale event for fd %r: errno=%d: %s',
fd, event.data, errno.errorcode.get(event.data))
elif event.filter == select.KQ_FILTER_READ:
data, gen = self._rfds.get(fd, (None, None))
# Events can still be read for an already-discarded fd.
if gen and gen < self._generation:
mitogen.core._vv and IOLOG.debug('%r: POLLIN: %r', self, fd)
yield data
elif event.filter == select.KQ_FILTER_WRITE and fd in self._wfds:
data, gen = self._wfds.get(fd, (None, None))
if gen and gen < self._generation:
mitogen.core._vv and IOLOG.debug('%r: POLLOUT: %r', self, fd)
yield data
continue
fds_lookup, label = self._kq_filters[event.filter]
yield fd, fds_lookup, label


class EpollPoller(mitogen.core.Poller):
"""
Poller based on the Linux :linux:man7:`epoll` interface.
"""
SUPPORTED = hasattr(select, 'epoll')
_repr = 'EpollPoller()'
_inmask = SUPPORTED and select.EPOLLIN | select.EPOLLHUP

def __init__(self):
super(EpollPoller, self).__init__()
Expand Down Expand Up @@ -1077,41 +1064,27 @@ def stop_transmit(self, fd):
self._wfds.pop(fd, None)
self._control(fd)

_inmask = (getattr(select, 'EPOLLIN', 0) |
getattr(select, 'EPOLLHUP', 0))

def _poll(self, timeout):
the_timeout = -1
if timeout is not None:
the_timeout = timeout

events, _ = mitogen.core.io_op(self._epoll.poll, the_timeout, 32)
for fd, event in events:
if event & self._inmask:
data, gen = self._rfds.get(fd, (None, None))
if gen and gen < self._generation:
# Events can still be read for an already-discarded fd.
mitogen.core._vv and IOLOG.debug('%r: POLLIN: %r', self, fd)
yield data
if event & select.EPOLLOUT:
data, gen = self._wfds.get(fd, (None, None))
if gen and gen < self._generation:
mitogen.core._vv and IOLOG.debug('%r: POLLOUT: %r', self, fd)
yield data


# 2.4 and 2.5 only had select.select() and select.poll().
for _klass in mitogen.core.Poller, PollPoller, KqueuePoller, EpollPoller:
if _klass.SUPPORTED:
PREFERRED_POLLER = _klass
return self._data(self._items(events))

def _items(self, events):
for fd, mask in events:
if mask & self._inmask: yield fd, self._rfds, 'IN'
if mask & select.EPOLLOUT: yield fd, self._wfds, 'OUT'


POLLERS = (EpollPoller, KqueuePoller, PollPoller, mitogen.core.Poller)
PREFERRED_POLLER = next(cls for cls in POLLERS if cls.SUPPORTED)


# For processes that start many threads or connections, it's possible Latch
# will also get high-numbered FDs, and so select() becomes useless there too.
# So swap in our favourite poller.
if PollPoller.SUPPORTED:
mitogen.core.Latch.poller_class = PollPoller
else:
mitogen.core.Latch.poller_class = PREFERRED_POLLER
POLLER_LIGHTWEIGHT = PollPoller if PollPoller.SUPPORTED else PREFERRED_POLLER
mitogen.core.Latch.poller_class = POLLER_LIGHTWEIGHT


class LineLoggingProtocolMixin(object):
Expand Down

0 comments on commit 6c68394

Please sign in to comment.