diff --git a/Lib/test/libregrtest/cmdline.py b/Lib/test/libregrtest/cmdline.py index 7cd85bf2803aa1..cb09ee0e03b30c 100644 --- a/Lib/test/libregrtest/cmdline.py +++ b/Lib/test/libregrtest/cmdline.py @@ -226,8 +226,9 @@ def _create_parser(): '(instead of the Python stdlib test suite)') group = parser.add_argument_group('Special runs') - group.add_argument('-l', '--findleaks', action='store_true', - help='if GC is available detect tests that leak memory') + group.add_argument('-l', '--findleaks', action='store_const', const=2, + default=1, + help='deprecated alias to --fail-env-changed') group.add_argument('-L', '--runleaks', action='store_true', help='run the leaks(1) command just before exit.' + more_details) @@ -309,7 +310,7 @@ def _parse_args(args, **kwargs): # Defaults ns = argparse.Namespace(testdir=None, verbose=0, quiet=False, exclude=False, single=False, randomize=False, fromfile=None, - findleaks=False, use_resources=None, trace=False, coverdir='coverage', + findleaks=1, use_resources=None, trace=False, coverdir='coverage', runleaks=False, huntrleaks=False, verbose2=False, print_slow=False, random_seed=None, use_mp=None, verbose3=False, forever=False, header=False, failfast=False, match_tests=None, pgo=False) @@ -330,12 +331,13 @@ def _parse_args(args, **kwargs): parser.error("unrecognized arguments: %s" % arg) sys.exit(1) + if ns.findleaks > 1: + # --findleaks implies --fail-env-changed + ns.fail_env_changed = True if ns.single and ns.fromfile: parser.error("-s and -f don't go together!") if ns.use_mp is not None and ns.trace: parser.error("-T and -j don't go together!") - if ns.use_mp is not None and ns.findleaks: - parser.error("-l and -j don't go together!") if ns.failfast and not (ns.verbose or ns.verbose3): parser.error("-G/--failfast needs either -v or -W") if ns.pgo and (ns.verbose or ns.verbose2 or ns.verbose3): diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py index 32ac44029bc3a4..c19ea44db9b287 100644 --- a/Lib/test/libregrtest/main.py +++ b/Lib/test/libregrtest/main.py @@ -20,10 +20,6 @@ from test.libregrtest.setup import setup_tests from test.libregrtest.utils import removepy, count, format_duration, printlist from test import support -try: - import gc -except ImportError: - gc = None # When tests are run from the Python build directory, it is best practice @@ -79,8 +75,8 @@ def __init__(self): self.skipped = [] self.resource_denieds = [] self.environment_changed = [] - self.rerun = [] self.run_no_tests = [] + self.rerun = [] self.first_result = None self.interrupted = False @@ -90,9 +86,6 @@ def __init__(self): # used by --coverage, trace.Trace instance self.tracer = None - # used by --findleaks, store for gc.garbage - self.found_garbage = [] - # used to display the progress bar "[ 3/100]" self.start_time = time.monotonic() self.test_count = '' @@ -105,26 +98,43 @@ def __init__(self): # used by --junit-xml self.testsuite_xml = None - def accumulate_result(self, test, result): - ok, test_time, xml_data = result - if ok not in (CHILD_ERROR, INTERRUPTED): - self.test_times.append((test_time, test)) + self.win_load_tracker = None + + def get_executed(self): + return (set(self.good) | set(self.bad) | set(self.skipped) + | set(self.resource_denieds) | set(self.environment_changed) + | set(self.run_no_tests)) + + def accumulate_result(self, result, rerun=False): + test_name = result.test_name + ok = result.result + + if ok not in (CHILD_ERROR, INTERRUPTED) and not rerun: + self.test_times.append((result.test_time, test_name)) + if ok == PASSED: - self.good.append(test) + self.good.append(test_name) elif ok in (FAILED, CHILD_ERROR): - self.bad.append(test) + if not rerun: + self.bad.append(test_name) elif ok == ENV_CHANGED: - self.environment_changed.append(test) + self.environment_changed.append(test_name) elif ok == SKIPPED: - self.skipped.append(test) + self.skipped.append(test_name) elif ok == RESOURCE_DENIED: - self.skipped.append(test) - self.resource_denieds.append(test) + self.skipped.append(test_name) + self.resource_denieds.append(test_name) elif ok == TEST_DID_NOT_RUN: - self.run_no_tests.append(test) - elif ok != INTERRUPTED: + self.run_no_tests.append(test_name) + elif ok == INTERRUPTED: + self.interrupted = True + else: raise ValueError("invalid test result: %r" % ok) + if rerun and ok not in {FAILED, CHILD_ERROR, INTERRUPTED}: + self.bad.remove(test_name) + + xml_data = result.xml_data if xml_data: import xml.etree.ElementTree as ET for e in xml_data: @@ -134,7 +144,7 @@ def accumulate_result(self, test, result): print(xml_data, file=sys.__stderr__) raise - def display_progress(self, test_index, test): + def display_progress(self, test_index, text): if self.ns.quiet: return @@ -143,12 +153,12 @@ def display_progress(self, test_index, test): fails = len(self.bad) + len(self.environment_changed) if fails and not self.ns.pgo: line = f"{line}/{fails}" - line = f"[{line}] {test}" + line = f"[{line}] {text}" # add the system load prefix: "load avg: 1.80 " - if hasattr(os, 'getloadavg'): - load_avg_1min = os.getloadavg()[0] - line = f"load avg: {load_avg_1min:.2f} {line}" + load_avg = self.getloadavg() + if load_avg is not None: + line = f"load avg: {load_avg:.2f} {line}" # add the timestamp prefix: "0:01:05 " test_time = time.monotonic() - self.start_time @@ -164,22 +174,6 @@ def parse_args(self, kwargs): "faulthandler.dump_traceback_later", file=sys.stderr) ns.timeout = None - if ns.threshold is not None and gc is None: - print('No GC available, ignore --threshold.', file=sys.stderr) - ns.threshold = None - - if ns.findleaks: - if gc is not None: - # Uncomment the line below to report garbage that is not - # freeable by reference counting alone. By default only - # garbage that is not collectable by the GC is reported. - pass - #gc.set_debug(gc.DEBUG_SAVEALL) - else: - print('No GC available, disabling --findleaks', - file=sys.stderr) - ns.findleaks = False - if ns.xmlpath: support.junit_xml_list = self.testsuite_xml = [] @@ -275,13 +269,13 @@ def list_cases(self): support.verbose = False support.set_match_tests(self.ns.match_tests) - for test in self.selected: - abstest = get_abs_module(self.ns, test) + for test_name in self.selected: + abstest = get_abs_module(self.ns, test_name) try: suite = unittest.defaultTestLoader.loadTestsFromName(abstest) self._list_cases(suite) except unittest.SkipTest: - self.skipped.append(test) + self.skipped.append(test_name) if self.skipped: print(file=sys.stderr) @@ -298,23 +292,19 @@ def rerun_failed_tests(self): print() print("Re-running failed tests in verbose mode") self.rerun = self.bad[:] - for test in self.rerun: - print("Re-running test %r in verbose mode" % test, flush=True) - try: - self.ns.verbose = True - ok = runtest(self.ns, test) - except KeyboardInterrupt: - self.interrupted = True - # print a newline separate from the ^C - print() + for test_name in self.rerun: + print(f"Re-running {test_name} in verbose mode", flush=True) + self.ns.verbose = True + result = runtest(self.ns, test_name) + + self.accumulate_result(result, rerun=True) + + if result.result == INTERRUPTED: break - else: - if ok[0] in {PASSED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED}: - self.bad.remove(test) - else: - if self.bad: - print(count(len(self.bad), 'test'), "failed again:") - printlist(self.bad) + + if self.bad: + print(count(len(self.bad), 'test'), "failed again:") + printlist(self.bad) self.display_result() @@ -327,11 +317,11 @@ def display_result(self): print("== Tests result: %s ==" % self.get_tests_result()) if self.interrupted: - print() - # print a newline after ^C print("Test suite interrupted by signal SIGINT.") - executed = set(self.good) | set(self.bad) | set(self.skipped) - omitted = set(self.selected) - executed + + omitted = set(self.selected) - self.get_executed() + if omitted: + print() print(count(len(omitted), "test"), "omitted:") printlist(omitted) @@ -348,8 +338,8 @@ def display_result(self): self.test_times.sort(reverse=True) print() print("10 slowest tests:") - for time, test in self.test_times[:10]: - print("- %s: %s" % (test, format_duration(time))) + for test_time, test in self.test_times[:10]: + print("- %s: %s" % (test, format_duration(test_time))) if self.bad: print() @@ -387,10 +377,10 @@ def run_tests_sequential(self): print("Run tests sequentially") previous_test = None - for test_index, test in enumerate(self.tests, 1): + for test_index, test_name in enumerate(self.tests, 1): start_time = time.monotonic() - text = test + text = test_name if previous_test: text = '%s -- %s' % (text, previous_test) self.display_progress(test_index, text) @@ -398,22 +388,19 @@ def run_tests_sequential(self): if self.tracer: # If we're tracing code coverage, then we don't exit with status # if on a false return value from main. - cmd = ('result = runtest(self.ns, test); ' - 'self.accumulate_result(test, result)') + cmd = ('result = runtest(self.ns, test_name); ' + 'self.accumulate_result(result)') ns = dict(locals()) self.tracer.runctx(cmd, globals=globals(), locals=ns) result = ns['result'] else: - try: - result = runtest(self.ns, test) - except KeyboardInterrupt: - self.interrupted = True - self.accumulate_result(test, (INTERRUPTED, None, None)) - break - else: - self.accumulate_result(test, result) - - previous_test = format_test_result(test, result[0]) + result = runtest(self.ns, test_name) + self.accumulate_result(result) + + if result.result == INTERRUPTED: + break + + previous_test = format_test_result(result) test_time = time.monotonic() - start_time if test_time >= PROGRESS_MIN_TIME: previous_test = "%s in %s" % (previous_test, format_duration(test_time)) @@ -421,16 +408,6 @@ def run_tests_sequential(self): # be quiet: say nothing if the test passed shortly previous_test = None - if self.ns.findleaks: - gc.collect() - if gc.garbage: - print("Warning: test created", len(gc.garbage), end=' ') - print("uncollectable object(s).") - # move the uncollectable objects somewhere so we don't see - # them again - self.found_garbage.extend(gc.garbage) - del gc.garbage[:] - # Unload the newly imported modules (best effort finalization) for module in sys.modules.keys(): if module not in save_modules and module.startswith("test."): @@ -441,8 +418,8 @@ def run_tests_sequential(self): def _test_forever(self, tests): while True: - for test in tests: - yield test + for test_name in tests: + yield test_name if self.bad: return if self.ns.fail_env_changed and self.environment_changed: @@ -515,6 +492,10 @@ def run_tests(self): self.run_tests_sequential() def finalize(self): + if self.win_load_tracker is not None: + self.win_load_tracker.close() + self.win_load_tracker = None + if self.next_single_filename: if self.next_single_test: with open(self.next_single_filename, 'w') as fp: @@ -585,6 +566,15 @@ def main(self, tests=None, **kwargs): with support.temp_cwd(test_cwd, quiet=True): self._main(tests, kwargs) + def getloadavg(self): + if self.win_load_tracker is not None: + return self.win_load_tracker.getloadavg() + + if hasattr(os, 'getloadavg'): + return os.getloadavg()[0] + + return None + def _main(self, tests, kwargs): if self.ns.huntrleaks: warmup, repetitions, _ = self.ns.huntrleaks @@ -616,6 +606,18 @@ def _main(self, tests, kwargs): self.list_cases() sys.exit(0) + # If we're on windows and this is the parent runner (not a worker), + # track the load average. + if sys.platform == 'win32' and (self.ns.worker_args is None): + from test.libregrtest.win_utils import WindowsLoadTracker + + try: + self.win_load_tracker = WindowsLoadTracker() + except FileNotFoundError as error: + # Windows IoT Core and Windows Nano Server do not provide + # typeperf.exe for x64, x86 or ARM + print(f'Failed to create WindowsLoadTracker: {error}') + self.run_tests() self.display_result() diff --git a/Lib/test/libregrtest/refleak.py b/Lib/test/libregrtest/refleak.py index 235d6bfd3af621..8d221232eb6ce7 100644 --- a/Lib/test/libregrtest/refleak.py +++ b/Lib/test/libregrtest/refleak.py @@ -1,4 +1,3 @@ -import errno import os import re import sys @@ -18,7 +17,7 @@ def _get_dump(cls): cls._abc_negative_cache, cls._abc_negative_cache_version) -def dash_R(ns, the_module, test_name, test_func): +def dash_R(ns, test_name, test_func): """Run a test multiple times, looking for reference leaks. Returns: diff --git a/Lib/test/libregrtest/runtest.py b/Lib/test/libregrtest/runtest.py index 99486c72db3e94..a9574929a4cda6 100644 --- a/Lib/test/libregrtest/runtest.py +++ b/Lib/test/libregrtest/runtest.py @@ -1,4 +1,7 @@ +import collections import faulthandler +import functools +import gc import importlib import io import os @@ -6,9 +9,11 @@ import time import traceback import unittest + from test import support from test.libregrtest.refleak import dash_R, clear_caches from test.libregrtest.save_env import saved_test_environment +from test.libregrtest.utils import print_warning # Test result constants. @@ -55,9 +60,17 @@ NOTTESTS = set() -def format_test_result(test_name, result): - fmt = _FORMAT_TEST_RESULT.get(result, "%s") - return fmt % test_name +# used by --findleaks, store for gc.garbage +FOUND_GARBAGE = [] + + +def format_test_result(result): + fmt = _FORMAT_TEST_RESULT.get(result.result, "%s") + return fmt % result.test_name + + +def findtestdir(path=None): + return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir def findtests(testdir=None, stdtests=STDTESTS, nottests=NOTTESTS): @@ -73,48 +86,34 @@ def findtests(testdir=None, stdtests=STDTESTS, nottests=NOTTESTS): return stdtests + sorted(tests) -def get_abs_module(ns, test): - if test.startswith('test.') or ns.testdir: - return test +def get_abs_module(ns, test_name): + if test_name.startswith('test.') or ns.testdir: + return test_name else: - # Always import it from the test package - return 'test.' + test - - -def runtest(ns, test): - """Run a single test. + # Import it from the test package + return 'test.' + test_name - ns -- regrtest namespace of options - test -- the name of the test - Returns the tuple (result, test_time, xml_data), where result is one - of the constants: +TestResult = collections.namedtuple('TestResult', + 'test_name result test_time xml_data') - INTERRUPTED KeyboardInterrupt when run under -j - RESOURCE_DENIED test skipped because resource denied - SKIPPED test skipped for some other reason - ENV_CHANGED test failed because it changed the execution environment - FAILED test failed - PASSED test passed - EMPTY_TEST_SUITE test ran no subtests. - - If ns.xmlpath is not None, xml_data is a list containing each - generated testsuite element. - """ +def _runtest(ns, test_name): + # Handle faulthandler timeout, capture stdout+stderr, XML serialization + # and measure time. output_on_failure = ns.verbose3 use_timeout = (ns.timeout is not None) if use_timeout: faulthandler.dump_traceback_later(ns.timeout, exit=True) + + start_time = time.perf_counter() try: support.set_match_tests(ns.match_tests) - # reset the environment_altered flag to detect if a test altered - # the environment - support.environment_altered = False support.junit_xml_list = xml_list = [] if ns.xmlpath else None if ns.failfast: support.failfast = True + if output_on_failure: support.verbose = True @@ -124,8 +123,9 @@ def runtest(ns, test): try: sys.stdout = stream sys.stderr = stream - result = runtest_inner(ns, test, display_failure=False) - if result[0] != PASSED: + result = _runtest_inner(ns, test_name, + display_failure=False) + if result != PASSED: output = stream.getvalue() orig_stderr.write(output) orig_stderr.flush() @@ -133,98 +133,164 @@ def runtest(ns, test): sys.stdout = orig_stdout sys.stderr = orig_stderr else: - support.verbose = ns.verbose # Tell tests to be moderately quiet - result = runtest_inner(ns, test, display_failure=not ns.verbose) + # Tell tests to be moderately quiet + support.verbose = ns.verbose + + result = _runtest_inner(ns, test_name, + display_failure=not ns.verbose) if xml_list: import xml.etree.ElementTree as ET xml_data = [ET.tostring(x).decode('us-ascii') for x in xml_list] else: xml_data = None - return result + (xml_data,) + + test_time = time.perf_counter() - start_time + + return TestResult(test_name, result, test_time, xml_data) finally: if use_timeout: faulthandler.cancel_dump_traceback_later() - cleanup_test_droppings(test, ns.verbose) support.junit_xml_list = None -def post_test_cleanup(): +def runtest(ns, test_name): + """Run a single test. + + ns -- regrtest namespace of options + test_name -- the name of the test + + Returns the tuple (result, test_time, xml_data), where result is one + of the constants: + + INTERRUPTED KeyboardInterrupt + RESOURCE_DENIED test skipped because resource denied + SKIPPED test skipped for some other reason + ENV_CHANGED test failed because it changed the execution environment + FAILED test failed + PASSED test passed + EMPTY_TEST_SUITE test ran no subtests. + + If ns.xmlpath is not None, xml_data is a list containing each + generated testsuite element. + """ + try: + return _runtest(ns, test_name) + except: + if not ns.pgo: + msg = traceback.format_exc() + print(f"test {test_name} crashed -- {msg}", + file=sys.stderr, flush=True) + return TestResult(test_name, FAILED, 0.0, None) + + +def _test_module(the_module): + loader = unittest.TestLoader() + tests = loader.loadTestsFromModule(the_module) + for error in loader.errors: + print(error, file=sys.stderr) + if loader.errors: + raise Exception("errors while loading tests") + support.run_unittest(tests) + + +def _runtest_inner2(ns, test_name): + # Load the test function, run the test function, handle huntrleaks + # and findleaks to detect leaks + + abstest = get_abs_module(ns, test_name) + + # remove the module from sys.module to reload it if it was already imported + support.unload(abstest) + + the_module = importlib.import_module(abstest) + + # If the test has a test_main, that will run the appropriate + # tests. If not, use normal unittest test loading. + test_runner = getattr(the_module, "test_main", None) + if test_runner is None: + test_runner = functools.partial(_test_module, the_module) + + try: + if ns.huntrleaks: + # Return True if the test leaked references + refleak = dash_R(ns, test_name, test_runner) + else: + test_runner() + refleak = False + finally: + cleanup_test_droppings(test_name, ns.verbose) + + support.gc_collect() + + if gc.garbage: + support.environment_altered = True + print_warning(f"{test_name} created {len(gc.garbage)} " + f"uncollectable object(s).") + + # move the uncollectable objects somewhere, + # so we don't see them again + FOUND_GARBAGE.extend(gc.garbage) + gc.garbage.clear() + support.reap_children() + return refleak + + +def _runtest_inner(ns, test_name, display_failure=True): + # Detect environment changes, handle exceptions. -def runtest_inner(ns, test, display_failure=True): - support.unload(test) + # Reset the environment_altered flag to detect if a test altered + # the environment + support.environment_altered = False + + if ns.pgo: + display_failure = False - test_time = 0.0 - refleak = False # True if the test leaked references. try: - abstest = get_abs_module(ns, test) clear_caches() - with saved_test_environment(test, ns.verbose, ns.quiet, pgo=ns.pgo) as environment: - start_time = time.perf_counter() - the_module = importlib.import_module(abstest) - # If the test has a test_main, that will run the appropriate - # tests. If not, use normal unittest test loading. - test_runner = getattr(the_module, "test_main", None) - if test_runner is None: - def test_runner(): - loader = unittest.TestLoader() - tests = loader.loadTestsFromModule(the_module) - for error in loader.errors: - print(error, file=sys.stderr) - if loader.errors: - raise Exception("errors while loading tests") - support.run_unittest(tests) - if ns.huntrleaks: - refleak = dash_R(ns, the_module, test, test_runner) - else: - test_runner() - test_time = time.perf_counter() - start_time - post_test_cleanup() + + with saved_test_environment(test_name, ns.verbose, ns.quiet, pgo=ns.pgo) as environment: + refleak = _runtest_inner2(ns, test_name) except support.ResourceDenied as msg: if not ns.quiet and not ns.pgo: - print(test, "skipped --", msg, flush=True) - return RESOURCE_DENIED, test_time + print(f"{test_name} skipped -- {msg}", flush=True) + return RESOURCE_DENIED except unittest.SkipTest as msg: if not ns.quiet and not ns.pgo: - print(test, "skipped --", msg, flush=True) - return SKIPPED, test_time - except KeyboardInterrupt: - raise - except support.TestFailed as msg: - if not ns.pgo: - if display_failure: - print("test", test, "failed --", msg, file=sys.stderr, - flush=True) - else: - print("test", test, "failed", file=sys.stderr, flush=True) - return FAILED, test_time + print(f"{test_name} skipped -- {msg}", flush=True) + return SKIPPED + except support.TestFailed as exc: + msg = f"test {test_name} failed" + if display_failure: + msg = f"{msg} -- {exc}" + print(msg, file=sys.stderr, flush=True) + return FAILED except support.TestDidNotRun: - return TEST_DID_NOT_RUN, test_time + return TEST_DID_NOT_RUN + except KeyboardInterrupt: + print() + return INTERRUPTED except: - msg = traceback.format_exc() if not ns.pgo: - print("test", test, "crashed --", msg, file=sys.stderr, - flush=True) - return FAILED, test_time - else: - if refleak: - return FAILED, test_time - if environment.changed: - return ENV_CHANGED, test_time - return PASSED, test_time + msg = traceback.format_exc() + print(f"test {test_name} crashed -- {msg}", + file=sys.stderr, flush=True) + return FAILED + if refleak: + return FAILED + if environment.changed: + return ENV_CHANGED + return PASSED -def cleanup_test_droppings(testname, verbose): - import shutil - import stat - import gc +def cleanup_test_droppings(test_name, verbose): # First kill any dangling references to open files etc. # This can also issue some ResourceWarnings which would otherwise get # triggered during the following test run, and possibly produce failures. - gc.collect() + support.gc_collect() # Try to clean up junk commonly left behind. While tests shouldn't leave # any files or directories behind, when a test fails that can be tedious @@ -239,25 +305,23 @@ def cleanup_test_droppings(testname, verbose): continue if os.path.isdir(name): + import shutil kind, nuker = "directory", shutil.rmtree elif os.path.isfile(name): kind, nuker = "file", os.unlink else: - raise SystemError("os.path says %r exists but is neither " - "directory nor file" % name) + raise RuntimeError(f"os.path says {name!r} exists but is neither " + f"directory nor file") if verbose: - print("%r left behind %s %r" % (testname, kind, name)) + print_warning("%r left behind %s %r" % (test_name, kind, name)) + support.environment_altered = True + try: - # if we have chmod, fix possible permissions problems - # that might prevent cleanup - if (hasattr(os, 'chmod')): - os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + import stat + # fix possible permissions problems that might prevent cleanup + os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) nuker(name) - except Exception as msg: - print(("%r left behind %s %r and it couldn't be " - "removed: %s" % (testname, kind, name, msg)), file=sys.stderr) - - -def findtestdir(path=None): - return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir + except Exception as exc: + print_warning(f"{test_name} left behind {kind} {name!r} " + f"and it couldn't be removed: {exc}") diff --git a/Lib/test/libregrtest/runtest_mp.py b/Lib/test/libregrtest/runtest_mp.py index 6190574afdf8b3..dbab6954de867f 100644 --- a/Lib/test/libregrtest/runtest_mp.py +++ b/Lib/test/libregrtest/runtest_mp.py @@ -1,7 +1,9 @@ +import collections import faulthandler import json import os import queue +import subprocess import sys import threading import time @@ -11,7 +13,7 @@ from test.libregrtest.runtest import ( runtest, INTERRUPTED, CHILD_ERROR, PROGRESS_MIN_TIME, - format_test_result) + format_test_result, TestResult) from test.libregrtest.setup import setup_tests from test.libregrtest.utils import format_duration @@ -19,20 +21,12 @@ # Display the running tests if nothing happened last N seconds PROGRESS_UPDATE = 30.0 # seconds -# If interrupted, display the wait progress every N seconds -WAIT_PROGRESS = 2.0 # seconds +def must_stop(result): + return result.result in (INTERRUPTED, CHILD_ERROR) -def run_test_in_subprocess(testname, ns): - """Run the given test in a subprocess with --worker-args. - - ns is the option Namespace parsed from command-line arguments. regrtest - is invoked in a subprocess with the --worker-args argument; when the - subprocess exits, its return code, stdout and stderr are returned as a - 3-tuple. - """ - from subprocess import Popen, PIPE +def run_test_in_subprocess(testname, ns): ns_dict = vars(ns) worker_args = (ns_dict, testname) worker_args = json.dumps(worker_args) @@ -47,15 +41,12 @@ def run_test_in_subprocess(testname, ns): # Running the child from the same working directory as regrtest's original # invocation ensures that TEMPDIR for the child is the same when # sysconfig.is_python_build() is true. See issue 15300. - popen = Popen(cmd, - stdout=PIPE, stderr=PIPE, - universal_newlines=True, - close_fds=(os.name != 'nt'), - cwd=support.SAVEDCWD) - with popen: - stdout, stderr = popen.communicate() - retcode = popen.wait() - return retcode, stdout, stderr + return subprocess.Popen(cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, + close_fds=(os.name != 'nt'), + cwd=support.SAVEDCWD) def run_tests_worker(worker_args): @@ -64,14 +55,7 @@ def run_tests_worker(worker_args): setup_tests(ns) - try: - result = runtest(ns, testname) - except KeyboardInterrupt: - result = INTERRUPTED, '', None - except BaseException as e: - traceback.print_exc() - result = CHILD_ERROR, str(e) - + result = runtest(ns, testname) print() # Force a newline (just in case) print(json.dumps(result), flush=True) sys.exit(0) @@ -83,7 +67,6 @@ class MultiprocessIterator: """A thread-safe iterator over tests for multiprocess mode.""" def __init__(self, tests): - self.interrupted = False self.lock = threading.Lock() self.tests = tests @@ -92,152 +75,213 @@ def __iter__(self): def __next__(self): with self.lock: - if self.interrupted: - raise StopIteration('tests interrupted') return next(self.tests) +MultiprocessResult = collections.namedtuple('MultiprocessResult', + 'result stdout stderr error_msg') + class MultiprocessThread(threading.Thread): def __init__(self, pending, output, ns): super().__init__() self.pending = pending self.output = output self.ns = ns - self.current_test = None + self.current_test_name = None self.start_time = None + self._popen = None - def _runtest(self): - try: - test = next(self.pending) - except StopIteration: - self.output.put((None, None, None, None)) - return True + def kill(self): + if not self.is_alive(): + return + if self._popen is not None: + self._popen.kill() + def _runtest(self, test_name): try: self.start_time = time.monotonic() - self.current_test = test - - retcode, stdout, stderr = run_test_in_subprocess(test, self.ns) + self.current_test_name = test_name + + popen = run_test_in_subprocess(test_name, self.ns) + self._popen = popen + with popen: + try: + stdout, stderr = popen.communicate() + except: + popen.kill() + popen.wait() + raise + + retcode = popen.wait() finally: - self.current_test = None + self.current_test_name = None + self._popen = None - if retcode != 0: - result = (CHILD_ERROR, "Exit code %s" % retcode, None) - self.output.put((test, stdout.rstrip(), stderr.rstrip(), - result)) - return False - - stdout, _, result = stdout.strip().rpartition("\n") - if not result: - self.output.put((None, None, None, None)) - return True + stdout = stdout.strip() + stderr = stderr.rstrip() - result = json.loads(result) - assert len(result) == 3, f"Invalid result tuple: {result!r}" - self.output.put((test, stdout.rstrip(), stderr.rstrip(), - result)) - return False + err_msg = None + if retcode != 0: + err_msg = "Exit code %s" % retcode + else: + stdout, _, result = stdout.rpartition("\n") + stdout = stdout.rstrip() + if not result: + err_msg = "Failed to parse worker stdout" + else: + try: + # deserialize run_tests_worker() output + result = json.loads(result) + result = TestResult(*result) + except Exception as exc: + err_msg = "Failed to parse worker JSON: %s" % exc + + if err_msg is not None: + test_time = time.monotonic() - self.start_time + result = TestResult(test_name, CHILD_ERROR, test_time, None) + + return MultiprocessResult(result, stdout, stderr, err_msg) def run(self): - try: - stop = False - while not stop: - stop = self._runtest() - except BaseException: - self.output.put((None, None, None, None)) - raise + while True: + try: + try: + test_name = next(self.pending) + except StopIteration: + break + mp_result = self._runtest(test_name) + self.output.put((False, mp_result)) -def run_tests_multiprocess(regrtest): - output = queue.Queue() - pending = MultiprocessIterator(regrtest.tests) - test_timeout = regrtest.ns.timeout - use_timeout = (test_timeout is not None) - - workers = [MultiprocessThread(pending, output, regrtest.ns) - for i in range(regrtest.ns.use_mp)] - print("Run tests in parallel using %s child processes" - % len(workers)) + if must_stop(mp_result.result): + break + except BaseException: + self.output.put((True, traceback.format_exc())) + break + + +def get_running(workers): + running = [] for worker in workers: - worker.start() - - def get_running(workers): - running = [] - for worker in workers: - current_test = worker.current_test - if not current_test: - continue - dt = time.monotonic() - worker.start_time - if dt >= PROGRESS_MIN_TIME: - text = '%s (%s)' % (current_test, format_duration(dt)) - running.append(text) - return running - - finished = 0 - test_index = 1 - get_timeout = max(PROGRESS_UPDATE, PROGRESS_MIN_TIME) - try: - while finished < regrtest.ns.use_mp: - if use_timeout: - faulthandler.dump_traceback_later(test_timeout, exit=True) + current_test_name = worker.current_test_name + if not current_test_name: + continue + dt = time.monotonic() - worker.start_time + if dt >= PROGRESS_MIN_TIME: + text = '%s (%s)' % (current_test_name, format_duration(dt)) + running.append(text) + return running + + +class MultiprocessRunner: + def __init__(self, regrtest): + self.regrtest = regrtest + self.ns = regrtest.ns + self.output = queue.Queue() + self.pending = MultiprocessIterator(self.regrtest.tests) + if self.ns.timeout is not None: + self.test_timeout = self.ns.timeout * 1.5 + else: + self.test_timeout = None + self.workers = None + + def start_workers(self): + self.workers = [MultiprocessThread(self.pending, self.output, self.ns) + for _ in range(self.ns.use_mp)] + print("Run tests in parallel using %s child processes" + % len(self.workers)) + for worker in self.workers: + worker.start() + + def wait_workers(self): + for worker in self.workers: + worker.kill() + for worker in self.workers: + worker.join() + + def _get_result(self): + if not any(worker.is_alive() for worker in self.workers): + # all worker threads are done: consume pending results + try: + return self.output.get(timeout=0) + except queue.Empty: + return None + + while True: + if self.test_timeout is not None: + faulthandler.dump_traceback_later(self.test_timeout, exit=True) + # wait for a thread + timeout = max(PROGRESS_UPDATE, PROGRESS_MIN_TIME) try: - item = output.get(timeout=get_timeout) + return self.output.get(timeout=timeout) except queue.Empty: - running = get_running(workers) - if running and not regrtest.ns.pgo: - print('running: %s' % ', '.join(running), flush=True) - continue - - test, stdout, stderr, result = item - if test is None: - finished += 1 - continue - regrtest.accumulate_result(test, result) - - # Display progress - ok, test_time, xml_data = result - text = format_test_result(test, ok) - if (ok not in (CHILD_ERROR, INTERRUPTED) - and test_time >= PROGRESS_MIN_TIME - and not regrtest.ns.pgo): - text += ' (%s)' % format_duration(test_time) - elif ok == CHILD_ERROR: - text = '%s (%s)' % (text, test_time) - running = get_running(workers) - if running and not regrtest.ns.pgo: - text += ' -- running: %s' % ', '.join(running) - regrtest.display_progress(test_index, text) - - # Copy stdout and stderr from the child process - if stdout: - print(stdout, flush=True) - if stderr and not regrtest.ns.pgo: - print(stderr, file=sys.stderr, flush=True) - - if result[0] == INTERRUPTED: - raise KeyboardInterrupt - test_index += 1 - except KeyboardInterrupt: - regrtest.interrupted = True - pending.interrupted = True - print() - finally: - if use_timeout: - faulthandler.cancel_dump_traceback_later() - - # If tests are interrupted, wait until tests complete - wait_start = time.monotonic() - while True: - running = [worker.current_test for worker in workers] - running = list(filter(bool, running)) - if not running: - break - - dt = time.monotonic() - wait_start - line = "Waiting for %s (%s tests)" % (', '.join(running), len(running)) - if dt >= WAIT_PROGRESS: - line = "%s since %.0f sec" % (line, dt) - print(line, flush=True) - for worker in workers: - worker.join(WAIT_PROGRESS) + pass + + # display progress + running = get_running(self.workers) + if running and not self.ns.pgo: + print('running: %s' % ', '.join(running), flush=True) + + def display_result(self, mp_result): + result = mp_result.result + + text = format_test_result(result) + if mp_result.error_msg is not None: + # CHILD_ERROR + text += ' (%s)' % mp_result.error_msg + elif (result.test_time >= PROGRESS_MIN_TIME and not self.ns.pgo): + text += ' (%s)' % format_duration(result.test_time) + running = get_running(self.workers) + if running and not self.ns.pgo: + text += ' -- running: %s' % ', '.join(running) + self.regrtest.display_progress(self.test_index, text) + + def _process_result(self, item): + if item[0]: + # Thread got an exception + format_exc = item[1] + print(f"regrtest worker thread failed: {format_exc}", + file=sys.stderr, flush=True) + return True + + self.test_index += 1 + mp_result = item[1] + self.regrtest.accumulate_result(mp_result.result) + self.display_result(mp_result) + + if mp_result.stdout: + print(mp_result.stdout, flush=True) + if mp_result.stderr and not self.ns.pgo: + print(mp_result.stderr, file=sys.stderr, flush=True) + + if must_stop(mp_result.result): + return True + + return False + + def run_tests(self): + self.start_workers() + + self.test_index = 0 + try: + while True: + item = self._get_result() + if item is None: + break + + stop = self._process_result(item) + if stop: + break + except KeyboardInterrupt: + print() + self.regrtest.interrupted = True + finally: + if self.test_timeout is not None: + faulthandler.cancel_dump_traceback_later() + + self.wait_workers() + + +def run_tests_multiprocess(regrtest): + MultiprocessRunner(regrtest).run_tests() diff --git a/Lib/test/libregrtest/save_env.py b/Lib/test/libregrtest/save_env.py index 45b365d456336a..e133c3f1c765a9 100644 --- a/Lib/test/libregrtest/save_env.py +++ b/Lib/test/libregrtest/save_env.py @@ -8,6 +8,7 @@ import threading import warnings from test import support +from test.libregrtest.utils import print_warning try: import _multiprocessing, multiprocessing.process except ImportError: @@ -276,8 +277,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): self.changed = True restore(original) if not self.quiet and not self.pgo: - print(f"Warning -- {name} was modified by {self.testname}", - file=sys.stderr, flush=True) + print_warning(f"{name} was modified by {self.testname}") print(f" Before: {original}\n After: {current} ", file=sys.stderr, flush=True) return False diff --git a/Lib/test/libregrtest/utils.py b/Lib/test/libregrtest/utils.py index d36bf9196626db..fb9971a64f66c8 100644 --- a/Lib/test/libregrtest/utils.py +++ b/Lib/test/libregrtest/utils.py @@ -1,5 +1,6 @@ -import os.path import math +import os.path +import sys import textwrap @@ -54,3 +55,7 @@ def printlist(x, width=70, indent=4, file=None): print(textwrap.fill(' '.join(str(elt) for elt in sorted(x)), width, initial_indent=blanks, subsequent_indent=blanks), file=file) + + +def print_warning(msg): + print(f"Warning -- {msg}", file=sys.stderr, flush=True) diff --git a/Lib/test/libregrtest/win_utils.py b/Lib/test/libregrtest/win_utils.py new file mode 100644 index 00000000000000..adfe278ba39bf2 --- /dev/null +++ b/Lib/test/libregrtest/win_utils.py @@ -0,0 +1,105 @@ +import _winapi +import msvcrt +import os +import subprocess +import uuid +from test import support + + +# Max size of asynchronous reads +BUFSIZE = 8192 +# Exponential damping factor (see below) +LOAD_FACTOR_1 = 0.9200444146293232478931553241 +# Seconds per measurement +SAMPLING_INTERVAL = 5 +COUNTER_NAME = r'\System\Processor Queue Length' + + +class WindowsLoadTracker(): + """ + This class asynchronously interacts with the `typeperf` command to read + the system load on Windows. Mulitprocessing and threads can't be used + here because they interfere with the test suite's cases for those + modules. + """ + + def __init__(self): + self.load = 0.0 + self.start() + + def start(self): + # Create a named pipe which allows for asynchronous IO in Windows + pipe_name = r'\\.\pipe\typeperf_output_' + str(uuid.uuid4()) + + open_mode = _winapi.PIPE_ACCESS_INBOUND + open_mode |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE + open_mode |= _winapi.FILE_FLAG_OVERLAPPED + + # This is the read end of the pipe, where we will be grabbing output + self.pipe = _winapi.CreateNamedPipe( + pipe_name, open_mode, _winapi.PIPE_WAIT, + 1, BUFSIZE, BUFSIZE, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL + ) + # The write end of the pipe which is passed to the created process + pipe_write_end = _winapi.CreateFile( + pipe_name, _winapi.GENERIC_WRITE, 0, _winapi.NULL, + _winapi.OPEN_EXISTING, 0, _winapi.NULL + ) + # Open up the handle as a python file object so we can pass it to + # subprocess + command_stdout = msvcrt.open_osfhandle(pipe_write_end, 0) + + # Connect to the read end of the pipe in overlap/async mode + overlap = _winapi.ConnectNamedPipe(self.pipe, overlapped=True) + overlap.GetOverlappedResult(True) + + # Spawn off the load monitor + command = ['typeperf', COUNTER_NAME, '-si', str(SAMPLING_INTERVAL)] + self.p = subprocess.Popen(command, stdout=command_stdout, cwd=support.SAVEDCWD) + + # Close our copy of the write end of the pipe + os.close(command_stdout) + + def close(self): + if self.p is None: + return + self.p.kill() + self.p.wait() + self.p = None + + def __del__(self): + self.close() + + def read_output(self): + import _winapi + + overlapped, _ = _winapi.ReadFile(self.pipe, BUFSIZE, True) + bytes_read, res = overlapped.GetOverlappedResult(False) + if res != 0: + return + + return overlapped.getbuffer().decode() + + def getloadavg(self): + typeperf_output = self.read_output() + # Nothing to update, just return the current load + if not typeperf_output: + return self.load + + # Process the backlog of load values + for line in typeperf_output.splitlines(): + # typeperf outputs in a CSV format like this: + # "07/19/2018 01:32:26.605","3.000000" + toks = line.split(',') + # Ignore blank lines and the initial header + if line.strip() == '' or (COUNTER_NAME in line) or len(toks) != 2: + continue + + load = float(toks[1].replace('"', '')) + # We use an exponentially weighted moving average, imitating the + # load calculation on Unix systems. + # https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation + new_load = self.load * LOAD_FACTOR_1 + load * (1.0 - LOAD_FACTOR_1) + self.load = new_load + + return self.load diff --git a/Lib/test/test_regrtest.py b/Lib/test/test_regrtest.py index a67458313addb2..4c6152153668d5 100644 --- a/Lib/test/test_regrtest.py +++ b/Lib/test/test_regrtest.py @@ -21,7 +21,7 @@ from test.libregrtest import utils -Py_DEBUG = hasattr(sys, 'getobjects') +Py_DEBUG = hasattr(sys, 'gettotalrefcount') ROOT_DIR = os.path.join(os.path.dirname(__file__), '..', '..') ROOT_DIR = os.path.abspath(os.path.normpath(ROOT_DIR)) @@ -109,7 +109,7 @@ def test_quiet(self): self.assertTrue(ns.quiet) self.assertEqual(ns.verbose, 0) - def test_slow(self): + def test_slowest(self): for opt in '-o', '--slowest': with self.subTest(opt=opt): ns = libregrtest._parse_args([opt]) @@ -255,9 +255,7 @@ def test_multiprocess(self): self.checkError([opt], 'expected one argument') self.checkError([opt, 'foo'], 'invalid int value') self.checkError([opt, '2', '-T'], "don't go together") - self.checkError([opt, '2', '-l'], "don't go together") self.checkError([opt, '0', '-T'], "don't go together") - self.checkError([opt, '0', '-l'], "don't go together") def test_coverage(self): for opt in '-T', '--coverage': @@ -454,8 +452,8 @@ def list_regex(line_format, tests): regex = list_regex('%s re-run test%s', rerun) self.check_line(output, regex) self.check_line(output, "Re-running failed tests in verbose mode") - for name in rerun: - regex = "Re-running test %r in verbose mode" % name + for test_name in rerun: + regex = f"Re-running {test_name} in verbose mode" self.check_line(output, regex) if no_test_ran: @@ -487,7 +485,7 @@ def list_regex(line_format, tests): result.append('SUCCESS') result = ', '.join(result) if rerun: - self.check_line(output, 'Tests result: %s' % result) + self.check_line(output, 'Tests result: FAILURE') result = 'FAILURE then %s' % result self.check_line(output, 'Tests result: %s' % result) @@ -781,22 +779,23 @@ def test_slowest(self): % (self.TESTNAME_REGEX, len(tests))) self.check_line(output, regex) - def test_slow_interrupted(self): + def test_slowest_interrupted(self): # Issue #25373: test --slowest with an interrupted test code = TEST_INTERRUPTED test = self.create_test("sigint", code=code) for multiprocessing in (False, True): - if multiprocessing: - args = ("--slowest", "-j2", test) - else: - args = ("--slowest", test) - output = self.run_tests(*args, exitcode=130) - self.check_executed_tests(output, test, - omitted=test, interrupted=True) - - regex = ('10 slowest tests:\n') - self.check_line(output, regex) + with self.subTest(multiprocessing=multiprocessing): + if multiprocessing: + args = ("--slowest", "-j2", test) + else: + args = ("--slowest", test) + output = self.run_tests(*args, exitcode=130) + self.check_executed_tests(output, test, + omitted=test, interrupted=True) + + regex = ('10 slowest tests:\n') + self.check_line(output, regex) def test_coverage(self): # test --coverage @@ -915,13 +914,13 @@ def test_method2(self): testname) self.assertEqual(output.splitlines(), all_methods) + @support.cpython_only def test_crashed(self): # Any code which causes a crash code = 'import faulthandler; faulthandler._sigsegv()' crash_test = self.create_test(name="crash", code=code) - ok_test = self.create_test(name="ok") - tests = [crash_test, ok_test] + tests = [crash_test] output = self.run_tests("-j2", *tests, exitcode=2) self.check_executed_tests(output, tests, failed=crash_test, randomize=True) @@ -991,6 +990,7 @@ def test_env_changed(self): fail_env_changed=True) def test_rerun_fail(self): + # FAILURE then FAILURE code = textwrap.dedent(""" import unittest @@ -1005,6 +1005,26 @@ def test_bug(self): self.check_executed_tests(output, [testname], failed=testname, rerun=testname) + def test_rerun_success(self): + # FAILURE then SUCCESS + code = textwrap.dedent(""" + import builtins + import unittest + + class Tests(unittest.TestCase): + failed = False + + def test_fail_once(self): + if not hasattr(builtins, '_test_failed'): + builtins._test_failed = True + self.fail("bug") + """) + testname = self.create_test(code=code) + + output = self.run_tests("-w", testname, exitcode=0) + self.check_executed_tests(output, [testname], + rerun=testname) + def test_no_tests_ran(self): code = textwrap.dedent(""" import unittest @@ -1069,6 +1089,38 @@ def test_other_bug(self): self.check_executed_tests(output, [testname, testname2], no_test_ran=[testname]) + @support.cpython_only + def test_findleaks(self): + code = textwrap.dedent(r""" + import _testcapi + import gc + import unittest + + @_testcapi.with_tp_del + class Garbage: + def __tp_del__(self): + pass + + class Tests(unittest.TestCase): + def test_garbage(self): + # create an uncollectable object + obj = Garbage() + obj.ref_cycle = obj + obj = None + """) + testname = self.create_test(code=code) + + output = self.run_tests("--fail-env-changed", testname, exitcode=3) + self.check_executed_tests(output, [testname], + env_changed=[testname], + fail_env_changed=True) + + # --findleaks is now basically an alias to --fail-env-changed + output = self.run_tests("--findleaks", testname, exitcode=3) + self.check_executed_tests(output, [testname], + env_changed=[testname], + fail_env_changed=True) + class TestUtils(unittest.TestCase): def test_format_duration(self): diff --git a/Misc/NEWS.d/next/Library/2019-01-21-13-56-55.bpo-35802.6633PE.rst b/Misc/NEWS.d/next/Library/2019-01-21-13-56-55.bpo-35802.6633PE.rst new file mode 100644 index 00000000000000..8b73d2bd5851fd --- /dev/null +++ b/Misc/NEWS.d/next/Library/2019-01-21-13-56-55.bpo-35802.6633PE.rst @@ -0,0 +1,2 @@ +Clean up code which checked presence of ``os.stat`` / ``os.lstat`` / +``os.chmod`` which are always present. Patch by Anthony Sottile. diff --git a/Misc/NEWS.d/next/Tests/2019-04-26-04-12-29.bpo-36725.B8-ghi.rst b/Misc/NEWS.d/next/Tests/2019-04-26-04-12-29.bpo-36725.B8-ghi.rst new file mode 100644 index 00000000000000..b632c46d2b674f --- /dev/null +++ b/Misc/NEWS.d/next/Tests/2019-04-26-04-12-29.bpo-36725.B8-ghi.rst @@ -0,0 +1,3 @@ +When using mulitprocessing mode (-jN), regrtest now better reports errors if +a worker process fails, and it exits immediately on a worker thread failure +or when interrupted. diff --git a/Misc/NEWS.d/next/Tests/2019-04-26-09-02-49.bpo-36719.ys2uqH.rst b/Misc/NEWS.d/next/Tests/2019-04-26-09-02-49.bpo-36719.ys2uqH.rst new file mode 100644 index 00000000000000..4b6ef76bc6d69a --- /dev/null +++ b/Misc/NEWS.d/next/Tests/2019-04-26-09-02-49.bpo-36719.ys2uqH.rst @@ -0,0 +1,4 @@ +regrtest now always detects uncollectable objects. Previously, the check was +only enabled by ``--findleaks``. The check now also works with +``-jN/--multiprocess N``. ``--findleaks`` becomes a deprecated alias to +``--fail-env-changed``. diff --git a/Misc/NEWS.d/next/Windows/2018-07-20-13-09-19.bpo-34060.v-z87j.rst b/Misc/NEWS.d/next/Windows/2018-07-20-13-09-19.bpo-34060.v-z87j.rst new file mode 100644 index 00000000000000..b77d805b7f2aff --- /dev/null +++ b/Misc/NEWS.d/next/Windows/2018-07-20-13-09-19.bpo-34060.v-z87j.rst @@ -0,0 +1,2 @@ +Report system load when running test suite on Windows. Patch by Ammar Askar. +Based on prior work by Jeremy Kloth.