Skip to content

Commit

Permalink
pythonGH-109978: Allow multiprocessing finalizers to run on a separat…
Browse files Browse the repository at this point in the history
…e thread
  • Loading branch information
pitrou committed Oct 7, 2023
1 parent de2a403 commit a7d3581
Show file tree
Hide file tree
Showing 15 changed files with 264 additions and 73 deletions.
15 changes: 11 additions & 4 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,9 +561,9 @@ def shutdown_workers(self):
except queue.Full:
break

def join_executor_internals(self):
def join_executor_internals(self, broken=False):
with self.shutdown_lock:
self._join_executor_internals()
self._join_executor_internals(broken)

def _join_executor_internals(self, broken=False):
# If broken, call_queue was closed and so can no longer be used.
Expand Down Expand Up @@ -759,7 +759,11 @@ def _start_executor_manager_thread(self):
if not self._safe_to_dynamically_spawn_children: # ie, using fork.
self._launch_processes()
self._executor_manager_thread = _ExecutorManagerThread(self)
self._executor_manager_thread.start()
try:
self._executor_manager_thread.start()
except RuntimeError:
self._broken = "Executor manager thread could not be started"
raise BrokenProcessPool(self._broken)
_threads_wakeups[self._executor_manager_thread] = \
self._executor_manager_thread_wakeup

Expand Down Expand Up @@ -860,7 +864,10 @@ def shutdown(self, wait=True, *, cancel_futures=False):
self._executor_manager_thread_wakeup.wakeup()

if self._executor_manager_thread is not None and wait:
self._executor_manager_thread.join()
try:
self._executor_manager_thread.join()
except RuntimeError:
self._executor_manager_thread.join_executor_internals(broken=True)
# To reduce the risk of opening too many files, remove references to
# objects that use file descriptors.
self._executor_manager_thread = None
Expand Down
4 changes: 4 additions & 0 deletions Lib/multiprocessing/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,10 @@ def close(self):
self._listener = None
listener.close()

@property
def closed(self):
return self._listener is None

@property
def address(self):
return self._listener._address
Expand Down
43 changes: 6 additions & 37 deletions Lib/multiprocessing/heap.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,6 @@ def __init__(self, size=mmap.PAGESIZE):
self._allocated_blocks = defaultdict(set)
self._arenas = []

# List of pending blocks to free - see comment in free() below
self._pending_free_blocks = []

# Statistics
self._n_mallocs = 0
self._n_frees = 0
Expand Down Expand Up @@ -255,43 +252,16 @@ def _remove_allocated_block(self, block):
# Arena is entirely free, discard it from this process
self._discard_arena(arena)

def _free_pending_blocks(self):
# Free all the blocks in the pending list - called with the lock held.
while True:
try:
block = self._pending_free_blocks.pop()
except IndexError:
break
self._add_free_block(block)
self._remove_allocated_block(block)

def free(self, block):
# free a block returned by malloc()
# Since free() can be called asynchronously by the GC, it could happen
# that it's called while self._lock is held: in that case,
# self._lock.acquire() would deadlock (issue #12352). To avoid that, a
# trylock is used instead, and if the lock can't be acquired
# immediately, the block is added to a list of blocks to be freed
# synchronously sometimes later from malloc() or free(), by calling
# _free_pending_blocks() (appending and retrieving from a list is not
# strictly thread-safe but under CPython it's atomic thanks to the GIL).
if os.getpid() != self._lastpid:
raise ValueError(
"My pid ({0:n}) is not last pid {1:n}".format(
os.getpid(),self._lastpid))
if not self._lock.acquire(False):
# can't acquire the lock right now, add the block to the list of
# pending blocks to free
self._pending_free_blocks.append(block)
else:
# we hold the lock
try:
self._n_frees += 1
self._free_pending_blocks()
self._add_free_block(block)
self._remove_allocated_block(block)
finally:
self._lock.release()
with self._lock:
self._n_frees += 1
self._add_free_block(block)
self._remove_allocated_block(block)

def malloc(self, size):
# return a block of right size (possibly rounded up)
Expand All @@ -303,8 +273,6 @@ def malloc(self, size):
self.__init__() # reinitialize after fork
with self._lock:
self._n_mallocs += 1
# allow pending blocks to be marked available
self._free_pending_blocks()
size = self._roundup(max(size, 1), self._alignment)
(arena, start, stop) = self._malloc(size)
real_stop = start + size
Expand All @@ -330,7 +298,8 @@ def __init__(self, size):
raise OverflowError("Size {0:n} too large".format(size))
block = BufferWrapper._heap.malloc(size)
self._state = (block, size)
util.Finalize(self, BufferWrapper._heap.free, args=(block,))
util.Finalize(self, BufferWrapper._heap.free, args=(block,),
reentrant=False)

def create_memoryview(self):
(arena, start, stop), size = self._state
Expand Down
8 changes: 4 additions & 4 deletions Lib/multiprocessing/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,15 @@ def serve_forever(self):
except (KeyboardInterrupt, SystemExit):
pass
finally:
self.listener.close()
if sys.stdout != sys.__stdout__: # what about stderr?
util.debug('resetting stdout, stderr')
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
sys.exit(0)

def accepter(self):
while True:
while not self.listener.closed:
try:
c = self.listener.accept()
except OSError:
Expand Down Expand Up @@ -575,7 +576,7 @@ def start(self, initializer=None, initargs=()):
self, type(self)._finalize_manager,
args=(self._process, self._address, self._authkey, self._state,
self._Client, self._shutdown_timeout),
exitpriority=0
exitpriority=0, reentrant=False
)

@classmethod
Expand Down Expand Up @@ -859,12 +860,11 @@ def _incref(self):
self._idset.add(self._id)

state = self._manager and self._manager._state

self._close = util.Finalize(
self, BaseProxy._decref,
args=(self._token, self._authkey, state,
self._tls, self._idset, self._Client),
exitpriority=10
exitpriority=10, reentrant=False
)

@staticmethod
Expand Down
7 changes: 4 additions & 3 deletions Lib/multiprocessing/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ def __init__(self, processes=None, initializer=None, initargs=(),
p.terminate()
for p in self._pool:
p.join()
self._pool.clear()
raise

sentinels = self._get_sentinels()
Expand Down Expand Up @@ -257,7 +258,7 @@ def __init__(self, processes=None, initializer=None, initargs=(),
args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
self._change_notifier, self._worker_handler, self._task_handler,
self._result_handler, self._cache),
exitpriority=15
exitpriority=15, reentrant=False
)
self._state = RUN

Expand Down Expand Up @@ -665,8 +666,8 @@ def join(self):
self._worker_handler.join()
self._task_handler.join()
self._result_handler.join()
for p in self._pool:
p.join()
while self._pool:
self._pool.pop().join()

@staticmethod
def _help_stuff_finish(inqueue, task_handler, size):
Expand Down
4 changes: 2 additions & 2 deletions Lib/multiprocessing/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,14 @@ def _start_thread(self):
self._jointhread = Finalize(
self._thread, Queue._finalize_join,
[weakref.ref(self._thread)],
exitpriority=-5
exitpriority=-5, reentrant=False
)

# Send sentinel to the thread queue object when garbage collected
self._close = Finalize(
self, Queue._finalize_close,
[self._buffer, self._notempty],
exitpriority=10
exitpriority=10, reentrant=False
)

@staticmethod
Expand Down
2 changes: 1 addition & 1 deletion Lib/multiprocessing/synchronize.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def _after_fork(obj):
from .resource_tracker import register
register(self._semlock.name, "semaphore")
util.Finalize(self, SemLock._cleanup, (self._semlock.name,),
exitpriority=0)
exitpriority=0, reentrant=False)

@staticmethod
def _cleanup(name):
Expand Down

0 comments on commit a7d3581

Please sign in to comment.