Permalink
Browse files

PEP8 cleanup

  • Loading branch information...
1 parent 9b97956 commit fea24767e2897a687f4b2e8cd3b6fcac6a83ad73 @gmr gmr committed May 13, 2014
Showing with 103 additions and 107 deletions.
  1. +103 −107 pika/adapters/libev_connection.py
View
210 pika/adapters/libev_connection.py
@@ -9,18 +9,19 @@
from pika.adapters.base_connection import BaseConnection
LOGGER = logging.getLogger(__name__)
-global global_sigint_watcher, global_sigterm_watcher
-global_sigint_watcher = global_sigterm_watcher = None
+
+global_sigint_watcher, global_sigterm_watcher = None, None
+
class LibevConnection(BaseConnection):
"""The LibevConnection runs on the libev IOLoop. If you're running the
connection in a web app, make sure you set stop_ioloop_on_close to False,
which is the default behavior for this adapter, otherwise the web app
will stop taking requests.
-
+
You should be familiar with pyev and libev to use this adapter, esp.
with regard to the use of libev ioloops.
-
+
If an on_signal_callback method is provided, the adapter creates signal
watchers the first time; subsequent instantiations with a provided method
reuse the same watchers but will call the new method upon receiving a
@@ -39,56 +40,47 @@ class LibevConnection(BaseConnection):
"""
WARN_ABOUT_IOLOOP = True
-
+
# use static arrays to translate masks between pika and libev
- _PIKA_TO_LIBEV_ARRAY = array.array(
- 'i',
- [0] * (
- (BaseConnection.READ|BaseConnection.WRITE|BaseConnection.ERROR) + 1
- )
- )
-
+ _PIKA_TO_LIBEV_ARRAY = array.array('i',
+ [0] * ((BaseConnection.READ |
+ BaseConnection.WRITE |
+ BaseConnection.ERROR) + 1))
+
_PIKA_TO_LIBEV_ARRAY[BaseConnection.READ] = pyev.EV_READ
_PIKA_TO_LIBEV_ARRAY[BaseConnection.WRITE] = pyev.EV_WRITE
-
- _PIKA_TO_LIBEV_ARRAY[
- BaseConnection.READ|BaseConnection.WRITE
- ] = pyev.EV_READ|pyev.EV_WRITE
-
- _PIKA_TO_LIBEV_ARRAY[
- BaseConnection.READ|BaseConnection.ERROR
- ] = pyev.EV_READ
-
- _PIKA_TO_LIBEV_ARRAY[
- BaseConnection.WRITE|BaseConnection.ERROR
- ] = pyev.EV_WRITE
-
- _PIKA_TO_LIBEV_ARRAY[
- BaseConnection.READ|BaseConnection.WRITE|BaseConnection.ERROR
- ] = pyev.EV_READ|pyev.EV_WRITE
-
- _LIBEV_TO_PIKA_ARRAY = array.array(
- 'i',
- [0] * ((pyev.EV_READ|pyev.EV_WRITE) + 1)
- )
-
+
+ _PIKA_TO_LIBEV_ARRAY[BaseConnection.READ |
+ BaseConnection.WRITE] = pyev.EV_READ | pyev.EV_WRITE
+
+ _PIKA_TO_LIBEV_ARRAY[BaseConnection.READ |
+ BaseConnection.ERROR] = pyev.EV_READ
+
+ _PIKA_TO_LIBEV_ARRAY[BaseConnection.WRITE |
+ BaseConnection.ERROR] = pyev.EV_WRITE
+
+ _PIKA_TO_LIBEV_ARRAY[BaseConnection.READ |
+ BaseConnection.WRITE |
+ BaseConnection.ERROR] = pyev.EV_READ | pyev.EV_WRITE
+
+ _LIBEV_TO_PIKA_ARRAY = array.array('i',
+ [0] * ((pyev.EV_READ |
+ pyev.EV_WRITE) + 1))
+
_LIBEV_TO_PIKA_ARRAY[pyev.EV_READ] = BaseConnection.READ
_LIBEV_TO_PIKA_ARRAY[pyev.EV_WRITE] = BaseConnection.WRITE
-
- _LIBEV_TO_PIKA_ARRAY[
- pyev.EV_READ|pyev.EV_WRITE
- ] = BaseConnection.READ|BaseConnection.WRITE
-
- def __init__(
- self,
- parameters=None,
- on_open_callback=None,
- on_open_error_callback=None,
- on_close_callback=None,
- stop_ioloop_on_close=False,
- custom_ioloop=None,
- on_signal_callback=None
- ):
+
+ _LIBEV_TO_PIKA_ARRAY[pyev.EV_READ | pyev.EV_WRITE] = \
+ BaseConnection.READ | BaseConnection.WRITE
+
+ def __init__(self,
+ parameters=None,
+ on_open_callback=None,
+ on_open_error_callback=None,
+ on_close_callback=None,
+ stop_ioloop_on_close=False,
+ custom_ioloop=None,
+ on_signal_callback=None):
"""Create a new instance of the LibevConnection class, connecting
to RabbitMQ automatically
@@ -110,21 +102,19 @@ def __init__(
with warnings.catch_warnings():
warnings.simplefilter("ignore", RuntimeWarning)
self.ioloop = pyev.default_loop()
-
+
self.async = None
self._on_signal_callback = on_signal_callback
self._io_watcher = None
self._active_timers = {}
self._stopped_timers = deque()
- super(LibevConnection, self).__init__(
- parameters,
- on_open_callback,
- on_open_error_callback,
- on_close_callback,
- self.ioloop,
- stop_ioloop_on_close
- )
+ super(LibevConnection, self).__init__(parameters,
+ on_open_callback,
+ on_open_error_callback,
+ on_close_callback,
+ self.ioloop,
+ stop_ioloop_on_close)
def _adapter_connect(self):
"""Connect to the remote socket, adding the socket to the IOLoop if
@@ -134,35 +124,33 @@ def _adapter_connect(self):
"""
LOGGER.debug('init io and signal watchers if any')
- # reuse existing signal watchers as they can only be declared for 1 ioloop
+ # reuse existing signal watchers, can only be declared for 1 ioloop
global global_sigint_watcher, global_sigterm_watcher
error = super(LibevConnection, self)._adapter_connect()
if not error:
if self._on_signal_callback and not global_sigterm_watcher:
- global_sigterm_watcher = self.ioloop.signal(
- signal.SIGTERM,
- self._handle_sigterm
- )
-
+ global_sigterm_watcher = \
+ self.ioloop.signal(signal.SIGTERM,
+ self._handle_sigterm)
+
if self._on_signal_callback and not global_sigint_watcher:
- global_sigint_watcher = self.ioloop.signal(
- signal.SIGINT,
- self._handle_sigint
- )
+ global_sigint_watcher = self.ioloop.signal(signal.SIGINT,
+ self._handle_sigint)
if not self._io_watcher:
- self._io_watcher = self.ioloop.io(
- self.socket.fileno(),
- self._PIKA_TO_LIBEV_ARRAY[self.event_state],
- self._handle_events
- )
-
+ self._io_watcher = \
+ self.ioloop.io(self.socket.fileno(),
+ self._PIKA_TO_LIBEV_ARRAY[self.event_state],
+ self._handle_events)
+
self.async = pyev.Async(self.ioloop, self._handle_events)
- if self._on_signal_callback: global_sigterm_watcher.start()
- if self._on_signal_callback: global_sigint_watcher.start()
+ if self._on_signal_callback:
+ global_sigterm_watcher.start()
+ if self._on_signal_callback:
+ global_sigint_watcher.start()
self._io_watcher.start()
-
+
return error
def _init_connection_state(self):
@@ -171,10 +159,14 @@ def _init_connection_state(self):
be wiped.
"""
- for timer in self._active_timers: self.remove_timeout(timer)
- if global_sigint_watcher: global_sigint_watcher.stop()
- if global_sigterm_watcher: global_sigterm_watcher.stop()
- if self._io_watcher: self._io_watcher.stop()
+ for timer in self._active_timers:
+ self.remove_timeout(timer)
+ if global_sigint_watcher:
+ global_sigint_watcher.stop()
+ if global_sigterm_watcher:
+ global_sigterm_watcher.stop()
+ if self._io_watcher:
+ self._io_watcher.stop()
super(LibevConnection, self)._init_connection_state()
def _handle_sigint(self, signal_watcher, libev_events):
@@ -184,7 +176,7 @@ def _handle_sigint(self, signal_watcher, libev_events):
"""
LOGGER.debug('SIGINT')
self._on_signal_callback('SIGINT')
-
+
def _handle_sigterm(self, signal_watcher, libev_events):
"""If an on_signal_callback has been defined, call it returning the
string 'SIGTERM'.
@@ -198,20 +190,19 @@ def _handle_events(self, io_watcher, libev_events, **kwargs):
events and calling super.
"""
- super(LibevConnection, self)._handle_events(
- io_watcher.fd,
- self._LIBEV_TO_PIKA_ARRAY[libev_events],
- **kwargs
- )
+ super(LibevConnection,
+ self)._handle_events(io_watcher.fd,
+ self._LIBEV_TO_PIKA_ARRAY[libev_events],
+ **kwargs)
def _manage_event_state(self):
"""Manage the bitmask for reading/writing/error which is used by the
io/event handler to specify when there is an event such as a read or
write.
-
+
"""
if self.outbound_buffer:
- if not self.event_state & self.WRITE:
+ if not self.event_state & self.WRITE:
self.event_state |= self.WRITE
self._io_watcher.stop()
@@ -221,7 +212,7 @@ def _manage_event_state(self):
)
self._io_watcher.start()
- elif self.event_state & self.WRITE:
+ elif self.event_state & self.WRITE:
self.event_state = self.base_events
self._io_watcher.stop()
@@ -235,61 +226,66 @@ def _manage_event_state(self):
def _timer_callback(self, timer, libev_events):
"""Manage timer callbacks indirectly."""
if timer in self._active_timers:
- callback_method, callback_timeout, kwargs = self._active_timers[timer]
-
+ (callback_method,
+ callback_timeout,
+ kwargs) = self._active_timers[timer]
+
if callback_timeout:
callback_method(timeout=timer, **kwargs)
else:
callback_method(**kwargs)
-
+
self.remove_timeout(timer)
else:
LOGGER.warning('Timer callback_method not found')
-
+
def _get_timer(self, deadline):
"""Get a timer from the pool or allocate a new one."""
if self._stopped_timers:
timer = self._stopped_timers.pop()
timer.set(deadline, 0.0)
else:
timer = self.ioloop.timer(deadline, 0.0, self._timer_callback)
-
+
return timer
- def add_timeout(self, deadline, callback_method, callback_timeout=False, **callback_kwargs):
+ def add_timeout(self, deadline, callback_method,
+ callback_timeout=False, **callback_kwargs):
"""Add the callback_method indirectly to the IOLoop timer to fire
after deadline seconds. Returns the timer handle.
-
+
:param int deadline: The number of seconds to wait to call callback
:param method callback_method: The callback method
- :param boolean callback_timeout: Whether timeout kwarg should be passed on callback
+ :param callback_timeout: Whether timeout kwarg is passed on callback
+ :type callback_timeout: boolean
:param kwargs callback_kwargs: additional kwargs to pass on callback
:rtype: timer instance handle.
"""
LOGGER.debug('deadline: {0}'.format(deadline))
timer = self._get_timer(deadline)
- self._active_timers[timer] = (callback_method, callback_timeout, callback_kwargs)
+ self._active_timers[timer] = (callback_method,
+ callback_timeout,
+ callback_kwargs)
timer.start()
return timer
-
+
def remove_timeout(self, timer):
"""Remove the timer from the IOLoop using the handle returned from
add_timeout.
-
+
param: timer instance handle
"""
LOGGER.debug('stop')
self._active_timers.pop(timer, None)
timer.stop()
self._stopped_timers.append(timer)
-
+
def _create_and_connect_to_socket(self, sock_addr_tuple):
"""Call super and then set the socket to nonblocking."""
- result = super(LibevConnection, self)._create_and_connect_to_socket(
- sock_addr_tuple
- )
-
- if result: self.socket.setblocking(0)
+ result = super(LibevConnection,
+ self)._create_and_connect_to_socket(sock_addr_tuple)
+ if result:
+ self.socket.setblocking(0)
return result

0 comments on commit fea2476

Please sign in to comment.