Skip to content

Commit bf2e7e5

Browse files
bpo-40692: Run more test_concurrent_futures tests (pythonGH-20239)
In the case of multiprocessing.synchronize() being missing, the test_concurrent_futures test suite now skips only the tests that require multiprocessing.synchronize(). Validate that multiprocessing.synchronize exists as part of _check_system_limits(), allowing ProcessPoolExecutor to raise NotImplementedError during __init__, rather than crashing with ImportError during __init__ when creating a lock imported from multiprocessing.synchronize. Use _check_system_limits() to disable tests of ProcessPoolExecutor on systems without multiprocessing.synchronize. Running the test suite without multiprocessing.synchronize reveals that Lib/compileall.py crashes when it uses a ProcessPoolExecutor. Therefore, change Lib/compileall.py to call _check_system_limits() before creating the ProcessPoolExecutor. Note that both Lib/compileall.py and Lib/test/test_compileall.py were attempting to sanity-check ProcessPoolExecutor by expecting ImportError. In multiprocessing.resource_tracker, sem_unlink() is also absent on platforms where POSIX semaphores aren't available. Avoid using sem_unlink() if it, too, does not exist. Co-authored-by: Pablo Galindo <Pablogsal@gmail.com>
1 parent 30a8b28 commit bf2e7e5

File tree

6 files changed

+49
-10
lines changed

6 files changed

+49
-10
lines changed

Lib/compileall.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,12 +84,14 @@ def compile_dir(dir, maxlevels=None, ddir=None, force=False,
8484
if workers < 0:
8585
raise ValueError('workers must be greater or equal to 0')
8686
if workers != 1:
87+
# Check if this is a system where ProcessPoolExecutor can function.
88+
from concurrent.futures.process import _check_system_limits
8789
try:
88-
# Only import when needed, as low resource platforms may
89-
# fail to import it
90-
from concurrent.futures import ProcessPoolExecutor
91-
except ImportError:
90+
_check_system_limits()
91+
except NotImplementedError:
9292
workers = 1
93+
else:
94+
from concurrent.futures import ProcessPoolExecutor
9395
if maxlevels is None:
9496
maxlevels = sys.getrecursionlimit()
9597
files = _walk_dir(dir, quiet=quiet, maxlevels=maxlevels)

Lib/concurrent/futures/process.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,14 @@ def _check_system_limits():
532532
if _system_limited:
533533
raise NotImplementedError(_system_limited)
534534
_system_limits_checked = True
535+
try:
536+
import multiprocessing.synchronize
537+
except ImportError:
538+
_system_limited = (
539+
"This Python build lacks multiprocessing.synchronize, usually due "
540+
"to named semaphores being unavailable on this platform."
541+
)
542+
raise NotImplementedError(_system_limited)
535543
try:
536544
nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
537545
except (AttributeError, ValueError):

Lib/multiprocessing/resource_tracker.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,16 @@
3737
import _multiprocessing
3838
import _posixshmem
3939

40+
# Use sem_unlink() to clean up named semaphores.
41+
#
42+
# sem_unlink() may be missing if the Python build process detected the
43+
# absence of POSIX named semaphores. In that case, no named semaphores were
44+
# ever opened, so no cleanup would be necessary.
45+
if hasattr(_multiprocessing, 'sem_unlink'):
46+
_CLEANUP_FUNCS.update({
47+
'semaphore': _multiprocessing.sem_unlink,
48+
})
4049
_CLEANUP_FUNCS.update({
41-
'semaphore': _multiprocessing.sem_unlink,
4250
'shared_memory': _posixshmem.shm_unlink,
4351
})
4452

Lib/test/test_compileall.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,14 @@
1616
import unittest
1717

1818
from unittest import mock, skipUnless
19+
from concurrent.futures import ProcessPoolExecutor
1920
try:
20-
from concurrent.futures import ProcessPoolExecutor
21+
# compileall relies on ProcessPoolExecutor if ProcessPoolExecutor exists
22+
# and it can function.
23+
from concurrent.futures.process import _check_system_limits
24+
_check_system_limits()
2125
_have_multiprocessing = True
22-
except ImportError:
26+
except NotImplementedError:
2327
_have_multiprocessing = False
2428

2529
from test import support
@@ -188,6 +192,7 @@ def test_compile_dir_pathlike(self):
188192
self.assertRegex(line, r'Listing ([^WindowsPath|PosixPath].*)')
189193
self.assertTrue(os.path.isfile(self.bc_path))
190194

195+
@skipUnless(_have_multiprocessing, "requires multiprocessing")
191196
@mock.patch('concurrent.futures.ProcessPoolExecutor')
192197
def test_compile_pool_called(self, pool_mock):
193198
compileall.compile_dir(self.directory, quiet=True, workers=5)
@@ -198,11 +203,13 @@ def test_compile_workers_non_positive(self):
198203
"workers must be greater or equal to 0"):
199204
compileall.compile_dir(self.directory, workers=-1)
200205

206+
@skipUnless(_have_multiprocessing, "requires multiprocessing")
201207
@mock.patch('concurrent.futures.ProcessPoolExecutor')
202208
def test_compile_workers_cpu_count(self, pool_mock):
203209
compileall.compile_dir(self.directory, quiet=True, workers=0)
204210
self.assertEqual(pool_mock.call_args[1]['max_workers'], None)
205211

212+
@skipUnless(_have_multiprocessing, "requires multiprocessing")
206213
@mock.patch('concurrent.futures.ProcessPoolExecutor')
207214
@mock.patch('compileall.compile_file')
208215
def test_compile_one_worker(self, compile_file_mock, pool_mock):

Lib/test/test_concurrent_futures.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44

55
# Skip tests if _multiprocessing wasn't built.
66
import_helper.import_module('_multiprocessing')
7-
# Skip tests if sem_open implementation is broken.
8-
support.skip_if_broken_multiprocessing_synchronize()
97

108
from test.support import hashlib_helper
119
from test.support.script_helper import assert_python_ok
@@ -27,7 +25,7 @@
2725
from concurrent.futures._base import (
2826
PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
2927
BrokenExecutor)
30-
from concurrent.futures.process import BrokenProcessPool
28+
from concurrent.futures.process import BrokenProcessPool, _check_system_limits
3129
from multiprocessing import get_context
3230

3331
import multiprocessing.process
@@ -161,6 +159,10 @@ class ProcessPoolForkMixin(ExecutorMixin):
161159
ctx = "fork"
162160

163161
def get_context(self):
162+
try:
163+
_check_system_limits()
164+
except NotImplementedError:
165+
self.skipTest("ProcessPoolExecutor unavailable on this system")
164166
if sys.platform == "win32":
165167
self.skipTest("require unix system")
166168
return super().get_context()
@@ -170,12 +172,23 @@ class ProcessPoolSpawnMixin(ExecutorMixin):
170172
executor_type = futures.ProcessPoolExecutor
171173
ctx = "spawn"
172174

175+
def get_context(self):
176+
try:
177+
_check_system_limits()
178+
except NotImplementedError:
179+
self.skipTest("ProcessPoolExecutor unavailable on this system")
180+
return super().get_context()
181+
173182

174183
class ProcessPoolForkserverMixin(ExecutorMixin):
175184
executor_type = futures.ProcessPoolExecutor
176185
ctx = "forkserver"
177186

178187
def get_context(self):
188+
try:
189+
_check_system_limits()
190+
except NotImplementedError:
191+
self.skipTest("ProcessPoolExecutor unavailable on this system")
179192
if sys.platform == "win32":
180193
self.skipTest("require unix system")
181194
return super().get_context()
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
In the :class:`concurrent.futures.ProcessPoolExecutor`, validate that :func:`multiprocess.synchronize` is available on a given platform and rely on that check in the :mod:`concurrent.futures` test suite so we can run tests that are unrelated to :class:`ProcessPoolExecutor` on those platforms.

0 commit comments

Comments
 (0)