From 54610bb448a9cf5be77d53b66169fca4c11be6cb Mon Sep 17 00:00:00 2001 From: Serhiy Storchaka Date: Fri, 21 Jan 2022 09:54:50 +0200 Subject: [PATCH] bpo-46426: Improve tests for the dir_fd argument (GH-30668) Ensure that directory file descriptors refer to directories different from the current directory, and that src_dir_fd and dst_dir_fd refer to different directories. Add context manager open_dir_fd() in test.support.os_helper. --- Lib/test/support/os_helper.py | 11 + Lib/test/test_os.py | 10 +- Lib/test/test_posix.py | 417 ++++++++++++++++------------------ 3 files changed, 208 insertions(+), 230 deletions(-) diff --git a/Lib/test/support/os_helper.py b/Lib/test/support/os_helper.py index ce01417ed07d88..50aa7a7176c0a4 100644 --- a/Lib/test/support/os_helper.py +++ b/Lib/test/support/os_helper.py @@ -455,6 +455,17 @@ def create_empty_file(filename): os.close(fd) +@contextlib.contextmanager +def open_dir_fd(path): + """Open a file descriptor to a directory.""" + assert os.path.isdir(path) + dir_fd = os.open(path, os.O_RDONLY) + try: + yield dir_fd + finally: + os.close(dir_fd) + + def fs_is_case_insensitive(directory): """Detects if the file system for the specified directory is case-insensitive.""" diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py index 8da0aa3163fe2b..89e5e4190c640d 100644 --- a/Lib/test/test_os.py +++ b/Lib/test/test_os.py @@ -848,12 +848,9 @@ def set_time(filename, ns): def test_utime_dir_fd(self): def set_time(filename, ns): dirname, name = os.path.split(filename) - dirfd = os.open(dirname, os.O_RDONLY) - try: + with os_helper.open_dir_fd(dirname) as dirfd: # pass dir_fd to test utimensat(timespec) or futimesat(timeval) os.utime(name, dir_fd=dirfd, ns=ns) - finally: - os.close(dirfd) self._test_utime(set_time) def test_utime_directory(self): @@ -4341,8 +4338,7 @@ def test_fd(self): os.symlink('file.txt', os.path.join(self.path, 'link')) expected_names.append('link') - fd = os.open(self.path, os.O_RDONLY) - try: + with os_helper.open_dir_fd(self.path) as fd: with os.scandir(fd) as it: entries = list(it) names = [entry.name for entry in entries] @@ -4357,8 +4353,6 @@ def test_fd(self): self.assertEqual(entry.stat(), st) st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False) self.assertEqual(entry.stat(follow_symlinks=False), st) - finally: - os.close(fd) def test_empty_path(self): self.assertRaises(FileNotFoundError, os.scandir, '') diff --git a/Lib/test/test_posix.py b/Lib/test/test_posix.py index 56b72f465c1c08..974edd766cc809 100644 --- a/Lib/test/test_posix.py +++ b/Lib/test/test_posix.py @@ -21,6 +21,7 @@ import unittest import warnings import textwrap +from contextlib import contextmanager _DUMMY_SYMLINK = os.path.join(tempfile.gettempdir(), os_helper.TESTFN + '-dummy-symlink') @@ -1081,187 +1082,6 @@ def test_getgroups(self): symdiff = idg_groups.symmetric_difference(posix.getgroups()) self.assertTrue(not symdiff or symdiff == {posix.getegid()}) - # tests for the posix *at functions follow - - @unittest.skipUnless(os.access in os.supports_dir_fd, "test needs dir_fd support for os.access()") - def test_access_dir_fd(self): - f = posix.open(posix.getcwd(), posix.O_RDONLY) - try: - self.assertTrue(posix.access(os_helper.TESTFN, os.R_OK, dir_fd=f)) - finally: - posix.close(f) - - @unittest.skipUnless(os.chmod in os.supports_dir_fd, "test needs dir_fd support in os.chmod()") - def test_chmod_dir_fd(self): - os.chmod(os_helper.TESTFN, stat.S_IRUSR) - - f = posix.open(posix.getcwd(), posix.O_RDONLY) - try: - posix.chmod(os_helper.TESTFN, stat.S_IRUSR | stat.S_IWUSR, dir_fd=f) - - s = posix.stat(os_helper.TESTFN) - self.assertEqual(s[0] & stat.S_IRWXU, stat.S_IRUSR | stat.S_IWUSR) - finally: - posix.close(f) - - @unittest.skipUnless(hasattr(os, 'chown') and (os.chown in os.supports_dir_fd), - "test needs dir_fd support in os.chown()") - def test_chown_dir_fd(self): - os_helper.unlink(os_helper.TESTFN) - os_helper.create_empty_file(os_helper.TESTFN) - - f = posix.open(posix.getcwd(), posix.O_RDONLY) - try: - posix.chown(os_helper.TESTFN, os.getuid(), os.getgid(), dir_fd=f) - finally: - posix.close(f) - - @unittest.skipUnless(os.stat in os.supports_dir_fd, "test needs dir_fd support in os.stat()") - def test_stat_dir_fd(self): - os_helper.unlink(os_helper.TESTFN) - with open(os_helper.TESTFN, 'w') as outfile: - outfile.write("testline\n") - - f = posix.open(posix.getcwd(), posix.O_RDONLY) - try: - s1 = posix.stat(os_helper.TESTFN) - s2 = posix.stat(os_helper.TESTFN, dir_fd=f) - self.assertEqual(s1, s2) - s2 = posix.stat(os_helper.TESTFN, dir_fd=None) - self.assertEqual(s1, s2) - self.assertRaisesRegex(TypeError, 'should be integer or None, not', - posix.stat, os_helper.TESTFN, dir_fd=posix.getcwd()) - self.assertRaisesRegex(TypeError, 'should be integer or None, not', - posix.stat, os_helper.TESTFN, dir_fd=float(f)) - self.assertRaises(OverflowError, - posix.stat, os_helper.TESTFN, dir_fd=10**20) - finally: - posix.close(f) - - @unittest.skipUnless(os.utime in os.supports_dir_fd, "test needs dir_fd support in os.utime()") - def test_utime_dir_fd(self): - f = posix.open(posix.getcwd(), posix.O_RDONLY) - try: - now = time.time() - posix.utime(os_helper.TESTFN, None, dir_fd=f) - posix.utime(os_helper.TESTFN, dir_fd=f) - self.assertRaises(TypeError, posix.utime, os_helper.TESTFN, - now, dir_fd=f) - self.assertRaises(TypeError, posix.utime, os_helper.TESTFN, - (None, None), dir_fd=f) - self.assertRaises(TypeError, posix.utime, os_helper.TESTFN, - (now, None), dir_fd=f) - self.assertRaises(TypeError, posix.utime, os_helper.TESTFN, - (None, now), dir_fd=f) - self.assertRaises(TypeError, posix.utime, os_helper.TESTFN, - (now, "x"), dir_fd=f) - posix.utime(os_helper.TESTFN, (int(now), int(now)), dir_fd=f) - posix.utime(os_helper.TESTFN, (now, now), dir_fd=f) - posix.utime(os_helper.TESTFN, - (int(now), int((now - int(now)) * 1e9)), dir_fd=f) - posix.utime(os_helper.TESTFN, dir_fd=f, - times=(int(now), int((now - int(now)) * 1e9))) - - # try dir_fd and follow_symlinks together - if os.utime in os.supports_follow_symlinks: - try: - posix.utime(os_helper.TESTFN, follow_symlinks=False, - dir_fd=f) - except ValueError: - # whoops! using both together not supported on this platform. - pass - - finally: - posix.close(f) - - @unittest.skipUnless(os.link in os.supports_dir_fd, "test needs dir_fd support in os.link()") - def test_link_dir_fd(self): - f = posix.open(posix.getcwd(), posix.O_RDONLY) - try: - posix.link(os_helper.TESTFN, os_helper.TESTFN + 'link', - src_dir_fd=f, dst_dir_fd=f) - except PermissionError as e: - self.skipTest('posix.link(): %s' % e) - else: - # should have same inodes - self.assertEqual(posix.stat(os_helper.TESTFN)[1], - posix.stat(os_helper.TESTFN + 'link')[1]) - finally: - posix.close(f) - os_helper.unlink(os_helper.TESTFN + 'link') - - @unittest.skipUnless(os.mkdir in os.supports_dir_fd, "test needs dir_fd support in os.mkdir()") - def test_mkdir_dir_fd(self): - f = posix.open(posix.getcwd(), posix.O_RDONLY) - try: - posix.mkdir(os_helper.TESTFN + 'dir', dir_fd=f) - posix.stat(os_helper.TESTFN + 'dir') # should not raise exception - finally: - posix.close(f) - os_helper.rmtree(os_helper.TESTFN + 'dir') - - @unittest.skipUnless(hasattr(os, 'mknod') - and (os.mknod in os.supports_dir_fd) - and hasattr(stat, 'S_IFIFO'), - "test requires both stat.S_IFIFO and dir_fd support for os.mknod()") - def test_mknod_dir_fd(self): - # Test using mknodat() to create a FIFO (the only use specified - # by POSIX). - os_helper.unlink(os_helper.TESTFN) - mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR - f = posix.open(posix.getcwd(), posix.O_RDONLY) - try: - posix.mknod(os_helper.TESTFN, mode, 0, dir_fd=f) - except OSError as e: - # Some old systems don't allow unprivileged users to use - # mknod(), or only support creating device nodes. - self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES)) - else: - self.assertTrue(stat.S_ISFIFO(posix.stat(os_helper.TESTFN).st_mode)) - finally: - posix.close(f) - - @unittest.skipUnless(os.open in os.supports_dir_fd, "test needs dir_fd support in os.open()") - def test_open_dir_fd(self): - os_helper.unlink(os_helper.TESTFN) - with open(os_helper.TESTFN, 'w') as outfile: - outfile.write("testline\n") - a = posix.open(posix.getcwd(), posix.O_RDONLY) - b = posix.open(os_helper.TESTFN, posix.O_RDONLY, dir_fd=a) - try: - res = posix.read(b, 9).decode(encoding="utf-8") - self.assertEqual("testline\n", res) - finally: - posix.close(a) - posix.close(b) - - @unittest.skipUnless(hasattr(os, 'readlink') and (os.readlink in os.supports_dir_fd), - "test needs dir_fd support in os.readlink()") - def test_readlink_dir_fd(self): - os.symlink(os_helper.TESTFN, os_helper.TESTFN + 'link') - f = posix.open(posix.getcwd(), posix.O_RDONLY) - try: - self.assertEqual(posix.readlink(os_helper.TESTFN + 'link'), - posix.readlink(os_helper.TESTFN + 'link', dir_fd=f)) - finally: - os_helper.unlink(os_helper.TESTFN + 'link') - posix.close(f) - - @unittest.skipUnless(os.rename in os.supports_dir_fd, "test needs dir_fd support in os.rename()") - def test_rename_dir_fd(self): - os_helper.unlink(os_helper.TESTFN) - os_helper.create_empty_file(os_helper.TESTFN + 'ren') - f = posix.open(posix.getcwd(), posix.O_RDONLY) - try: - posix.rename(os_helper.TESTFN + 'ren', os_helper.TESTFN, src_dir_fd=f, dst_dir_fd=f) - except: - posix.rename(os_helper.TESTFN + 'ren', os_helper.TESTFN) - raise - else: - posix.stat(os_helper.TESTFN) # should not raise exception - finally: - posix.close(f) - @unittest.skipUnless(hasattr(signal, 'SIGCHLD'), 'CLD_XXXX be placed in si_code for a SIGCHLD signal') @unittest.skipUnless(hasattr(os, 'waitid_result'), "test needs os.waitid_result") def test_cld_xxxx_constants(self): @@ -1272,47 +1092,6 @@ def test_cld_xxxx_constants(self): os.CLD_STOPPED os.CLD_CONTINUED - @unittest.skipUnless(os.symlink in os.supports_dir_fd, "test needs dir_fd support in os.symlink()") - def test_symlink_dir_fd(self): - f = posix.open(posix.getcwd(), posix.O_RDONLY) - try: - posix.symlink(os_helper.TESTFN, os_helper.TESTFN + 'link', - dir_fd=f) - self.assertEqual(posix.readlink(os_helper.TESTFN + 'link'), - os_helper.TESTFN) - finally: - posix.close(f) - os_helper.unlink(os_helper.TESTFN + 'link') - - @unittest.skipUnless(os.unlink in os.supports_dir_fd, "test needs dir_fd support in os.unlink()") - def test_unlink_dir_fd(self): - f = posix.open(posix.getcwd(), posix.O_RDONLY) - os_helper.create_empty_file(os_helper.TESTFN + 'del') - posix.stat(os_helper.TESTFN + 'del') # should not raise exception - try: - posix.unlink(os_helper.TESTFN + 'del', dir_fd=f) - except: - os_helper.unlink(os_helper.TESTFN + 'del') - raise - else: - self.assertRaises(OSError, posix.stat, os_helper.TESTFN + 'link') - finally: - posix.close(f) - - @unittest.skipUnless(os.mkfifo in os.supports_dir_fd, "test needs dir_fd support in os.mkfifo()") - def test_mkfifo_dir_fd(self): - os_helper.unlink(os_helper.TESTFN) - f = posix.open(posix.getcwd(), posix.O_RDONLY) - try: - try: - posix.mkfifo(os_helper.TESTFN, - stat.S_IRUSR | stat.S_IWUSR, dir_fd=f) - except PermissionError as e: - self.skipTest('posix.mkfifo(): %s' % e) - self.assertTrue(stat.S_ISFIFO(posix.stat(os_helper.TESTFN).st_mode)) - finally: - posix.close(f) - requires_sched_h = unittest.skipUnless(hasattr(posix, 'sched_yield'), "don't have scheduling support") requires_sched_affinity = unittest.skipUnless(hasattr(posix, 'sched_setaffinity'), @@ -1519,6 +1298,200 @@ def test_pidfd_open(self): self.assertEqual(cm.exception.errno, errno.EINVAL) os.close(os.pidfd_open(os.getpid(), 0)) + +# tests for the posix *at functions follow +class TestPosixDirFd(unittest.TestCase): + count = 0 + + @contextmanager + def prepare(self): + TestPosixDirFd.count += 1 + name = f'{os_helper.TESTFN}_{self.count}' + base_dir = f'{os_helper.TESTFN}_{self.count}base' + posix.mkdir(base_dir) + self.addCleanup(posix.rmdir, base_dir) + fullname = os.path.join(base_dir, name) + assert not os.path.exists(fullname) + with os_helper.open_dir_fd(base_dir) as dir_fd: + yield (dir_fd, name, fullname) + + @contextmanager + def prepare_file(self): + with self.prepare() as (dir_fd, name, fullname): + os_helper.create_empty_file(fullname) + self.addCleanup(posix.unlink, fullname) + yield (dir_fd, name, fullname) + + @unittest.skipUnless(os.access in os.supports_dir_fd, "test needs dir_fd support for os.access()") + def test_access_dir_fd(self): + with self.prepare_file() as (dir_fd, name, fullname): + self.assertTrue(posix.access(name, os.R_OK, dir_fd=dir_fd)) + + @unittest.skipUnless(os.chmod in os.supports_dir_fd, "test needs dir_fd support in os.chmod()") + def test_chmod_dir_fd(self): + with self.prepare_file() as (dir_fd, name, fullname): + posix.chmod(fullname, stat.S_IRUSR) + posix.chmod(name, stat.S_IRUSR | stat.S_IWUSR, dir_fd=dir_fd) + s = posix.stat(fullname) + self.assertEqual(s.st_mode & stat.S_IRWXU, + stat.S_IRUSR | stat.S_IWUSR) + + @unittest.skipUnless(hasattr(os, 'chown') and (os.chown in os.supports_dir_fd), + "test needs dir_fd support in os.chown()") + def test_chown_dir_fd(self): + with self.prepare_file() as (dir_fd, name, fullname): + posix.chown(name, os.getuid(), os.getgid(), dir_fd=dir_fd) + + @unittest.skipUnless(os.stat in os.supports_dir_fd, "test needs dir_fd support in os.stat()") + def test_stat_dir_fd(self): + with self.prepare() as (dir_fd, name, fullname): + with open(fullname, 'w') as outfile: + outfile.write("testline\n") + self.addCleanup(posix.unlink, fullname) + + s1 = posix.stat(fullname) + s2 = posix.stat(name, dir_fd=dir_fd) + self.assertEqual(s1, s2) + s2 = posix.stat(fullname, dir_fd=None) + self.assertEqual(s1, s2) + + self.assertRaisesRegex(TypeError, 'should be integer or None, not', + posix.stat, name, dir_fd=posix.getcwd()) + self.assertRaisesRegex(TypeError, 'should be integer or None, not', + posix.stat, name, dir_fd=float(dir_fd)) + self.assertRaises(OverflowError, + posix.stat, name, dir_fd=10**20) + + @unittest.skipUnless(os.utime in os.supports_dir_fd, "test needs dir_fd support in os.utime()") + def test_utime_dir_fd(self): + with self.prepare_file() as (dir_fd, name, fullname): + now = time.time() + posix.utime(name, None, dir_fd=dir_fd) + posix.utime(name, dir_fd=dir_fd) + self.assertRaises(TypeError, posix.utime, name, + now, dir_fd=dir_fd) + self.assertRaises(TypeError, posix.utime, name, + (None, None), dir_fd=dir_fd) + self.assertRaises(TypeError, posix.utime, name, + (now, None), dir_fd=dir_fd) + self.assertRaises(TypeError, posix.utime, name, + (None, now), dir_fd=dir_fd) + self.assertRaises(TypeError, posix.utime, name, + (now, "x"), dir_fd=dir_fd) + posix.utime(name, (int(now), int(now)), dir_fd=dir_fd) + posix.utime(name, (now, now), dir_fd=dir_fd) + posix.utime(name, + (int(now), int((now - int(now)) * 1e9)), dir_fd=dir_fd) + posix.utime(name, dir_fd=dir_fd, + times=(int(now), int((now - int(now)) * 1e9))) + + # try dir_fd and follow_symlinks together + if os.utime in os.supports_follow_symlinks: + try: + posix.utime(name, follow_symlinks=False, dir_fd=dir_fd) + except ValueError: + # whoops! using both together not supported on this platform. + pass + + @unittest.skipUnless(os.link in os.supports_dir_fd, "test needs dir_fd support in os.link()") + def test_link_dir_fd(self): + with self.prepare_file() as (dir_fd, name, fullname), \ + self.prepare() as (dir_fd2, linkname, fulllinkname): + try: + posix.link(name, linkname, src_dir_fd=dir_fd, dst_dir_fd=dir_fd2) + except PermissionError as e: + self.skipTest('posix.link(): %s' % e) + self.addCleanup(posix.unlink, fulllinkname) + # should have same inodes + self.assertEqual(posix.stat(fullname)[1], + posix.stat(fulllinkname)[1]) + + @unittest.skipUnless(os.mkdir in os.supports_dir_fd, "test needs dir_fd support in os.mkdir()") + def test_mkdir_dir_fd(self): + with self.prepare() as (dir_fd, name, fullname): + posix.mkdir(name, dir_fd=dir_fd) + self.addCleanup(posix.rmdir, fullname) + posix.stat(fullname) # should not raise exception + + @unittest.skipUnless(hasattr(os, 'mknod') + and (os.mknod in os.supports_dir_fd) + and hasattr(stat, 'S_IFIFO'), + "test requires both stat.S_IFIFO and dir_fd support for os.mknod()") + def test_mknod_dir_fd(self): + # Test using mknodat() to create a FIFO (the only use specified + # by POSIX). + with self.prepare() as (dir_fd, name, fullname): + mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR + try: + posix.mknod(name, mode, 0, dir_fd=dir_fd) + except OSError as e: + # Some old systems don't allow unprivileged users to use + # mknod(), or only support creating device nodes. + self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES)) + else: + self.addCleanup(posix.unlink, fullname) + self.assertTrue(stat.S_ISFIFO(posix.stat(fullname).st_mode)) + + @unittest.skipUnless(os.open in os.supports_dir_fd, "test needs dir_fd support in os.open()") + def test_open_dir_fd(self): + with self.prepare() as (dir_fd, name, fullname): + with open(fullname, 'wb') as outfile: + outfile.write(b"testline\n") + self.addCleanup(posix.unlink, fullname) + fd = posix.open(name, posix.O_RDONLY, dir_fd=dir_fd) + try: + res = posix.read(fd, 9) + self.assertEqual(b"testline\n", res) + finally: + posix.close(fd) + + @unittest.skipUnless(hasattr(os, 'readlink') and (os.readlink in os.supports_dir_fd), + "test needs dir_fd support in os.readlink()") + def test_readlink_dir_fd(self): + with self.prepare() as (dir_fd, name, fullname): + os.symlink('symlink', fullname) + self.addCleanup(posix.unlink, fullname) + self.assertEqual(posix.readlink(name, dir_fd=dir_fd), 'symlink') + + @unittest.skipUnless(os.rename in os.supports_dir_fd, "test needs dir_fd support in os.rename()") + def test_rename_dir_fd(self): + with self.prepare_file() as (dir_fd, name, fullname), \ + self.prepare() as (dir_fd2, name2, fullname2): + posix.rename(name, name2, + src_dir_fd=dir_fd, dst_dir_fd=dir_fd2) + posix.stat(fullname2) # should not raise exception + posix.rename(fullname2, fullname) + + @unittest.skipUnless(os.symlink in os.supports_dir_fd, "test needs dir_fd support in os.symlink()") + def test_symlink_dir_fd(self): + with self.prepare() as (dir_fd, name, fullname): + posix.symlink('symlink', name, dir_fd=dir_fd) + self.addCleanup(posix.unlink, fullname) + self.assertEqual(posix.readlink(fullname), 'symlink') + + @unittest.skipUnless(os.unlink in os.supports_dir_fd, "test needs dir_fd support in os.unlink()") + def test_unlink_dir_fd(self): + with self.prepare() as (dir_fd, name, fullname): + os_helper.create_empty_file(fullname) + posix.stat(fullname) # should not raise exception + try: + posix.unlink(name, dir_fd=dir_fd) + self.assertRaises(OSError, posix.stat, fullname) + except: + self.addCleanup(posix.unlink, fullname) + raise + + @unittest.skipUnless(os.mkfifo in os.supports_dir_fd, "test needs dir_fd support in os.mkfifo()") + def test_mkfifo_dir_fd(self): + with self.prepare() as (dir_fd, name, fullname): + try: + posix.mkfifo(name, stat.S_IRUSR | stat.S_IWUSR, dir_fd=dir_fd) + except PermissionError as e: + self.skipTest('posix.mkfifo(): %s' % e) + self.addCleanup(posix.unlink, fullname) + self.assertTrue(stat.S_ISFIFO(posix.stat(fullname).st_mode)) + + class PosixGroupsTester(unittest.TestCase): def setUp(self):