Skip to content

Commit

Permalink
Merge 0421fea into 6018be3
Browse files Browse the repository at this point in the history
  • Loading branch information
jerith committed Nov 3, 2015
2 parents 6018be3 + 0421fea commit a0c57bb
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 20 deletions.
9 changes: 6 additions & 3 deletions vumi/application/tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,15 +177,17 @@ def test_application_prefetch_count_custom(self):
'amqp_prefetch_count': 10,
})
for consumer in self.get_app_consumers(app):
self.assertEqual(consumer.channel.qos_prefetch_count, 10)
fake_channel = consumer.channel._fake_channel
self.assertEqual(fake_channel.qos_prefetch_count, 10)

@inlineCallbacks
def test_application_prefetch_count_default(self):
app = yield self.app_helper.get_application({
'transport_name': 'test',
})
for consumer in self.get_app_consumers(app):
self.assertEqual(consumer.channel.qos_prefetch_count, 20)
fake_channel = consumer.channel._fake_channel
self.assertEqual(fake_channel.qos_prefetch_count, 20)

@inlineCallbacks
def test_application_prefetch_count_none(self):
Expand All @@ -194,7 +196,8 @@ def test_application_prefetch_count_none(self):
'amqp_prefetch_count': None,
})
for consumer in self.get_app_consumers(app):
self.assertFalse(consumer.channel.qos_prefetch_count)
fake_channel = consumer.channel._fake_channel
self.assertEqual(fake_channel.qos_prefetch_count, 0)

def assertNotRaises(self, error_class, f, *args, **kw):
try:
Expand Down
9 changes: 6 additions & 3 deletions vumi/dispatchers/tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,21 +219,24 @@ def test_consumer_prefetch_count_default(self):
dp = yield self.get_dispatcher()
consumers = self.get_dispatcher_consumers(dp)
for consumer in consumers:
self.assertEqual(consumer.channel.qos_prefetch_count, 20)
fake_channel = consumer.channel._fake_channel
self.assertEqual(fake_channel.qos_prefetch_count, 20)

@inlineCallbacks
def test_consumer_prefetch_count_custom(self):
dp = yield self.get_dispatcher(amqp_prefetch_count=10)
consumers = self.get_dispatcher_consumers(dp)
for consumer in consumers:
self.assertEqual(consumer.channel.qos_prefetch_count, 10)
fake_channel = consumer.channel._fake_channel
self.assertEqual(fake_channel.qos_prefetch_count, 10)

@inlineCallbacks
def test_consumer_prefetch_count_none(self):
dp = yield self.get_dispatcher(amqp_prefetch_count=None)
consumers = self.get_dispatcher_consumers(dp)
for consumer in consumers:
self.assertFalse(consumer.channel.qos_prefetch_count)
fake_channel = consumer.channel._fake_channel
self.assertEqual(fake_channel.qos_prefetch_count, 0)


class TestToAddrRouter(VumiTestCase):
Expand Down
9 changes: 6 additions & 3 deletions vumi/dispatchers/tests/test_endpoint_dispatchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,18 +219,21 @@ def test_consumer_prefetch_count_default(self):
dp = yield self.get_dispatcher()
consumers = self.get_dispatcher_consumers(dp)
for consumer in consumers:
self.assertEqual(consumer.channel.qos_prefetch_count, 20)
fake_channel = consumer.channel._fake_channel
self.assertEqual(fake_channel.qos_prefetch_count, 20)

@inlineCallbacks
def test_consumer_prefetch_count_custom(self):
dp = yield self.get_dispatcher(amqp_prefetch_count=10)
consumers = self.get_dispatcher_consumers(dp)
for consumer in consumers:
self.assertEqual(consumer.channel.qos_prefetch_count, 10)
fake_channel = consumer.channel._fake_channel
self.assertEqual(fake_channel.qos_prefetch_count, 10)

@inlineCallbacks
def test_consumer_prefetch_count_none(self):
dp = yield self.get_dispatcher(amqp_prefetch_count=None)
consumers = self.get_dispatcher_consumers(dp)
for consumer in consumers:
self.assertFalse(consumer.channel.qos_prefetch_count)
fake_channel = consumer.channel._fake_channel
self.assertEqual(fake_channel.qos_prefetch_count, 0)
6 changes: 3 additions & 3 deletions vumi/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,9 @@ class Consumer(object):

def __init__(self, channel):
self.channel = channel
self._fake_channel = getattr(self.channel, '_fake_channel', None)
self._notify_paused_and_quiet = []
self.keep_consuming = False
self._testing = hasattr(self.channel, 'message_processed')
self.queue = None
self._consumer_tag = None

Expand Down Expand Up @@ -341,8 +341,8 @@ def consume(self, message):
# broken, but we still decrement the _in_progress counter so we
# don't wait forever for it during shutdown.
self._in_progress -= 1
if self._testing:
self.channel.message_processed()
if self._fake_channel is not None:
self._fake_channel.message_processed()
if result is not False:
yield self.channel.basic_ack(message.delivery_tag, False)
else:
Expand Down
57 changes: 56 additions & 1 deletion vumi/tests/fake_amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,8 +488,63 @@ def channel(self, id):
try:
ch = self.channels[id]
except KeyError:
ch = FakeAMQPChannel(id, self)
ch = FakeAMQPChannelWrapper(id, self)
self.channels[id] = ch
finally:
self.channelLock.release()
returnValue(ch)


class FakeAMQPChannelWrapper(object):
"""
Wrapper around a FakeAMQPChannel to make it look more like a real channel
object.
"""

def __init__(self, id, client):
self._fake_channel = FakeAMQPChannel(id, client)
self.client = client

def __repr__(self):
return '<FakeAMQPChannelWrapper: id=%s>' % (
self._fake_channel.channel_id,)

def channel_open(self):
return self._fake_channel.channel_open()

def channel_close(self):
return self._fake_channel.channel_close()

def channel_flow(self, active):
return self._fake_channel.channel_flow(active)

def close(self, _reason):
pass

def basic_qos(self, prefetch_size, prefetch_count, is_global):
return self._fake_channel.basic_qos(
prefetch_size, prefetch_count, is_global)

def exchange_declare(self, exchange, type, durable=None):
return self._fake_channel.exchange_declare(exchange, type, durable)

def queue_declare(self, queue, durable=None):
return self._fake_channel.queue_declare(queue, durable)

def queue_bind(self, queue, exchange, routing_key):
return self._fake_channel.queue_bind(queue, exchange, routing_key)

def basic_consume(self, queue, tag=None):
return self._fake_channel.basic_consume(queue, tag)

def basic_cancel(self, tag):
return self._fake_channel.basic_cancel(tag)

def basic_publish(self, exchange, routing_key, content):
return self._fake_channel.basic_publish(exchange, routing_key, content)

def basic_ack(self, delivery_tag, multiple):
return self._fake_channel.basic_ack(delivery_tag, multiple)

def basic_get(self, queue):
return self._fake_channel.basic_get(queue)
5 changes: 3 additions & 2 deletions vumi/tests/test_connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,10 @@ def test_middlewares_publish(self):
for i in range(2, -1, -1)])

@inlineCallbacks
def test_pretech_count(self):
def test_prefetch_count(self):
conn, consumer = yield self.mk_consumer(prefetch_count=10)
self.assertEqual(consumer.channel.qos_prefetch_count, 10)
fake_channel = consumer.channel._fake_channel
self.assertEqual(fake_channel.qos_prefetch_count, 10)

@inlineCallbacks
def test_setup_raises(self):
Expand Down
7 changes: 4 additions & 3 deletions vumi/tests/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ def test_consume_with_prefetch(self):
# them to the log
consumer = yield worker.consume(
'test.routing.key', lambda msg: log.append(msg), prefetch_count=10)
self.assertEqual(10, consumer.channel._get_consumer_prefetch(
fake_channel = consumer.channel._fake_channel
self.assertEqual(10, fake_channel._get_consumer_prefetch(
consumer._consumer_tag))

@inlineCallbacks
Expand Down Expand Up @@ -93,11 +94,11 @@ def consume_func(msg):
@inlineCallbacks
def test_start_publisher(self):
"""The publisher should publish"""
worker = WorkerHelper.get_worker_raw(Worker, {})
worker = yield self.worker_helper.get_worker(Worker, {}, start=False)
publisher = yield worker.publish_to('test.routing.key')
self.assertEquals(publisher.routing_key, 'test.routing.key')
publisher.publish_message(Message(key="value"))
[published_msg] = publisher.channel.broker.get_dispatched(
[published_msg] = self.worker_helper.broker.get_dispatched(
'vumi', 'test.routing.key')

self.assertEquals(published_msg.body, '{"key": "value"}')
Expand Down
6 changes: 4 additions & 2 deletions vumi/transports/tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,17 @@ def test_transport_prefetch_count_custom(self):
consumers = list(self.get_tx_consumers(transport))
self.assertEqual(1, len(consumers))
for consumer in consumers:
self.assertEqual(consumer.channel.qos_prefetch_count, 1)
fake_channel = consumer.channel._fake_channel
self.assertEqual(fake_channel.qos_prefetch_count, 1)

@inlineCallbacks
def test_transport_prefetch_count_default(self):
transport = yield self.tx_helper.get_transport({})
consumers = list(self.get_tx_consumers(transport))
self.assertEqual(1, len(consumers))
for consumer in consumers:
self.assertEqual(consumer.channel.qos_prefetch_count, 20)
fake_channel = consumer.channel._fake_channel
self.assertEqual(fake_channel.qos_prefetch_count, 20)

@inlineCallbacks
def test_add_outbound_handler(self):
Expand Down

0 comments on commit a0c57bb

Please sign in to comment.