Skip to content

Commit

Permalink
bpo-21423: Add an initializer argument to {Process,Thread}PoolExecutor (
Browse files Browse the repository at this point in the history
#4241)

* bpo-21423: Add an initializer argument to {Process,Thread}PoolExecutor

* Fix docstring
  • Loading branch information
pitrou committed Nov 4, 2017
1 parent b838cc3 commit 63ff413
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 81 deletions.
48 changes: 42 additions & 6 deletions Doc/library/concurrent.futures.rst
Expand Up @@ -124,11 +124,17 @@ And::
executor.submit(wait_on_future)


.. class:: ThreadPoolExecutor(max_workers=None, thread_name_prefix='')
.. class:: ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())

An :class:`Executor` subclass that uses a pool of at most *max_workers*
threads to execute calls asynchronously.

*initializer* is an optional callable that is called at the start of
each worker thread; *initargs* is a tuple of arguments passed to the
initializer. Should *initializer* raise an exception, all currently
pending jobs will raise a :exc:`~concurrent.futures.thread.BrokenThreadPool`,
as well any attempt to submit more jobs to the pool.

.. versionchanged:: 3.5
If *max_workers* is ``None`` or
not given, it will default to the number of processors on the machine,
Expand All @@ -142,6 +148,10 @@ And::
control the threading.Thread names for worker threads created by
the pool for easier debugging.

.. versionchanged:: 3.7
Added the *initializer* and *initargs* arguments.


.. _threadpoolexecutor-example:

ThreadPoolExecutor Example
Expand Down Expand Up @@ -191,7 +201,7 @@ that :class:`ProcessPoolExecutor` will not work in the interactive interpreter.
Calling :class:`Executor` or :class:`Future` methods from a callable submitted
to a :class:`ProcessPoolExecutor` will result in deadlock.

.. class:: ProcessPoolExecutor(max_workers=None, mp_context=None)
.. class:: ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())

An :class:`Executor` subclass that executes calls asynchronously using a pool
of at most *max_workers* processes. If *max_workers* is ``None`` or not
Expand All @@ -202,6 +212,12 @@ to a :class:`ProcessPoolExecutor` will result in deadlock.
launch the workers. If *mp_context* is ``None`` or not given, the default
multiprocessing context is used.

*initializer* is an optional callable that is called at the start of
each worker process; *initargs* is a tuple of arguments passed to the
initializer. Should *initializer* raise an exception, all currently
pending jobs will raise a :exc:`~concurrent.futures.thread.BrokenThreadPool`,
as well any attempt to submit more jobs to the pool.

.. versionchanged:: 3.3
When one of the worker processes terminates abruptly, a
:exc:`BrokenProcessPool` error is now raised. Previously, behaviour
Expand All @@ -212,6 +228,8 @@ to a :class:`ProcessPoolExecutor` will result in deadlock.
The *mp_context* argument was added to allow users to control the
start_method for worker processes created by the pool.

Added the *initializer* and *initargs* arguments.


.. _processpoolexecutor-example:

Expand Down Expand Up @@ -432,13 +450,31 @@ Exception classes

Raised when a future operation exceeds the given timeout.

.. exception:: BrokenExecutor

Derived from :exc:`RuntimeError`, this exception class is raised
when an executor is broken for some reason, and cannot be used
to submit or execute new tasks.

.. versionadded:: 3.7

.. currentmodule:: concurrent.futures.thread

.. exception:: BrokenThreadPool

Derived from :exc:`~concurrent.futures.BrokenExecutor`, this exception
class is raised when one of the workers of a :class:`ThreadPoolExecutor`
has failed initializing.

.. versionadded:: 3.7

.. currentmodule:: concurrent.futures.process

.. exception:: BrokenProcessPool

Derived from :exc:`RuntimeError`, this exception class is raised when
one of the workers of a :class:`ProcessPoolExecutor` has terminated
in a non-clean fashion (for example, if it was killed from the outside).
Derived from :exc:`~concurrent.futures.BrokenExecutor` (formerly
:exc:`RuntimeError`), this exception class is raised when one of the
workers of a :class:`ProcessPoolExecutor` has terminated in a non-clean
fashion (for example, if it was killed from the outside).

.. versionadded:: 3.3

1 change: 1 addition & 0 deletions Lib/concurrent/futures/__init__.py
Expand Up @@ -10,6 +10,7 @@
ALL_COMPLETED,
CancelledError,
TimeoutError,
BrokenExecutor,
Future,
Executor,
wait,
Expand Down
6 changes: 6 additions & 0 deletions Lib/concurrent/futures/_base.py
Expand Up @@ -610,3 +610,9 @@ def __enter__(self):
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown(wait=True)
return False


class BrokenExecutor(RuntimeError):
"""
Raised when a executor has become non-functional after a severe failure.
"""
36 changes: 29 additions & 7 deletions Lib/concurrent/futures/process.py
Expand Up @@ -131,6 +131,7 @@ def __init__(self, work_id, fn, args, kwargs):
self.args = args
self.kwargs = kwargs


def _get_chunks(*iterables, chunksize):
""" Iterates over zip()ed iterables in chunks. """
it = zip(*iterables)
Expand All @@ -151,7 +152,7 @@ def _process_chunk(fn, chunk):
"""
return [fn(*args) for args in chunk]

def _process_worker(call_queue, result_queue):
def _process_worker(call_queue, result_queue, initializer, initargs):
"""Evaluates calls from call_queue and places the results in result_queue.
This worker is run in a separate process.
Expand All @@ -161,7 +162,17 @@ def _process_worker(call_queue, result_queue):
evaluated by the worker.
result_queue: A ctx.Queue of _ResultItems that will written
to by the worker.
initializer: A callable initializer, or None
initargs: A tuple of args for the initializer
"""
if initializer is not None:
try:
initializer(*initargs)
except BaseException:
_base.LOGGER.critical('Exception in initializer:', exc_info=True)
# The parent will notice that the process stopped and
# mark the pool broken
return
while True:
call_item = call_queue.get(block=True)
if call_item is None:
Expand Down Expand Up @@ -277,7 +288,9 @@ def shutdown_worker():
# Mark the process pool broken so that submits fail right now.
executor = executor_reference()
if executor is not None:
executor._broken = True
executor._broken = ('A child process terminated '
'abruptly, the process pool is not '
'usable anymore')
executor._shutdown_thread = True
executor = None
# All futures in flight must be marked failed
Expand Down Expand Up @@ -372,15 +385,16 @@ def _chain_from_iterable_of_lists(iterable):
yield element.pop()


class BrokenProcessPool(RuntimeError):
class BrokenProcessPool(_base.BrokenExecutor):
"""
Raised when a process in a ProcessPoolExecutor terminated abruptly
while a future was in the running state.
"""


class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None, mp_context=None):
def __init__(self, max_workers=None, mp_context=None,
initializer=None, initargs=()):
"""Initializes a new ProcessPoolExecutor instance.
Args:
Expand All @@ -389,6 +403,8 @@ def __init__(self, max_workers=None, mp_context=None):
worker processes will be created as the machine has processors.
mp_context: A multiprocessing context to launch the workers. This
object should provide SimpleQueue, Queue and Process.
initializer: An callable used to initialize worker processes.
initargs: A tuple of arguments to pass to the initializer.
"""
_check_system_limits()

Expand All @@ -403,6 +419,11 @@ def __init__(self, max_workers=None, mp_context=None):
mp_context = mp.get_context()
self._mp_context = mp_context

if initializer is not None and not callable(initializer):
raise TypeError("initializer must be a callable")
self._initializer = initializer
self._initargs = initargs

# Make the call queue slightly larger than the number of processes to
# prevent the worker processes from idling. But don't make it too big
# because futures in the call queue cannot be cancelled.
Expand Down Expand Up @@ -450,15 +471,16 @@ def _adjust_process_count(self):
p = self._mp_context.Process(
target=_process_worker,
args=(self._call_queue,
self._result_queue))
self._result_queue,
self._initializer,
self._initargs))
p.start()
self._processes[p.pid] = p

def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
if self._broken:
raise BrokenProcessPool('A child process terminated '
'abruptly, the process pool is not usable anymore')
raise BrokenProcessPool(self._broken)
if self._shutdown_thread:
raise RuntimeError('cannot schedule new futures after shutdown')

Expand Down
51 changes: 48 additions & 3 deletions Lib/concurrent/futures/thread.py
Expand Up @@ -41,6 +41,7 @@ def _python_exit():

atexit.register(_python_exit)


class _WorkItem(object):
def __init__(self, future, fn, args, kwargs):
self.future = future
Expand All @@ -61,7 +62,17 @@ def run(self):
else:
self.future.set_result(result)

def _worker(executor_reference, work_queue):

def _worker(executor_reference, work_queue, initializer, initargs):
if initializer is not None:
try:
initializer(*initargs)
except BaseException:
_base.LOGGER.critical('Exception in initializer:', exc_info=True)
executor = executor_reference()
if executor is not None:
executor._initializer_failed()
return
try:
while True:
work_item = work_queue.get(block=True)
Expand All @@ -83,18 +94,28 @@ def _worker(executor_reference, work_queue):
except BaseException:
_base.LOGGER.critical('Exception in worker', exc_info=True)


class BrokenThreadPool(_base.BrokenExecutor):
"""
Raised when a worker thread in a ThreadPoolExecutor failed initializing.
"""


class ThreadPoolExecutor(_base.Executor):

# Used to assign unique thread names when thread_name_prefix is not supplied.
_counter = itertools.count().__next__

def __init__(self, max_workers=None, thread_name_prefix=''):
def __init__(self, max_workers=None, thread_name_prefix='',
initializer=None, initargs=()):
"""Initializes a new ThreadPoolExecutor instance.
Args:
max_workers: The maximum number of threads that can be used to
execute the given calls.
thread_name_prefix: An optional name prefix to give our threads.
initializer: An callable used to initialize worker threads.
initargs: A tuple of arguments to pass to the initializer.
"""
if max_workers is None:
# Use this number because ThreadPoolExecutor is often
Expand All @@ -103,16 +124,25 @@ def __init__(self, max_workers=None, thread_name_prefix=''):
if max_workers <= 0:
raise ValueError("max_workers must be greater than 0")

if initializer is not None and not callable(initializer):
raise TypeError("initializer must be a callable")

self._max_workers = max_workers
self._work_queue = queue.Queue()
self._threads = set()
self._broken = False
self._shutdown = False
self._shutdown_lock = threading.Lock()
self._thread_name_prefix = (thread_name_prefix or
("ThreadPoolExecutor-%d" % self._counter()))
self._initializer = initializer
self._initargs = initargs

def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
if self._broken:
raise BrokenThreadPool(self._broken)

if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')

Expand All @@ -137,12 +167,27 @@ def weakref_cb(_, q=self._work_queue):
num_threads)
t = threading.Thread(name=thread_name, target=_worker,
args=(weakref.ref(self, weakref_cb),
self._work_queue))
self._work_queue,
self._initializer,
self._initargs))
t.daemon = True
t.start()
self._threads.add(t)
_threads_queues[t] = self._work_queue

def _initializer_failed(self):
with self._shutdown_lock:
self._broken = ('A thread initializer failed, the thread pool '
'is not usable anymore')
# Drain work queue and mark pending futures failed
while True:
try:
work_item = self._work_queue.get_nowait()
except queue.Empty:
break
if work_item is not None:
work_item.future.set_exception(BrokenThreadPool(self._broken))

def shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown = True
Expand Down

0 comments on commit 63ff413

Please sign in to comment.