Skip to content

Commit

Permalink
Merge 1583537 into 409670b
Browse files Browse the repository at this point in the history
  • Loading branch information
markfennema committed Apr 14, 2015
2 parents 409670b + 1583537 commit 531c5f9
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 2 deletions.
11 changes: 9 additions & 2 deletions pika/adapters/blocking_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ def basic_cancel(self, consumer_tag='', nowait=False):
self._on_cancelok, replies)

def basic_get(self, queue=None, no_ack=False):
"""Get a single message from the AMQP broker. Returns a set with the
"""Get a single message from the AMQP broker. Returns a set with the
method frame, header frame and body.
:param queue: The queue to get a message from
Expand All @@ -513,7 +513,7 @@ def basic_get(self, queue=None, no_ack=False):
def basic_publish(self, exchange, routing_key, body,
properties=None, mandatory=False, immediate=False):
"""Publish to the channel with the given exchange, routing key and body.
Returns a boolean value indicating the success of the operation. For
Returns a boolean value indicating the success of the operation. For
more information on basic_publish and what the parameters do, see:
http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.publish
Expand Down Expand Up @@ -727,6 +727,13 @@ def consume(self, queue, no_ack=False, exclusive=False):
yield self._generator_messages.pop(0)
self.connection.process_data_events()

def get_waiting_message_count(self):
"""Returns the amount of messages waiting in the generator messages list.
:rtype: int
"""
return len(self._generator_messages)

def force_data_events(self, enable):
"""Turn on and off forcing the blocking adapter to stop and look to see
if there are any frames from RabbitMQ in the read buffer. By default
Expand Down
15 changes: 15 additions & 0 deletions tests/unit/blocking_channel_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,18 @@ def test_init_initial_value_wait(self):
def test_init_open_called(self):
self._open.assert_called_once_with()


def test_init_create_generator(self):
self.obj.consume("queue")
self.assertEquals(self.obj.get_waiting_message_count(), 0)


def test_init_value_waiting_message_count(self):
self.obj.consume("queue")
self.obj._generator_messages.append("test")
self.assertEquals(self.obj.get_waiting_message_count(), 1)
self.obj._generator_messages.append("test")
self.assertEquals(self.obj.get_waiting_message_count(), 2)
self.obj._generator_messages.pop(0)
self.assertEquals(self.obj.get_waiting_message_count(), 1)

0 comments on commit 531c5f9

Please sign in to comment.