Skip to content

Commit

Permalink
bpo-40275: Use new test.support helper submodules in tests (GH-21764)
Browse files Browse the repository at this point in the history
  • Loading branch information
shihai1991 committed Aug 7, 2020
1 parent 46e19b6 commit 598a951
Show file tree
Hide file tree
Showing 15 changed files with 97 additions and 84 deletions.
3 changes: 2 additions & 1 deletion Lib/ctypes/test/test_loading.py
Expand Up @@ -5,6 +5,7 @@
import sys
import unittest
import test.support
from test.support import import_helper
from ctypes.util import find_library

libc_name = None
Expand Down Expand Up @@ -117,7 +118,7 @@ def test_1703286_B(self):
@unittest.skipUnless(os.name == "nt",
'test specific to Windows')
def test_load_dll_with_flags(self):
_sqlite3 = test.support.import_module("_sqlite3")
_sqlite3 = import_helper.import_module("_sqlite3")
src = _sqlite3.__file__
if src.lower().endswith("_d.pyd"):
ext = "_d.dll"
Expand Down
3 changes: 2 additions & 1 deletion Lib/sqlite3/test/hooks.py
Expand Up @@ -24,7 +24,8 @@
import unittest
import sqlite3 as sqlite

from test.support import TESTFN, unlink
from test.support.os_helper import TESTFN, unlink


class CollationTests(unittest.TestCase):
def CheckCreateCollationNotString(self):
Expand Down
32 changes: 17 additions & 15 deletions Lib/test/_test_multiprocessing.py
Expand Up @@ -28,8 +28,10 @@
from test import support
from test.support import hashlib_helper
from test.support import import_helper
from test.support import os_helper
from test.support import socket_helper
from test.support import threading_helper
from test.support import warnings_helper


# Skip tests if _multiprocessing wasn't built.
Expand Down Expand Up @@ -615,7 +617,7 @@ def test_lose_target_ref(self):

@classmethod
def _test_child_fd_inflation(self, evt, q):
q.put(test.support.fd_count())
q.put(os_helper.fd_count())
evt.wait()

def test_child_fd_inflation(self):
Expand Down Expand Up @@ -819,8 +821,8 @@ def test_stderr_flush(self):
if self.TYPE == "threads":
self.skipTest('test not appropriate for {}'.format(self.TYPE))

testfn = test.support.TESTFN
self.addCleanup(test.support.unlink, testfn)
testfn = os_helper.TESTFN
self.addCleanup(os_helper.unlink, testfn)
proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
proc.start()
proc.join()
Expand Down Expand Up @@ -849,8 +851,8 @@ def test_sys_exit(self):
if self.TYPE == 'threads':
self.skipTest('test not appropriate for {}'.format(self.TYPE))

testfn = test.support.TESTFN
self.addCleanup(test.support.unlink, testfn)
testfn = os_helper.TESTFN
self.addCleanup(os_helper.unlink, testfn)

for reason in (
[1, 2, 3],
Expand Down Expand Up @@ -1114,7 +1116,7 @@ def test_task_done(self):
close_queue(queue)

def test_no_import_lock_contention(self):
with test.support.temp_cwd():
with os_helper.temp_cwd():
module_name = 'imported_by_an_imported_module'
with open(module_name + '.py', 'w') as f:
f.write("""if 1:
Expand All @@ -1127,7 +1129,7 @@ def test_no_import_lock_contention(self):
del q
""")

with test.support.DirsOnSysPath(os.getcwd()):
with import_helper.DirsOnSysPath(os.getcwd()):
try:
__import__(module_name)
except pyqueue.Empty:
Expand Down Expand Up @@ -2688,8 +2690,8 @@ def test_resource_warning(self):
# force state to RUN to emit ResourceWarning in __del__()
pool._state = multiprocessing.pool.RUN

with support.check_warnings(('unclosed running multiprocessing pool',
ResourceWarning)):
with warnings_helper.check_warnings(
('unclosed running multiprocessing pool', ResourceWarning)):
pool = None
support.gc_collect()

Expand Down Expand Up @@ -3199,14 +3201,14 @@ def test_fd_transfer(self):
p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
p.daemon = True
p.start()
self.addCleanup(test.support.unlink, test.support.TESTFN)
with open(test.support.TESTFN, "wb") as f:
self.addCleanup(os_helper.unlink, os_helper.TESTFN)
with open(os_helper.TESTFN, "wb") as f:
fd = f.fileno()
if msvcrt:
fd = msvcrt.get_osfhandle(fd)
reduction.send_handle(conn, fd, p.pid)
p.join()
with open(test.support.TESTFN, "rb") as f:
with open(os_helper.TESTFN, "rb") as f:
self.assertEqual(f.read(), b"foo")

@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Expand All @@ -3225,8 +3227,8 @@ def test_large_fd_transfer(self):
p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
p.daemon = True
p.start()
self.addCleanup(test.support.unlink, test.support.TESTFN)
with open(test.support.TESTFN, "wb") as f:
self.addCleanup(os_helper.unlink, os_helper.TESTFN)
with open(os_helper.TESTFN, "wb") as f:
fd = f.fileno()
for newfd in range(256, MAXFD):
if not self._is_fd_assigned(newfd):
Expand All @@ -3239,7 +3241,7 @@ def test_large_fd_transfer(self):
finally:
os.close(newfd)
p.join()
with open(test.support.TESTFN, "rb") as f:
with open(os_helper.TESTFN, "rb") as f:
self.assertEqual(f.read(), b"bar")

@classmethod
Expand Down
5 changes: 3 additions & 2 deletions Lib/test/test___all__.py
@@ -1,5 +1,6 @@
import unittest
from test import support
from test.support import warnings_helper
import os
import sys

Expand All @@ -15,7 +16,7 @@ class AllTest(unittest.TestCase):

def check_all(self, modname):
names = {}
with support.check_warnings(
with warnings_helper.check_warnings(
(".* (module|package)", DeprecationWarning),
(".* (module|package)", PendingDeprecationWarning),
("", ResourceWarning),
Expand All @@ -31,7 +32,7 @@ def check_all(self, modname):
raise NoAll(modname)
names = {}
with self.subTest(module=modname):
with support.check_warnings(
with warnings_helper.check_warnings(
("", DeprecationWarning),
("", ResourceWarning),
quiet=True):
Expand Down
8 changes: 4 additions & 4 deletions Lib/test/test_asyncio/test_proactor_events.py
Expand Up @@ -12,7 +12,7 @@
from asyncio.proactor_events import _ProactorWritePipeTransport
from asyncio.proactor_events import _ProactorDuplexPipeTransport
from asyncio.proactor_events import _ProactorDatagramTransport
from test import support
from test.support import os_helper
from test.support import socket_helper
from test.test_asyncio import utils as test_utils

Expand Down Expand Up @@ -935,20 +935,20 @@ async def wait_closed(self):

@classmethod
def setUpClass(cls):
with open(support.TESTFN, 'wb') as fp:
with open(os_helper.TESTFN, 'wb') as fp:
fp.write(cls.DATA)
super().setUpClass()

@classmethod
def tearDownClass(cls):
support.unlink(support.TESTFN)
os_helper.unlink(os_helper.TESTFN)
super().tearDownClass()

def setUp(self):
self.loop = asyncio.ProactorEventLoop()
self.set_event_loop(self.loop)
self.addCleanup(self.loop.close)
self.file = open(support.TESTFN, 'rb')
self.file = open(os_helper.TESTFN, 'rb')
self.addCleanup(self.file.close)
super().setUp()

Expand Down
12 changes: 6 additions & 6 deletions Lib/test/test_ntpath.py
Expand Up @@ -6,7 +6,7 @@
from test.support import os_helper
from test.support import TestFailed
from test.support.os_helper import FakePath
from test import support, test_genericpath
from test import test_genericpath
from tempfile import TemporaryFile


Expand Down Expand Up @@ -287,7 +287,7 @@ def test_realpath_broken_symlinks(self):
os.mkdir(ABSTFN)
self.addCleanup(os_helper.rmtree, ABSTFN)

with support.change_cwd(ABSTFN):
with os_helper.change_cwd(ABSTFN):
os.mkdir("subdir")
os.chdir("subdir")
os.symlink(".", "recursive")
Expand Down Expand Up @@ -443,11 +443,11 @@ def test_realpath_cwd(self):

self.assertPathEqual(test_file_long, ntpath.realpath(test_file_short))

with support.change_cwd(test_dir_long):
with os_helper.change_cwd(test_dir_long):
self.assertPathEqual(test_file_long, ntpath.realpath("file.txt"))
with support.change_cwd(test_dir_long.lower()):
with os_helper.change_cwd(test_dir_long.lower()):
self.assertPathEqual(test_file_long, ntpath.realpath("file.txt"))
with support.change_cwd(test_dir_short):
with os_helper.change_cwd(test_dir_short):
self.assertPathEqual(test_file_long, ntpath.realpath("file.txt"))

def test_expandvars(self):
Expand Down Expand Up @@ -673,7 +673,7 @@ def test_ismount(self):
# locations below cannot then refer to mount points
#
drive, path = ntpath.splitdrive(sys.executable)
with support.change_cwd(ntpath.dirname(sys.executable)):
with os_helper.change_cwd(ntpath.dirname(sys.executable)):
self.assertFalse(ntpath.ismount(drive.lower()))
self.assertFalse(ntpath.ismount(drive.upper()))

Expand Down
3 changes: 2 additions & 1 deletion Lib/test/test_ossaudiodev.py
@@ -1,9 +1,10 @@
from test import support
from test.support import import_helper
support.requires('audio')

from test.support import findfile

ossaudiodev = support.import_module('ossaudiodev')
ossaudiodev = import_helper.import_module('ossaudiodev')

import errno
import sys
Expand Down
2 changes: 1 addition & 1 deletion Lib/test/test_platform.py
Expand Up @@ -196,7 +196,7 @@ def test_uname_win32_ARCHITEW6432(self):
# using it, per
# http://blogs.msdn.com/david.wang/archive/2006/03/26/HOWTO-Detect-Process-Bitness.aspx
try:
with support.EnvironmentVarGuard() as environ:
with os_helper.EnvironmentVarGuard() as environ:
if 'PROCESSOR_ARCHITEW6432' in environ:
del environ['PROCESSOR_ARCHITEW6432']
environ['PROCESSOR_ARCHITECTURE'] = 'foo'
Expand Down

0 comments on commit 598a951

Please sign in to comment.