Skip to content

Commit

Permalink
Don't hold the connection when reply fail
Browse files Browse the repository at this point in the history
This change moves the reply retry code to upper layer
to be able to release the connection while we wait between
two retries.

In the worse scenario, a client waits for more than 30 replies
and died/restart, the server tries to send this 30 replies to this
this client and can wait too 60s per replies. During this
replies for other clients are just stuck.

This change fixes that.

Related-bug: #1477914
Closes-bug: #1521958

Change-Id: I0d3c16ea6d2c1da143de4924b3be41d1cea159bd
  • Loading branch information
Mehdi Abaakouk committed Dec 2, 2015
1 parent ba42571 commit daddb82
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 60 deletions.
69 changes: 50 additions & 19 deletions oslo_messaging/_drivers/amqpdriver.py
Expand Up @@ -17,6 +17,7 @@

import logging
import threading
import time
import uuid

import cachetools
Expand Down Expand Up @@ -70,16 +71,13 @@ def _send_reply(self, conn, reply=None, failure=None,
# Otherwise use the msg_id for backward compatibility.
if self.reply_q:
msg['_msg_id'] = self.msg_id
try:
if ending:
LOG.debug("sending reply msg_id: %(msg_id)s "
"reply queue: %(reply_q)s" % {
'msg_id': self.msg_id,
'unique_id': unique_id,
'reply_q': self.reply_q})
conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg))
except rpc_amqp.AMQPDestinationNotFound:
self._obsolete_reply_queues.add(self.reply_q, self.msg_id)
if ending:
LOG.debug("sending reply msg_id: %(msg_id)s "
"reply queue: %(reply_q)s" % {
'msg_id': self.msg_id,
'unique_id': unique_id,
'reply_q': self.reply_q})
conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg))
else:
# TODO(sileht): look at which version of oslo-incubator rpc
# send need this, but I guess this is older than icehouse
Expand All @@ -93,20 +91,52 @@ def reply(self, reply=None, failure=None, log_failure=True):
# because reply should not be expected by caller side
return

# NOTE(sileht): return without hold the a connection if possible
# NOTE(sileht): return without using a connection if possible
if (self.reply_q and
not self._obsolete_reply_queues.reply_q_valid(self.reply_q,
self.msg_id)):
return

with self.listener.driver._get_connection(
rpc_common.PURPOSE_SEND) as conn:
if self.listener.driver.send_single_reply:
self._send_reply(conn, reply, failure, log_failure=log_failure,
ending=True)
else:
self._send_reply(conn, reply, failure, log_failure=log_failure)
self._send_reply(conn, ending=True)
# NOTE(sileht): we read the configuration value from the driver
# to be able to backport this change in previous version that
# still have the qpid driver
duration = self.listener.driver.missing_destination_retry_timeout
timer = rpc_common.DecayingTimer(duration=duration)
timer.start()

first_reply_sent = False
while True:
try:
with self.listener.driver._get_connection(
rpc_common.PURPOSE_SEND) as conn:
if self.listener.driver.send_single_reply:
self._send_reply(conn, reply, failure,
log_failure=log_failure,
ending=True)
else:
if not first_reply_sent:
self._send_reply(conn, reply, failure,
log_failure=log_failure)
first_reply_sent = True
self._send_reply(conn, ending=True)
return
except rpc_amqp.AMQPDestinationNotFound:
if timer.check_return() > 0:
LOG.info(_LI("The reply %(msg_id)s cannot be sent "
"%(reply_q)s reply queue don't exist, "
"retrying...") % {
'msg_id': self.msg_id,
'reply_q': self.reply_q})
time.sleep(0.25)
else:
self._obsolete_reply_queues.add(self.reply_q, self.msg_id)
LOG.info(_LI("The reply %(msg_id)s cannot be sent "
"%(reply_q)s reply queue don't exist after "
"%(duration)s sec abandoning...") % {
'msg_id': self.msg_id,
'reply_q': self.reply_q,
'duration': duration})
return

def acknowledge(self):
self.acknowledge_callback()
Expand Down Expand Up @@ -345,6 +375,7 @@ def wait(self, msg_id, timeout):


class AMQPDriverBase(base.BaseDriver):
missing_destination_retry_timeout = 0

def __init__(self, conf, url, connection_pool,
default_exchange=None, allowed_remote_exmods=None,
Expand Down
58 changes: 17 additions & 41 deletions oslo_messaging/_drivers/impl_rabbit.py
Expand Up @@ -1043,32 +1043,20 @@ def _publish_and_creates_default_queue(self, exchange, msg,

self._publish(exchange, msg, routing_key=routing_key, timeout=timeout)

def _publish_and_retry_on_missing_exchange(self, exchange, msg,
routing_key=None, timeout=None):
"""Publisher that retry if the exchange is missing.
"""

def _publish_and_raises_on_missing_exchange(self, exchange, msg,
routing_key=None,
timeout=None):
"""Publisher that raises exception if exchange is missing."""
if not exchange.passive:
raise RuntimeError("_publish_and_retry_on_missing_exchange() must "
"be called with an passive exchange.")

# TODO(sileht): use @retrying
# NOTE(sileht): no need to wait the application expect a response
# before timeout is exshauted
duration = (
timeout if timeout is not None
else self.kombu_reconnect_timeout
)

timer = rpc_common.DecayingTimer(duration=duration)
timer.start()

while True:
try:
self._publish(exchange, msg, routing_key=routing_key,
timeout=timeout)
return
except self.connection.channel_errors as exc:
try:
self._publish(exchange, msg, routing_key=routing_key,
timeout=timeout)
return
except self.connection.channel_errors as exc:
if exc.code == 404:
# NOTE(noelbk/sileht):
# If rabbit dies, the consumer can be disconnected before the
# publisher sends, and if the consumer hasn't declared the
Expand All @@ -1077,24 +1065,9 @@ def _publish_and_retry_on_missing_exchange(self, exchange, msg,
# So we set passive=True to the publisher exchange and catch
# the 404 kombu ChannelError and retry until the exchange
# appears
if exc.code == 404 and timer.check_return() > 0:
LOG.info(_LI("The exchange %(exchange)s to send to "
"%(routing_key)s doesn't exist yet, "
"retrying...") % {
'exchange': exchange.name,
'routing_key': routing_key})
time.sleep(0.25)
continue
elif exc.code == 404:
msg = _("The exchange %(exchange)s to send to "
"%(routing_key)s still doesn't exist after "
"%(duration)s sec abandoning...") % {
'duration': duration,
'exchange': exchange.name,
'routing_key': routing_key}
LOG.info(msg)
raise rpc_amqp.AMQPDestinationNotFound(msg)
raise
raise rpc_amqp.AMQPDestinationNotFound(
"exchange %s doesn't exists" % exchange.name)
raise

def direct_send(self, msg_id, msg):
"""Send a 'direct' message."""
Expand All @@ -1104,7 +1077,7 @@ def direct_send(self, msg_id, msg):
auto_delete=True,
passive=True)

self._ensure_publishing(self._publish_and_retry_on_missing_exchange,
self._ensure_publishing(self._publish_and_raises_on_missing_exchange,
exchange, msg, routing_key=msg_id)

def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None):
Expand Down Expand Up @@ -1160,6 +1133,9 @@ def __init__(self, conf, url,
conf.register_opts(rpc_amqp.amqp_opts, group=opt_group)
conf.register_opts(base.base_opts, group=opt_group)

self.missing_destination_retry_timeout = (
conf.oslo_messaging_rabbit.kombu_reconnect_timeout)

connection_pool = pool.ConnectionPool(
conf, conf.oslo_messaging_rabbit.rpc_conn_pool_size,
url, Connection)
Expand Down

0 comments on commit daddb82

Please sign in to comment.