Skip to content

Commit

Permalink
rabbit: make ack/requeue thread-safe
Browse files Browse the repository at this point in the history
ack/requeue messages are currently done in the
MessageHandlingServer._process_incoming().

But _process_incoming() in run by a futurist Executor.
That can be a threading or an eventlet executor.

With eventlet, we don't really share the socket between threads.

But with threading executor and expecialy ssl, this can't work, if you
write data with two different threads to the socket.

This change moves back the message ack/requeue to the polling threads
that handle the connection, instead of the threads we spawn for the
application.

Oslo Messaging now always use a connection in the same thread.

Change-Id: I5c0e6def6b34f4d195fb1f8dbb26eda0f21ff34e
  • Loading branch information
Mehdi Abaakouk committed Feb 22, 2017
1 parent 296d93d commit 7e71ac8
Showing 1 changed file with 61 additions and 14 deletions.
75 changes: 61 additions & 14 deletions oslo_messaging/_drivers/amqpdriver.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,35 @@

LOG = logging.getLogger(__name__)

# Minimum/Maximum sleep between a poll and ack/requeue
# Maximum should be small enough to not get rejected ack,
# minimum should be big enough to not burn the CPU.
ACK_REQUEUE_EVERY_SECONDS_MIN = 0.001
ACK_REQUEUE_EVERY_SECONDS_MAX = 1.0


def do_pending_tasks(tasks):
while True:
try:
task = tasks.get(block=False)
except moves.queue.Empty:
break
else:
task()


class AMQPIncomingMessage(base.RpcIncomingMessage):

def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q,
obsolete_reply_queues):
obsolete_reply_queues, pending_message_actions):
super(AMQPIncomingMessage, self).__init__(ctxt, message)
self.listener = listener

self.unique_id = unique_id
self.msg_id = msg_id
self.reply_q = reply_q
self._obsolete_reply_queues = obsolete_reply_queues
self._pending_tasks = pending_message_actions
self.stopwatch = timeutils.StopWatch()
self.stopwatch.start()

Expand Down Expand Up @@ -116,7 +133,7 @@ def reply(self, reply=None, failure=None):
return

def acknowledge(self):
self.message.acknowledge()
self._pending_tasks.put(self.message.acknowledge)
self.listener.msg_id_cache.add(self.unique_id)

def requeue(self):
Expand All @@ -126,7 +143,7 @@ def requeue(self):
# msg_id_cache, the message will be reconsumed, the only difference is
# the message stay at the beginning of the queue instead of moving to
# the end.
self.message.requeue()
self._pending_tasks.put(self.message.requeue)


class ObsoleteReplyQueuesCache(object):
Expand Down Expand Up @@ -184,6 +201,8 @@ def __init__(self, driver, conn):
self.incoming = []
self._stopped = threading.Event()
self._obsolete_reply_queues = ObsoleteReplyQueuesCache()
self._pending_tasks = moves.queue.Queue()
self._current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN

def __call__(self, message):
ctxt = rpc_amqp.unpack_context(message)
Expand All @@ -194,27 +213,45 @@ def __call__(self, message):
'msg_id': ctxt.msg_id})
else:
LOG.debug("received message with unique_id: %s", unique_id)
self.incoming.append(AMQPIncomingMessage(self,
ctxt.to_dict(),
message,
unique_id,
ctxt.msg_id,
ctxt.reply_q,
self._obsolete_reply_queues))

self.incoming.append(AMQPIncomingMessage(
self,
ctxt.to_dict(),
message,
unique_id,
ctxt.msg_id,
ctxt.reply_q,
self._obsolete_reply_queues,
self._pending_tasks))

@base.batch_poll_helper
def poll(self, timeout=None):
stopwatch = timeutils.StopWatch(duration=timeout).start()

while not self._stopped.is_set():
do_pending_tasks(self._pending_tasks)

if self.incoming:
return self.incoming.pop(0)

left = stopwatch.leftover(return_none=True)
if left is None:
left = self._current_timeout
if left <= 0:
return None

try:
self.conn.consume(timeout=timeout)
self.conn.consume(timeout=min(self._current_timeout, left))
except rpc_common.Timeout:
return None
self._current_timeout = max(self._current_timeout * 2,
ACK_REQUEUE_EVERY_SECONDS_MAX)
else:
self._current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN

def stop(self):
self._stopped.set()
self.conn.stop_consuming()
do_pending_tasks(self._pending_tasks)

def cleanup(self):
# Closes listener connection
Expand Down Expand Up @@ -269,6 +306,7 @@ def __init__(self, reply_q, conn, allowed_remote_exmods):
self.allowed_remote_exmods = allowed_remote_exmods
self.msg_id_cache = rpc_amqp._MsgIdCache()
self.waiters = ReplyWaiters()
self._pending_tasks = moves.queue.Queue()

self.conn.declare_direct_consumer(reply_q, self)

Expand All @@ -283,17 +321,26 @@ def stop(self):
self.conn.stop_consuming()
self._thread.join()
self._thread = None
do_pending_tasks(self._pending_tasks)

def poll(self):
current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN
while not self._thread_exit_event.is_set():
do_pending_tasks(self._pending_tasks)
try:
self.conn.consume()
# ack every ACK_REQUEUE_EVERY_SECONDS_MAX seconds
self.conn.consume(timeout=current_timeout)
except rpc_common.Timeout:
current_timeout = max(current_timeout * 2,
ACK_REQUEUE_EVERY_SECONDS_MAX)
except Exception:
LOG.exception(_LE("Failed to process incoming message, "
"retrying..."))
else:
current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN

def __call__(self, message):
message.acknowledge()
self._pending_tasks.put(message.acknowledge)
incoming_msg_id = message.pop('_msg_id', None)
if message.get('ending'):
LOG.debug("received reply msg_id: %s", incoming_msg_id)
Expand Down

0 comments on commit 7e71ac8

Please sign in to comment.