diff --git a/swift/common/exceptions.py b/swift/common/exceptions.py index 2f36ab623d..721ac3421a 100644 --- a/swift/common/exceptions.py +++ b/swift/common/exceptions.py @@ -145,10 +145,6 @@ class LockTimeout(MessageTimeout): pass -class ThreadPoolDead(SwiftException): - pass - - class RingBuilderError(SwiftException): pass diff --git a/swift/common/utils.py b/swift/common/utils.py index fb4baa4396..ec7be0bb7d 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -47,8 +47,7 @@ import eventlet import eventlet.semaphore -from eventlet import GreenPool, sleep, Timeout, tpool, greenthread, \ - greenio, event +from eventlet import GreenPool, sleep, Timeout, tpool from eventlet.green import socket, threading import eventlet.queue import netifaces @@ -3210,205 +3209,6 @@ def inner(): return resp -class ThreadPool(object): - """ - Perform blocking operations in background threads. - - Call its methods from within greenlets to green-wait for results without - blocking the eventlet reactor (hopefully). - """ - - BYTE = 'a'.encode('utf-8') - - def __init__(self, nthreads=2): - self.nthreads = nthreads - self._run_queue = stdlib_queue.Queue() - self._result_queue = stdlib_queue.Queue() - self._threads = [] - self._alive = True - - if nthreads <= 0: - return - - # We spawn a greenthread whose job it is to pull results from the - # worker threads via a real Queue and send them to eventlet Events so - # that the calling greenthreads can be awoken. - # - # Since each OS thread has its own collection of greenthreads, it - # doesn't work to have the worker thread send stuff to the event, as - # it then notifies its own thread-local eventlet hub to wake up, which - # doesn't do anything to help out the actual calling greenthread over - # in the main thread. - # - # Thus, each worker sticks its results into a result queue and then - # writes a byte to a pipe, signaling the result-consuming greenlet (in - # the main thread) to wake up and consume results. - # - # This is all stuff that eventlet.tpool does, but that code can't have - # multiple instances instantiated. Since the object server uses one - # pool per disk, we have to reimplement this stuff. - _raw_rpipe, self.wpipe = os.pipe() - self.rpipe = greenio.GreenPipe(_raw_rpipe, 'rb') - - for _junk in range(nthreads): - thr = stdlib_threading.Thread( - target=self._worker, - args=(self._run_queue, self._result_queue)) - thr.daemon = True - thr.start() - self._threads.append(thr) - - # This is the result-consuming greenthread that runs in the main OS - # thread, as described above. - self._consumer_coro = greenthread.spawn_n(self._consume_results, - self._result_queue) - - def _worker(self, work_queue, result_queue): - """ - Pulls an item from the queue and runs it, then puts the result into - the result queue. Repeats forever. - - :param work_queue: queue from which to pull work - :param result_queue: queue into which to place results - """ - while True: - item = work_queue.get() - if item is None: - break - ev, func, args, kwargs = item - try: - result = func(*args, **kwargs) - result_queue.put((ev, True, result)) - except BaseException: - result_queue.put((ev, False, sys.exc_info())) - finally: - work_queue.task_done() - os.write(self.wpipe, self.BYTE) - - def _consume_results(self, queue): - """ - Runs as a greenthread in the same OS thread as callers of - run_in_thread(). - - Takes results from the worker OS threads and sends them to the waiting - greenthreads. - """ - while True: - try: - self.rpipe.read(1) - except ValueError: - # can happen at process shutdown when pipe is closed - break - - while True: - try: - ev, success, result = queue.get(block=False) - except stdlib_queue.Empty: - break - - try: - if success: - ev.send(result) - else: - ev.send_exception(*result) - finally: - queue.task_done() - - def run_in_thread(self, func, *args, **kwargs): - """ - Runs ``func(*args, **kwargs)`` in a thread. Blocks the current greenlet - until results are available. - - Exceptions thrown will be reraised in the calling thread. - - If the threadpool was initialized with nthreads=0, it invokes - ``func(*args, **kwargs)`` directly, followed by eventlet.sleep() to - ensure the eventlet hub has a chance to execute. It is more likely the - hub will be invoked when queuing operations to an external thread. - - :returns: result of calling func - :raises: whatever func raises - """ - if not self._alive: - raise swift.common.exceptions.ThreadPoolDead() - - if self.nthreads <= 0: - result = func(*args, **kwargs) - sleep() - return result - - ev = event.Event() - self._run_queue.put((ev, func, args, kwargs), block=False) - - # blocks this greenlet (and only *this* greenlet) until the real - # thread calls ev.send(). - result = ev.wait() - return result - - def _run_in_eventlet_tpool(self, func, *args, **kwargs): - """ - Really run something in an external thread, even if we haven't got any - threads of our own. - """ - def inner(): - try: - return (True, func(*args, **kwargs)) - except (Timeout, BaseException) as err: - return (False, err) - - success, result = tpool.execute(inner) - if success: - return result - else: - raise result - - def force_run_in_thread(self, func, *args, **kwargs): - """ - Runs ``func(*args, **kwargs)`` in a thread. Blocks the current greenlet - until results are available. - - Exceptions thrown will be reraised in the calling thread. - - If the threadpool was initialized with nthreads=0, uses eventlet.tpool - to run the function. This is in contrast to run_in_thread(), which - will (in that case) simply execute func in the calling thread. - - :returns: result of calling func - :raises: whatever func raises - """ - if not self._alive: - raise swift.common.exceptions.ThreadPoolDead() - - if self.nthreads <= 0: - return self._run_in_eventlet_tpool(func, *args, **kwargs) - else: - return self.run_in_thread(func, *args, **kwargs) - - def terminate(self): - """ - Releases the threadpool's resources (OS threads, greenthreads, pipes, - etc.) and renders it unusable. - - Don't call run_in_thread() or force_run_in_thread() after calling - terminate(). - """ - self._alive = False - if self.nthreads <= 0: - return - - for _junk in range(self.nthreads): - self._run_queue.put(None) - for thr in self._threads: - thr.join() - self._threads = [] - self.nthreads = 0 - - greenthread.kill(self._consumer_coro) - - self.rpipe.close() - os.close(self.wpipe) - - def ismount(path): """ Test whether a path is a mount point. This will catch any diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index a074507487..5295cd5dac 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -55,9 +55,10 @@ from swift.common.request_helpers import is_sys_meta from swift.common.utils import mkdirs, Timestamp, \ storage_directory, hash_path, renamer, fallocate, fsync, fdatasync, \ - fsync_dir, drop_buffer_cache, ThreadPool, lock_path, write_pickle, \ + fsync_dir, drop_buffer_cache, lock_path, write_pickle, \ config_true_value, listdir, split_path, ismount, remove_file, \ - get_md5_socket, F_SETPIPE_SZ, decode_timestamps, encode_timestamps + get_md5_socket, F_SETPIPE_SZ, decode_timestamps, encode_timestamps, \ + tpool_reraise from swift.common.splice import splice, tee from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \ DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \ @@ -536,7 +537,6 @@ def __init__(self, conf, logger): conf.get('replication_one_per_device', 'true')) self.replication_lock_timeout = int(conf.get( 'replication_lock_timeout', 15)) - self.threadpools = defaultdict(lambda: ThreadPool(nthreads=0)) self.use_splice = False self.pipe_size = None @@ -1115,8 +1115,7 @@ def pickle_async_update(self, device, account, container, obj, data, device_path = self.construct_dev_path(device) async_dir = os.path.join(device_path, get_async_dir(policy)) ohash = hash_path(account, container, obj) - self.threadpools[device].run_in_thread( - write_pickle, + write_pickle( data, os.path.join(async_dir, ohash[-3:], ohash + '-' + Timestamp(timestamp).internal), @@ -1139,7 +1138,7 @@ def get_diskfile(self, device, partition, account, container, obj, dev_path = self.get_dev_path(device) if not dev_path: raise DiskFileDeviceUnavailable() - return self.diskfile_cls(self, dev_path, self.threadpools[device], + return self.diskfile_cls(self, dev_path, partition, account, container, obj, policy=policy, use_splice=self.use_splice, pipe_size=self.pipe_size, **kwargs) @@ -1215,7 +1214,7 @@ def get_diskfile_from_hash(self, device, partition, object_hash, metadata.get('name', ''), 3, 3, True) except ValueError: raise DiskFileNotExist() - return self.diskfile_cls(self, dev_path, self.threadpools[device], + return self.diskfile_cls(self, dev_path, partition, account, container, obj, policy=policy, **kwargs) @@ -1235,7 +1234,7 @@ def get_hashes(self, device, partition, suffixes, policy): partition) if not os.path.exists(partition_path): mkdirs(partition_path) - _junk, hashes = self.threadpools[device].force_run_in_thread( + _junk, hashes = tpool_reraise( self._get_hashes, partition_path, recalculate=suffixes) return hashes @@ -1368,19 +1367,16 @@ class BaseDiskFileWriter(object): :param fd: open file descriptor of temporary file to receive data :param tmppath: full path name of the opened file descriptor :param bytes_per_sync: number bytes written between sync calls - :param threadpool: internal thread pool to use for disk operations :param diskfile: the diskfile creating this DiskFileWriter instance """ - def __init__(self, name, datadir, fd, tmppath, bytes_per_sync, threadpool, - diskfile): + def __init__(self, name, datadir, fd, tmppath, bytes_per_sync, diskfile): # Parameter tracking self._name = name self._datadir = datadir self._fd = fd self._tmppath = tmppath self._bytes_per_sync = bytes_per_sync - self._threadpool = threadpool self._diskfile = diskfile # Internal attributes @@ -1409,18 +1405,15 @@ def write(self, chunk): :returns: the total number of bytes written to an object """ - def _write_entire_chunk(chunk): - while chunk: - written = os.write(self._fd, chunk) - self._upload_size += written - chunk = chunk[written:] - - self._threadpool.run_in_thread(_write_entire_chunk, chunk) + while chunk: + written = os.write(self._fd, chunk) + self._upload_size += written + chunk = chunk[written:] # For large files sync every 512MB (by default) written diff = self._upload_size - self._last_sync if diff >= self._bytes_per_sync: - self._threadpool.force_run_in_thread(fdatasync, self._fd) + tpool_reraise(fdatasync, self._fd) drop_buffer_cache(self._fd, self._last_sync, diff) self._last_sync = self._upload_size @@ -1477,8 +1470,7 @@ def _put(self, metadata, cleanup=True, *a, **kw): metadata['name'] = self._name target_path = join(self._datadir, filename) - self._threadpool.force_run_in_thread( - self._finalize_put, metadata, target_path, cleanup) + tpool_reraise(self._finalize_put, metadata, target_path, cleanup) def put(self, metadata): """ @@ -1521,7 +1513,6 @@ class BaseDiskFileReader(object): :param data_file: on-disk data file name for the object :param obj_size: verified on-disk size of the object :param etag: expected metadata etag value for entire file - :param threadpool: thread pool to use for read operations :param disk_chunk_size: size of reads from disk in bytes :param keep_cache_size: maximum object size that will be kept in cache :param device_path: on-disk device path, used when quarantining an obj @@ -1532,7 +1523,7 @@ class BaseDiskFileReader(object): :param diskfile: the diskfile creating this DiskFileReader instance :param keep_cache: should resulting reads be kept in the buffer cache """ - def __init__(self, fp, data_file, obj_size, etag, threadpool, + def __init__(self, fp, data_file, obj_size, etag, disk_chunk_size, keep_cache_size, device_path, logger, quarantine_hook, use_splice, pipe_size, diskfile, keep_cache=False): @@ -1541,7 +1532,6 @@ def __init__(self, fp, data_file, obj_size, etag, threadpool, self._data_file = data_file self._obj_size = obj_size self._etag = etag - self._threadpool = threadpool self._diskfile = diskfile self._disk_chunk_size = disk_chunk_size self._device_path = device_path @@ -1580,8 +1570,7 @@ def __iter__(self): self._started_at_0 = True self._iter_etag = hashlib.md5() while True: - chunk = self._threadpool.run_in_thread( - self._fp.read, self._disk_chunk_size) + chunk = self._fp.read(self._disk_chunk_size) if chunk: if self._iter_etag: self._iter_etag.update(chunk) @@ -1634,8 +1623,8 @@ def zero_copy_send(self, wsockfd): try: while True: # Read data from disk to pipe - (bytes_in_pipe, _1, _2) = self._threadpool.run_in_thread( - splice, rfd, None, client_wpipe, None, pipe_size, 0) + (bytes_in_pipe, _1, _2) = splice( + rfd, None, client_wpipe, None, pipe_size, 0) if bytes_in_pipe == 0: self._read_to_eof = True self._drop_cache(rfd, dropped_cache, @@ -1758,9 +1747,8 @@ def _drop_cache(self, fd, offset, length): drop_buffer_cache(fd, offset, length) def _quarantine(self, msg): - self._quarantined_dir = self._threadpool.run_in_thread( - self.manager.quarantine_renamer, self._device_path, - self._data_file) + self._quarantined_dir = self.manager.quarantine_renamer( + self._device_path, self._data_file) self._logger.warning("Quarantined object %s: %s" % ( self._data_file, msg)) self._logger.increment('quarantines') @@ -1824,7 +1812,6 @@ class BaseDiskFile(object): :param mgr: associated DiskFileManager instance :param device_path: path to the target device or drive - :param threadpool: thread pool to use for blocking operations :param partition: partition on the device in which the object lives :param account: account name for the object :param container: container name for the object @@ -1837,12 +1824,11 @@ class BaseDiskFile(object): reader_cls = None # must be set by subclasses writer_cls = None # must be set by subclasses - def __init__(self, mgr, device_path, threadpool, partition, + def __init__(self, mgr, device_path, partition, account=None, container=None, obj=None, _datadir=None, policy=None, use_splice=False, pipe_size=None, **kwargs): self._manager = mgr self._device_path = device_path - self._threadpool = threadpool or ThreadPool(nthreads=0) self._logger = mgr.logger self._disk_chunk_size = mgr.disk_chunk_size self._bytes_per_sync = mgr.bytes_per_sync @@ -2043,8 +2029,8 @@ def _quarantine(self, data_file, msg): :param msg: reason for quarantining to be included in the exception :returns: DiskFileQuarantined exception object """ - self._quarantined_dir = self._threadpool.run_in_thread( - self.manager.quarantine_renamer, self._device_path, data_file) + self._quarantined_dir = self.manager.quarantine_renamer( + self._device_path, data_file) self._logger.warning("Quarantined object %s: %s" % ( data_file, msg)) self._logger.increment('quarantines') @@ -2333,7 +2319,7 @@ def reader(self, keep_cache=False, """ dr = self.reader_cls( self._fp, self._data_file, int(self._metadata['Content-Length']), - self._metadata['ETag'], self._threadpool, self._disk_chunk_size, + self._metadata['ETag'], self._disk_chunk_size, self._manager.keep_cache_size, self._device_path, self._logger, use_splice=self._use_splice, quarantine_hook=_quarantine_hook, pipe_size=self._pipe_size, diskfile=self, keep_cache=keep_cache) @@ -2378,7 +2364,6 @@ def create(self, size=None): raise dfw = self.writer_cls(self._name, self._datadir, fd, tmppath, bytes_per_sync=self._bytes_per_sync, - threadpool=self._threadpool, diskfile=self) yield dfw finally: @@ -2561,8 +2546,7 @@ def commit(self, timestamp): """ durable_file_path = os.path.join( self._datadir, timestamp.internal + '.durable') - self._threadpool.force_run_in_thread( - self._finalize_durable, durable_file_path) + tpool_reraise(self._finalize_durable, durable_file_path) def put(self, metadata): """ diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 626043de3d..e46a018a65 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -30,7 +30,6 @@ import random import re import socket -import stat import sys import json import math @@ -43,7 +42,6 @@ import tempfile import time -import traceback import unittest import fcntl import shutil @@ -58,7 +56,7 @@ from swift.common.exceptions import Timeout, MessageTimeout, \ ConnectionTimeout, LockTimeout, ReplicationLockTimeout, \ - MimeInvalid, ThreadPoolDead + MimeInvalid from swift.common import utils from swift.common.container_sync_realms import ContainerSyncRealms from swift.common.header_key_dict import HeaderKeyDict @@ -4717,165 +4715,6 @@ def fsync(fd): self.assertEqual(called, [12345]) -class TestThreadPool(unittest.TestCase): - - def setUp(self): - self.tp = None - - def tearDown(self): - if self.tp: - self.tp.terminate() - - def _pipe_count(self): - # Counts the number of pipes that this process owns. - fd_dir = "/proc/%d/fd" % os.getpid() - - def is_pipe(path): - try: - stat_result = os.stat(path) - return stat.S_ISFIFO(stat_result.st_mode) - except OSError: - return False - - return len([fd for fd in os.listdir(fd_dir) - if is_pipe(os.path.join(fd_dir, fd))]) - - def _thread_id(self): - return threading.current_thread().ident - - def _capture_args(self, *args, **kwargs): - return {'args': args, 'kwargs': kwargs} - - def _raise_valueerror(self): - return int('fishcakes') - - def test_run_in_thread_with_threads(self): - tp = self.tp = utils.ThreadPool(1) - - my_id = self._thread_id() - other_id = tp.run_in_thread(self._thread_id) - self.assertNotEqual(my_id, other_id) - - result = tp.run_in_thread(self._capture_args, 1, 2, bert='ernie') - self.assertEqual(result, {'args': (1, 2), - 'kwargs': {'bert': 'ernie'}}) - - caught = False - try: - tp.run_in_thread(self._raise_valueerror) - except ValueError: - caught = True - self.assertTrue(caught) - - def test_force_run_in_thread_with_threads(self): - # with nthreads > 0, force_run_in_thread looks just like run_in_thread - tp = self.tp = utils.ThreadPool(1) - - my_id = self._thread_id() - other_id = tp.force_run_in_thread(self._thread_id) - self.assertNotEqual(my_id, other_id) - - result = tp.force_run_in_thread(self._capture_args, 1, 2, bert='ernie') - self.assertEqual(result, {'args': (1, 2), - 'kwargs': {'bert': 'ernie'}}) - self.assertRaises(ValueError, tp.force_run_in_thread, - self._raise_valueerror) - - def test_run_in_thread_without_threads(self): - # with zero threads, run_in_thread doesn't actually do so - tp = utils.ThreadPool(0) - - my_id = self._thread_id() - other_id = tp.run_in_thread(self._thread_id) - self.assertEqual(my_id, other_id) - - result = tp.run_in_thread(self._capture_args, 1, 2, bert='ernie') - self.assertEqual(result, {'args': (1, 2), - 'kwargs': {'bert': 'ernie'}}) - self.assertRaises(ValueError, tp.run_in_thread, - self._raise_valueerror) - - def test_force_run_in_thread_without_threads(self): - # with zero threads, force_run_in_thread uses eventlet.tpool - tp = utils.ThreadPool(0) - - my_id = self._thread_id() - other_id = tp.force_run_in_thread(self._thread_id) - self.assertNotEqual(my_id, other_id) - - result = tp.force_run_in_thread(self._capture_args, 1, 2, bert='ernie') - self.assertEqual(result, {'args': (1, 2), - 'kwargs': {'bert': 'ernie'}}) - self.assertRaises(ValueError, tp.force_run_in_thread, - self._raise_valueerror) - - def test_preserving_stack_trace_from_thread(self): - def gamma(): - return 1 / 0 # ZeroDivisionError - - def beta(): - return gamma() - - def alpha(): - return beta() - - tp = self.tp = utils.ThreadPool(1) - try: - tp.run_in_thread(alpha) - except ZeroDivisionError: - # NB: format is (filename, line number, function name, text) - tb_func = [elem[2] for elem - in traceback.extract_tb(sys.exc_info()[2])] - else: - self.fail("Expected ZeroDivisionError") - - self.assertEqual(tb_func[-1], "gamma") - self.assertEqual(tb_func[-2], "beta") - self.assertEqual(tb_func[-3], "alpha") - # omit the middle; what's important is that the start and end are - # included, not the exact names of helper methods - self.assertEqual(tb_func[1], "run_in_thread") - self.assertEqual(tb_func[0], "test_preserving_stack_trace_from_thread") - - def test_terminate(self): - initial_thread_count = threading.activeCount() - initial_pipe_count = self._pipe_count() - - tp = utils.ThreadPool(4) - # do some work to ensure any lazy initialization happens - tp.run_in_thread(os.path.join, 'foo', 'bar') - tp.run_in_thread(os.path.join, 'baz', 'quux') - - # 4 threads in the ThreadPool, plus one pipe for IPC; this also - # serves as a sanity check that we're actually allocating some - # resources to free later - self.assertEqual(initial_thread_count, threading.activeCount() - 4) - self.assertEqual(initial_pipe_count, self._pipe_count() - 2) - - tp.terminate() - self.assertEqual(initial_thread_count, threading.activeCount()) - self.assertEqual(initial_pipe_count, self._pipe_count()) - - def test_cant_run_after_terminate(self): - tp = utils.ThreadPool(0) - tp.terminate() - self.assertRaises(ThreadPoolDead, tp.run_in_thread, lambda: 1) - self.assertRaises(ThreadPoolDead, tp.force_run_in_thread, lambda: 1) - - def test_double_terminate_doesnt_crash(self): - tp = utils.ThreadPool(0) - tp.terminate() - tp.terminate() - - tp = utils.ThreadPool(1) - tp.terminate() - tp.terminate() - - def test_terminate_no_threads_doesnt_crash(self): - tp = utils.ThreadPool(0) - tp.terminate() - - class TestAuditLocationGenerator(unittest.TestCase): def test_drive_tree_access(self): diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py index 818ad1563c..a4ca8cff26 100644 --- a/test/unit/obj/test_diskfile.py +++ b/test/unit/obj/test_diskfile.py @@ -940,7 +940,7 @@ def test_get_diskfile_from_hash(self): self.df_mgr.get_diskfile_from_hash( 'dev', '9', '9a7175077c01a23ade5956b8a2bba900', POLICIES[0]) dfclass.assert_called_once_with( - self.df_mgr, '/srv/dev/', self.df_mgr.threadpools['dev'], '9', + self.df_mgr, '/srv/dev/', '9', 'a', 'c', 'o', policy=POLICIES[0]) hclistdir.assert_called_once_with( '/srv/dev/objects/9/900/9a7175077c01a23ade5956b8a2bba900',