Skip to content

Commit

Permalink
Remove ThreadPool class
Browse files Browse the repository at this point in the history
With the removement of threads_per_disk there is no longer a need to use
run_in_thread() at all; it was just calling the function itself when
running with 0 threads.
Similar to force_run_in_thread() - with 0 threads it was basically doing
the same like in tpool_reraise(), therefore replacing the call and
finally removing the complete ThreadPool class.

Note that this might break external consumers that are inheriting
BaseDiskFileManager; in this case you need to adopt this change in your
codebase then.

Change-Id: I39489dd660935bdbfbc26b92af86814369369fb5
  • Loading branch information
cschwede committed Apr 29, 2016
1 parent 9d6a055 commit 4c11833
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 409 deletions.
4 changes: 0 additions & 4 deletions swift/common/exceptions.py
Expand Up @@ -145,10 +145,6 @@ class LockTimeout(MessageTimeout):
pass


class ThreadPoolDead(SwiftException):
pass


class RingBuilderError(SwiftException):
pass

Expand Down
202 changes: 1 addition & 201 deletions swift/common/utils.py
Expand Up @@ -47,8 +47,7 @@

import eventlet
import eventlet.semaphore
from eventlet import GreenPool, sleep, Timeout, tpool, greenthread, \
greenio, event
from eventlet import GreenPool, sleep, Timeout, tpool
from eventlet.green import socket, threading
import eventlet.queue
import netifaces
Expand Down Expand Up @@ -3210,205 +3209,6 @@ def inner():
return resp


class ThreadPool(object):
"""
Perform blocking operations in background threads.
Call its methods from within greenlets to green-wait for results without
blocking the eventlet reactor (hopefully).
"""

BYTE = 'a'.encode('utf-8')

def __init__(self, nthreads=2):
self.nthreads = nthreads
self._run_queue = stdlib_queue.Queue()
self._result_queue = stdlib_queue.Queue()
self._threads = []
self._alive = True

if nthreads <= 0:
return

# We spawn a greenthread whose job it is to pull results from the
# worker threads via a real Queue and send them to eventlet Events so
# that the calling greenthreads can be awoken.
#
# Since each OS thread has its own collection of greenthreads, it
# doesn't work to have the worker thread send stuff to the event, as
# it then notifies its own thread-local eventlet hub to wake up, which
# doesn't do anything to help out the actual calling greenthread over
# in the main thread.
#
# Thus, each worker sticks its results into a result queue and then
# writes a byte to a pipe, signaling the result-consuming greenlet (in
# the main thread) to wake up and consume results.
#
# This is all stuff that eventlet.tpool does, but that code can't have
# multiple instances instantiated. Since the object server uses one
# pool per disk, we have to reimplement this stuff.
_raw_rpipe, self.wpipe = os.pipe()
self.rpipe = greenio.GreenPipe(_raw_rpipe, 'rb')

for _junk in range(nthreads):
thr = stdlib_threading.Thread(
target=self._worker,
args=(self._run_queue, self._result_queue))
thr.daemon = True
thr.start()
self._threads.append(thr)

# This is the result-consuming greenthread that runs in the main OS
# thread, as described above.
self._consumer_coro = greenthread.spawn_n(self._consume_results,
self._result_queue)

def _worker(self, work_queue, result_queue):
"""
Pulls an item from the queue and runs it, then puts the result into
the result queue. Repeats forever.
:param work_queue: queue from which to pull work
:param result_queue: queue into which to place results
"""
while True:
item = work_queue.get()
if item is None:
break
ev, func, args, kwargs = item
try:
result = func(*args, **kwargs)
result_queue.put((ev, True, result))
except BaseException:
result_queue.put((ev, False, sys.exc_info()))
finally:
work_queue.task_done()
os.write(self.wpipe, self.BYTE)

def _consume_results(self, queue):
"""
Runs as a greenthread in the same OS thread as callers of
run_in_thread().
Takes results from the worker OS threads and sends them to the waiting
greenthreads.
"""
while True:
try:
self.rpipe.read(1)
except ValueError:
# can happen at process shutdown when pipe is closed
break

while True:
try:
ev, success, result = queue.get(block=False)
except stdlib_queue.Empty:
break

try:
if success:
ev.send(result)
else:
ev.send_exception(*result)
finally:
queue.task_done()

def run_in_thread(self, func, *args, **kwargs):
"""
Runs ``func(*args, **kwargs)`` in a thread. Blocks the current greenlet
until results are available.
Exceptions thrown will be reraised in the calling thread.
If the threadpool was initialized with nthreads=0, it invokes
``func(*args, **kwargs)`` directly, followed by eventlet.sleep() to
ensure the eventlet hub has a chance to execute. It is more likely the
hub will be invoked when queuing operations to an external thread.
:returns: result of calling func
:raises: whatever func raises
"""
if not self._alive:
raise swift.common.exceptions.ThreadPoolDead()

if self.nthreads <= 0:
result = func(*args, **kwargs)
sleep()
return result

ev = event.Event()
self._run_queue.put((ev, func, args, kwargs), block=False)

# blocks this greenlet (and only *this* greenlet) until the real
# thread calls ev.send().
result = ev.wait()
return result

def _run_in_eventlet_tpool(self, func, *args, **kwargs):
"""
Really run something in an external thread, even if we haven't got any
threads of our own.
"""
def inner():
try:
return (True, func(*args, **kwargs))
except (Timeout, BaseException) as err:
return (False, err)

success, result = tpool.execute(inner)
if success:
return result
else:
raise result

def force_run_in_thread(self, func, *args, **kwargs):
"""
Runs ``func(*args, **kwargs)`` in a thread. Blocks the current greenlet
until results are available.
Exceptions thrown will be reraised in the calling thread.
If the threadpool was initialized with nthreads=0, uses eventlet.tpool
to run the function. This is in contrast to run_in_thread(), which
will (in that case) simply execute func in the calling thread.
:returns: result of calling func
:raises: whatever func raises
"""
if not self._alive:
raise swift.common.exceptions.ThreadPoolDead()

if self.nthreads <= 0:
return self._run_in_eventlet_tpool(func, *args, **kwargs)
else:
return self.run_in_thread(func, *args, **kwargs)

def terminate(self):
"""
Releases the threadpool's resources (OS threads, greenthreads, pipes,
etc.) and renders it unusable.
Don't call run_in_thread() or force_run_in_thread() after calling
terminate().
"""
self._alive = False
if self.nthreads <= 0:
return

for _junk in range(self.nthreads):
self._run_queue.put(None)
for thr in self._threads:
thr.join()
self._threads = []
self.nthreads = 0

greenthread.kill(self._consumer_coro)

self.rpipe.close()
os.close(self.wpipe)


def ismount(path):
"""
Test whether a path is a mount point. This will catch any
Expand Down

0 comments on commit 4c11833

Please sign in to comment.