Skip to content

Commit

Permalink
Merge pull request #559 from wjps/select-connection-ioloop
Browse files Browse the repository at this point in the history
Make SelectConnection behave like an ioloop - bug fix
  • Loading branch information
gmr committed May 6, 2015
2 parents 062ecbc + 9414153 commit 508fcbe
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 19 deletions.
19 changes: 12 additions & 7 deletions pika/adapters/select_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ def __getattr__(self, attr):

def _get_poller(self):
"""Determine the best poller to use for this enviroment."""


poller = None

if hasattr(select, 'epoll'):
if not SELECT_TYPE or SELECT_TYPE == 'epoll':
LOGGER.debug('Using EPollPoller')
Expand Down Expand Up @@ -163,7 +165,7 @@ def get_interrupt_pair(self):
try:
return socket.socketpair()

except:
except NameError:

This comment has been minimized.

Copy link
@analytik

analytik May 14, 2015

This is possibly wrong. On Python 3.4.1 on Windows, I'm getting an AttributeError - 'module' object has no attribute 'socketpair', not a NameError.

This comment has been minimized.

Copy link
@vitaly-krugl

vitaly-krugl May 14, 2015

Member

@analytik, you're correct - it shoud be AttributeError. Would you mind submitting a pull request with the fix? Thanks!

LOGGER.debug("Using custom socketpair for interrupt")
read_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
read_sock.setblocking(0)
Expand Down Expand Up @@ -307,12 +309,16 @@ def stop(self):
self._stopping = True

try:
# Send byte to interrupt the poll loop
self._w_interrupt.send('X')
# Send byte to interrupt the poll loop, use write() for consitency.
os.write(self._w_interrupt.fileno(), 'X')
except OSError as err:
if err.errno != errno.EWOULDBLOCK:
raise
except Exception as err:
# There's nothing sensible to do here, we'll exit the interrupt
# loop after POLL_TIMEOUT secs in worst case anyway.
LOGGER.warning("Failed to send ioloop interrupt: %s", err)
raise

def poll(self, write_only=False):
"""Wait for events on interested filedescriptors.
Expand Down Expand Up @@ -367,11 +373,11 @@ def _process_fd_events(self, fd_event_map, write_only):
handler = self._fd_handlers[fileno]
handler(fileno, events, write_only=write_only)

def flush_outbound(self):
def _flush_outbound(self):
"""Call the state manager who will figure out that we need to write
then call the poller's poll function to force it to process events.
"""
self._()
self._manage_event_state()
# Force our poller to come up for air, but in write only mode
# write only mode prevents messages from coming in and kicking off
# events through the consumer
Expand Down Expand Up @@ -490,7 +496,6 @@ def add_handler(self, fileno, handler, events):
:param int events: The events to look for
"""
LOGGER.info("registering file %s", fileno)
self._poll.register(fileno, events)
super(PollPoller, self).add_handler(fileno, handler, events)

Expand Down
84 changes: 72 additions & 12 deletions tests/unit/select_connection_ioloop_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,18 @@
from pika.adapters.select_connection import READ, WRITE, ERROR
from functools import partial


class IOLoopBaseTest(unittest.TestCase):


SELECT_POLLER=None
TIMEOUT = 1.0
def setUp(self):

def setUp(self):
select_connection.SELECT_TYPE = self.SELECT_POLLER
self.ioloop = select_connection.IOLoop()

def tearDown(self):
self.ioloop.remove_timeout(self.fail_timer)
self.ioloop = None

def start(self):
self.fail_timer = self.ioloop.add_timeout(self.TIMEOUT, self.on_timeout)
Expand All @@ -40,20 +43,31 @@ def on_timeout(self):
self.ioloop.stop()
raise AssertionError('Test timed out')

class IOLoopThreadStopTest(IOLoopBaseTest):

class IOLoopThreadStopTestSelect(IOLoopBaseTest):
SELECT_POLLER='select'
def start_test(self):
t = threading.Timer(0.1, self.ioloop.stop)
t.start()
self.start()

class IOLoopThreadStopTestPoll(IOLoopThreadStopTestSelect):
SELECT_POLLER='poll'

class IOLoopThreadStopTestEPoll(IOLoopThreadStopTestSelect):
SELECT_POLLER='epoll'

class IOLoopThreadStopTestKqueue(IOLoopThreadStopTestSelect):
SELECT_POLLER='kqueue'

class IOLoopTimerTest(IOLoopBaseTest):

class IOLoopTimerTestSelect(IOLoopBaseTest):
""" Set a bunch of very short timers to fire in reverse order and check
that they fire in order of time, not
"""
NUM_TIMERS = 5
TIMER_INTERVAL = 0.02
SELECT_POLLER='select'

def set_timers(self):
self.timer_stack = list()
Expand All @@ -71,17 +85,37 @@ def on_timer(self, val):
if not self.timer_stack:
self.ioloop.stop()

class IOLoopSleepTimerTest(IOLoopTimerTest):
class IOLoopTimerTestPoll(IOLoopTimerTestSelect):
SELECT_POLLER='poll'

class IOLoopTimerTestEPoll(IOLoopTimerTestSelect):
SELECT_POLLER='epoll'

class IOLoopTimerTestKqueue(IOLoopTimerTestSelect):
SELECT_POLLER='kqueue'


class IOLoopSleepTimerTestSelect(IOLoopTimerTestSelect):
"""Sleep until all the timers should have passed and check they still
fire in deadline order"""

def start_test(self):
self.set_timers()
time.sleep(self.NUM_TIMERS * self.TIMER_INTERVAL)
self.start()

class IOLoopSocketBase(IOLoopBaseTest):
class IOLoopSleepTimerTestPoll(IOLoopSleepTimerTestSelect):
SELECT_POLLER='poll'

class IOLoopSleepTimerTestEPoll(IOLoopSleepTimerTestSelect):
SELECT_POLLER='epoll'

class IOLoopSleepTimerTestKqueue(IOLoopSleepTimerTestSelect):
SELECT_POLLER='kqueue'

class IOLoopSocketBaseSelect(IOLoopBaseTest):

SELECT_POLLER='select'
READ_SIZE = 1024

def save_sock(self, sock):
Expand All @@ -90,10 +124,17 @@ def save_sock(self, sock):
return fd

def setUp(self):
super(IOLoopSocketBase, self).setUp()
super(IOLoopSocketBaseSelect, self).setUp()
self.sock_map = dict()
self.create_accept_socket()

def tearDown(self):
for fd in self.sock_map:
self.ioloop.remove_handler(fd)
self.sock_map[fd].close()
super(IOLoopSocketBaseSelect, self).tearDown()


def create_accept_socket(self):
listen_sock = socket.socket()
listen_sock.setblocking(0)
Expand Down Expand Up @@ -135,18 +176,27 @@ def on_timeout(self):
self.ioloop.stop()
raise AssertionError('Test timed out')

class IOLoopSocketBasePoll(IOLoopSocketBaseSelect):
SELECT_POLLER='poll'

class IOLoopSocketBaseEPoll(IOLoopSocketBaseSelect):
SELECT_POLLER='epoll'

class IOLoopSimpleMessageTestCase(IOLoopSocketBase):
class IOLoopSocketBaseKqueue(IOLoopSocketBaseSelect):
SELECT_POLLER='kqueue'


class IOLoopSimpleMessageTestCaseSelect(IOLoopSocketBaseSelect):

def start(self):
self.create_write_socket(self.connected)
super(IOLoopSimpleMessageTestCase, self).start()
super(IOLoopSimpleMessageTestCaseSelect, self).start()

def connected(self, fd, events, write_only):
self.assertEqual(events, WRITE)
logging.debug("Writing to %d message: %s", fd, 'X')
os.write(fd, 'X')
self.ioloop.remove_handler(fd)
self.ioloop.update_handler(fd, 0)

def verify_message(self, msg):
self.assertEqual(msg, 'X')
Expand All @@ -155,3 +205,13 @@ def verify_message(self, msg):
def start_test(self):
self.start()

class IOLoopSimpleMessageTestCasetPoll(IOLoopSimpleMessageTestCaseSelect):
SELECT_POLLER='poll'

class IOLoopSimpleMessageTestCasetEPoll(IOLoopSimpleMessageTestCaseSelect):
SELECT_POLLER='epoll'

class IOLoopSimpleMessageTestCasetKqueue(IOLoopSimpleMessageTestCaseSelect):
SELECT_POLLER='kqueue'


0 comments on commit 508fcbe

Please sign in to comment.