Skip to content

Commit

Permalink
rabbit: heartbeat implementation
Browse files Browse the repository at this point in the history
AMQP offers a heartbeat feature to ensure that the application layer
promptly finds out about disrupted connections (and also completely
unresponsive peers). If the client requests heartbeats on connection, rabbit
server will regularly send messages to each connections with the expectation of
a response.

To acheive this, each driver connection object spawn a thread that
send/retrieve heartbeat packets exchanged between the server and the
client.

To protect the concurrency access to the kombu connection between the
driver and this thread use a lock that always prioritize the
heartbeat thread. So when the heartbeat thread wakes up it will acquire the
lock quickly, to ensure we have no heartbeat starvation when the driver
sends a lot of messages.

Also when we are polling the broker, the lock can be held for a long
time by the 'consume' method, so this one does the heartbeat stuffs itself.

DocImpact: 2 new configuration options for Rabbit driver

Co-Authored-By: Oleksii Zamiatin <ozamiatin@mirantis.com>
Co-Authored-By: Ilya Pekelny <ipekelny@mirantis.com>

Related-Bug: #1371723
Closes-Bug: #856764

Change-Id: I1d3a635f3853bc13ffc14034468f1ac6262c11a3
  • Loading branch information
Mehdi Abaakouk committed Mar 18, 2015
1 parent ec68634 commit b9e134d
Show file tree
Hide file tree
Showing 8 changed files with 424 additions and 39 deletions.
32 changes: 28 additions & 4 deletions oslo_messaging/_drivers/amqp.py
Expand Up @@ -55,6 +55,26 @@
UNIQUE_ID = '_unique_id'
LOG = logging.getLogger(__name__)

# NOTE(sileht): Even if rabbit/qpid have only one Connection class,
# this connection can be used for two purposes:
# * wait and receive amqp messages (only do read stuffs on the socket)
# * send messages to the broker (only do write stuffs on the socket)
# The code inside a connection class is not concurrency safe.
# Using one Connection class instance for doing both, will result
# of eventlet complaining of multiple greenthreads that read/write the
# same fd concurrently... because 'send' and 'listen' run in different
# greenthread.
# So, a connection cannot be shared between thread/greenthread and
# this two variables permit to define the purpose of the connection
# to allow drivers to add special handling if needed (like heatbeat).
# amqp drivers create 3 kind of connections:
# * driver.listen*(): each call create a new 'PURPOSE_LISTEN' connection
# * driver.send*(): a pool of 'PURPOSE_SEND' connections is used
# * driver internally have another 'PURPOSE_LISTEN' connection dedicated
# to wait replies of rpc call
PURPOSE_LISTEN = 'listen'
PURPOSE_SEND = 'send'


class ConnectionPool(pool.Pool):
"""Class that implements a Pool of Connections."""
Expand All @@ -66,9 +86,11 @@ def __init__(self, conf, rpc_conn_pool_size, url, connection_cls):
self.reply_proxy = None

# TODO(comstud): Timeout connections not used in a while
def create(self):
def create(self, purpose=None):
if purpose is None:
purpose = PURPOSE_SEND
LOG.debug('Pool creating new connection')
return self.connection_cls(self.conf, self.url)
return self.connection_cls(self.conf, self.url, purpose)

def empty(self):
for item in self.iter_free():
Expand All @@ -87,16 +109,18 @@ class ConnectionContext(rpc_common.Connection):
If possible the function makes sure to return a connection to the pool.
"""

def __init__(self, connection_pool, pooled=True):
def __init__(self, connection_pool, purpose):
"""Create a new connection, or get one from the pool."""
self.connection = None
self.connection_pool = connection_pool
pooled = purpose == PURPOSE_SEND
if pooled:
self.connection = connection_pool.get()
else:
# a non-pooled connection is requested, so create a new connection
self.connection = connection_pool.create()
self.connection = connection_pool.create(purpose)
self.pooled = pooled
self.connection.pooled = pooled

def __enter__(self):
"""When with ConnectionContext() is used, return self."""
Expand Down
15 changes: 8 additions & 7 deletions oslo_messaging/_drivers/amqpdriver.py
Expand Up @@ -69,7 +69,8 @@ def reply(self, reply=None, failure=None, log_failure=True):
# NOTE(Alexei_987) not sending reply, if msg_id is empty
# because reply should not be expected by caller side
return
with self.listener.driver._get_connection() as conn:
with self.listener.driver._get_connection(
rpc_amqp.PURPOSE_SEND) as conn:
self._send_reply(conn, reply, failure, log_failure=log_failure)
self._send_reply(conn, ending=True)

Expand Down Expand Up @@ -268,9 +269,9 @@ def __init__(self, conf, url, connection_pool,
def _get_exchange(self, target):
return target.exchange or self._default_exchange

def _get_connection(self, pooled=True):
def _get_connection(self, purpose=rpc_amqp.PURPOSE_SEND):
return rpc_amqp.ConnectionContext(self._connection_pool,
pooled=pooled)
purpose=purpose)

def _get_reply_q(self):
with self._reply_q_lock:
Expand All @@ -279,7 +280,7 @@ def _get_reply_q(self):

reply_q = 'reply_' + uuid.uuid4().hex

conn = self._get_connection(pooled=False)
conn = self._get_connection(rpc_amqp.PURPOSE_LISTEN)

self._waiter = ReplyWaiter(reply_q, conn,
self._allowed_remote_exmods)
Expand Down Expand Up @@ -320,7 +321,7 @@ def to_dict(self):
self._waiter.listen(msg_id)

try:
with self._get_connection() as conn:
with self._get_connection(rpc_amqp.PURPOSE_SEND) as conn:
if notify:
conn.notify_send(self._get_exchange(target),
target.topic, msg, retry=retry)
Expand Down Expand Up @@ -353,7 +354,7 @@ def send_notification(self, target, ctxt, message, version, retry=None):
envelope=(version == 2.0), notify=True, retry=retry)

def listen(self, target):
conn = self._get_connection(pooled=False)
conn = self._get_connection(rpc_amqp.PURPOSE_LISTEN)

listener = AMQPListener(self, conn)

Expand All @@ -369,7 +370,7 @@ def listen(self, target):
return listener

def listen_for_notifications(self, targets_and_priorities, pool):
conn = self._get_connection(pooled=False)
conn = self._get_connection(rpc_amqp.PURPOSE_LISTEN)

listener = AMQPListener(self, conn)
for target, priority in targets_and_priorities:
Expand Down
2 changes: 1 addition & 1 deletion oslo_messaging/_drivers/impl_qpid.py
Expand Up @@ -462,7 +462,7 @@ class Connection(object):

pools = {}

def __init__(self, conf, url):
def __init__(self, conf, url, purpose):
if not qpid_messaging:
raise ImportError("Failed to import qpid.messaging")

Expand Down

0 comments on commit b9e134d

Please sign in to comment.