Skip to content

Commit

Permalink
replace check_processed with ignore_processed
Browse files Browse the repository at this point in the history
  • Loading branch information
smagafurov committed Jul 21, 2017
1 parent 081e846 commit 0afcb18
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 10 deletions.
17 changes: 9 additions & 8 deletions aio_pika/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ def __init__(self, channel: Channel, envelope, properties, body, no_ack: bool =
self.__processed = True

@contextmanager
def process(self, requeue=False, reject_on_redelivered=False, check_processed=False):
def process(self, requeue=False, reject_on_redelivered=False, ignore_processed=False):
""" Context manager for processing the message
>>> def on_message_received(message: IncomingMessage):
Expand All @@ -308,19 +308,20 @@ def process(self, requeue=False, reject_on_redelivered=False, check_processed=Fa
:param requeue: Requeue message when exception.
:param reject_on_redelivered: When True message will be rejected only when message was redelivered.
:param check_processed: Do not ack if message already processed
:param ignore_processed: Do nothing if message already processed
"""
try:
yield self
if not check_processed or not self.processed:
if not ignore_processed or not self.processed:
self.ack()
except:
if reject_on_redelivered and self.redelivered:
log.info("Message %r was redelivered and will be rejected.", self)
self.reject(requeue=False)
else:
self.reject(requeue=requeue)
if not ignore_processed or not self.processed:
if reject_on_redelivered and self.redelivered:
log.info("Message %r was redelivered and will be rejected.", self)
self.reject(requeue=False)
else:
self.reject(requeue=requeue)
raise

def ack(self, multiple: bool = False):
Expand Down
4 changes: 2 additions & 2 deletions tests/test_amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ def test_context_process(self):

incoming_message = yield from queue.get(timeout=5)

with incoming_message.process(check_processed=True):
with incoming_message.process(ignore_processed=True):
incoming_message.reject(requeue=False)

self.assertEqual(incoming_message.body, body)
Expand Down Expand Up @@ -849,7 +849,7 @@ def test_connection_close(self):

routing_key = self.get_random_name()

channel = yield from client.channel() # type: Channel
channel = yield from client.channel() # type: aio_pika.Channel
exchange = yield from channel.declare_exchange('direct', auto_delete=True)

try:
Expand Down

0 comments on commit 0afcb18

Please sign in to comment.