Skip to content

Commit 69c715c

Browse files
smerritttipabu
authored andcommitted
Fix deadlock when logging from a tpool thread.
The object server runs certain IO-intensive methods outside the main pthread for performance. If one of those methods tries to log, this can cause a crash that eventually leads to an object server with hundreds or thousands of greenthreads, all deadlocked. The short version of the story is that logging.SysLogHandler has a mutex which Eventlet monkey-patches. However, the monkey-patched mutex sometimes breaks if used across different pthreads, and it breaks in such a way that it is still considered held. After that happens, any attempt to emit a log message blocks the calling greenthread forever. The fix is to use a mutex that works across different greenlets and across different pthreads. This patch introduces such a lock based on an anonymous pipe. Change-Id: I57decefaf5bbed57b97a62d0df8518b112917480 Closes-Bug: 1710328 (cherry picked from commit 6d16079)
1 parent 72ed8f2 commit 69c715c

File tree

2 files changed

+307
-7
lines changed

2 files changed

+307
-7
lines changed

swift/common/utils.py

Lines changed: 128 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,12 @@
4848
import datetime
4949

5050
import eventlet
51+
import eventlet.debug
52+
import eventlet.greenthread
5153
import eventlet.semaphore
5254
from eventlet import GreenPool, sleep, Timeout, tpool
5355
from eventlet.green import socket, threading
56+
from eventlet.hubs import trampoline
5457
import eventlet.queue
5558
import netifaces
5659
import codecs
@@ -1850,17 +1853,18 @@ def get_logger(conf, name=None, log_to_console=False, log_route=None,
18501853
if udp_host:
18511854
udp_port = int(conf.get('log_udp_port',
18521855
logging.handlers.SYSLOG_UDP_PORT))
1853-
handler = SysLogHandler(address=(udp_host, udp_port),
1854-
facility=facility)
1856+
handler = ThreadSafeSysLogHandler(address=(udp_host, udp_port),
1857+
facility=facility)
18551858
else:
18561859
log_address = conf.get('log_address', '/dev/log')
18571860
try:
1858-
handler = SysLogHandler(address=log_address, facility=facility)
1861+
handler = ThreadSafeSysLogHandler(address=log_address,
1862+
facility=facility)
18591863
except socket.error as e:
18601864
# Either /dev/log isn't a UNIX socket or it does not exist at all
18611865
if e.errno not in [errno.ENOTSOCK, errno.ENOENT]:
18621866
raise
1863-
handler = SysLogHandler(facility=facility)
1867+
handler = ThreadSafeSysLogHandler(facility=facility)
18641868
handler.setFormatter(formatter)
18651869
logger.addHandler(handler)
18661870
get_logger.handler4logger[logger] = handler
@@ -4221,3 +4225,123 @@ def md5_hash_for_file(fname):
42214225
for block in iter(lambda: f.read(MD5_BLOCK_READ_BYTES), ''):
42224226
md5sum.update(block)
42234227
return md5sum.hexdigest()
4228+
4229+
4230+
class PipeMutex(object):
4231+
"""
4232+
Mutex using a pipe. Works across both greenlets and real threads, even
4233+
at the same time.
4234+
"""
4235+
4236+
def __init__(self):
4237+
self.rfd, self.wfd = os.pipe()
4238+
4239+
# You can't create a pipe in non-blocking mode; you must set it
4240+
# later.
4241+
rflags = fcntl.fcntl(self.rfd, fcntl.F_GETFL)
4242+
fcntl.fcntl(self.rfd, fcntl.F_SETFL, rflags | os.O_NONBLOCK)
4243+
os.write(self.wfd, b'-') # start unlocked
4244+
4245+
self.owner = None
4246+
self.recursion_depth = 0
4247+
4248+
# Usually, it's an error to have multiple greenthreads all waiting
4249+
# to read the same file descriptor. It's often a sign of inadequate
4250+
# concurrency control; for example, if you have two greenthreads
4251+
# trying to use the same memcache connection, they'll end up writing
4252+
# interleaved garbage to the socket or stealing part of each others'
4253+
# responses.
4254+
#
4255+
# In this case, we have multiple greenthreads waiting on the same
4256+
# file descriptor by design. This lets greenthreads in real thread A
4257+
# wait with greenthreads in real thread B for the same mutex.
4258+
# Therefore, we must turn off eventlet's multiple-reader detection.
4259+
#
4260+
# It would be better to turn off multiple-reader detection for only
4261+
# our calls to trampoline(), but eventlet does not support that.
4262+
eventlet.debug.hub_prevent_multiple_readers(False)
4263+
4264+
def acquire(self, blocking=True):
4265+
"""
4266+
Acquire the mutex.
4267+
4268+
If called with blocking=False, returns True if the mutex was
4269+
acquired and False if it wasn't. Otherwise, blocks until the mutex
4270+
is acquired and returns True.
4271+
4272+
This lock is recursive; the same greenthread may acquire it as many
4273+
times as it wants to, though it must then release it that many times
4274+
too.
4275+
"""
4276+
current_greenthread_id = id(eventlet.greenthread.getcurrent())
4277+
if self.owner == current_greenthread_id:
4278+
self.recursion_depth += 1
4279+
return True
4280+
4281+
while True:
4282+
try:
4283+
# If there is a byte available, this will read it and remove
4284+
# it from the pipe. If not, this will raise OSError with
4285+
# errno=EAGAIN.
4286+
os.read(self.rfd, 1)
4287+
self.owner = current_greenthread_id
4288+
return True
4289+
except OSError as err:
4290+
if err.errno != errno.EAGAIN:
4291+
raise
4292+
4293+
if not blocking:
4294+
return False
4295+
4296+
# Tell eventlet to suspend the current greenthread until
4297+
# self.rfd becomes readable. This will happen when someone
4298+
# else writes to self.wfd.
4299+
trampoline(self.rfd, read=True)
4300+
4301+
def release(self):
4302+
"""
4303+
Release the mutex.
4304+
"""
4305+
current_greenthread_id = id(eventlet.greenthread.getcurrent())
4306+
if self.owner != current_greenthread_id:
4307+
raise RuntimeError("cannot release un-acquired lock")
4308+
4309+
if self.recursion_depth > 0:
4310+
self.recursion_depth -= 1
4311+
return
4312+
4313+
self.owner = None
4314+
os.write(self.wfd, b'X')
4315+
4316+
def close(self):
4317+
"""
4318+
Close the mutex. This releases its file descriptors.
4319+
4320+
You can't use a mutex after it's been closed.
4321+
"""
4322+
if self.wfd is not None:
4323+
os.close(self.rfd)
4324+
self.rfd = None
4325+
os.close(self.wfd)
4326+
self.wfd = None
4327+
self.owner = None
4328+
self.recursion_depth = 0
4329+
4330+
def __del__(self):
4331+
# We need this so we don't leak file descriptors. Otherwise, if you
4332+
# call get_logger() and don't explicitly dispose of it by calling
4333+
# logger.logger.handlers[0].lock.close() [1], the pipe file
4334+
# descriptors are leaked.
4335+
#
4336+
# This only really comes up in tests. Swift processes tend to call
4337+
# get_logger() once and then hang on to it until they exit, but the
4338+
# test suite calls get_logger() a lot.
4339+
#
4340+
# [1] and that's a completely ridiculous thing to expect callers to
4341+
# do, so nobody does it and that's okay.
4342+
self.close()
4343+
4344+
4345+
class ThreadSafeSysLogHandler(SysLogHandler):
4346+
def createLock(self):
4347+
self.lock = PipeMutex()

test/unit/common/test_utils.py

Lines changed: 179 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
import contextlib
2222
import errno
2323
import eventlet
24+
import eventlet.debug
2425
import eventlet.event
26+
import eventlet.patcher
2527
import functools
2628
import grp
2729
import logging
@@ -1454,7 +1456,7 @@ def test_get_logger(self):
14541456
'test1\ntest3\ntest4\ntest6\n')
14551457

14561458
def test_get_logger_sysloghandler_plumbing(self):
1457-
orig_sysloghandler = utils.SysLogHandler
1459+
orig_sysloghandler = utils.ThreadSafeSysLogHandler
14581460
syslog_handler_args = []
14591461

14601462
def syslog_handler_catcher(*args, **kwargs):
@@ -1465,7 +1467,7 @@ def syslog_handler_catcher(*args, **kwargs):
14651467
syslog_handler_catcher.LOG_LOCAL3 = orig_sysloghandler.LOG_LOCAL3
14661468

14671469
try:
1468-
utils.SysLogHandler = syslog_handler_catcher
1470+
utils.ThreadSafeSysLogHandler = syslog_handler_catcher
14691471
utils.get_logger({
14701472
'log_facility': 'LOG_LOCAL3',
14711473
}, 'server', log_route='server')
@@ -1515,7 +1517,7 @@ def syslog_handler_catcher(*args, **kwargs):
15151517
'facility': orig_sysloghandler.LOG_LOCAL0})],
15161518
syslog_handler_args)
15171519
finally:
1518-
utils.SysLogHandler = orig_sysloghandler
1520+
utils.ThreadSafeSysLogHandler = orig_sysloghandler
15191521

15201522
@reset_logger_state
15211523
def test_clean_logger_exception(self):
@@ -6114,5 +6116,179 @@ def test_hash_for_file_brittle(self):
61146116
self.fail('Some data did not compute expected hash:\n' +
61156117
'\n'.join(failures))
61166118

6119+
6120+
class TestPipeMutex(unittest.TestCase):
6121+
def setUp(self):
6122+
self.mutex = utils.PipeMutex()
6123+
6124+
def tearDown(self):
6125+
self.mutex.close()
6126+
6127+
def test_nonblocking(self):
6128+
evt_lock1 = eventlet.event.Event()
6129+
evt_lock2 = eventlet.event.Event()
6130+
evt_unlock = eventlet.event.Event()
6131+
6132+
def get_the_lock():
6133+
self.mutex.acquire()
6134+
evt_lock1.send('got the lock')
6135+
evt_lock2.wait()
6136+
self.mutex.release()
6137+
evt_unlock.send('released the lock')
6138+
6139+
eventlet.spawn(get_the_lock)
6140+
evt_lock1.wait() # Now, the other greenthread has the lock.
6141+
6142+
self.assertFalse(self.mutex.acquire(blocking=False))
6143+
evt_lock2.send('please release the lock')
6144+
evt_unlock.wait() # The other greenthread has released the lock.
6145+
self.assertTrue(self.mutex.acquire(blocking=False))
6146+
6147+
def test_recursive(self):
6148+
self.assertTrue(self.mutex.acquire(blocking=False))
6149+
self.assertTrue(self.mutex.acquire(blocking=False))
6150+
6151+
def try_acquire_lock():
6152+
return self.mutex.acquire(blocking=False)
6153+
6154+
self.assertFalse(eventlet.spawn(try_acquire_lock).wait())
6155+
self.mutex.release()
6156+
self.assertFalse(eventlet.spawn(try_acquire_lock).wait())
6157+
self.mutex.release()
6158+
self.assertTrue(eventlet.spawn(try_acquire_lock).wait())
6159+
6160+
def test_release_without_acquire(self):
6161+
self.assertRaises(RuntimeError, self.mutex.release)
6162+
6163+
def test_too_many_releases(self):
6164+
self.mutex.acquire()
6165+
self.mutex.release()
6166+
self.assertRaises(RuntimeError, self.mutex.release)
6167+
6168+
def test_wrong_releaser(self):
6169+
self.mutex.acquire()
6170+
self.assertRaises(RuntimeError,
6171+
eventlet.spawn(self.mutex.release).wait)
6172+
6173+
def test_blocking(self):
6174+
evt = eventlet.event.Event()
6175+
6176+
sequence = []
6177+
6178+
def coro1():
6179+
eventlet.sleep(0) # let coro2 go
6180+
6181+
self.mutex.acquire()
6182+
sequence.append('coro1 acquire')
6183+
evt.send('go')
6184+
self.mutex.release()
6185+
sequence.append('coro1 release')
6186+
6187+
def coro2():
6188+
evt.wait() # wait for coro1 to start us
6189+
self.mutex.acquire()
6190+
sequence.append('coro2 acquire')
6191+
self.mutex.release()
6192+
sequence.append('coro2 release')
6193+
6194+
c1 = eventlet.spawn(coro1)
6195+
c2 = eventlet.spawn(coro2)
6196+
6197+
c1.wait()
6198+
c2.wait()
6199+
6200+
self.assertEqual(sequence, [
6201+
'coro1 acquire',
6202+
'coro1 release',
6203+
'coro2 acquire',
6204+
'coro2 release'])
6205+
6206+
def test_blocking_tpool(self):
6207+
# Note: this test's success isn't a guarantee that the mutex is
6208+
# working. However, this test's failure means that the mutex is
6209+
# definitely broken.
6210+
sequence = []
6211+
6212+
def do_stuff():
6213+
n = 10
6214+
while n > 0:
6215+
self.mutex.acquire()
6216+
sequence.append("<")
6217+
eventlet.sleep(0.0001)
6218+
sequence.append(">")
6219+
self.mutex.release()
6220+
n -= 1
6221+
6222+
greenthread1 = eventlet.spawn(do_stuff)
6223+
greenthread2 = eventlet.spawn(do_stuff)
6224+
6225+
real_thread1 = eventlet.patcher.original('threading').Thread(
6226+
target=do_stuff)
6227+
real_thread1.start()
6228+
6229+
real_thread2 = eventlet.patcher.original('threading').Thread(
6230+
target=do_stuff)
6231+
real_thread2.start()
6232+
6233+
greenthread1.wait()
6234+
greenthread2.wait()
6235+
real_thread1.join()
6236+
real_thread2.join()
6237+
6238+
self.assertEqual(''.join(sequence), "<>" * 40)
6239+
6240+
def test_blocking_preserves_ownership(self):
6241+
pthread1_event = eventlet.patcher.original('threading').Event()
6242+
pthread2_event1 = eventlet.patcher.original('threading').Event()
6243+
pthread2_event2 = eventlet.patcher.original('threading').Event()
6244+
thread_id = []
6245+
owner = []
6246+
6247+
def pthread1():
6248+
thread_id.append(id(eventlet.greenthread.getcurrent()))
6249+
self.mutex.acquire()
6250+
owner.append(self.mutex.owner)
6251+
pthread2_event1.set()
6252+
6253+
orig_os_write = utils.os.write
6254+
6255+
def patched_os_write(*a, **kw):
6256+
try:
6257+
return orig_os_write(*a, **kw)
6258+
finally:
6259+
pthread1_event.wait()
6260+
6261+
with mock.patch.object(utils.os, 'write', patched_os_write):
6262+
self.mutex.release()
6263+
pthread2_event2.set()
6264+
6265+
def pthread2():
6266+
pthread2_event1.wait() # ensure pthread1 acquires lock first
6267+
thread_id.append(id(eventlet.greenthread.getcurrent()))
6268+
self.mutex.acquire()
6269+
pthread1_event.set()
6270+
pthread2_event2.wait()
6271+
owner.append(self.mutex.owner)
6272+
self.mutex.release()
6273+
6274+
real_thread1 = eventlet.patcher.original('threading').Thread(
6275+
target=pthread1)
6276+
real_thread1.start()
6277+
6278+
real_thread2 = eventlet.patcher.original('threading').Thread(
6279+
target=pthread2)
6280+
real_thread2.start()
6281+
6282+
real_thread1.join()
6283+
real_thread2.join()
6284+
self.assertEqual(thread_id, owner)
6285+
self.assertIsNone(self.mutex.owner)
6286+
6287+
@classmethod
6288+
def tearDownClass(cls):
6289+
# PipeMutex turns this off when you instantiate one
6290+
eventlet.debug.hub_prevent_multiple_readers(True)
6291+
6292+
61176293
if __name__ == '__main__':
61186294
unittest.main()

0 commit comments

Comments
 (0)