Skip to content

Commit

Permalink
Merge afbc9e0 into 24332a2
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Laing committed May 14, 2014
2 parents 24332a2 + afbc9e0 commit e635bf5
Showing 1 changed file with 24 additions and 16 deletions.
40 changes: 24 additions & 16 deletions pika/adapters/libev_connection.py
Expand Up @@ -195,6 +195,28 @@ def _handle_events(self, io_watcher, libev_events, **kwargs):
self._LIBEV_TO_PIKA_ARRAY[libev_events],
**kwargs)

def _reset_io_watcher(self):
"""Reset the IO watcher; retry as necessary
"""
self._io_watcher.stop()

retries = 0
while True:
try:
self._io_watcher.set(
self._io_watcher.fd,
self._PIKA_TO_LIBEV_ARRAY[self.event_state]
)

break
except: # sometimes the stop() doesn't complete in time
if retries > 5: raise
self._io_watcher.stop() # so try it again
retries += 1

self._io_watcher.start()

def _manage_event_state(self):
"""Manage the bitmask for reading/writing/error which is used by the
io/event handler to specify when there is an event such as a read or
Expand All @@ -204,24 +226,10 @@ def _manage_event_state(self):
if self.outbound_buffer:
if not self.event_state & self.WRITE:
self.event_state |= self.WRITE
self._io_watcher.stop()

self._io_watcher.set(
self._io_watcher.fd,
self._PIKA_TO_LIBEV_ARRAY[self.event_state]
)

self._io_watcher.start()
self._reset_io_watcher()
elif self.event_state & self.WRITE:
self.event_state = self.base_events
self._io_watcher.stop()

self._io_watcher.set(
self._io_watcher.fd,
self._PIKA_TO_LIBEV_ARRAY[self.event_state]
)

self._io_watcher.start()
self._reset_io_watcher()

def _timer_callback(self, timer, libev_events):
"""Manage timer callbacks indirectly."""
Expand Down

0 comments on commit e635bf5

Please sign in to comment.