Skip to content

Commit

Permalink
Merge pull request #49 from smagafurov/feature/message-process-contex…
Browse files Browse the repository at this point in the history
…t-check

ability to process message yourself under message.process context
  • Loading branch information
mosquito committed Jul 21, 2017
2 parents 8dafa39 + 5651cba commit 57c2977
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 8 deletions.
26 changes: 19 additions & 7 deletions aio_pika/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ def __init__(self, channel: Channel, envelope, properties, body, no_ack: bool =
self.__processed = True

@contextmanager
def process(self, requeue=False, reject_on_redelivered=False):
def process(self, requeue=False, reject_on_redelivered=False, ignore_processed=False):
""" Context manager for processing the message
>>> def on_message_received(message: IncomingMessage):
Expand All @@ -306,19 +306,31 @@ def process(self, requeue=False, reject_on_redelivered=False):
... # the message will be rejected
... print(message.body)
Example with ignore_processed=True
>>> def on_message_received(message: IncomingMessage):
... with message.process(ignore_processed=True):
... # Now (with ignore_processed=True) you may reject (or ack) message manually too
... if True: # some reasonable condition here
... message.reject()
... print(message.body)
:param requeue: Requeue message when exception.
:param reject_on_redelivered: When True message will be rejected only when message was redelivered.
:param ignore_processed: Do nothing if message already processed
"""
try:
yield self
self.ack()
if not ignore_processed or not self.processed:
self.ack()
except:
if reject_on_redelivered and self.redelivered:
log.info("Message %r was redelivered and will be rejected.", self)
self.reject(requeue=False)
else:
self.reject(requeue=requeue)
if not ignore_processed or not self.processed:
if reject_on_redelivered and self.redelivered:
log.info("Message %r was redelivered and will be rejected.", self)
self.reject(requeue=False)
else:
self.reject(requeue=requeue)
raise

def ack(self, multiple: bool = False):
Expand Down
25 changes: 24 additions & 1 deletion tests/test_amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,29 @@ def test_context_process(self):
routing_key
)

incoming_message = yield from queue.get(timeout=5)

with self.assertRaises(MessageProcessError):
with incoming_message.process():
incoming_message.reject(requeue=True)

self.assertEqual(incoming_message.locked, True)

incoming_message = yield from queue.get(timeout=5)

with incoming_message.process(ignore_processed=True):
incoming_message.reject(requeue=False)

self.assertEqual(incoming_message.body, body)

yield from exchange.publish(
Message(
body, content_type='text/plain',
headers={'foo': 'bar'}
),
routing_key
)

incoming_message = yield from queue.get(timeout=5)
with self.assertRaises(AssertionError):
with incoming_message.process(requeue=True, reject_on_redelivered=True):
Expand Down Expand Up @@ -859,7 +882,7 @@ def test_connection_close(self):

routing_key = self.get_random_name()

channel = yield from client.channel() # type: Channel
channel = yield from client.channel() # type: aio_pika.Channel
exchange = yield from channel.declare_exchange('direct', auto_delete=True)

try:
Expand Down

0 comments on commit 57c2977

Please sign in to comment.