Skip to content

Commit

Permalink
fix(core): use selectors to poll connections instead of raw select in…
Browse files Browse the repository at this point in the history
… threading,gevent,eventlet (#656)

Co-authored-by: lawrentwang <lawrentwang@tencent.com>

Solve the select limitation on a maximum file handler value and dynamic choose the best poller in the system.
  • Loading branch information
JetDrag committed Feb 2, 2022
1 parent f585d60 commit 4042a85
Show file tree
Hide file tree
Showing 10 changed files with 245 additions and 98 deletions.
8 changes: 5 additions & 3 deletions kazoo/handlers/eventlet.py
Expand Up @@ -5,15 +5,15 @@
import logging

import eventlet
from eventlet.green import select as green_select
from eventlet.green import socket as green_socket
from eventlet.green import time as green_time
from eventlet.green import threading as green_threading
from eventlet.green import selectors as green_selectors
from eventlet import queue as green_queue

from kazoo.handlers import utils
import kazoo.python2atexit as python2atexit

from kazoo.handlers.utils import selector_select

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -41,6 +41,7 @@ class TimeoutError(Exception):

class AsyncResult(utils.AsyncResult):
"""A one-time event that stores a value or an exception"""

def __init__(self, handler):
super(AsyncResult, self).__init__(handler,
green_threading.Condition,
Expand Down Expand Up @@ -164,7 +165,8 @@ def create_connection(self, *args, **kwargs):

def select(self, *args, **kwargs):
with _yield_before_after():
return green_select.select(*args, **kwargs)
return selector_select(*args, selectors_module=green_selectors,
**kwargs)

def async_result(self):
return AsyncResult(self)
Expand Down
9 changes: 7 additions & 2 deletions kazoo/handlers/gevent.py
Expand Up @@ -9,6 +9,10 @@
import gevent.queue
import gevent.select
import gevent.thread
import gevent.selectors

from kazoo.handlers.utils import selector_select

try:
from gevent.lock import Semaphore, RLock
except ImportError:
Expand All @@ -17,7 +21,6 @@
from kazoo.handlers import utils
from kazoo import python2atexit


_using_libevent = gevent.__version__.startswith('0.')

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -84,6 +87,7 @@ def greenlet_worker():
del func # release before possible idle
except self.queue_empty:
continue

return gevent.spawn(greenlet_worker)

def start(self):
Expand Down Expand Up @@ -122,7 +126,8 @@ def stop(self):
python2atexit.unregister(self.stop)

def select(self, *args, **kwargs):
return gevent.select.select(*args, **kwargs)
return selector_select(*args, selectors_module=gevent.selectors,
**kwargs)

def socket(self, *args, **kwargs):
return utils.create_tcp_socket(socket)
Expand Down
87 changes: 4 additions & 83 deletions kazoo/handlers/threading.py
Expand Up @@ -12,11 +12,7 @@
"""
from __future__ import absolute_import

from collections import defaultdict
import errno
from itertools import chain
import logging
import select
import socket
import threading
import time
Expand All @@ -25,20 +21,18 @@

import kazoo.python2atexit as python2atexit
from kazoo.handlers import utils
from kazoo.handlers.utils import selector_select

try:
import Queue
except ImportError: # pragma: nocover
import queue as Queue


# sentinel objects
_STOP = object()

log = logging.getLogger(__name__)

_HAS_EPOLL = hasattr(select, "epoll")


def _to_fileno(obj):
if isinstance(obj, six.integer_types):
Expand All @@ -65,6 +59,7 @@ class KazooTimeoutError(Exception):

class AsyncResult(utils.AsyncResult):
"""A one-time event that stores a value or an exception"""

def __init__(self, handler):
super(AsyncResult, self).__init__(handler,
threading.Condition,
Expand Down Expand Up @@ -133,6 +128,7 @@ def _thread_worker(): # pragma: nocover
del func # release before possible idle
except self.queue_empty:
continue

t = self.spawn(_thread_worker)
return t

Expand Down Expand Up @@ -173,82 +169,7 @@ def stop(self):
python2atexit.unregister(self.stop)

def select(self, *args, **kwargs):
# if we have epoll, and select is not expected to work
# use an epoll-based "select". Otherwise don't touch
# anything to minimize changes
if _HAS_EPOLL:
# if the highest fd we've seen is > 1023
if max(map(_to_fileno, chain.from_iterable(args[:3]))) > 1023:
return self._epoll_select(*args, **kwargs)
return self._select(*args, **kwargs)

def _select(self, *args, **kwargs):
timeout = kwargs.pop('timeout', None)
# either the time to give up, or None
end = (time.time() + timeout) if timeout else None
while end is None or time.time() < end:
if end is not None:
# make a list, since tuples aren't mutable
args = list(args)

# set the timeout to the remaining time
args[3] = end - time.time()
try:
return select.select(*args, **kwargs)
except select.error as ex:
# if the system call was interrupted, we'll retry until timeout
# in Python 3, system call interruptions are a native exception
# in Python 2, they are not
errnum = ex.errno if isinstance(ex, OSError) else ex[0]
if errnum == errno.EINTR:
continue
raise
# if we hit our timeout, lets return as a timeout
return ([], [], [])

def _epoll_select(self, rlist, wlist, xlist, timeout=None):
"""epoll-based drop-in replacement for select to overcome select
limitation on a maximum filehandle value
"""
if timeout is None:
timeout = -1
eventmasks = defaultdict(int)
rfd2obj = defaultdict(list)
wfd2obj = defaultdict(list)
xfd2obj = defaultdict(list)
read_evmask = select.EPOLLIN | select.EPOLLPRI # Just in case

def store_evmasks(obj_list, evmask, fd2obj):
for obj in obj_list:
fileno = _to_fileno(obj)
eventmasks[fileno] |= evmask
fd2obj[fileno].append(obj)

store_evmasks(rlist, read_evmask, rfd2obj)
store_evmasks(wlist, select.EPOLLOUT, wfd2obj)
store_evmasks(xlist, select.EPOLLERR, xfd2obj)

poller = select.epoll()

for fileno in eventmasks:
poller.register(fileno, eventmasks[fileno])

try:
events = poller.poll(timeout)
revents = []
wevents = []
xevents = []
for fileno, event in events:
if event & read_evmask:
revents += rfd2obj.get(fileno, [])
if event & select.EPOLLOUT:
wevents += wfd2obj.get(fileno, [])
if event & select.EPOLLERR:
xevents += xfd2obj.get(fileno, [])
finally:
poller.close()

return revents, wevents, xevents
return selector_select(*args, **kwargs)

def socket(self):
return utils.create_tcp_socket(socket)
Expand Down
89 changes: 89 additions & 0 deletions kazoo/handlers/utils.py
Expand Up @@ -2,11 +2,21 @@

import errno
import functools
import os
import select
import ssl
import socket
import time

from collections import defaultdict

import six

if six.PY34:
import selectors
else:
import selectors2 as selectors

HAS_FNCTL = True
try:
import fcntl
Expand All @@ -19,6 +29,7 @@

class AsyncResult(object):
"""A one-time event that stores a value or an exception"""

def __init__(self, handler, condition_factory, timeout_factory):
self._handler = handler
self._exception = _NONE
Expand Down Expand Up @@ -126,6 +137,7 @@ def _do_callbacks(self):
else:
functools.partial(callback, self)()


def _set_fd_cloexec(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
Expand Down Expand Up @@ -272,14 +284,17 @@ def capture_exceptions(async_result):
:param async_result: An async result implementing :class:`IAsyncResult`
"""

def capture(function):
@functools.wraps(function)
def captured_function(*args, **kwargs):
try:
return function(*args, **kwargs)
except Exception as exc:
async_result.set_exception(exc)

return captured_function

return capture


Expand All @@ -291,12 +306,86 @@ def wrap(async_result):
:param async_result: An async result implementing :class:`IAsyncResult`
"""

def capture(function):
@capture_exceptions(async_result)
def captured_function(*args, **kwargs):
value = function(*args, **kwargs)
if value is not None:
async_result.set(value)
return value

return captured_function

return capture


def fileobj_to_fd(fileobj):
"""Return a file descriptor from a file object.
Parameters:
fileobj -- file object or file descriptor
Returns:
corresponding file descriptor
Raises:
TypeError if the object is invalid
"""
if isinstance(fileobj, int):
fd = fileobj
else:
try:
fd = int(fileobj.fileno())
except (AttributeError, TypeError, ValueError):
raise TypeError("Invalid file object: "
"{!r}".format(fileobj))
if fd < 0:
raise TypeError("Invalid file descriptor: {}".format(fd))
os.fstat(fd)
return fd


def selector_select(rlist, wlist, xlist, timeout=None,
selectors_module=selectors):
"""Selector-based drop-in replacement for select to overcome select
limitation on a maximum filehandle value.
Need backport selectors2 package in python 2.
"""
if timeout is not None:
if not (isinstance(timeout, six.integer_types) or isinstance(
timeout, float)):
raise TypeError('timeout must be a number')
if timeout < 0:
raise ValueError('timeout must be non-negative')

events_mapping = {selectors_module.EVENT_READ: rlist,
selectors_module.EVENT_WRITE: wlist}
fd_events = defaultdict(int)
fd_fileobjs = defaultdict(list)

for event, fileobjs in events_mapping.items():
for fileobj in fileobjs:
fd = fileobj_to_fd(fileobj)
fd_events[fd] |= event
fd_fileobjs[fd].append(fileobj)

selector = selectors_module.DefaultSelector()
for fd, events in fd_events.items():
selector.register(fd, events)

revents, wevents, xevents = [], [], []
try:
ready = selector.select(timeout)
finally:
selector.close()

for info in ready:
k, events = info
if events & selectors.EVENT_READ:
revents.extend(fd_fileobjs[k.fd])
elif events & selectors.EVENT_WRITE:
wevents.extend(fd_fileobjs[k.fd])

return revents, wevents, xevents
22 changes: 22 additions & 0 deletions kazoo/tests/test_eventlet_handler.py
Expand Up @@ -130,6 +130,28 @@ def broken():
with pytest.raises(IOError):
r.get()

def test_huge_file_descriptor(self):
import resource
from eventlet.green import socket
from kazoo.handlers.utils import create_tcp_socket

try:
resource.setrlimit(resource.RLIMIT_NOFILE, (4096, 4096))
except (ValueError, resource.error):
self.skipTest('couldnt raise fd limit high enough')
fd = 0
socks = []
while fd < 4000:
sock = create_tcp_socket(socket)
fd = sock.fileno()
socks.append(sock)
with start_stop_one() as h:
h.start()
h.select(socks, [], [], 0)
h.stop()
for sock in socks:
sock.close()


class TestEventletClient(test_client.TestClient):
def setUp(self):
Expand Down

0 comments on commit 4042a85

Please sign in to comment.