From daddb82788918296f8b34d6cdeb40d01620fb183 Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Wed, 2 Dec 2015 11:38:27 +0100 Subject: [PATCH] Don't hold the connection when reply fail This change moves the reply retry code to upper layer to be able to release the connection while we wait between two retries. In the worse scenario, a client waits for more than 30 replies and died/restart, the server tries to send this 30 replies to this this client and can wait too 60s per replies. During this replies for other clients are just stuck. This change fixes that. Related-bug: #1477914 Closes-bug: #1521958 Change-Id: I0d3c16ea6d2c1da143de4924b3be41d1cea159bd --- oslo_messaging/_drivers/amqpdriver.py | 69 +++++++++++++++++++------- oslo_messaging/_drivers/impl_rabbit.py | 58 +++++++--------------- 2 files changed, 67 insertions(+), 60 deletions(-) diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index 420587c45..e95edfc2e 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -17,6 +17,7 @@ import logging import threading +import time import uuid import cachetools @@ -70,16 +71,13 @@ def _send_reply(self, conn, reply=None, failure=None, # Otherwise use the msg_id for backward compatibility. if self.reply_q: msg['_msg_id'] = self.msg_id - try: - if ending: - LOG.debug("sending reply msg_id: %(msg_id)s " - "reply queue: %(reply_q)s" % { - 'msg_id': self.msg_id, - 'unique_id': unique_id, - 'reply_q': self.reply_q}) - conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg)) - except rpc_amqp.AMQPDestinationNotFound: - self._obsolete_reply_queues.add(self.reply_q, self.msg_id) + if ending: + LOG.debug("sending reply msg_id: %(msg_id)s " + "reply queue: %(reply_q)s" % { + 'msg_id': self.msg_id, + 'unique_id': unique_id, + 'reply_q': self.reply_q}) + conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg)) else: # TODO(sileht): look at which version of oslo-incubator rpc # send need this, but I guess this is older than icehouse @@ -93,20 +91,52 @@ def reply(self, reply=None, failure=None, log_failure=True): # because reply should not be expected by caller side return - # NOTE(sileht): return without hold the a connection if possible + # NOTE(sileht): return without using a connection if possible if (self.reply_q and not self._obsolete_reply_queues.reply_q_valid(self.reply_q, self.msg_id)): return - with self.listener.driver._get_connection( - rpc_common.PURPOSE_SEND) as conn: - if self.listener.driver.send_single_reply: - self._send_reply(conn, reply, failure, log_failure=log_failure, - ending=True) - else: - self._send_reply(conn, reply, failure, log_failure=log_failure) - self._send_reply(conn, ending=True) + # NOTE(sileht): we read the configuration value from the driver + # to be able to backport this change in previous version that + # still have the qpid driver + duration = self.listener.driver.missing_destination_retry_timeout + timer = rpc_common.DecayingTimer(duration=duration) + timer.start() + + first_reply_sent = False + while True: + try: + with self.listener.driver._get_connection( + rpc_common.PURPOSE_SEND) as conn: + if self.listener.driver.send_single_reply: + self._send_reply(conn, reply, failure, + log_failure=log_failure, + ending=True) + else: + if not first_reply_sent: + self._send_reply(conn, reply, failure, + log_failure=log_failure) + first_reply_sent = True + self._send_reply(conn, ending=True) + return + except rpc_amqp.AMQPDestinationNotFound: + if timer.check_return() > 0: + LOG.info(_LI("The reply %(msg_id)s cannot be sent " + "%(reply_q)s reply queue don't exist, " + "retrying...") % { + 'msg_id': self.msg_id, + 'reply_q': self.reply_q}) + time.sleep(0.25) + else: + self._obsolete_reply_queues.add(self.reply_q, self.msg_id) + LOG.info(_LI("The reply %(msg_id)s cannot be sent " + "%(reply_q)s reply queue don't exist after " + "%(duration)s sec abandoning...") % { + 'msg_id': self.msg_id, + 'reply_q': self.reply_q, + 'duration': duration}) + return def acknowledge(self): self.acknowledge_callback() @@ -345,6 +375,7 @@ def wait(self, msg_id, timeout): class AMQPDriverBase(base.BaseDriver): + missing_destination_retry_timeout = 0 def __init__(self, conf, url, connection_pool, default_exchange=None, allowed_remote_exmods=None, diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index d96a39838..cdd642ea6 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -1043,32 +1043,20 @@ def _publish_and_creates_default_queue(self, exchange, msg, self._publish(exchange, msg, routing_key=routing_key, timeout=timeout) - def _publish_and_retry_on_missing_exchange(self, exchange, msg, - routing_key=None, timeout=None): - """Publisher that retry if the exchange is missing. - """ - + def _publish_and_raises_on_missing_exchange(self, exchange, msg, + routing_key=None, + timeout=None): + """Publisher that raises exception if exchange is missing.""" if not exchange.passive: raise RuntimeError("_publish_and_retry_on_missing_exchange() must " "be called with an passive exchange.") - # TODO(sileht): use @retrying - # NOTE(sileht): no need to wait the application expect a response - # before timeout is exshauted - duration = ( - timeout if timeout is not None - else self.kombu_reconnect_timeout - ) - - timer = rpc_common.DecayingTimer(duration=duration) - timer.start() - - while True: - try: - self._publish(exchange, msg, routing_key=routing_key, - timeout=timeout) - return - except self.connection.channel_errors as exc: + try: + self._publish(exchange, msg, routing_key=routing_key, + timeout=timeout) + return + except self.connection.channel_errors as exc: + if exc.code == 404: # NOTE(noelbk/sileht): # If rabbit dies, the consumer can be disconnected before the # publisher sends, and if the consumer hasn't declared the @@ -1077,24 +1065,9 @@ def _publish_and_retry_on_missing_exchange(self, exchange, msg, # So we set passive=True to the publisher exchange and catch # the 404 kombu ChannelError and retry until the exchange # appears - if exc.code == 404 and timer.check_return() > 0: - LOG.info(_LI("The exchange %(exchange)s to send to " - "%(routing_key)s doesn't exist yet, " - "retrying...") % { - 'exchange': exchange.name, - 'routing_key': routing_key}) - time.sleep(0.25) - continue - elif exc.code == 404: - msg = _("The exchange %(exchange)s to send to " - "%(routing_key)s still doesn't exist after " - "%(duration)s sec abandoning...") % { - 'duration': duration, - 'exchange': exchange.name, - 'routing_key': routing_key} - LOG.info(msg) - raise rpc_amqp.AMQPDestinationNotFound(msg) - raise + raise rpc_amqp.AMQPDestinationNotFound( + "exchange %s doesn't exists" % exchange.name) + raise def direct_send(self, msg_id, msg): """Send a 'direct' message.""" @@ -1104,7 +1077,7 @@ def direct_send(self, msg_id, msg): auto_delete=True, passive=True) - self._ensure_publishing(self._publish_and_retry_on_missing_exchange, + self._ensure_publishing(self._publish_and_raises_on_missing_exchange, exchange, msg, routing_key=msg_id) def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None): @@ -1160,6 +1133,9 @@ def __init__(self, conf, url, conf.register_opts(rpc_amqp.amqp_opts, group=opt_group) conf.register_opts(base.base_opts, group=opt_group) + self.missing_destination_retry_timeout = ( + conf.oslo_messaging_rabbit.kombu_reconnect_timeout) + connection_pool = pool.ConnectionPool( conf, conf.oslo_messaging_rabbit.rpc_conn_pool_size, url, Connection)