Permalink
Browse files

Fix whitespace

  • Loading branch information...
1 parent ef89492 commit 222629d4ddce181b7dd202808bb514943f12d83a @labisso labisso committed Dec 18, 2012
Showing with 9 additions and 8 deletions.
  1. +7 −6 dashi/__init__.py
  2. +2 −2 dashi/tests/test_dashi.py
View
13 dashi/__init__.py
@@ -38,7 +38,7 @@ def __init__(self, name, uri, exchange, durable=False, auto_delete=True,
@param transport_options: custom parameter dict for the transport backend
"""
- self._conn = BrokerConnection(uri, transport_options=transport_options,ssl=ssl)
+ self._conn = BrokerConnection(uri, transport_options=transport_options, ssl=ssl)
self._name = name
self._exchange_name = exchange
self._exchange = Exchange(name=exchange, type='direct',
@@ -76,7 +76,7 @@ def fire(self, name, operation, args=None, **kwargs):
args = kwargs
d = dict(op=operation, args=args)
- headers = {'sender' : self.name}
+ headers = {'sender': self.name}
with producers[self._conn].acquire(block=True) as producer:
maybe_declare(self._exchange, producer.channel)
@@ -100,7 +100,6 @@ def call(self, name, operation, timeout=5, args=None, **kwargs):
else:
args = kwargs
-
# create a direct exchange and queue for the reply. This may end up
# being a bottleneck for performance: each rpc call gets a brand new
# direct exchange and exclusive queue. However this approach is used
@@ -128,7 +127,7 @@ def _callback(body, message):
consumer.declare()
d = dict(op=operation, args=args)
- headers = {'reply-to' : msg_id, 'sender' : self.name}
+ headers = {'reply-to': msg_id, 'sender': self.name}
with producers[self._conn].acquire(block=True) as producer:
maybe_declare(self._exchange, producer.channel)
@@ -202,8 +201,10 @@ def link_exceptions(self, custom_exception=None, dashi_exception=None):
self._linked_exceptions[custom_exception] = dashi_exception
+
_OpSpec = namedtuple('_OpSpec', ['function', 'sender_kwarg'])
+
class DashiConsumer(object):
def __init__(self, dashi, connection, name, exchange):
self._dashi = dashi
@@ -219,6 +220,7 @@ def __init__(self, dashi, connection, name, exchange):
self.connect()
def connect(self):
+
self._channel = self._conn.channel()
self._queue = Queue(channel=self._channel, name=self._name,
@@ -282,7 +284,6 @@ def _consume_one(self, timeout=None):
if elapsed + inner_timeout > timeout:
inner_timeout = timeout - elapsed
-
def cancel(self, block=True):
self._cancelled = True
if block:
@@ -360,7 +361,7 @@ def _callback(self, body, message):
def add_op(self, name, fun, sender_kwarg=None):
if not callable(fun):
raise ValueError("operation function must be callable")
-
+
self._ops[name] = _OpSpec(fun, sender_kwarg)
View
4 dashi/tests/test_dashi.py
@@ -18,6 +18,7 @@
_NO_EXCEPTION = object()
_NO_REPLY = object()
+
class TestReceiver(object):
consume_timeout = 5
@@ -144,7 +145,7 @@ def test_fire(self):
def test_call(self):
receiver = TestReceiver(uri=self.uri, exchange="x1",
transport_options=self.transport_options)
- replies = [5,4,3,2,1]
+ replies = [5, 4, 3, 2, 1]
receiver.handle("test", replies.pop)
receiver.consume_in_thread(1)
@@ -406,4 +407,3 @@ def test_pool_problems(self):
receiver.wait(pred=pred)
self.assertEqual(len(receiver.received), 100)
-

0 comments on commit 222629d

Please sign in to comment.