Permalink
Browse files

libev_connection: reset_io_watcher

  • Loading branch information...
1 parent 24332a2 commit afbc9e01e5eefd59e27a0c5974caecd4ceaa1704 @michaelplaing michaelplaing committed May 14, 2014
Showing with 24 additions and 16 deletions.
  1. +24 −16 pika/adapters/libev_connection.py
View
40 pika/adapters/libev_connection.py
@@ -195,6 +195,28 @@ def _handle_events(self, io_watcher, libev_events, **kwargs):
self._LIBEV_TO_PIKA_ARRAY[libev_events],
**kwargs)
+ def _reset_io_watcher(self):
+ """Reset the IO watcher; retry as necessary
+
+ """
+ self._io_watcher.stop()
+
+ retries = 0
+ while True:
+ try:
+ self._io_watcher.set(
+ self._io_watcher.fd,
+ self._PIKA_TO_LIBEV_ARRAY[self.event_state]
+ )
+
+ break
+ except: # sometimes the stop() doesn't complete in time
+ if retries > 5: raise
+ self._io_watcher.stop() # so try it again
+ retries += 1
+
+ self._io_watcher.start()
+
def _manage_event_state(self):
"""Manage the bitmask for reading/writing/error which is used by the
io/event handler to specify when there is an event such as a read or
@@ -204,24 +226,10 @@ def _manage_event_state(self):
if self.outbound_buffer:
if not self.event_state & self.WRITE:
self.event_state |= self.WRITE
- self._io_watcher.stop()
-
- self._io_watcher.set(
- self._io_watcher.fd,
- self._PIKA_TO_LIBEV_ARRAY[self.event_state]
- )
-
- self._io_watcher.start()
+ self._reset_io_watcher()
elif self.event_state & self.WRITE:
self.event_state = self.base_events
- self._io_watcher.stop()
-
- self._io_watcher.set(
- self._io_watcher.fd,
- self._PIKA_TO_LIBEV_ARRAY[self.event_state]
- )
-
- self._io_watcher.start()
+ self._reset_io_watcher()
def _timer_callback(self, timer, libev_events):
"""Manage timer callbacks indirectly."""

0 comments on commit afbc9e0

Please sign in to comment.