Skip to content

Commit

Permalink
gh-91133: tempfile.TemporaryDirectory: fix symlink bug in cleanup (GH…
Browse files Browse the repository at this point in the history
…-99930)

Co-authored-by: Serhiy Storchaka <storchaka@gmail.com>
  • Loading branch information
kwi-dk and serhiy-storchaka committed Dec 7, 2023
1 parent 21221c3 commit 81c16cd
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 15 deletions.
27 changes: 18 additions & 9 deletions Lib/tempfile.py
Expand Up @@ -270,6 +270,22 @@ def _mkstemp_inner(dir, pre, suf, flags, output_type):
raise FileExistsError(_errno.EEXIST,
"No usable temporary file name found")

def _dont_follow_symlinks(func, path, *args):
# Pass follow_symlinks=False, unless not supported on this platform.
if func in _os.supports_follow_symlinks:
func(path, *args, follow_symlinks=False)
elif _os.name == 'nt' or not _os.path.islink(path):
func(path, *args)

def _resetperms(path):
try:
chflags = _os.chflags
except AttributeError:
pass
else:
_dont_follow_symlinks(chflags, path, 0)
_dont_follow_symlinks(_os.chmod, path, 0o700)


# User visible interfaces.

Expand Down Expand Up @@ -876,17 +892,10 @@ def __init__(self, suffix=None, prefix=None, dir=None,
def _rmtree(cls, name, ignore_errors=False):
def onexc(func, path, exc):
if isinstance(exc, PermissionError):
def resetperms(path):
try:
_os.chflags(path, 0)
except AttributeError:
pass
_os.chmod(path, 0o700)

try:
if path != name:
resetperms(_os.path.dirname(path))
resetperms(path)
_resetperms(_os.path.dirname(path))
_resetperms(path)

try:
_os.unlink(path)
Expand Down
111 changes: 105 additions & 6 deletions Lib/test/test_tempfile.py
Expand Up @@ -1673,6 +1673,103 @@ def test_cleanup_with_symlink_to_a_directory(self):
"were deleted")
d2.cleanup()

@os_helper.skip_unless_symlink
def test_cleanup_with_symlink_modes(self):
# cleanup() should not follow symlinks when fixing mode bits (#91133)
with self.do_create(recurse=0) as d2:
file1 = os.path.join(d2, 'file1')
open(file1, 'wb').close()
dir1 = os.path.join(d2, 'dir1')
os.mkdir(dir1)
for mode in range(8):
mode <<= 6
with self.subTest(mode=format(mode, '03o')):
def test(target, target_is_directory):
d1 = self.do_create(recurse=0)
symlink = os.path.join(d1.name, 'symlink')
os.symlink(target, symlink,
target_is_directory=target_is_directory)
try:
os.chmod(symlink, mode, follow_symlinks=False)
except NotImplementedError:
pass
try:
os.chmod(symlink, mode)
except FileNotFoundError:
pass
os.chmod(d1.name, mode)
d1.cleanup()
self.assertFalse(os.path.exists(d1.name))

with self.subTest('nonexisting file'):
test('nonexisting', target_is_directory=False)
with self.subTest('nonexisting dir'):
test('nonexisting', target_is_directory=True)

with self.subTest('existing file'):
os.chmod(file1, mode)
old_mode = os.stat(file1).st_mode
test(file1, target_is_directory=False)
new_mode = os.stat(file1).st_mode
self.assertEqual(new_mode, old_mode,
'%03o != %03o' % (new_mode, old_mode))

with self.subTest('existing dir'):
os.chmod(dir1, mode)
old_mode = os.stat(dir1).st_mode
test(dir1, target_is_directory=True)
new_mode = os.stat(dir1).st_mode
self.assertEqual(new_mode, old_mode,
'%03o != %03o' % (new_mode, old_mode))

@unittest.skipUnless(hasattr(os, 'chflags'), 'requires os.chflags')
@os_helper.skip_unless_symlink
def test_cleanup_with_symlink_flags(self):
# cleanup() should not follow symlinks when fixing flags (#91133)
flags = stat.UF_IMMUTABLE | stat.UF_NOUNLINK
self.check_flags(flags)

with self.do_create(recurse=0) as d2:
file1 = os.path.join(d2, 'file1')
open(file1, 'wb').close()
dir1 = os.path.join(d2, 'dir1')
os.mkdir(dir1)
def test(target, target_is_directory):
d1 = self.do_create(recurse=0)
symlink = os.path.join(d1.name, 'symlink')
os.symlink(target, symlink,
target_is_directory=target_is_directory)
try:
os.chflags(symlink, flags, follow_symlinks=False)
except NotImplementedError:
pass
try:
os.chflags(symlink, flags)
except FileNotFoundError:
pass
os.chflags(d1.name, flags)
d1.cleanup()
self.assertFalse(os.path.exists(d1.name))

with self.subTest('nonexisting file'):
test('nonexisting', target_is_directory=False)
with self.subTest('nonexisting dir'):
test('nonexisting', target_is_directory=True)

with self.subTest('existing file'):
os.chflags(file1, flags)
old_flags = os.stat(file1).st_flags
test(file1, target_is_directory=False)
new_flags = os.stat(file1).st_flags
self.assertEqual(new_flags, old_flags)

with self.subTest('existing dir'):
os.chflags(dir1, flags)
old_flags = os.stat(dir1).st_flags
test(dir1, target_is_directory=True)
new_flags = os.stat(dir1).st_flags
self.assertEqual(new_flags, old_flags)

@support.cpython_only
def test_del_on_collection(self):
# A TemporaryDirectory is deleted when garbage collected
Expand Down Expand Up @@ -1845,10 +1942,7 @@ def test_modes(self):
d.cleanup()
self.assertFalse(os.path.exists(d.name))

@unittest.skipUnless(hasattr(os, 'chflags'), 'requires os.chflags')
def test_flags(self):
flags = stat.UF_IMMUTABLE | stat.UF_NOUNLINK

def check_flags(self, flags):
# skip the test if these flags are not supported (ex: FreeBSD 13)
filename = os_helper.TESTFN
try:
Expand All @@ -1857,13 +1951,18 @@ def test_flags(self):
os.chflags(filename, flags)
except OSError as exc:
# "OSError: [Errno 45] Operation not supported"
self.skipTest(f"chflags() doesn't support "
f"UF_IMMUTABLE|UF_NOUNLINK: {exc}")
self.skipTest(f"chflags() doesn't support flags "
f"{flags:#b}: {exc}")
else:
os.chflags(filename, 0)
finally:
os_helper.unlink(filename)

@unittest.skipUnless(hasattr(os, 'chflags'), 'requires os.chflags')
def test_flags(self):
flags = stat.UF_IMMUTABLE | stat.UF_NOUNLINK
self.check_flags(flags)

d = self.do_create(recurse=3, dirs=2, files=2)
with d:
# Change files and directories flags recursively.
Expand Down
@@ -0,0 +1,2 @@
Fix a bug in :class:`tempfile.TemporaryDirectory` cleanup, which now no longer
dereferences symlinks when working around file system permission errors.

0 comments on commit 81c16cd

Please sign in to comment.