Skip to content

Commit

Permalink
feat(core): use epoll when available to support fds > 1023
Browse files Browse the repository at this point in the history
When epoll is available, and the highest fd in use is > 1023, route through epoll.
Otherwise, use the existing select() behavior so by and large nothing changes.

Closes #266, #171
  • Loading branch information
packysauce committed Jun 4, 2017
1 parent 95b2185 commit 267e61b
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 2 deletions.
82 changes: 80 additions & 2 deletions kazoo/handlers/threading.py
Expand Up @@ -18,6 +18,10 @@
import socket
import threading
import time
import six

from collections import defaultdict
from itertools import chain

import kazoo.python2atexit as python2atexit

Expand All @@ -33,6 +37,27 @@

log = logging.getLogger(__name__)

_HAS_EPOLL = hasattr(select, "epoll")


def _to_fileno(obj):
if isinstance(obj, six.integer_types):
fd = int(obj)
elif hasattr(obj, "fileno"):
fd = obj.fileno()
if not isinstance(fd, six.integer_types):
raise TypeError("fileno() returned a non-integer")
fd = int(fd)
else:
raise TypeError("argument must be an int, or have a fileno() method.")

if fd < 0:
raise ValueError(
"file descriptor cannot be a negative integer (%d)" % (fd,)
)

return fd


class KazooTimeoutError(Exception):
pass
Expand Down Expand Up @@ -143,8 +168,17 @@ def stop(self):
python2atexit.unregister(self.stop)

def select(self, *args, **kwargs):
# select() takes no kwargs, so it will be in args
timeout = args[3] if len(args) == 4 else None
# if we have epoll, and select is not expected to work
# use an epoll-based "select". Otherwise don't touch
# anything to minimize changes
if _HAS_EPOLL:
# if the highest fd we've seen is > 1023
if max(map(_to_fileno, chain(*args[:3]))) > 1023:
return self._epoll_select(*args, **kwargs)
return self._select(*args, **kwargs)

def _select(self, *args, **kwargs):
timeout = kwargs.pop('timeout', None)
# either the time to give up, or None
end = (time.time() + timeout) if timeout else None
while end is None or time.time() < end:
Expand All @@ -167,6 +201,50 @@ def select(self, *args, **kwargs):
# if we hit our timeout, lets return as a timeout
return ([], [], [])

def _epoll_select(self, rlist, wlist, xlist, timeout=None):
"""epoll-based drop-in replacement for select to overcome select
limitation on a maximum filehandle value
"""
if timeout is None:
timeout = -1
eventmasks = defaultdict(int)
rfd2obj = defaultdict(list)
wfd2obj = defaultdict(list)
xfd2obj = defaultdict(list)
read_evmask = select.EPOLLIN | select.EPOLLPRI # Just in case

def store_evmasks(obj_list, evmask, fd2obj):
for obj in obj_list:
fileno = _to_fileno(obj)
eventmasks[fileno] |= evmask
fd2obj[fileno].append(obj)

store_evmasks(rlist, read_evmask, rfd2obj)
store_evmasks(wlist, select.EPOLLOUT, wfd2obj)
store_evmasks(xlist, select.EPOLLERR, xfd2obj)

poller = select.epoll()

for fileno in eventmasks:
poller.register(fileno, eventmasks[fileno])

try:
events = poller.poll(timeout)
revents = []
wevents = []
xevents = []
for fileno, event in events:
if event & read_evmask:
revents += rfd2obj.get(fileno, [])
if event & select.EPOLLOUT:
wevents += wfd2obj.get(fileno, [])
if event & select.EPOLLERR:
xevents += xfd2obj.get(fileno, [])
finally:
poller.close()

return revents, wevents, xevents

def socket(self):
return utils.create_tcp_socket(socket)

Expand Down
25 changes: 25 additions & 0 deletions kazoo/tests/test_threading_handler.py
Expand Up @@ -46,6 +46,31 @@ def test_double_start_stop(self):
h.stop()
self.assertFalse(h._running)

def test_huge_file_descriptor(self):
from kazoo.handlers.threading import _HAS_EPOLL
if not _HAS_EPOLL:
self.skipTest('only run on systems with epoll()')
import resource
import socket
from kazoo.handlers.utils import create_tcp_socket
try:
resource.setrlimit(resource.RLIMIT_NOFILE, (4096, 4096))
except (ValueError, resource.error):
self.skipTest('couldnt raise fd limit high enough')
fd = 0
socks = []
while fd < 4000:
sock = create_tcp_socket(socket)
fd = sock.fileno()
socks.append(sock)
h = self._makeOne()
h.start()
h.select(socks, [], [])
with self.assertRaises(ValueError):
h._select(socks, [], [])
h._epoll_select(socks, [], [])
h.stop()


class TestThreadingAsync(unittest.TestCase):
def _makeOne(self, *args):
Expand Down

0 comments on commit 267e61b

Please sign in to comment.