Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-73991: Support preserving metadata in pathlib.Path.copy() #120806

Merged
merged 15 commits into from
Jul 6, 2024
12 changes: 6 additions & 6 deletions Doc/library/pathlib.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1539,7 +1539,7 @@ Creating files and directories
Copying, renaming and deleting
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. method:: Path.copy(target, *, follow_symlinks=True)
.. method:: Path.copy(target, *, follow_symlinks=True, preserve_metadata=False)

Copy the contents of this file to the *target* file. If *target* specifies
a file that already exists, it will be replaced.
Expand All @@ -1548,11 +1548,11 @@ Copying, renaming and deleting
will be created as a symbolic link. If *follow_symlinks* is true and this
file is a symbolic link, *target* will be a copy of the symlink target.

.. note::
This method uses operating system functionality to copy file content
efficiently. The OS might also copy some metadata, such as file
permissions. After the copy is complete, users may wish to call
:meth:`Path.chmod` to set the permissions of the target file.
If *preserve_metadata* is false (the default), only the file data is
guaranteed to be copied. Set *preserve_metadata* to true to ensure that the
file mode (permissions), flags, last access and modification times, and
extended attributes are copied where supported. This argument has no effect
on Windows, where metadata is always preserved when copying.

.. versionadded:: 3.14

Expand Down
31 changes: 30 additions & 1 deletion Lib/pathlib/_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,32 @@ def mkdir(self, mode=0o777, parents=False, exist_ok=False):
"""
raise UnsupportedOperation(self._unsupported_msg('mkdir()'))

def copy(self, target, follow_symlinks=True):
# Metadata keys supported by this path type.
_readable_metadata = _writable_metadata = frozenset()

def _read_metadata(self, keys=None, *, follow_symlinks=True):
"""
Returns path metadata as a dict with string keys.
"""
raise UnsupportedOperation(self._unsupported_msg('_read_metadata()'))

def _write_metadata(self, metadata, *, follow_symlinks=True):
"""
Sets path metadata from the given dict with string keys.
"""
raise UnsupportedOperation(self._unsupported_msg('_write_metadata()'))

def _copy_metadata(self, target, *, follow_symlinks=True):
"""
Copies metadata (permissions, timestamps, etc) from this path to target.
"""
# Metadata types supported by both source and target.
keys = self._readable_metadata & target._writable_metadata
if keys:
metadata = self._read_metadata(keys, follow_symlinks=follow_symlinks)
target._write_metadata(metadata, follow_symlinks=follow_symlinks)

def copy(self, target, *, follow_symlinks=True, preserve_metadata=False):
"""
Copy the contents of this file to the given target. If this file is a
symlink and follow_symlinks is false, a symlink will be created at the
Expand All @@ -793,6 +818,8 @@ def copy(self, target, follow_symlinks=True):
raise OSError(f"{self!r} and {target!r} are the same file")
if not follow_symlinks and self.is_symlink():
target.symlink_to(self.readlink())
if preserve_metadata:
self._copy_metadata(target, follow_symlinks=False)
return
with self.open('rb') as source_f:
try:
Expand All @@ -805,6 +832,8 @@ def copy(self, target, follow_symlinks=True):
f'Directory does not exist: {target}') from e
else:
raise
if preserve_metadata:
self._copy_metadata(target)

def copytree(self, target, *, follow_symlinks=True, dirs_exist_ok=False,
ignore=None, on_error=None):
Expand Down
12 changes: 9 additions & 3 deletions Lib/pathlib/_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
except ImportError:
grp = None

from ._os import UnsupportedOperation, copyfile
from ._os import (UnsupportedOperation, copyfile, file_metadata_keys,
read_file_metadata, write_file_metadata)
from ._abc import PurePathBase, PathBase


Expand Down Expand Up @@ -781,8 +782,12 @@ def mkdir(self, mode=0o777, parents=False, exist_ok=False):
if not exist_ok or not self.is_dir():
raise

_readable_metadata = _writable_metadata = file_metadata_keys
_read_metadata = read_file_metadata
_write_metadata = write_file_metadata

if copyfile:
def copy(self, target, follow_symlinks=True):
def copy(self, target, *, follow_symlinks=True, preserve_metadata=False):
"""
Copy the contents of this file to the given target. If this file is a
symlink and follow_symlinks is false, a symlink will be created at the
Expand All @@ -799,7 +804,8 @@ def copy(self, target, follow_symlinks=True):
return
except UnsupportedOperation:
pass # Fall through to generic code.
PathBase.copy(self, target, follow_symlinks=follow_symlinks)
PathBase.copy(self, target, follow_symlinks=follow_symlinks,
preserve_metadata=preserve_metadata)

def chmod(self, mode, *, follow_symlinks=True):
"""
Expand Down
99 changes: 98 additions & 1 deletion Lib/pathlib/_os.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Low-level OS functionality wrappers used by pathlib.
"""

from errno import EBADF, EOPNOTSUPP, ETXTBSY, EXDEV
from errno import *
import os
import stat
import sys
Expand Down Expand Up @@ -178,3 +178,100 @@ def copyfileobj(source_f, target_f):
write_target = target_f.write
while buf := read_source(1024 * 1024):
write_target(buf)


# Kinds of metadata supported by the operating system.
file_metadata_keys = {'mode', 'times_ns'}
if hasattr(os.stat_result, 'st_flags'):
file_metadata_keys.add('flags')
if hasattr(os, 'listxattr'):
file_metadata_keys.add('xattrs')
file_metadata_keys = frozenset(file_metadata_keys)


def read_file_metadata(path, keys=None, *, follow_symlinks=True):
"""
Returns local path metadata as a dict with string keys.
"""
if keys is None:
keys = file_metadata_keys
assert keys.issubset(file_metadata_keys)
result = {}
for key in keys:
if key == 'xattrs':
try:
result['xattrs'] = [
(attr, os.getxattr(path, attr, follow_symlinks=follow_symlinks))
for attr in os.listxattr(path, follow_symlinks=follow_symlinks)]
except OSError as err:
if err.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
raise
continue
st = os.stat(path, follow_symlinks=follow_symlinks)
if key == 'mode':
result['mode'] = stat.S_IMODE(st.st_mode)
elif key == 'times_ns':
result['times_ns'] = st.st_atime_ns, st.st_mtime_ns
elif key == 'flags':
result['flags'] = st.st_flags
return result


def write_file_metadata(path, metadata, *, follow_symlinks=True):
"""
Sets local path metadata from the given dict with string keys.
"""
assert frozenset(metadata.keys()).issubset(file_metadata_keys)

def _nop(*args, ns=None, follow_symlinks=None):
pass

if follow_symlinks:
# use the real function if it exists
def lookup(name):
return getattr(os, name, _nop)
else:
# use the real function only if it exists
# *and* it supports follow_symlinks
def lookup(name):
fn = getattr(os, name, _nop)
if fn in os.supports_follow_symlinks:
return fn
return _nop

times_ns = metadata.get('times_ns')
if times_ns is not None:
lookup("utime")(path, ns=times_ns, follow_symlinks=follow_symlinks)
# We must copy extended attributes before the file is (potentially)
# chmod()'ed read-only, otherwise setxattr() will error with -EACCES.
xattrs = metadata.get('xattrs')
if xattrs is not None:
for attr, value in xattrs:
try:
os.setxattr(path, attr, value, follow_symlinks=follow_symlinks)
except OSError as e:
if e.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
raise
mode = metadata.get('mode')
if mode is not None:
try:
lookup("chmod")(path, mode, follow_symlinks=follow_symlinks)
except NotImplementedError:
# if we got a NotImplementedError, it's because
# * follow_symlinks=False,
# * lchown() is unavailable, and
# * either
# * fchownat() is unavailable or
# * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW.
# (it returned ENOSUP.)
# therefore we're out of options--we simply cannot chown the
# symlink. give up, suppress the error.
# (which is what shutil always did in this circumstance.)
pass
flags = metadata.get('flags')
if flags is not None:
try:
lookup("chflags")(path, flags, follow_symlinks=follow_symlinks)
except OSError as why:
if why.errno not in (EOPNOTSUPP, ENOTSUP):
raise
44 changes: 44 additions & 0 deletions Lib/test/test_pathlib/test_pathlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,50 @@ def test_open_unbuffered(self):
self.assertIsInstance(f, io.RawIOBase)
self.assertEqual(f.read().strip(), b"this is file A")

def test_copy_file_preserve_metadata(self):
base = self.cls(self.base)
source = base / 'fileA'
if hasattr(os, 'setxattr'):
os.setxattr(source, b'user.foo', b'42')
if hasattr(os, 'chmod'):
os.chmod(source, stat.S_IRWXU | stat.S_IRWXO)
if hasattr(os, 'chflags') and hasattr(stat, 'UF_NODUMP'):
os.chflags(source, stat.UF_NODUMP)
source_st = source.stat()
target = base / 'copyA'
source.copy(target, preserve_metadata=True)
self.assertTrue(target.exists())
self.assertEqual(source.read_text(), target.read_text())
target_st = target.stat()
self.assertLessEqual(source_st.st_atime, target_st.st_atime)
self.assertLessEqual(source_st.st_mtime, target_st.st_mtime)
if hasattr(os, 'getxattr'):
self.assertEqual(os.getxattr(target, b'user.foo'), b'42')
self.assertEqual(source_st.st_mode, target_st.st_mode)
if hasattr(source_st, 'st_flags'):
self.assertEqual(source_st.st_flags, target_st.st_flags)

@needs_symlinks
def test_copy_link_preserve_metadata(self):
base = self.cls(self.base)
source = base / 'linkA'
if hasattr(os, 'lchmod'):
os.lchmod(source, stat.S_IRWXU | stat.S_IRWXO)
if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
os.lchflags(source, stat.UF_NODUMP)
source_st = source.lstat()
target = base / 'copyA'
source.copy(target, follow_symlinks=False, preserve_metadata=True)
self.assertTrue(target.exists())
self.assertTrue(target.is_symlink())
self.assertEqual(source.readlink(), target.readlink())
target_st = target.lstat()
self.assertLessEqual(source_st.st_atime, target_st.st_atime)
self.assertLessEqual(source_st.st_mtime, target_st.st_mtime)
self.assertEqual(source_st.st_mode, target_st.st_mode)
if hasattr(source_st, 'st_flags'):
self.assertEqual(source_st.st_flags, target_st.st_flags)

@unittest.skipIf(sys.platform == "win32" or sys.platform == "wasi", "directories are always readable on Windows and WASI")
@unittest.skipIf(root_in_posix, "test fails with root privilege")
def test_copytree_no_read_permission(self):
Expand Down
Loading