Skip to content

Commit

Permalink
Don't reply when we known that client is gone
Browse files Browse the repository at this point in the history
In case of a broker restart/failover a reply queue can be
unreachable for short period the IncomingMessage.send_reply
will block for 60 seconds in this case or until rabbit recovers.

But in case of the reply queue is unreachable because the
rpc client is really gone, we can have a ton of reply to send
waiting 60 seconds.
This leads to a starvation of connection of the pool
The rpc server take to much time to send reply, other rpc client will
raise TimeoutError because their don't receive their replies in time.

This changes introduces an object cache that stores already known gone
client to not wait 60 seconds and hold a connection of the pool
Keeping 200 last gone rpc client for 1 minute is enough
and doesn't hold to much memory.

This also don't raise anymore a frightening exception when we can't send reply
to the rpc client. But just logging a info about missing exchange and
a warning about unsend reply.

Closes-bug: #1460652

Change-Id: I928b30c9b5f9ee007532ff703e136640b0e8aaf4
  • Loading branch information
Mehdi Abaakouk authored and vikt0rs committed Jul 6, 2015
1 parent 8dc5923 commit 286659a
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 7 deletions.
4 changes: 4 additions & 0 deletions oslo_messaging/_drivers/amqp.py
Expand Up @@ -263,3 +263,7 @@ def _add_unique_id(msg):
"""Add unique_id for checking duplicate messages."""
unique_id = uuid.uuid4().hex
msg.update({UNIQUE_ID: unique_id})


class AMQPDestinationNotFound(Exception):
pass
76 changes: 73 additions & 3 deletions oslo_messaging/_drivers/amqpdriver.py
Expand Up @@ -19,6 +19,7 @@
import threading
import uuid

import cachetools
from six import moves

import oslo_messaging
Expand All @@ -27,13 +28,15 @@
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._i18n import _
from oslo_messaging._i18n import _LI
from oslo_messaging._i18n import _LW

LOG = logging.getLogger(__name__)


class AMQPIncomingMessage(base.IncomingMessage):

def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q):
def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q,
obsolete_reply_queues):
super(AMQPIncomingMessage, self).__init__(listener, ctxt,
dict(message))

Expand All @@ -42,9 +45,15 @@ def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q):
self.reply_q = reply_q
self.acknowledge_callback = message.acknowledge
self.requeue_callback = message.requeue
self._obsolete_reply_queues = obsolete_reply_queues

def _send_reply(self, conn, reply=None, failure=None,
ending=False, log_failure=True):
if (self.reply_q and
not self._obsolete_reply_queues.reply_q_valid(self.reply_q,
self.msg_id)):
return

if failure:
failure = rpc_common.serialize_remote_exception(failure,
log_failure)
Expand All @@ -60,15 +69,29 @@ def _send_reply(self, conn, reply=None, failure=None,
# Otherwise use the msg_id for backward compatibility.
if self.reply_q:
msg['_msg_id'] = self.msg_id
conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg))
try:
conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg))
except rpc_amqp.AMQPDestinationNotFound:
self._obsolete_reply_queues.add(self.reply_q, self.msg_id)
else:
# TODO(sileht): look at which version of oslo-incubator rpc
# send need this, but I guess this is older than icehouse
# if this is icehouse, we can drop this at M
# if this is havana, we can drop this now.
conn.direct_send(self.msg_id, rpc_common.serialize_msg(msg))

def reply(self, reply=None, failure=None, log_failure=True):
if not self.msg_id:
# NOTE(Alexei_987) not sending reply, if msg_id is empty
# because reply should not be expected by caller side
return

# NOTE(sileht): return without hold the a connection if possible
if (self.reply_q and
not self._obsolete_reply_queues.reply_q_valid(self.reply_q,
self.msg_id)):
return

with self.listener.driver._get_connection(
rpc_amqp.PURPOSE_SEND) as conn:
if self.listener.driver.send_single_reply:
Expand All @@ -92,6 +115,51 @@ def requeue(self):
self.requeue_callback()


class ObsoleteReplyQueuesCache(object):
"""Cache of reply queue id that doesn't exists anymore.
NOTE(sileht): In case of a broker restart/failover
a reply queue can be unreachable for short period
the IncomingMessage.send_reply will block for 60 seconds
in this case or until rabbit recovers.
But in case of the reply queue is unreachable because the
rpc client is really gone, we can have a ton of reply to send
waiting 60 seconds.
This leads to a starvation of connection of the pool
The rpc server take to much time to send reply, other rpc client will
raise TimeoutError because their don't receive their replies in time.
This object cache stores already known gone client to not wait 60 seconds
and hold a connection of the pool.
Keeping 200 last gone rpc client for 1 minute is enough
and doesn't hold to much memory.
"""

SIZE = 200
TTL = 60

def __init__(self):
self._lock = threading.RLock()
self._cache = cachetools.TTLCache(self.SIZE, self.TTL)

def reply_q_valid(self, reply_q, msg_id):
if reply_q in self._cache:
self._no_reply_log(reply_q, msg_id)
return False
return True

def add(self, reply_q, msg_id):
with self._lock:
self._cache.update({reply_q: msg_id})
self._no_reply_log(reply_q, msg_id)

def _no_reply_log(self, reply_q, msg_id):
LOG.warn(_LW("%(reply_queue)s doesn't exists, drop reply to "
"%(msg_id)s"), {'reply_queue': reply_q,
'msg_id': msg_id})


class AMQPListener(base.Listener):

def __init__(self, driver, conn):
Expand All @@ -100,6 +168,7 @@ def __init__(self, driver, conn):
self.msg_id_cache = rpc_amqp._MsgIdCache()
self.incoming = []
self._stopped = threading.Event()
self._obsolete_reply_queues = ObsoleteReplyQueuesCache()

def __call__(self, message):
ctxt = rpc_amqp.unpack_context(self.conf, message)
Expand All @@ -116,7 +185,8 @@ def __call__(self, message):
message,
unique_id,
ctxt.msg_id,
ctxt.reply_q))
ctxt.reply_q,
self._obsolete_reply_queues))

def poll(self, timeout=None):
while not self._stopped.is_set():
Expand Down
11 changes: 10 additions & 1 deletion oslo_messaging/_drivers/impl_rabbit.py
Expand Up @@ -1081,8 +1081,17 @@ def _publish_and_retry_on_missing_exchange(self, exchange, msg,
"retrying...") % {
'exchange': exchange.name,
'routing_key': routing_key})
time.sleep(1)
time.sleep(0.25)
continue
elif exc.code == 404:
msg = _("The exchange %(exchange)s to send to "
"%(routing_key)s still doesn't exist after "
"%(duration)s sec abandonning...") % {
'duration': duration,
'exchange': exchange.name,
'routing_key': routing_key}
LOG.info(msg)
raise rpc_amqp.AMQPDestinationNotFound(msg)
raise

def direct_send(self, msg_id, msg):
Expand Down
39 changes: 36 additions & 3 deletions oslo_messaging/tests/drivers/test_impl_rabbit.py
Expand Up @@ -355,6 +355,11 @@ class TestSendReceive(test_utils.BaseTestCase):
('zero', dict(rx_id=False, reply=0)),
]

_reply_fail = [
('reply_success', dict(reply_failure_404=False)),
('reply_failure', dict(reply_failure_404=True)),
]

_failure = [
('success', dict(failure=False)),
('failure', dict(failure=True, expected=False)),
Expand All @@ -376,11 +381,14 @@ def generate_scenarios(cls):
cls.scenarios = testscenarios.multiply_scenarios(cls._n_senders,
cls._context,
cls._reply,
cls._reply_fail,
cls._failure,
cls._timeout,
cls._reply_ending)

def test_send_receive(self):
self.config(kombu_reconnect_timeout=0.5,
group="oslo_messaging_rabbit")
self.config(heartbeat_timeout_threshold=0,
group="oslo_messaging_rabbit")
self.config(send_single_reply=self.send_single_reply,
Expand Down Expand Up @@ -409,16 +417,21 @@ def stub_error(msg, *a, **kw):

def send_and_wait_for_reply(i):
try:
if self.reply_failure_404:
timeout = 0.01
else:
timeout = self.timeout
replies.append(driver.send(target,
self.ctxt,
{'tx_id': i},
wait_for_reply=True,
timeout=self.timeout))
timeout=timeout))
self.assertFalse(self.failure)
self.assertIsNone(self.timeout)
except (ZeroDivisionError, oslo_messaging.MessagingTimeout) as e:
replies.append(e)
self.assertTrue(self.failure or self.timeout is not None)
self.assertTrue(self.failure or self.timeout is not None
or self.reply_failure_404)

while len(senders) < self.n_senders:
senders.append(threading.Thread(target=send_and_wait_for_reply,
Expand All @@ -438,6 +451,18 @@ def send_and_wait_for_reply(i):
if len(order) > 1:
order[-1], order[-2] = order[-2], order[-1]

if self.reply_failure_404:
start = time.time()
# NOTE(sileht): Simulate a rpc client restart
# By returning a ExchangeNotFound when we try to
# send reply
exc = (driver._reply_q_conn.connection.
connection.channel_errors[0]())
exc.code = 404
self.useFixture(mockpatch.Patch(
'kombu.messaging.Producer.publish',
side_effect=exc))

for i in order:
if self.timeout is None:
if self.failure:
Expand All @@ -451,11 +476,19 @@ def send_and_wait_for_reply(i):
msgs[i].reply({'rx_id': i})
else:
msgs[i].reply(self.reply)
elif self.reply_failure_404:
msgs[i].reply({})
senders[i].join()

if self.reply_failure_404:
# NOTE(sileht) all reply fail, first take
# kombu_reconnect_timeout seconds to fail
# next immediatly fail
self.assertAlmostEqual(0.5, time.time() - start, 1)

self.assertEqual(len(senders), len(replies))
for i, reply in enumerate(replies):
if self.timeout is not None:
if self.timeout is not None or self.reply_failure_404:
self.assertIsInstance(reply, oslo_messaging.MessagingTimeout)
elif self.failure:
self.assertIsInstance(reply, ZeroDivisionError)
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Expand Up @@ -13,6 +13,7 @@ stevedore>=1.5.0 # Apache-2.0

# for jsonutils
six>=1.9.0
cachetools>=1.0.0 # MIT License

# FIXME(markmc): remove this when the drivers no longer
# import eventlet
Expand Down

0 comments on commit 286659a

Please sign in to comment.