Skip to content

Commit

Permalink
Don't allow use of subsequent basic_get calls. Closes #709
Browse files Browse the repository at this point in the history
  • Loading branch information
dschep committed Apr 3, 2016
1 parent 9d2d33b commit 8d970e1
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 2 deletions.
9 changes: 7 additions & 2 deletions pika/channel.py
Expand Up @@ -336,8 +336,9 @@ def basic_get(self, callback=None, queue='', no_ack=False):
"""Get a single message from the AMQP broker. If you want to
be notified of Basic.GetEmpty, use the Channel.add_callback method
adding your Basic.GetEmpty callback which should expect only one
parameter, frame. For more information on basic_get and its
parameters, see:
parameter, frame. Due to implementation details, this cannot be called
a second time until the callback is executed. For more information on
basic_get and its parameters, see:
http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.get
Expand All @@ -354,6 +355,8 @@ def basic_get(self, callback=None, queue='', no_ack=False):
"""
self._validate_channel_and_callback(callback)
# TODO Is basic_get meaningful when callback is None?
if self._on_getok_callback is not None:
raise exceptions.DuplicateGetOkCallback()
self._on_getok_callback = callback
# TODO Strangely, not using _rpc for the synchronous Basic.Get. Would
# need to extend _rpc to handle Basic.GetOk method, header, and body
Expand Down Expand Up @@ -1156,6 +1159,8 @@ def _on_getempty(self, method_frame):
"""
LOGGER.debug('Received Basic.GetEmpty: %r', method_frame)
if self._on_getok_callback is not None:
self._on_getok_callback = None

def _on_getok(self, method_frame, header_frame, body):
"""Called in reply to a Basic.Get when there is a message.
Expand Down
7 changes: 7 additions & 0 deletions pika/exceptions.py
Expand Up @@ -248,3 +248,10 @@ class ShortStringTooLong(AMQPError):
def __repr__(self):
return ('AMQP Short String can contain up to 255 bytes: '
'%.300s' % self.args[0])


class DuplicateGetOkCallback(ChannelError):

def __repr__(self):
return ('basic_get can only be called again after the callback for the'
'previous basic_get is executed')
32 changes: 32 additions & 0 deletions tests/acceptance/enforce_one_basicget_test.py
@@ -0,0 +1,32 @@
try:
import unittest2 as unittest
except ImportError:
import unittest

from mock import MagicMock
from pika.frame import Method, Header
from pika.exceptions import DuplicateGetOkCallback
from pika.channel import Channel
from pika.connection import Connection


class OnlyOneBasicGetTestCase(unittest.TestCase):
def setUp(self):
self.channel = Channel(MagicMock(Connection)(), 0, None)
self.channel._state = Channel.OPEN
self.callback = MagicMock()

def test_two_basic_get_with_callback(self):
self.channel.basic_get(self.callback)
self.channel._on_getok(MagicMock(Method)(), MagicMock(Header)(), '')
self.channel.basic_get(self.callback)
self.channel._on_getok(MagicMock(Method)(), MagicMock(Header)(), '')
self.assertEqual(self.callback.call_count, 2)

def test_two_basic_get_without_callback(self):
self.channel.basic_get(self.callback)
with self.assertRaises(DuplicateGetOkCallback):
self.channel.basic_get(self.callback)

if __name__ == '__main__':
unittest.main()

0 comments on commit 8d970e1

Please sign in to comment.