diff --git a/test_functools32.py b/test_functools32.py index 7f28fab..2485630 100644 --- a/test_functools32.py +++ b/test_functools32.py @@ -2,7 +2,7 @@ import collections import sys import unittest -import test_support32 as support +import test_support27 as support from weakref import proxy import pickle from random import choice diff --git a/test_support27.py b/test_support27.py new file mode 100644 index 0000000..8b85f1c --- /dev/null +++ b/test_support27.py @@ -0,0 +1,1233 @@ +"""Supporting definitions for the Python regression tests.""" + +if __name__ != 'test_support27': + raise ImportError('test_support must be imported from the test package') + +import contextlib +import errno +import functools +import gc +import socket +import sys +import os +import platform +import shutil +import warnings +import unittest +import importlib +import UserDict +import re +import time +try: + import thread +except ImportError: + thread = None + +__all__ = ["Error", "TestFailed", "ResourceDenied", "import_module", + "verbose", "use_resources", "max_memuse", "record_original_stdout", + "get_original_stdout", "unload", "unlink", "rmtree", "forget", + "is_resource_enabled", "requires", "find_unused_port", "bind_port", + "fcmp", "have_unicode", "is_jython", "TESTFN", "HOST", "FUZZ", + "SAVEDCWD", "temp_cwd", "findfile", "sortdict", "check_syntax_error", + "open_urlresource", "check_warnings", "check_py3k_warnings", + "CleanImport", "EnvironmentVarGuard", "captured_output", + "captured_stdout", "TransientResource", "transient_internet", + "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest", + "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup", + "threading_cleanup", "reap_children", "cpython_only", + "check_impl_detail", "get_attribute", "py3k_bytes", + "import_fresh_module"] + + +class Error(Exception): + """Base class for regression test exceptions.""" + +class TestFailed(Error): + """Test failed.""" + +class ResourceDenied(unittest.SkipTest): + """Test skipped because it requested a disallowed resource. + + This is raised when a test calls requires() for a resource that + has not been enabled. It is used to distinguish between expected + and unexpected skips. + """ + +@contextlib.contextmanager +def _ignore_deprecated_imports(ignore=True): + """Context manager to suppress package and module deprecation + warnings when importing them. + + If ignore is False, this context manager has no effect.""" + if ignore: + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", ".+ (module|package)", + DeprecationWarning) + yield + else: + yield + + +def import_module(name, deprecated=False): + """Import and return the module to be tested, raising SkipTest if + it is not available. + + If deprecated is True, any module or package deprecation messages + will be suppressed.""" + with _ignore_deprecated_imports(deprecated): + try: + return importlib.import_module(name) + except ImportError, msg: + raise unittest.SkipTest(str(msg)) + + +def _save_and_remove_module(name, orig_modules): + """Helper function to save and remove a module from sys.modules + + Raise ImportError if the module can't be imported.""" + # try to import the module and raise an error if it can't be imported + if name not in sys.modules: + __import__(name) + del sys.modules[name] + for modname in list(sys.modules): + if modname == name or modname.startswith(name + '.'): + orig_modules[modname] = sys.modules[modname] + del sys.modules[modname] + +def _save_and_block_module(name, orig_modules): + """Helper function to save and block a module in sys.modules + + Return True if the module was in sys.modules, False otherwise.""" + saved = True + try: + orig_modules[name] = sys.modules[name] + except KeyError: + saved = False + sys.modules[name] = None + return saved + + +def import_fresh_module(name, fresh=(), blocked=(), deprecated=False): + """Imports and returns a module, deliberately bypassing the sys.modules cache + and importing a fresh copy of the module. Once the import is complete, + the sys.modules cache is restored to its original state. + + Modules named in fresh are also imported anew if needed by the import. + If one of these modules can't be imported, None is returned. + + Importing of modules named in blocked is prevented while the fresh import + takes place. + + If deprecated is True, any module or package deprecation messages + will be suppressed.""" + # NOTE: test_heapq, test_json, and test_warnings include extra sanity + # checks to make sure that this utility function is working as expected + with _ignore_deprecated_imports(deprecated): + # Keep track of modules saved for later restoration as well + # as those which just need a blocking entry removed + orig_modules = {} + names_to_remove = [] + _save_and_remove_module(name, orig_modules) + try: + for fresh_name in fresh: + _save_and_remove_module(fresh_name, orig_modules) + for blocked_name in blocked: + if not _save_and_block_module(blocked_name, orig_modules): + names_to_remove.append(blocked_name) + fresh_module = importlib.import_module(name) + except ImportError: + fresh_module = None + finally: + for orig_name, module in orig_modules.items(): + sys.modules[orig_name] = module + for name_to_remove in names_to_remove: + del sys.modules[name_to_remove] + return fresh_module + + +def get_attribute(obj, name): + """Get an attribute, raising SkipTest if AttributeError is raised.""" + try: + attribute = getattr(obj, name) + except AttributeError: + raise unittest.SkipTest("module %s has no attribute %s" % ( + obj.__name__, name)) + else: + return attribute + + +verbose = 1 # Flag set to 0 by regrtest.py +use_resources = None # Flag set to [] by regrtest.py +max_memuse = 0 # Disable bigmem tests (they will still be run with + # small sizes, to make sure they work.) +real_max_memuse = 0 + +# _original_stdout is meant to hold stdout at the time regrtest began. +# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever. +# The point is to have some flavor of stdout the user can actually see. +_original_stdout = None +def record_original_stdout(stdout): + global _original_stdout + _original_stdout = stdout + +def get_original_stdout(): + return _original_stdout or sys.stdout + +def unload(name): + try: + del sys.modules[name] + except KeyError: + pass + +def unlink(filename): + try: + os.unlink(filename) + except OSError: + pass + +def rmtree(path): + try: + shutil.rmtree(path) + except OSError, e: + # Unix returns ENOENT, Windows returns ESRCH. + if e.errno not in (errno.ENOENT, errno.ESRCH): + raise + +def forget(modname): + '''"Forget" a module was ever imported by removing it from sys.modules and + deleting any .pyc and .pyo files.''' + unload(modname) + for dirname in sys.path: + unlink(os.path.join(dirname, modname + os.extsep + 'pyc')) + # Deleting the .pyo file cannot be within the 'try' for the .pyc since + # the chance exists that there is no .pyc (and thus the 'try' statement + # is exited) but there is a .pyo file. + unlink(os.path.join(dirname, modname + os.extsep + 'pyo')) + +def is_resource_enabled(resource): + """Test whether a resource is enabled. Known resources are set by + regrtest.py.""" + return use_resources is not None and resource in use_resources + +def requires(resource, msg=None): + """Raise ResourceDenied if the specified resource is not available. + + If the caller's module is __main__ then automatically return True. The + possibility of False being returned occurs when regrtest.py is executing.""" + # see if the caller's module is __main__ - if so, treat as if + # the resource was set + if sys._getframe(1).f_globals.get("__name__") == "__main__": + return + if not is_resource_enabled(resource): + if msg is None: + msg = "Use of the `%s' resource not enabled" % resource + raise ResourceDenied(msg) + +HOST = 'localhost' + +def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM): + """Returns an unused port that should be suitable for binding. This is + achieved by creating a temporary socket with the same family and type as + the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to + the specified host address (defaults to 0.0.0.0) with the port set to 0, + eliciting an unused ephemeral port from the OS. The temporary socket is + then closed and deleted, and the ephemeral port is returned. + + Either this method or bind_port() should be used for any tests where a + server socket needs to be bound to a particular port for the duration of + the test. Which one to use depends on whether the calling code is creating + a python socket, or if an unused port needs to be provided in a constructor + or passed to an external program (i.e. the -accept argument to openssl's + s_server mode). Always prefer bind_port() over find_unused_port() where + possible. Hard coded ports should *NEVER* be used. As soon as a server + socket is bound to a hard coded port, the ability to run multiple instances + of the test simultaneously on the same host is compromised, which makes the + test a ticking time bomb in a buildbot environment. On Unix buildbots, this + may simply manifest as a failed test, which can be recovered from without + intervention in most cases, but on Windows, the entire python process can + completely and utterly wedge, requiring someone to log in to the buildbot + and manually kill the affected process. + + (This is easy to reproduce on Windows, unfortunately, and can be traced to + the SO_REUSEADDR socket option having different semantics on Windows versus + Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind, + listen and then accept connections on identical host/ports. An EADDRINUSE + socket.error will be raised at some point (depending on the platform and + the order bind and listen were called on each socket). + + However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE + will ever be raised when attempting to bind two identical host/ports. When + accept() is called on each socket, the second caller's process will steal + the port from the first caller, leaving them both in an awkwardly wedged + state where they'll no longer respond to any signals or graceful kills, and + must be forcibly killed via OpenProcess()/TerminateProcess(). + + The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option + instead of SO_REUSEADDR, which effectively affords the same semantics as + SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open + Source world compared to Windows ones, this is a common mistake. A quick + look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when + openssl.exe is called with the 's_server' option, for example. See + http://bugs.python.org/issue2550 for more info. The following site also + has a very thorough description about the implications of both REUSEADDR + and EXCLUSIVEADDRUSE on Windows: + http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx) + + XXX: although this approach is a vast improvement on previous attempts to + elicit unused ports, it rests heavily on the assumption that the ephemeral + port returned to us by the OS won't immediately be dished back out to some + other process when we close and delete our temporary socket but before our + calling code has a chance to bind the returned port. We can deal with this + issue if/when we come across it.""" + tempsock = socket.socket(family, socktype) + port = bind_port(tempsock) + tempsock.close() + del tempsock + return port + +def bind_port(sock, host=HOST): + """Bind the socket to a free port and return the port number. Relies on + ephemeral ports in order to ensure we are using an unbound port. This is + important as many tests may be running simultaneously, especially in a + buildbot environment. This method raises an exception if the sock.family + is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR + or SO_REUSEPORT set on it. Tests should *never* set these socket options + for TCP/IP sockets. The only case for setting these options is testing + multicasting via multiple UDP sockets. + + Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e. + on Windows), it will be set on the socket. This will prevent anyone else + from bind()'ing to our host/port for the duration of the test. + """ + if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM: + if hasattr(socket, 'SO_REUSEADDR'): + if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1: + raise TestFailed("tests should never set the SO_REUSEADDR " \ + "socket option on TCP/IP sockets!") + if hasattr(socket, 'SO_REUSEPORT'): + if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1: + raise TestFailed("tests should never set the SO_REUSEPORT " \ + "socket option on TCP/IP sockets!") + if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'): + sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1) + + sock.bind((host, 0)) + port = sock.getsockname()[1] + return port + +FUZZ = 1e-6 + +def fcmp(x, y): # fuzzy comparison function + if isinstance(x, float) or isinstance(y, float): + try: + fuzz = (abs(x) + abs(y)) * FUZZ + if abs(x-y) <= fuzz: + return 0 + except: + pass + elif type(x) == type(y) and isinstance(x, (tuple, list)): + for i in range(min(len(x), len(y))): + outcome = fcmp(x[i], y[i]) + if outcome != 0: + return outcome + return (len(x) > len(y)) - (len(x) < len(y)) + return (x > y) - (x < y) + +try: + unicode + have_unicode = True +except NameError: + have_unicode = False + +is_jython = sys.platform.startswith('java') + +# Filename used for testing +if os.name == 'java': + # Jython disallows @ in module names + TESTFN = '$test' +elif os.name == 'riscos': + TESTFN = 'testfile' +else: + TESTFN = '@test' + # Unicode name only used if TEST_FN_ENCODING exists for the platform. + if have_unicode: + # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding() + # TESTFN_UNICODE is a filename that can be encoded using the + # file system encoding, but *not* with the default (ascii) encoding + if isinstance('', unicode): + # python -U + # XXX perhaps unicode() should accept Unicode strings? + TESTFN_UNICODE = "@test-\xe0\xf2" + else: + # 2 latin characters. + TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1") + TESTFN_ENCODING = sys.getfilesystemencoding() + # TESTFN_UNENCODABLE is a filename that should *not* be + # able to be encoded by *either* the default or filesystem encoding. + # This test really only makes sense on Windows NT platforms + # which have special Unicode support in posixmodule. + if (not hasattr(sys, "getwindowsversion") or + sys.getwindowsversion()[3] < 2): # 0=win32s or 1=9x/ME + TESTFN_UNENCODABLE = None + else: + # Japanese characters (I think - from bug 846133) + TESTFN_UNENCODABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"') + try: + # XXX - Note - should be using TESTFN_ENCODING here - but for + # Windows, "mbcs" currently always operates as if in + # errors=ignore' mode - hence we get '?' characters rather than + # the exception. 'Latin1' operates as we expect - ie, fails. + # See [ 850997 ] mbcs encoding ignores errors + TESTFN_UNENCODABLE.encode("Latin1") + except UnicodeEncodeError: + pass + else: + print \ + 'WARNING: The filename %r CAN be encoded by the filesystem. ' \ + 'Unicode filename tests may not be effective' \ + % TESTFN_UNENCODABLE + + +# Disambiguate TESTFN for parallel testing, while letting it remain a valid +# module name. +TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid()) + +# Save the initial cwd +SAVEDCWD = os.getcwd() + +@contextlib.contextmanager +def temp_cwd(name='tempcwd', quiet=False): + """ + Context manager that creates a temporary directory and set it as CWD. + + The new CWD is created in the current directory and it's named *name*. + If *quiet* is False (default) and it's not possible to create or change + the CWD, an error is raised. If it's True, only a warning is raised + and the original CWD is used. + """ + if isinstance(name, unicode): + try: + name = name.encode(sys.getfilesystemencoding() or 'ascii') + except UnicodeEncodeError: + if not quiet: + raise unittest.SkipTest('unable to encode the cwd name with ' + 'the filesystem encoding.') + saved_dir = os.getcwd() + is_temporary = False + try: + os.mkdir(name) + os.chdir(name) + is_temporary = True + except OSError: + if not quiet: + raise + warnings.warn('tests may fail, unable to change the CWD to ' + name, + RuntimeWarning, stacklevel=3) + try: + yield os.getcwd() + finally: + os.chdir(saved_dir) + if is_temporary: + rmtree(name) + + +def findfile(file, here=__file__, subdir=None): + """Try to find a file on sys.path and the working directory. If it is not + found the argument passed to the function is returned (this does not + necessarily signal failure; could still be the legitimate path).""" + if os.path.isabs(file): + return file + if subdir is not None: + file = os.path.join(subdir, file) + path = sys.path + path = [os.path.dirname(here)] + path + for dn in path: + fn = os.path.join(dn, file) + if os.path.exists(fn): return fn + return file + +def sortdict(dict): + "Like repr(dict), but in sorted order." + items = dict.items() + items.sort() + reprpairs = ["%r: %r" % pair for pair in items] + withcommas = ", ".join(reprpairs) + return "{%s}" % withcommas + +def make_bad_fd(): + """ + Create an invalid file descriptor by opening and closing a file and return + its fd. + """ + file = open(TESTFN, "wb") + try: + return file.fileno() + finally: + file.close() + unlink(TESTFN) + +def check_syntax_error(testcase, statement): + testcase.assertRaises(SyntaxError, compile, statement, + '', 'exec') + +def open_urlresource(url, check=None): + import urlparse, urllib2 + + filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL! + + fn = os.path.join(os.path.dirname(__file__), "data", filename) + + def check_valid_file(fn): + f = open(fn) + if check is None: + return f + elif check(f): + f.seek(0) + return f + f.close() + + if os.path.exists(fn): + f = check_valid_file(fn) + if f is not None: + return f + unlink(fn) + + # Verify the requirement before downloading the file + requires('urlfetch') + + print >> get_original_stdout(), '\tfetching %s ...' % url + f = urllib2.urlopen(url, timeout=15) + try: + with open(fn, "wb") as out: + s = f.read() + while s: + out.write(s) + s = f.read() + finally: + f.close() + + f = check_valid_file(fn) + if f is not None: + return f + raise TestFailed('invalid resource "%s"' % fn) + + +class WarningsRecorder(object): + """Convenience wrapper for the warnings list returned on + entry to the warnings.catch_warnings() context manager. + """ + def __init__(self, warnings_list): + self._warnings = warnings_list + self._last = 0 + + def __getattr__(self, attr): + if len(self._warnings) > self._last: + return getattr(self._warnings[-1], attr) + elif attr in warnings.WarningMessage._WARNING_DETAILS: + return None + raise AttributeError("%r has no attribute %r" % (self, attr)) + + @property + def warnings(self): + return self._warnings[self._last:] + + def reset(self): + self._last = len(self._warnings) + + +def _filterwarnings(filters, quiet=False): + """Catch the warnings, then check if all the expected + warnings have been raised and re-raise unexpected warnings. + If 'quiet' is True, only re-raise the unexpected warnings. + """ + # Clear the warning registry of the calling module + # in order to re-raise the warnings. + frame = sys._getframe(2) + registry = frame.f_globals.get('__warningregistry__') + if registry: + registry.clear() + with warnings.catch_warnings(record=True) as w: + # Set filter "always" to record all warnings. Because + # test_warnings swap the module, we need to look up in + # the sys.modules dictionary. + sys.modules['warnings'].simplefilter("always") + yield WarningsRecorder(w) + # Filter the recorded warnings + reraise = [warning.message for warning in w] + missing = [] + for msg, cat in filters: + seen = False + for exc in reraise[:]: + message = str(exc) + # Filter out the matching messages + if (re.match(msg, message, re.I) and + issubclass(exc.__class__, cat)): + seen = True + reraise.remove(exc) + if not seen and not quiet: + # This filter caught nothing + missing.append((msg, cat.__name__)) + if reraise: + raise AssertionError("unhandled warning %r" % reraise[0]) + if missing: + raise AssertionError("filter (%r, %s) did not catch any warning" % + missing[0]) + + +@contextlib.contextmanager +def check_warnings(*filters, **kwargs): + """Context manager to silence warnings. + + Accept 2-tuples as positional arguments: + ("message regexp", WarningCategory) + + Optional argument: + - if 'quiet' is True, it does not fail if a filter catches nothing + (default True without argument, + default False if some filters are defined) + + Without argument, it defaults to: + check_warnings(("", Warning), quiet=True) + """ + quiet = kwargs.get('quiet') + if not filters: + filters = (("", Warning),) + # Preserve backward compatibility + if quiet is None: + quiet = True + return _filterwarnings(filters, quiet) + + +@contextlib.contextmanager +def check_py3k_warnings(*filters, **kwargs): + """Context manager to silence py3k warnings. + + Accept 2-tuples as positional arguments: + ("message regexp", WarningCategory) + + Optional argument: + - if 'quiet' is True, it does not fail if a filter catches nothing + (default False) + + Without argument, it defaults to: + check_py3k_warnings(("", DeprecationWarning), quiet=False) + """ + if sys.py3kwarning: + if not filters: + filters = (("", DeprecationWarning),) + else: + # It should not raise any py3k warning + filters = () + return _filterwarnings(filters, kwargs.get('quiet')) + + +class CleanImport(object): + """Context manager to force import to return a new module reference. + + This is useful for testing module-level behaviours, such as + the emission of a DeprecationWarning on import. + + Use like this: + + with CleanImport("foo"): + importlib.import_module("foo") # new reference + """ + + def __init__(self, *module_names): + self.original_modules = sys.modules.copy() + for module_name in module_names: + if module_name in sys.modules: + module = sys.modules[module_name] + # It is possible that module_name is just an alias for + # another module (e.g. stub for modules renamed in 3.x). + # In that case, we also need delete the real module to clear + # the import cache. + if module.__name__ != module_name: + del sys.modules[module.__name__] + del sys.modules[module_name] + + def __enter__(self): + return self + + def __exit__(self, *ignore_exc): + sys.modules.update(self.original_modules) + + +class EnvironmentVarGuard(UserDict.DictMixin): + + """Class to help protect the environment variable properly. Can be used as + a context manager.""" + + def __init__(self): + self._environ = os.environ + self._changed = {} + + def __getitem__(self, envvar): + return self._environ[envvar] + + def __setitem__(self, envvar, value): + # Remember the initial value on the first access + if envvar not in self._changed: + self._changed[envvar] = self._environ.get(envvar) + self._environ[envvar] = value + + def __delitem__(self, envvar): + # Remember the initial value on the first access + if envvar not in self._changed: + self._changed[envvar] = self._environ.get(envvar) + if envvar in self._environ: + del self._environ[envvar] + + def keys(self): + return self._environ.keys() + + def set(self, envvar, value): + self[envvar] = value + + def unset(self, envvar): + del self[envvar] + + def __enter__(self): + return self + + def __exit__(self, *ignore_exc): + for (k, v) in self._changed.items(): + if v is None: + if k in self._environ: + del self._environ[k] + else: + self._environ[k] = v + os.environ = self._environ + + +class DirsOnSysPath(object): + """Context manager to temporarily add directories to sys.path. + + This makes a copy of sys.path, appends any directories given + as positional arguments, then reverts sys.path to the copied + settings when the context ends. + + Note that *all* sys.path modifications in the body of the + context manager, including replacement of the object, + will be reverted at the end of the block. + """ + + def __init__(self, *paths): + self.original_value = sys.path[:] + self.original_object = sys.path + sys.path.extend(paths) + + def __enter__(self): + return self + + def __exit__(self, *ignore_exc): + sys.path = self.original_object + sys.path[:] = self.original_value + + +class TransientResource(object): + + """Raise ResourceDenied if an exception is raised while the context manager + is in effect that matches the specified exception and attributes.""" + + def __init__(self, exc, **kwargs): + self.exc = exc + self.attrs = kwargs + + def __enter__(self): + return self + + def __exit__(self, type_=None, value=None, traceback=None): + """If type_ is a subclass of self.exc and value has attributes matching + self.attrs, raise ResourceDenied. Otherwise let the exception + propagate (if any).""" + if type_ is not None and issubclass(self.exc, type_): + for attr, attr_value in self.attrs.iteritems(): + if not hasattr(value, attr): + break + if getattr(value, attr) != attr_value: + break + else: + raise ResourceDenied("an optional resource is not available") + + +@contextlib.contextmanager +def transient_internet(resource_name, timeout=30.0, errnos=()): + """Return a context manager that raises ResourceDenied when various issues + with the Internet connection manifest themselves as exceptions.""" + default_errnos = [ + ('ECONNREFUSED', 111), + ('ECONNRESET', 104), + ('EHOSTUNREACH', 113), + ('ENETUNREACH', 101), + ('ETIMEDOUT', 110), + ] + default_gai_errnos = [ + ('EAI_NONAME', -2), + ('EAI_NODATA', -5), + ] + + denied = ResourceDenied("Resource '%s' is not available" % resource_name) + captured_errnos = errnos + gai_errnos = [] + if not captured_errnos: + captured_errnos = [getattr(errno, name, num) + for (name, num) in default_errnos] + gai_errnos = [getattr(socket, name, num) + for (name, num) in default_gai_errnos] + + def filter_error(err): + n = getattr(err, 'errno', None) + if (isinstance(err, socket.timeout) or + (isinstance(err, socket.gaierror) and n in gai_errnos) or + n in captured_errnos): + if not verbose: + sys.stderr.write(denied.args[0] + "\n") + raise denied + + old_timeout = socket.getdefaulttimeout() + try: + if timeout is not None: + socket.setdefaulttimeout(timeout) + yield + except IOError as err: + # urllib can wrap original socket errors multiple times (!), we must + # unwrap to get at the original error. + while True: + a = err.args + if len(a) >= 1 and isinstance(a[0], IOError): + err = a[0] + # The error can also be wrapped as args[1]: + # except socket.error as msg: + # raise IOError('socket error', msg).with_traceback(sys.exc_info()[2]) + elif len(a) >= 2 and isinstance(a[1], IOError): + err = a[1] + else: + break + filter_error(err) + raise + # XXX should we catch generic exceptions and look for their + # __cause__ or __context__? + finally: + socket.setdefaulttimeout(old_timeout) + + +@contextlib.contextmanager +def captured_output(stream_name): + """Return a context manager used by captured_stdout and captured_stdin + that temporarily replaces the sys stream *stream_name* with a StringIO.""" + import StringIO + orig_stdout = getattr(sys, stream_name) + setattr(sys, stream_name, StringIO.StringIO()) + try: + yield getattr(sys, stream_name) + finally: + setattr(sys, stream_name, orig_stdout) + +def captured_stdout(): + """Capture the output of sys.stdout: + + with captured_stdout() as s: + print "hello" + self.assertEqual(s.getvalue(), "hello") + """ + return captured_output("stdout") + +def captured_stdin(): + return captured_output("stdin") + +def gc_collect(): + """Force as many objects as possible to be collected. + + In non-CPython implementations of Python, this is needed because timely + deallocation is not guaranteed by the garbage collector. (Even in CPython + this can be the case in case of reference cycles.) This means that __del__ + methods may be called later than expected and weakrefs may remain alive for + longer than expected. This function tries its best to force all garbage + objects to disappear. + """ + gc.collect() + if is_jython: + time.sleep(0.1) + gc.collect() + gc.collect() + + +#======================================================================= +# Decorator for running a function in a different locale, correctly resetting +# it afterwards. + +def run_with_locale(catstr, *locales): + def decorator(func): + def inner(*args, **kwds): + try: + import locale + category = getattr(locale, catstr) + orig_locale = locale.setlocale(category) + except AttributeError: + # if the test author gives us an invalid category string + raise + except: + # cannot retrieve original locale, so do nothing + locale = orig_locale = None + else: + for loc in locales: + try: + locale.setlocale(category, loc) + break + except: + pass + + # now run the function, resetting the locale on exceptions + try: + return func(*args, **kwds) + finally: + if locale and orig_locale: + locale.setlocale(category, orig_locale) + inner.func_name = func.func_name + inner.__doc__ = func.__doc__ + return inner + return decorator + +#======================================================================= +# Big-memory-test support. Separate from 'resources' because memory use should be configurable. + +# Some handy shorthands. Note that these are used for byte-limits as well +# as size-limits, in the various bigmem tests +_1M = 1024*1024 +_1G = 1024 * _1M +_2G = 2 * _1G +_4G = 4 * _1G + +MAX_Py_ssize_t = sys.maxsize + +def set_memlimit(limit): + global max_memuse + global real_max_memuse + sizes = { + 'k': 1024, + 'm': _1M, + 'g': _1G, + 't': 1024*_1G, + } + m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, + re.IGNORECASE | re.VERBOSE) + if m is None: + raise ValueError('Invalid memory limit %r' % (limit,)) + memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) + real_max_memuse = memlimit + if memlimit > MAX_Py_ssize_t: + memlimit = MAX_Py_ssize_t + if memlimit < _2G - 1: + raise ValueError('Memory limit %r too low to be useful' % (limit,)) + max_memuse = memlimit + +def bigmemtest(minsize, memuse, overhead=5*_1M): + """Decorator for bigmem tests. + + 'minsize' is the minimum useful size for the test (in arbitrary, + test-interpreted units.) 'memuse' is the number of 'bytes per size' for + the test, or a good estimate of it. 'overhead' specifies fixed overhead, + independent of the testsize, and defaults to 5Mb. + + The decorator tries to guess a good value for 'size' and passes it to + the decorated test function. If minsize * memuse is more than the + allowed memory use (as defined by max_memuse), the test is skipped. + Otherwise, minsize is adjusted upward to use up to max_memuse. + """ + def decorator(f): + def wrapper(self): + if not max_memuse: + # If max_memuse is 0 (the default), + # we still want to run the tests with size set to a few kb, + # to make sure they work. We still want to avoid using + # too much memory, though, but we do that noisily. + maxsize = 5147 + self.assertFalse(maxsize * memuse + overhead > 20 * _1M) + else: + maxsize = int((max_memuse - overhead) / memuse) + if maxsize < minsize: + # Really ought to print 'test skipped' or something + if verbose: + sys.stderr.write("Skipping %s because of memory " + "constraint\n" % (f.__name__,)) + return + # Try to keep some breathing room in memory use + maxsize = max(maxsize - 50 * _1M, minsize) + return f(self, maxsize) + wrapper.minsize = minsize + wrapper.memuse = memuse + wrapper.overhead = overhead + return wrapper + return decorator + +def precisionbigmemtest(size, memuse, overhead=5*_1M): + def decorator(f): + def wrapper(self): + if not real_max_memuse: + maxsize = 5147 + else: + maxsize = size + + if real_max_memuse and real_max_memuse < maxsize * memuse: + if verbose: + sys.stderr.write("Skipping %s because of memory " + "constraint\n" % (f.__name__,)) + return + + return f(self, maxsize) + wrapper.size = size + wrapper.memuse = memuse + wrapper.overhead = overhead + return wrapper + return decorator + +def bigaddrspacetest(f): + """Decorator for tests that fill the address space.""" + def wrapper(self): + if max_memuse < MAX_Py_ssize_t: + if verbose: + sys.stderr.write("Skipping %s because of memory " + "constraint\n" % (f.__name__,)) + else: + return f(self) + return wrapper + +#======================================================================= +# unittest integration. + +class BasicTestRunner: + def run(self, test): + result = unittest.TestResult() + test(result) + return result + +def _id(obj): + return obj + +def requires_resource(resource): + if is_resource_enabled(resource): + return _id + else: + return unittest.skip("resource {0!r} is not enabled".format(resource)) + +def cpython_only(test): + """ + Decorator for tests only applicable on CPython. + """ + return impl_detail(cpython=True)(test) + +def impl_detail(msg=None, **guards): + if check_impl_detail(**guards): + return _id + if msg is None: + guardnames, default = _parse_guards(guards) + if default: + msg = "implementation detail not available on {0}" + else: + msg = "implementation detail specific to {0}" + guardnames = sorted(guardnames.keys()) + msg = msg.format(' or '.join(guardnames)) + return unittest.skip(msg) + +def _parse_guards(guards): + # Returns a tuple ({platform_name: run_me}, default_value) + if not guards: + return ({'cpython': True}, False) + is_true = guards.values()[0] + assert guards.values() == [is_true] * len(guards) # all True or all False + return (guards, not is_true) + +# Use the following check to guard CPython's implementation-specific tests -- +# or to run them only on the implementation(s) guarded by the arguments. +def check_impl_detail(**guards): + """This function returns True or False depending on the host platform. + Examples: + if check_impl_detail(): # only on CPython (default) + if check_impl_detail(jython=True): # only on Jython + if check_impl_detail(cpython=False): # everywhere except on CPython + """ + guards, default = _parse_guards(guards) + return guards.get(platform.python_implementation().lower(), default) + + + +def _run_suite(suite): + """Run tests from a unittest.TestSuite-derived class.""" + if verbose: + runner = unittest.TextTestRunner(sys.stdout, verbosity=2) + else: + runner = BasicTestRunner() + + result = runner.run(suite) + if not result.wasSuccessful(): + if len(result.errors) == 1 and not result.failures: + err = result.errors[0][1] + elif len(result.failures) == 1 and not result.errors: + err = result.failures[0][1] + else: + err = "multiple errors occurred" + if not verbose: + err += "; run in verbose mode for details" + raise TestFailed(err) + + +def run_unittest(*classes): + """Run tests from unittest.TestCase-derived classes.""" + valid_types = (unittest.TestSuite, unittest.TestCase) + suite = unittest.TestSuite() + for cls in classes: + if isinstance(cls, str): + if cls in sys.modules: + suite.addTest(unittest.findTestCases(sys.modules[cls])) + else: + raise ValueError("str arguments must be keys in sys.modules") + elif isinstance(cls, valid_types): + suite.addTest(cls) + else: + suite.addTest(unittest.makeSuite(cls)) + _run_suite(suite) + + +#======================================================================= +# doctest driver. + +def run_doctest(module, verbosity=None): + """Run doctest on the given module. Return (#failures, #tests). + + If optional argument verbosity is not specified (or is None), pass + test_support's belief about verbosity on to doctest. Else doctest's + usual behavior is used (it searches sys.argv for -v). + """ + + import doctest + + if verbosity is None: + verbosity = verbose + else: + verbosity = None + + # Direct doctest output (normally just errors) to real stdout; doctest + # output shouldn't be compared by regrtest. + save_stdout = sys.stdout + sys.stdout = get_original_stdout() + try: + f, t = doctest.testmod(module, verbose=verbosity) + if f: + raise TestFailed("%d of %d doctests failed" % (f, t)) + finally: + sys.stdout = save_stdout + if verbose: + print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t) + return f, t + +#======================================================================= +# Threading support to prevent reporting refleaks when running regrtest.py -R + +# NOTE: we use thread._count() rather than threading.enumerate() (or the +# moral equivalent thereof) because a threading.Thread object is still alive +# until its __bootstrap() method has returned, even after it has been +# unregistered from the threading module. +# thread._count(), on the other hand, only gets decremented *after* the +# __bootstrap() method has returned, which gives us reliable reference counts +# at the end of a test run. + +def threading_setup(): + if thread: + return thread._count(), + else: + return 1, + +def threading_cleanup(nb_threads): + if not thread: + return + + _MAX_COUNT = 10 + for count in range(_MAX_COUNT): + n = thread._count() + if n == nb_threads: + break + time.sleep(0.1) + # XXX print a warning in case of failure? + +def reap_threads(func): + """Use this function when threads are being used. This will + ensure that the threads are cleaned up even when the test fails. + If threading is unavailable this function does nothing. + """ + if not thread: + return func + + @functools.wraps(func) + def decorator(*args): + key = threading_setup() + try: + return func(*args) + finally: + threading_cleanup(*key) + return decorator + +def reap_children(): + """Use this function at the end of test_main() whenever sub-processes + are started. This will help ensure that no extra children (zombies) + stick around to hog resources and create problems when looking + for refleaks. + """ + + # Reap all our dead child processes so we don't leave zombies around. + # These hog resources and might be causing some of the buildbots to die. + if hasattr(os, 'waitpid'): + any_process = -1 + while True: + try: + # This will raise an exception on Windows. That's ok. + pid, status = os.waitpid(any_process, os.WNOHANG) + if pid == 0: + break + except: + break + +def py3k_bytes(b): + """Emulate the py3k bytes() constructor. + + NOTE: This is only a best effort function. + """ + try: + # memoryview? + return b.tobytes() + except AttributeError: + try: + # iterable of ints? + return b"".join(chr(x) for x in b) + except TypeError: + return bytes(b) + +def args_from_interpreter_flags(): + """Return a list of command-line arguments reproducing the current + settings in sys.flags.""" + flag_opt_map = { + 'bytes_warning': 'b', + 'dont_write_bytecode': 'B', + 'ignore_environment': 'E', + 'no_user_site': 's', + 'no_site': 'S', + 'optimize': 'O', + 'py3k_warning': '3', + 'verbose': 'v', + } + args = [] + for flag, opt in flag_opt_map.items(): + v = getattr(sys.flags, flag) + if v > 0: + args.append('-' + opt * v) + return args + +def strip_python_stderr(stderr): + """Strip the stderr of a Python process from potential debug output + emitted by the interpreter. + + This will typically be run on the result of the communicate() method + of a subprocess.Popen object. + """ + stderr = re.sub(br"\[\d+ refs\]\r?\n?$", b"", stderr).strip() + return stderr