Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 204 additions & 11 deletions Lib/test/_test_multiprocessing.py

Large diffs are not rendered by default.

36 changes: 24 additions & 12 deletions Lib/test/support/warnings_helper.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import contextlib
import functools
import importlib
import re
import sys
import warnings



def import_deprecated(name):
"""Import *name* while suppressing DeprecationWarning."""
with warnings.catch_warnings():
Expand Down Expand Up @@ -42,20 +42,32 @@ def check_syntax_warning(testcase, statement, errtext='',
testcase.assertEqual(warns, [])


def ignore_warnings(*, category):
@contextlib.contextmanager
def ignore_warnings(*, category, message=''):
"""Decorator to suppress warnings.

Use of context managers to hide warnings make diffs
more noisy and tools like 'git blame' less useful.
Can also be used as a context manager. This is not preferred,
because it makes diffs more noisy and tools like 'git blame' less useful.
But, it's useful for async functions.
"""
def decorator(test):
@functools.wraps(test)
def wrapper(self, *args, **kwargs):
with warnings.catch_warnings():
warnings.simplefilter('ignore', category=category)
return test(self, *args, **kwargs)
return wrapper
return decorator
with warnings.catch_warnings():
warnings.filterwarnings('ignore', category=category, message=message)
yield


@contextlib.contextmanager
def ignore_fork_in_thread_deprecation_warnings():
"""Suppress deprecation warnings related to forking in multi-threaded code.

See gh-135427

Can be used as decorator (preferred) or context manager.
"""
with ignore_warnings(
message=".*fork.*may lead to deadlocks in the child.*",
category=DeprecationWarning,
):
yield


class WarningsRecorder(object):
Expand Down
120 changes: 59 additions & 61 deletions Lib/test/test_asyncio/test_unix_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from unittest import mock

from test import support
from test.support import os_helper
from test.support import os_helper, warnings_helper
from test.support import socket_helper
from test.support import wait_process
from test.support import hashlib_helper
Expand Down Expand Up @@ -1180,67 +1180,63 @@ async def runner():


@support.requires_fork()
class TestFork(unittest.TestCase):

def test_fork_not_share_current_task(self):
loop = object()
task = object()
asyncio._set_running_loop(loop)
self.addCleanup(asyncio._set_running_loop, None)
asyncio.tasks._enter_task(loop, task)
self.addCleanup(asyncio.tasks._leave_task, loop, task)
self.assertIs(asyncio.current_task(), task)
r, w = os.pipe()
self.addCleanup(os.close, r)
self.addCleanup(os.close, w)
pid = os.fork()
if pid == 0:
# child
try:
asyncio._set_running_loop(loop)
current_task = asyncio.current_task()
if current_task is None:
os.write(w, b'NO TASK')
else:
os.write(w, b'TASK:' + str(id(current_task)).encode())
except BaseException as e:
os.write(w, b'ERROR:' + ascii(e).encode())
finally:
asyncio._set_running_loop(None)
os._exit(0)
else:
# parent
result = os.read(r, 100)
self.assertEqual(result, b'NO TASK')
wait_process(pid, exitcode=0)

def test_fork_not_share_event_loop(self):
# The forked process should not share the event loop with the parent
loop = object()
asyncio._set_running_loop(loop)
self.assertIs(asyncio.get_running_loop(), loop)
self.addCleanup(asyncio._set_running_loop, None)
r, w = os.pipe()
self.addCleanup(os.close, r)
self.addCleanup(os.close, w)
pid = os.fork()
if pid == 0:
# child
try:
loop = asyncio.get_event_loop()
os.write(w, b'LOOP:' + str(id(loop)).encode())
except RuntimeError:
os.write(w, b'NO LOOP')
except BaseException as e:
os.write(w, b'ERROR:' + ascii(e).encode())
finally:
os._exit(0)
else:
# parent
result = os.read(r, 100)
self.assertEqual(result, b'NO LOOP')
wait_process(pid, exitcode=0)
class TestFork(unittest.IsolatedAsyncioTestCase):

async def test_fork_not_share_current_task(self):
with warnings_helper.ignore_fork_in_thread_deprecation_warnings():
loop = asyncio.get_running_loop()
task = asyncio.current_task()
self.assertIsNotNone(task)
r, w = os.pipe()
self.addCleanup(os.close, r)
self.addCleanup(os.close, w)
pid = os.fork()
if pid == 0:
# child
try:
asyncio._set_running_loop(loop)
current_task = asyncio.current_task()
if current_task is None:
os.write(w, b'NO TASK')
else:
os.write(w, b'TASK:' + str(id(current_task)).encode())
except BaseException as e:
os.write(w, b'ERROR:' + ascii(e).encode())
finally:
asyncio._set_running_loop(None)
os._exit(0)
else:
# parent
result = os.read(r, 100)
self.assertEqual(result, b'NO TASK')
wait_process(pid, exitcode=0)

async def test_fork_not_share_event_loop(self):
with warnings_helper.ignore_fork_in_thread_deprecation_warnings():
# The forked process should not share the event loop with the parent
loop = asyncio.get_running_loop()
r, w = os.pipe()
self.addCleanup(os.close, r)
self.addCleanup(os.close, w)
pid = os.fork()
if pid == 0:
# child
try:
loop = asyncio.get_event_loop()
os.write(w, b'LOOP:' + str(id(loop)).encode())
except RuntimeError:
os.write(w, b'NO LOOP')
except BaseException as e:
os.write(w, b'ERROR:' + ascii(e).encode())
finally:
os._exit(0)
else:
# parent
result = os.read(r, 100)
self.assertEqual(result, b'NO LOOP')
wait_process(pid, exitcode=0)

@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@hashlib_helper.requires_hashdigest('md5')
@support.skip_if_sanitizer("TSAN doesn't support threads after fork", thread=True)
def test_fork_signal_handling(self):
Expand Down Expand Up @@ -1288,6 +1284,7 @@ async def func():
self.assertFalse(parent_handled.is_set())
self.assertTrue(child_handled.is_set())

@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@hashlib_helper.requires_hashdigest('md5')
@support.skip_if_sanitizer("TSAN doesn't support threads after fork", thread=True)
def test_fork_asyncio_run(self):
Expand All @@ -1308,6 +1305,7 @@ async def child_main():

self.assertEqual(result.value, 42)

@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@hashlib_helper.requires_hashdigest('md5')
@support.skip_if_sanitizer("TSAN doesn't support threads after fork", thread=True)
def test_fork_asyncio_subprocess(self):
Expand Down
2 changes: 2 additions & 0 deletions Lib/test/test_builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from test import support
from test.support import cpython_only, swap_attr
from test.support import async_yield, run_yielding_async_fn
from test.support import warnings_helper
from test.support.import_helper import import_module
from test.support.os_helper import (EnvironmentVarGuard, TESTFN, unlink)
from test.support.script_helper import assert_python_ok
Expand Down Expand Up @@ -2545,6 +2546,7 @@ def run_child(self, child, terminal_input):
finally:
signal.signal(signal.SIGHUP, old_sighup)

@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def _run_child(self, child, terminal_input):
r, w = os.pipe() # Pipe test results from child back to parent
try:
Expand Down
16 changes: 15 additions & 1 deletion Lib/test/test_concurrent_futures/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from concurrent import futures
from operator import add
from test import support
from test.support import Py_GIL_DISABLED
from test.support import Py_GIL_DISABLED, warnings_helper


def mul(x, y):
Expand Down Expand Up @@ -43,10 +43,12 @@ class ExecutorTest:

# Executor.shutdown() and context manager usage is tested by
# ExecutorShutdownTest.
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_submit(self):
future = self.executor.submit(pow, 2, 8)
self.assertEqual(256, future.result())

@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_submit_keyword(self):
future = self.executor.submit(mul, 2, y=8)
self.assertEqual(16, future.result())
Expand All @@ -57,6 +59,7 @@ def test_submit_keyword(self):
with self.assertRaises(TypeError):
self.executor.submit(arg=1)

@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_map(self):
self.assertEqual(
list(self.executor.map(pow, range(10), range(10))),
Expand All @@ -66,13 +69,15 @@ def test_map(self):
list(self.executor.map(pow, range(10), range(10), chunksize=3)),
list(map(pow, range(10), range(10))))

@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_map_exception(self):
i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
self.assertEqual(i.__next__(), (0, 1))
self.assertEqual(i.__next__(), (0, 1))
with self.assertRaises(ZeroDivisionError):
i.__next__()

@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@support.requires_resource('walltime')
def test_map_timeout(self):
results = []
Expand Down Expand Up @@ -108,26 +113,30 @@ def test_map_buffersize_value_validation(self):
):
self.executor.map(str, range(4), buffersize=buffersize)

@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_map_buffersize(self):
ints = range(4)
for buffersize in (1, 2, len(ints), len(ints) * 2):
with self.subTest(buffersize=buffersize):
res = self.executor.map(str, ints, buffersize=buffersize)
self.assertListEqual(list(res), ["0", "1", "2", "3"])

@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_map_buffersize_on_multiple_iterables(self):
ints = range(4)
for buffersize in (1, 2, len(ints), len(ints) * 2):
with self.subTest(buffersize=buffersize):
res = self.executor.map(add, ints, ints, buffersize=buffersize)
self.assertListEqual(list(res), [0, 2, 4, 6])

@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_map_buffersize_on_infinite_iterable(self):
res = self.executor.map(str, itertools.count(), buffersize=2)
self.assertEqual(next(res, None), "0")
self.assertEqual(next(res, None), "1")
self.assertEqual(next(res, None), "2")

@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_map_buffersize_on_multiple_infinite_iterables(self):
res = self.executor.map(
add,
Expand All @@ -147,6 +156,7 @@ def test_map_buffersize_without_iterable(self):
res = self.executor.map(str, buffersize=2)
self.assertIsNone(next(res, None))

@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_map_buffersize_when_buffer_is_full(self):
ints = iter(range(4))
buffersize = 2
Expand All @@ -158,13 +168,15 @@ def test_map_buffersize_when_buffer_is_full(self):
msg="should have fetched only `buffersize` elements from `ints`.",
)

@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_shutdown_race_issue12456(self):
# Issue #12456: race condition at shutdown where trying to post a
# sentinel in the call queue blocks (the queue is full while processes
# have exited).
self.executor.map(str, [2] * (self.worker_count + 1))
self.executor.shutdown()

@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@support.cpython_only
def test_no_stale_references(self):
# Issue #16284: check that the executors don't unnecessarily hang onto
Expand Down Expand Up @@ -209,6 +221,7 @@ def test_max_workers_negative(self):
"than 0"):
self.executor_type(max_workers=number)

@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_free_reference(self):
# Issue #14406: Result iterator should not keep an internal
# reference to result objects.
Expand All @@ -221,6 +234,7 @@ def test_free_reference(self):
if wr() is None:
break

@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_swallows_falsey_exceptions(self):
# see gh-132063: Prevent exceptions that evaluate as falsey
# from being ignored.
Expand Down
4 changes: 4 additions & 0 deletions Lib/test/test_concurrent_futures/test_as_completed.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
CANCELLED_AND_NOTIFIED, FINISHED, Future)

from test import support
from test.support import warnings_helper

from .util import (
PENDING_FUTURE, RUNNING_FUTURE,
Expand All @@ -19,6 +20,7 @@ def mul(x, y):


class AsCompletedTests:
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_no_timeout(self):
future1 = self.executor.submit(mul, 2, 21)
future2 = self.executor.submit(mul, 7, 6)
Expand All @@ -35,6 +37,7 @@ def test_no_timeout(self):
future1, future2]),
completed)

@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_future_times_out(self):
"""Test ``futures.as_completed`` timing out before
completing it's final future."""
Expand Down Expand Up @@ -62,6 +65,7 @@ def test_future_times_out(self):
# Check that ``future`` wasn't completed.
self.assertEqual(completed_futures, already_completed)

@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_duplicate_futures(self):
# Issue 20367. Duplicate futures should not raise exceptions or give
# duplicate responses.
Expand Down
Loading
Loading