Skip to content

Commit

Permalink
Merge pull request #568 from vitaly-krugl/fix-broken-KQueuePoller-con…
Browse files Browse the repository at this point in the history
…structor

Fix KQueuePoller after changes in PR #555
  • Loading branch information
gmr committed May 7, 2015
2 parents 7ed6dcc + 287be36 commit 8dc5191
Showing 1 changed file with 19 additions and 19 deletions.
38 changes: 19 additions & 19 deletions pika/adapters/select_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class IOLoop(object):
Passes through all operations to the loaded poller object.
"""

def __init__(self):
self._poller = self._get_poller()

Expand All @@ -112,22 +112,22 @@ def _get_poller(self):
if not SELECT_TYPE or SELECT_TYPE == 'epoll':
LOGGER.debug('Using EPollPoller')
poller = EPollPoller()

if not poller and hasattr(select, 'kqueue'):
if not SELECT_TYPE or SELECT_TYPE == 'kqueue':
LOGGER.debug('Using KQueuePoller')
poller = KQueuePoller()

if not poller and hasattr(select, 'poll') and hasattr(select.poll(),
'modify'):
if not SELECT_TYPE or SELECT_TYPE == 'poll':
LOGGER.debug('Using PollPoller')
poller = PollPoller()

if not poller:
LOGGER.debug('Using SelectPoller')
poller = SelectPoller()

return poller


Expand Down Expand Up @@ -182,7 +182,7 @@ def read_interrupt(self, interrupt_sock, events, write_only):
:param int interrupt_sock: The file descriptor to read from
:param int events: (unused) The events generated for this fd
:param bool write_only: (unused) True if poll was called to trigger a
write
write
"""
os.read(interrupt_sock, 512)

Expand Down Expand Up @@ -230,7 +230,7 @@ def get_next_deadline(self):
deadlines = [t['deadline'] for t in self._timeouts.values()]
self._next_timeout = min(deadlines)
timeout = max((self._next_timeout - time.time(), 0))

else:
timeout = SelectPoller.POLL_TIMEOUT

Expand All @@ -239,7 +239,7 @@ def get_next_deadline(self):

def process_timeouts(self):
"""Process the self._timeouts event stack"""

now = time.time()
to_run = filter(lambda t: t['deadline'] <= now,
self._timeouts.values())
Expand Down Expand Up @@ -276,11 +276,11 @@ def update_handler(self, fileno, events):
self._fd_events[ev].add(fileno)
else:
self._fd_events[ev].discard(fileno)


def remove_handler(self, fileno):
"""Remove a file descriptor from the set
:param int fileno: The file descriptor
"""
Expand All @@ -294,10 +294,10 @@ def remove_handler(self, fileno):

def start(self):
"""Start the main poller loop. It will loop here until self._stopping"""

LOGGER.debug('Starting IOLoop')
self._stopping = False

while not self._stopping:
self.poll()
self.process_timeouts()
Expand All @@ -307,7 +307,7 @@ def stop(self):

LOGGER.debug('Stopping IOLoop')
self._stopping = True

try:
# Send byte to interrupt the poll loop, use write() for consitency.
os.write(self._w_interrupt.fileno(), 'X')
Expand Down Expand Up @@ -347,8 +347,8 @@ def poll(self, write_only=False):

def _process_fd_events(self, fd_event_map, write_only):
""" Processes the callbacks for each fileno we've recieved events.
Before doing so we re-calculate the event mask based on what is
currently set in case it has been changed under our feet by a
Before doing so we re-calculate the event mask based on what is
currently set in case it has been changed under our feet by a
previous callback. We also take a store a refernce to the
fd_event_map in the class so that we can detect removal of an
fileno during processing of another callback and not generate
Expand Down Expand Up @@ -395,8 +395,8 @@ def __init__(self):
:param int events: The events to look for
"""
super(KQueuePoller, self).__init__()
self._kqueue = select.kqueue()
super(KQueuePoller, self).__init__()

def update_handler(self, fileno, events):
"""Set the events to the current events
Expand Down Expand Up @@ -462,7 +462,7 @@ def poll(self, write_only=False):
fd_event_map = defaultdict(int)
for event in kevents:
fileno = event.ident
fd_event_map[fileno] |= self._map_event(event.filter)
fd_event_map[fileno] |= self._map_event(event)

self._process_fd_events(fd_event_map, write_only)

Expand Down Expand Up @@ -511,9 +511,9 @@ def update_handler(self, fileno, events):

def remove_handler(self, fileno):
"""Remove a fileno to the set
:param int fileno: The file descriptor
"""
super(PollPoller, self).remove_handler(fileno)
self._poll.unregister(fileno)
Expand Down

0 comments on commit 8dc5191

Please sign in to comment.