Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-89240: Limit multiprocessing.Pool to 61 workers on Windows #102920

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions Doc/library/multiprocessing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2209,6 +2209,10 @@ with the :class:`Pool` class.

*processes* is the number of worker processes to use. If *processes* is
``None`` then the number returned by :func:`os.cpu_count` is used.
On Windows, *processes* must be equal or lower than ``61``. If it is not
then :exc:`ValueError` will be raised. If *processes* is ``None``, then
the default chosen will be at most ``61``, even if more processors are
available.

If *initializer* is not ``None`` then each worker process will call
``initializer(*initargs)`` when it starts.
Expand Down
10 changes: 10 additions & 0 deletions Lib/multiprocessing/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import itertools
import os
import queue
import sys
import threading
import time
import traceback
Expand Down Expand Up @@ -175,6 +176,10 @@ class Pool(object):
Class which supports an async version of applying functions to arguments.
'''
_wrap_exception = True
# On Windows, WaitForMultipleObjects is used to wait for processes to
# finish. It can wait on, at most, 64 objects. There is an overhead of three
# objects.
_MAX_WINDOWS_WORKERS = 64 - 3

@staticmethod
def Process(ctx, *args, **kwds):
Expand All @@ -201,8 +206,12 @@ def __init__(self, processes=None, initializer=None, initargs=(),

if processes is None:
processes = os.cpu_count() or 1
if sys.platform == 'win32':
processes = min(processes, self._MAX_WINDOWS_WORKERS)
if processes < 1:
raise ValueError("Number of processes must be at least 1")
if sys.platform == 'win32' and processes > self._MAX_WINDOWS_WORKERS:
raise ValueError(f"max_workers must be <= {self._MAX_WINDOWS_WORKERS}")
if maxtasksperchild is not None:
if not isinstance(maxtasksperchild, int) or maxtasksperchild <= 0:
raise ValueError("maxtasksperchild must be a positive int or None")
Expand Down Expand Up @@ -920,6 +929,7 @@ def _set(self, i, obj):

class ThreadPool(Pool):
_wrap_exception = False
_MAX_WINDOWS_WORKERS = float("inf")

@staticmethod
def Process(ctx, *args, **kwds):
Expand Down
12 changes: 12 additions & 0 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2772,6 +2772,18 @@ def test_resource_warning(self):
pool = None
support.gc_collect()

class TestPoolMaxWorkers(unittest.TestCase):
@unittest.skipUnless(sys.platform=='win32', 'Windows-only process limit')
def test_max_workers_too_large(self):
with self.assertRaisesRegex(ValueError, "max_workers must be <= 61"):
multiprocessing.pool.Pool(62)

# ThreadPool have no limit.
p = multiprocessing.pool.ThreadPool(62)
p.close()
p.join()


def raising():
raise KeyError("key")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Limit ``processes`` in :class:`multiprocessing.Pool` to 61 to work around a
WaitForMultipleObjects limitation.