Allow users run the rabbitmq heartbeat inside a standard pthread.
This is an experimental feature.

The proposed changes will fix related issues when we run
heartbeat under apache/httpd enviornment with the apache MPM `prefork`
[1] engine and mod_wsgi or uwsgi in a monkey patched environment.

Propose changes to allow user to choose to run the rabbitmq health check
heartbeat in a standard python thread.


We facing an issue with the rabbitmq driver heartbeat
under apache MPM `prefork` module and mod_wsgi when nova_api monkey
patched the stdlib by using eventlet.

nova_api calling eventlet.monkey_patch() [2] when it runs under mod_wsgi.

This impacts the AMQP heartbeat thread,
which is meant to be a native thread. Instead of checking AMQP sockets
every 15s, It is now suspended and resumed by eventlet. However,
resuming greenthreads can take a very long time if mod_wsgi isn't
processing traffic regularly, which can cause rabbitmq to close the AMQP

Root Cause

The oslo.messaging RabbitMQ driver and especially the heartbeat
suffer to inherit the execution model of the service which consume him.

In this scenario nova_api need green threads to manage cells and edge
features so nova_api monkey patch the stdlib to obtain async features,
and the oslo.messaging rabbitmq driver endure these changes.

I think the main issue here is that nova_api want async and use eventlet green
threads to obtain it.


We want to allow user to isolate the heartbeat execution model
from the parent process inherited execution model by passing the
`heartbeat_in_pthread` option through the driver config.

While we use MPM `prefork` we want to avoid to use libevent and epoll.

If the `heartbeat_in_pthread` option is given we want to force to use the
python stdlib threading module to run the
rabbitmq heartbeat to avoid issue related to a non "standard"
environment. I mean "standard" because async features isn't the default
config in mostly case, starting by apache which define `prefork` is the
default engine.

This is an experimental feature, we can help us to ensure to run heartbeat
through a classical python thread



[2] openstack/nova@3c5e2b0

Change-Id: If8846599efc48fe18ecfb99c04e2c38f9a45b9ed
4383 committed Aug 7, 2019
1 parent fe0ac31 commit 22f240b82fffbd62be8568a7d0d3369134596ace
Showing 2 changed files with 57 additions and 1 deletion.
@@ -33,6 +33,7 @@
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import eventletutils
from oslo_utils import importutils
import six
import six.moves
from six.moves.urllib import parse
@@ -46,6 +47,18 @@
from oslo_messaging import _utils
from oslo_messaging import exceptions

eventlet = importutils.try_import('eventlet')
if eventlet and eventletutils.is_monkey_patched("thread"):
# Here we initialize module with the native python threading module
# if it was already monkey patched by eventlet/greenlet.
stdlib_threading = eventlet.patcher.original('threading')
# Manage the case where we run this driver in a non patched environment
# and where user even so configure the driver to run heartbeat through
# a python thread, if we don't do that when the heartbeat will start
# we will facing an issue by trying to override the threading module.
stdlib_threading = threading

# NOTE(sileht): don't exists in py2 socket module

@@ -76,6 +89,15 @@
help='SSL certification authority file '
'(valid only if SSL enabled).'),
help="EXPERIMENTAL: Run the health check heartbeat thread"
"through a native python thread. By default if this"
"option isn't provided the health check heartbeat will"
"inherit the execution model from the parent process. By"
"example if the parent process have monkey patched the"
"stdlib by using eventlet/greenlet then the heartbeat"
"will be run through a green thread."),
@@ -443,6 +465,34 @@ def __init__(self, conf, url, purpose):
self.kombu_failover_strategy = driver_conf.kombu_failover_strategy
self.kombu_compression = driver_conf.kombu_compression
self.heartbeat_in_pthread = driver_conf.heartbeat_in_pthread

if self.heartbeat_in_pthread:
# NOTE(hberaud): Experimental: threading module is in use to run
# the rabbitmq health check heartbeat. in some situation like
# with nova-api, nova need green threads to run the cells
# mechanismes in an async mode, so they used eventlet and
# greenlet to monkey patch the python stdlib and get green threads.
# The issue here is that nova-api run under the apache MPM prefork
# module and mod_wsgi. The apache prefork module doesn't support
# epoll and recent kernel features, and evenlet is built over epoll
# and libevent, so when we run the rabbitmq heartbeat we inherit
# from the execution model of the parent process (nova-api), and
# in this case we will run the heartbeat through a green thread.
# We want to allow users to choose between pthread and
# green threads if needed in some specific situations.
# This experimental feature allow user to use pthread in an env
# that doesn't support eventlet without forcing the parent process
# to stop to use eventlet if they need monkey patching for some
# specific reasons.
# If users want to use pthread we need to make sure that we
# will use the *native* threading module for
# initialize the heartbeat thread.
# Here we override globaly the previously imported
# threading module with the native python threading module
# if it was already monkey patched by eventlet/greenlet.
global threading
threading = stdlib_threading

if self.ssl:
self.ssl_version = driver_conf.ssl_version
@@ -896,7 +946,7 @@ def _heartbeat_check(self):

def _heartbeat_start(self):
if self._heartbeat_supported_and_enabled():
self._heartbeat_exit_event = eventletutils.Event()
self._heartbeat_exit_event = threading.Event()
self._heartbeat_thread = threading.Thread(
self._heartbeat_thread.daemon = True
@@ -89,6 +89,11 @@ def test_test_heartbeat_sent_connection_fail(self):
info='A recoverable connection/channel error occurred, '
'trying to reconnect: %s')

def test_run_heartbeat_in_pthread(self):

class TestRabbitQos(test_utils.BaseTestCase):

@@ -997,6 +1002,7 @@ def test_ensure_no_retry(self):

class ConnectionLockTestCase(test_utils.BaseTestCase):
def _thread(self, lock, sleep, heartbeat=False):

def thread_task():
if heartbeat:
with lock.for_heartbeat():

