Skip to content

Commit 6d16079

Browse files
committed
Fix deadlock when logging from a tpool thread.
The object server runs certain IO-intensive methods outside the main pthread for performance. If one of those methods tries to log, this can cause a crash that eventually leads to an object server with hundreds or thousands of greenthreads, all deadlocked. The short version of the story is that logging.SysLogHandler has a mutex which Eventlet monkey-patches. However, the monkey-patched mutex sometimes breaks if used across different pthreads, and it breaks in such a way that it is still considered held. After that happens, any attempt to emit a log message blocks the calling greenthread forever. The fix is to use a mutex that works across different greenlets and across different pthreads. This patch introduces such a lock based on an anonymous pipe. Change-Id: I57decefaf5bbed57b97a62d0df8518b112917480 Closes-Bug: 1710328
1 parent bf09a06 commit 6d16079

File tree

2 files changed

+307
-7
lines changed

2 files changed

+307
-7
lines changed

swift/common/utils.py

Lines changed: 128 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,12 @@
5050
import datetime
5151

5252
import eventlet
53+
import eventlet.debug
54+
import eventlet.greenthread
5355
import eventlet.semaphore
5456
from eventlet import GreenPool, sleep, Timeout, tpool
5557
from eventlet.green import socket, threading
58+
from eventlet.hubs import trampoline
5659
import eventlet.queue
5760
import netifaces
5861
import codecs
@@ -1894,17 +1897,18 @@ def get_logger(conf, name=None, log_to_console=False, log_route=None,
18941897
if udp_host:
18951898
udp_port = int(conf.get('log_udp_port',
18961899
logging.handlers.SYSLOG_UDP_PORT))
1897-
handler = SysLogHandler(address=(udp_host, udp_port),
1898-
facility=facility)
1900+
handler = ThreadSafeSysLogHandler(address=(udp_host, udp_port),
1901+
facility=facility)
18991902
else:
19001903
log_address = conf.get('log_address', '/dev/log')
19011904
try:
1902-
handler = SysLogHandler(address=log_address, facility=facility)
1905+
handler = ThreadSafeSysLogHandler(address=log_address,
1906+
facility=facility)
19031907
except socket.error as e:
19041908
# Either /dev/log isn't a UNIX socket or it does not exist at all
19051909
if e.errno not in [errno.ENOTSOCK, errno.ENOENT]:
19061910
raise
1907-
handler = SysLogHandler(facility=facility)
1911+
handler = ThreadSafeSysLogHandler(facility=facility)
19081912
handler.setFormatter(formatter)
19091913
logger.addHandler(handler)
19101914
get_logger.handler4logger[logger] = handler
@@ -4304,3 +4308,123 @@ def replace_partition_in_path(path, part_power):
43044308
path_components[-4] = "%d" % part
43054309

43064310
return os.sep.join(path_components)
4311+
4312+
4313+
class PipeMutex(object):
4314+
"""
4315+
Mutex using a pipe. Works across both greenlets and real threads, even
4316+
at the same time.
4317+
"""
4318+
4319+
def __init__(self):
4320+
self.rfd, self.wfd = os.pipe()
4321+
4322+
# You can't create a pipe in non-blocking mode; you must set it
4323+
# later.
4324+
rflags = fcntl.fcntl(self.rfd, fcntl.F_GETFL)
4325+
fcntl.fcntl(self.rfd, fcntl.F_SETFL, rflags | os.O_NONBLOCK)
4326+
os.write(self.wfd, b'-') # start unlocked
4327+
4328+
self.owner = None
4329+
self.recursion_depth = 0
4330+
4331+
# Usually, it's an error to have multiple greenthreads all waiting
4332+
# to read the same file descriptor. It's often a sign of inadequate
4333+
# concurrency control; for example, if you have two greenthreads
4334+
# trying to use the same memcache connection, they'll end up writing
4335+
# interleaved garbage to the socket or stealing part of each others'
4336+
# responses.
4337+
#
4338+
# In this case, we have multiple greenthreads waiting on the same
4339+
# file descriptor by design. This lets greenthreads in real thread A
4340+
# wait with greenthreads in real thread B for the same mutex.
4341+
# Therefore, we must turn off eventlet's multiple-reader detection.
4342+
#
4343+
# It would be better to turn off multiple-reader detection for only
4344+
# our calls to trampoline(), but eventlet does not support that.
4345+
eventlet.debug.hub_prevent_multiple_readers(False)
4346+
4347+
def acquire(self, blocking=True):
4348+
"""
4349+
Acquire the mutex.
4350+
4351+
If called with blocking=False, returns True if the mutex was
4352+
acquired and False if it wasn't. Otherwise, blocks until the mutex
4353+
is acquired and returns True.
4354+
4355+
This lock is recursive; the same greenthread may acquire it as many
4356+
times as it wants to, though it must then release it that many times
4357+
too.
4358+
"""
4359+
current_greenthread_id = id(eventlet.greenthread.getcurrent())
4360+
if self.owner == current_greenthread_id:
4361+
self.recursion_depth += 1
4362+
return True
4363+
4364+
while True:
4365+
try:
4366+
# If there is a byte available, this will read it and remove
4367+
# it from the pipe. If not, this will raise OSError with
4368+
# errno=EAGAIN.
4369+
os.read(self.rfd, 1)
4370+
self.owner = current_greenthread_id
4371+
return True
4372+
except OSError as err:
4373+
if err.errno != errno.EAGAIN:
4374+
raise
4375+
4376+
if not blocking:
4377+
return False
4378+
4379+
# Tell eventlet to suspend the current greenthread until
4380+
# self.rfd becomes readable. This will happen when someone
4381+
# else writes to self.wfd.
4382+
trampoline(self.rfd, read=True)
4383+
4384+
def release(self):
4385+
"""
4386+
Release the mutex.
4387+
"""
4388+
current_greenthread_id = id(eventlet.greenthread.getcurrent())
4389+
if self.owner != current_greenthread_id:
4390+
raise RuntimeError("cannot release un-acquired lock")
4391+
4392+
if self.recursion_depth > 0:
4393+
self.recursion_depth -= 1
4394+
return
4395+
4396+
self.owner = None
4397+
os.write(self.wfd, b'X')
4398+
4399+
def close(self):
4400+
"""
4401+
Close the mutex. This releases its file descriptors.
4402+
4403+
You can't use a mutex after it's been closed.
4404+
"""
4405+
if self.wfd is not None:
4406+
os.close(self.rfd)
4407+
self.rfd = None
4408+
os.close(self.wfd)
4409+
self.wfd = None
4410+
self.owner = None
4411+
self.recursion_depth = 0
4412+
4413+
def __del__(self):
4414+
# We need this so we don't leak file descriptors. Otherwise, if you
4415+
# call get_logger() and don't explicitly dispose of it by calling
4416+
# logger.logger.handlers[0].lock.close() [1], the pipe file
4417+
# descriptors are leaked.
4418+
#
4419+
# This only really comes up in tests. Swift processes tend to call
4420+
# get_logger() once and then hang on to it until they exit, but the
4421+
# test suite calls get_logger() a lot.
4422+
#
4423+
# [1] and that's a completely ridiculous thing to expect callers to
4424+
# do, so nobody does it and that's okay.
4425+
self.close()
4426+
4427+
4428+
class ThreadSafeSysLogHandler(SysLogHandler):
4429+
def createLock(self):
4430+
self.lock = PipeMutex()

test/unit/common/test_utils.py

Lines changed: 179 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
import contextlib
2222
import errno
2323
import eventlet
24+
import eventlet.debug
2425
import eventlet.event
26+
import eventlet.patcher
2527
import functools
2628
import grp
2729
import logging
@@ -1489,7 +1491,7 @@ def test_get_logger(self):
14891491
'test1\ntest3\ntest4\ntest6\n')
14901492

14911493
def test_get_logger_sysloghandler_plumbing(self):
1492-
orig_sysloghandler = utils.SysLogHandler
1494+
orig_sysloghandler = utils.ThreadSafeSysLogHandler
14931495
syslog_handler_args = []
14941496

14951497
def syslog_handler_catcher(*args, **kwargs):
@@ -1500,7 +1502,7 @@ def syslog_handler_catcher(*args, **kwargs):
15001502
syslog_handler_catcher.LOG_LOCAL3 = orig_sysloghandler.LOG_LOCAL3
15011503

15021504
try:
1503-
utils.SysLogHandler = syslog_handler_catcher
1505+
utils.ThreadSafeSysLogHandler = syslog_handler_catcher
15041506
utils.get_logger({
15051507
'log_facility': 'LOG_LOCAL3',
15061508
}, 'server', log_route='server')
@@ -1550,7 +1552,7 @@ def syslog_handler_catcher(*args, **kwargs):
15501552
'facility': orig_sysloghandler.LOG_LOCAL0})],
15511553
syslog_handler_args)
15521554
finally:
1553-
utils.SysLogHandler = orig_sysloghandler
1555+
utils.ThreadSafeSysLogHandler = orig_sysloghandler
15541556

15551557
@reset_logger_state
15561558
def test_clean_logger_exception(self):
@@ -6252,5 +6254,179 @@ def test_set_swift_dir(self):
62526254
reload_storage_policies()
62536255
self.assertIsNotNone(POLICIES.get_by_name(self.policy_name))
62546256

6257+
6258+
class TestPipeMutex(unittest.TestCase):
6259+
def setUp(self):
6260+
self.mutex = utils.PipeMutex()
6261+
6262+
def tearDown(self):
6263+
self.mutex.close()
6264+
6265+
def test_nonblocking(self):
6266+
evt_lock1 = eventlet.event.Event()
6267+
evt_lock2 = eventlet.event.Event()
6268+
evt_unlock = eventlet.event.Event()
6269+
6270+
def get_the_lock():
6271+
self.mutex.acquire()
6272+
evt_lock1.send('got the lock')
6273+
evt_lock2.wait()
6274+
self.mutex.release()
6275+
evt_unlock.send('released the lock')
6276+
6277+
eventlet.spawn(get_the_lock)
6278+
evt_lock1.wait() # Now, the other greenthread has the lock.
6279+
6280+
self.assertFalse(self.mutex.acquire(blocking=False))
6281+
evt_lock2.send('please release the lock')
6282+
evt_unlock.wait() # The other greenthread has released the lock.
6283+
self.assertTrue(self.mutex.acquire(blocking=False))
6284+
6285+
def test_recursive(self):
6286+
self.assertTrue(self.mutex.acquire(blocking=False))
6287+
self.assertTrue(self.mutex.acquire(blocking=False))
6288+
6289+
def try_acquire_lock():
6290+
return self.mutex.acquire(blocking=False)
6291+
6292+
self.assertFalse(eventlet.spawn(try_acquire_lock).wait())
6293+
self.mutex.release()
6294+
self.assertFalse(eventlet.spawn(try_acquire_lock).wait())
6295+
self.mutex.release()
6296+
self.assertTrue(eventlet.spawn(try_acquire_lock).wait())
6297+
6298+
def test_release_without_acquire(self):
6299+
self.assertRaises(RuntimeError, self.mutex.release)
6300+
6301+
def test_too_many_releases(self):
6302+
self.mutex.acquire()
6303+
self.mutex.release()
6304+
self.assertRaises(RuntimeError, self.mutex.release)
6305+
6306+
def test_wrong_releaser(self):
6307+
self.mutex.acquire()
6308+
self.assertRaises(RuntimeError,
6309+
eventlet.spawn(self.mutex.release).wait)
6310+
6311+
def test_blocking(self):
6312+
evt = eventlet.event.Event()
6313+
6314+
sequence = []
6315+
6316+
def coro1():
6317+
eventlet.sleep(0) # let coro2 go
6318+
6319+
self.mutex.acquire()
6320+
sequence.append('coro1 acquire')
6321+
evt.send('go')
6322+
self.mutex.release()
6323+
sequence.append('coro1 release')
6324+
6325+
def coro2():
6326+
evt.wait() # wait for coro1 to start us
6327+
self.mutex.acquire()
6328+
sequence.append('coro2 acquire')
6329+
self.mutex.release()
6330+
sequence.append('coro2 release')
6331+
6332+
c1 = eventlet.spawn(coro1)
6333+
c2 = eventlet.spawn(coro2)
6334+
6335+
c1.wait()
6336+
c2.wait()
6337+
6338+
self.assertEqual(sequence, [
6339+
'coro1 acquire',
6340+
'coro1 release',
6341+
'coro2 acquire',
6342+
'coro2 release'])
6343+
6344+
def test_blocking_tpool(self):
6345+
# Note: this test's success isn't a guarantee that the mutex is
6346+
# working. However, this test's failure means that the mutex is
6347+
# definitely broken.
6348+
sequence = []
6349+
6350+
def do_stuff():
6351+
n = 10
6352+
while n > 0:
6353+
self.mutex.acquire()
6354+
sequence.append("<")
6355+
eventlet.sleep(0.0001)
6356+
sequence.append(">")
6357+
self.mutex.release()
6358+
n -= 1
6359+
6360+
greenthread1 = eventlet.spawn(do_stuff)
6361+
greenthread2 = eventlet.spawn(do_stuff)
6362+
6363+
real_thread1 = eventlet.patcher.original('threading').Thread(
6364+
target=do_stuff)
6365+
real_thread1.start()
6366+
6367+
real_thread2 = eventlet.patcher.original('threading').Thread(
6368+
target=do_stuff)
6369+
real_thread2.start()
6370+
6371+
greenthread1.wait()
6372+
greenthread2.wait()
6373+
real_thread1.join()
6374+
real_thread2.join()
6375+
6376+
self.assertEqual(''.join(sequence), "<>" * 40)
6377+
6378+
def test_blocking_preserves_ownership(self):
6379+
pthread1_event = eventlet.patcher.original('threading').Event()
6380+
pthread2_event1 = eventlet.patcher.original('threading').Event()
6381+
pthread2_event2 = eventlet.patcher.original('threading').Event()
6382+
thread_id = []
6383+
owner = []
6384+
6385+
def pthread1():
6386+
thread_id.append(id(eventlet.greenthread.getcurrent()))
6387+
self.mutex.acquire()
6388+
owner.append(self.mutex.owner)
6389+
pthread2_event1.set()
6390+
6391+
orig_os_write = utils.os.write
6392+
6393+
def patched_os_write(*a, **kw):
6394+
try:
6395+
return orig_os_write(*a, **kw)
6396+
finally:
6397+
pthread1_event.wait()
6398+
6399+
with mock.patch.object(utils.os, 'write', patched_os_write):
6400+
self.mutex.release()
6401+
pthread2_event2.set()
6402+
6403+
def pthread2():
6404+
pthread2_event1.wait() # ensure pthread1 acquires lock first
6405+
thread_id.append(id(eventlet.greenthread.getcurrent()))
6406+
self.mutex.acquire()
6407+
pthread1_event.set()
6408+
pthread2_event2.wait()
6409+
owner.append(self.mutex.owner)
6410+
self.mutex.release()
6411+
6412+
real_thread1 = eventlet.patcher.original('threading').Thread(
6413+
target=pthread1)
6414+
real_thread1.start()
6415+
6416+
real_thread2 = eventlet.patcher.original('threading').Thread(
6417+
target=pthread2)
6418+
real_thread2.start()
6419+
6420+
real_thread1.join()
6421+
real_thread2.join()
6422+
self.assertEqual(thread_id, owner)
6423+
self.assertIsNone(self.mutex.owner)
6424+
6425+
@classmethod
6426+
def tearDownClass(cls):
6427+
# PipeMutex turns this off when you instantiate one
6428+
eventlet.debug.hub_prevent_multiple_readers(True)
6429+
6430+
62556431
if __name__ == '__main__':
62566432
unittest.main()

0 commit comments

Comments
 (0)