You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
{{ message }}
This repository has been archived by the owner on Jul 16, 2023. It is now read-only.
# TODO: raise ValueErrors on closed for all methods?
# -*- coding: utf-8 -*-"""Virtually every Python programmer has used Python for wranglingdisk contents, and ``fileutils`` collects solutions to some of themost commonly-found gaps in the standard library."""from __future__ importprint_functionimportosimportreimportsysimportstatimporterrnoimportfnmatchfromshutilimportcopy2, copystat, Error__all__= ['mkdir_p', 'atomic_save', 'AtomicSaver', 'FilePerms',
'iter_find_files', 'copytree']
FULL_PERMS=511# 0777 that both Python 2 and 3 can digestRW_PERMS=438_SINGLE_FULL_PERM=7# or 07 in Python 2try:
basestringexceptNameError:
unicode=str# Python 3 compatbasestring= (str, bytes)
defmkdir_p(path):
"""Creates a directory and any parent directories that may need to be created along the way, without raising errors for any existing directories. This function mimics the behavior of the ``mkdir -p`` command available in Linux/BSD environments, but also works on Windows. """try:
os.makedirs(path)
exceptOSErrorasexc:
ifexc.errno==errno.EEXISTandos.path.isdir(path):
returnraisereturnclassFilePerms(object):
"""The :class:`FilePerms` type is used to represent standard POSIX filesystem permissions: * Read * Write * Execute Across three classes of user: * Owning (u)ser * Owner's (g)roup * Any (o)ther user This class assists with computing new permissions, as well as working with numeric octal ``777``-style and ``rwx``-style permissions. Currently it only considers the bottom 9 permission bits; it does not support sticky bits or more advanced permission systems. Args: user (str): A string in the 'rwx' format, omitting characters for which owning user's permissions are not provided. group (str): A string in the 'rwx' format, omitting characters for which owning group permissions are not provided. other (str): A string in the 'rwx' format, omitting characters for which owning other/world permissions are not provided. There are many ways to use :class:`FilePerms`: >>> FilePerms(user='rwx', group='xrw', other='wxr') # note character order FilePerms(user='rwx', group='rwx', other='rwx') >>> int(FilePerms('r', 'r', '')) 288 >>> oct(288)[-3:] # XXX Py3k '440' See also the :meth:`FilePerms.from_int` and :meth:`FilePerms.from_path` classmethods for useful alternative ways to construct :class:`FilePerms` objects. """# TODO: consider more than the lower 9 bitsclass_FilePermProperty(object):
_perm_chars='rwx'_perm_set=frozenset('rwx')
_perm_val= {'r': 4, 'w': 2, 'x': 1} # for sortingdef__init__(self, attribute, offset):
self.attribute=attributeself.offset=offsetdef__get__(self, fp_obj, type_=None):
iffp_objisNone:
returnselfreturngetattr(fp_obj, self.attribute)
def__set__(self, fp_obj, value):
cur=getattr(fp_obj, self.attribute)
ifcur==value:
returntry:
invalid_chars=set(str(value)) -self._perm_setexceptTypeError:
raiseTypeError('expected string, not %r'%value)
ifinvalid_chars:
raiseValueError('got invalid chars %r in permission'' specification %r, expected empty string'' or one or more of %r'% (invalid_chars, value, self._perm_chars))
sort_key=lambdac: self._perm_val[c]
new_value=''.join(sorted(set(value),
key=sort_key, reverse=True))
setattr(fp_obj, self.attribute, new_value)
self._update_integer(fp_obj, new_value)
def_update_integer(self, fp_obj, value):
mode=0key='xwr'forsymbolinvalue:
bit=2**key.index(symbol)
mode |= (bit<< (self.offset*3))
fp_obj._integer |= modedef__init__(self, user='', group='', other=''):
self._user, self._group, self._other='', '', ''self._integer=0self.user=userself.group=groupself.other=other@classmethoddeffrom_int(cls, i):
"""Create a :class:`FilePerms` object from an integer. >>> FilePerms.from_int(0o644) # note the leading zero-oh for octal FilePerms(user='rw', group='r', other='r') """i &= FULL_PERMSkey= ('', 'x', 'w', 'xw', 'r', 'rx', 'rw', 'rwx')
parts= []
whilei:
parts.append(key[i&_SINGLE_FULL_PERM])
i >>= 3parts.reverse()
returncls(*parts)
@classmethoddeffrom_path(cls, path):
"""Make a new :class:`FilePerms` object based on the permissions assigned to the file or directory at *path*. Args: path (str): Filesystem path of the target file. Here's an example that holds true on most systems: >>> import tempfile >>> 'r' in FilePerms.from_path(tempfile.gettempdir()).user True """stat_res=os.stat(path)
returncls.from_int(stat.S_IMODE(stat_res.st_mode))
def__int__(self):
returnself._integer# Sphinx tip: attribute docstrings come after the attributeuser=_FilePermProperty('_user', 2)
"Stores the ``rwx``-formatted *user* permission."group=_FilePermProperty('_group', 1)
"Stores the ``rwx``-formatted *group* permission."other=_FilePermProperty('_other', 0)
"Stores the ``rwx``-formatted *other* permission."def__repr__(self):
cn=self.__class__.__name__return ('%s(user=%r, group=%r, other=%r)'% (cn, self.user, self.group, self.other))
####_TEXT_OPENFLAGS=os.O_RDWR|os.O_CREAT|os.O_EXCLifhasattr(os, 'O_NOINHERIT'):
_TEXT_OPENFLAGS |= os.O_NOINHERITifhasattr(os, 'O_NOFOLLOW'):
_TEXT_OPENFLAGS |= os.O_NOFOLLOW_BIN_OPENFLAGS=_TEXT_OPENFLAGSifhasattr(os, 'O_BINARY'):
_BIN_OPENFLAGS |= os.O_BINARYtry:
importfcntlasfcntlexceptImportError:
defset_cloexec(fd):
"Dummy set_cloexec for platforms without fcntl support"passelse:
defset_cloexec(fd):
"""Does a best-effort :func:`fcntl.fcntl` call to set a fd to be automatically closed by any future child processes. Implementation from the :mod:`tempfile` module. """try:
flags=fcntl.fcntl(fd, fcntl.F_GETFD, 0)
exceptIOError:
passelse:
# flags read successfully, modifyflags |= fcntl.FD_CLOEXECfcntl.fcntl(fd, fcntl.F_SETFD, flags)
returndefatomic_save(dest_path, **kwargs):
"""A convenient interface to the :class:`AtomicSaver` type. See the :class:`AtomicSaver` documentation for details. """returnAtomicSaver(dest_path, **kwargs)
defpath_to_unicode(path):
ifisinstance(path, unicode):
returnpathencoding=sys.getfilesystemencoding() orsys.getdefaultencoding()
returnpath.decode(encoding)
ifos.name=='nt':
importctypesfromctypesimportc_wchar_pfromctypes.wintypesimportDWORD, LPVOID_ReplaceFile=ctypes.windll.kernel32.ReplaceFile_ReplaceFile.argtypes= [c_wchar_p, c_wchar_p, c_wchar_p,
DWORD, LPVOID, LPVOID]
defreplace(src, dst):
# argument names match stdlib docs, docstring belowtry:
# ReplaceFile fails if the dest file does not exist, so# first try to rename it into positionos.rename(src, dst)
returnexceptWindowsErroraswe:
ifwe.errno==errno.EEXIST:
pass# continue with the ReplaceFile logic belowelse:
raisesrc=path_to_unicode(src)
dst=path_to_unicode(dst)
res=_ReplaceFile(c_wchar_p(dst), c_wchar_p(src),
None, 0, None, None)
ifnotres:
raiseOSError('failed to replace %r with %r'% (dst, src))
returndefatomic_rename(src, dst, overwrite=False):
"Rename *src* to *dst*, replacing *dst* if *overwrite is True"ifoverwrite:
replace(src, dst)
else:
os.rename(src, dst)
returnelse:
# wrapper func for cross compat + docsdefreplace(src, dst):
# os.replace does the same thing on unixreturnos.rename(src, dst)
defatomic_rename(src, dst, overwrite=False):
"Rename *src* to *dst*, replacing *dst* if *overwrite is True"ifoverwrite:
os.rename(src, dst)
else:
os.link(src, dst)
os.unlink(src)
return_atomic_rename=atomic_rename# backwards compatreplace.__doc__="""Similar to :func:`os.replace` in Python 3.3+,this function will atomically create or replace the file at path*dst* with the file at path *src*.On Windows, this function uses the ReplaceFile API for maximumpossible atomicity on a range of filesystems."""classAtomicSaver(object):
"""``AtomicSaver`` is a configurable `context manager`_ that provides a writable :class:`file` which will be moved into place as long as no exceptions are raised within the context manager's block. These "part files" are created in the same directory as the destination path to ensure atomic move operations (i.e., no cross-filesystem moves occur). Args: dest_path (str): The path where the completed file will be written. overwrite (bool): Whether to overwrite the destination file if it exists at completion time. Defaults to ``True``. file_perms (int): Integer representation of file permissions for the newly-created file. Defaults are, when the destination path already exists, to copy the permissions from the previous file, or if the file did not exist, to respect the user's configured `umask`_, usually resulting in octal 0644 or 0664. part_file (str): Name of the temporary *part_file*. Defaults to *dest_path* + ``.part``. Note that this argument is just the filename, and not the full path of the part file. To guarantee atomic saves, part files are always created in the same directory as the destination path. overwrite_part (bool): Whether to overwrite the *part_file*, should it exist at setup time. Defaults to ``False``, which results in an :exc:`OSError` being raised on pre-existing part files. Be careful of setting this to ``True`` in situations when multiple threads or processes could be writing to the same part file. rm_part_on_exc (bool): Remove *part_file* on exception cases. Defaults to ``True``, but ``False`` can be useful for recovery in some cases. Note that resumption is not automatic and by default an :exc:`OSError` is raised if the *part_file* exists. Practically, the AtomicSaver serves a few purposes: * Avoiding overwriting an existing, valid file with a partially written one. * Providing a reasonable guarantee that a part file only has one writer at a time. * Optional recovery of partial data in failure cases. .. _context manager: https://docs.python.org/2/reference/compound_stmts.html#with .. _umask: https://en.wikipedia.org/wiki/Umask """_default_file_perms=RW_PERMS# TODO: option to abort if target file modify date has changed since start?def__init__(self, dest_path, **kwargs):
self.dest_path=dest_pathself.overwrite=kwargs.pop('overwrite', True)
self.file_perms=kwargs.pop('file_perms', None)
self.overwrite_part=kwargs.pop('overwrite_part', False)
self.part_filename=kwargs.pop('part_file', None)
self.rm_part_on_exc=kwargs.pop('rm_part_on_exc', True)
self.text_mode=kwargs.pop('text_mode', False) # for windowsself.buffering=kwargs.pop('buffering', -1)
ifkwargs:
raiseTypeError('unexpected kwargs: %r'% (kwargs.keys(),))
self.dest_path=os.path.abspath(self.dest_path)
self.dest_dir=os.path.dirname(self.dest_path)
ifnotself.part_filename:
self.part_path=dest_path+'.part'else:
self.part_path=os.path.join(self.dest_dir, self.part_filename)
self.mode='w+'ifself.text_modeelse'w+b'self.open_flags=_TEXT_OPENFLAGSifself.text_modeelse_BIN_OPENFLAGSself.part_file=Nonedef_open_part_file(self):
do_chmod=Truefile_perms=self.file_permsiffile_permsisNone:
try:
# try to copy from file being replacedstat_res=os.stat(self.dest_path)
file_perms=stat.S_IMODE(stat_res.st_mode)
except (OSError, IOError):
# default if no destination file existsfile_perms=self._default_file_permsdo_chmod=False# respect the umaskfd=os.open(self.part_path, self.open_flags, file_perms)
set_cloexec(fd)
self.part_file=os.fdopen(fd, self.mode, self.buffering)
# if default perms are overridden by the user or previous dest_path# chmod away the effects of the umaskifdo_chmod:
try:
os.chmod(self.part_path, file_perms)
except (OSError, IOError):
self.part_file.close()
raisereturndefsetup(self):
"""Called on context manager entry (the :keyword:`with` statement), the ``setup()`` method creates the temporary file in the same directory as the destination file. ``setup()`` tests for a writable directory with rename permissions early, as the part file may not be written to immediately (not using :func:`os.access` because of the potential issues of effective vs. real privileges). If the caller is not using the :class:`AtomicSaver` as a context manager, this method should be called explicitly before writing. """ifos.path.lexists(self.dest_path):
ifnotself.overwrite:
raiseOSError(errno.EEXIST,
'Overwrite disabled and file already exists',
self.dest_path)
ifself.overwrite_partandos.path.lexists(self.part_path):
os.unlink(self.part_path)
self._open_part_file()
returndef__enter__(self):
self.setup()
returnself.part_filedef__exit__(self, exc_type, exc_val, exc_tb):
self.part_file.close()
ifexc_type:
ifself.rm_part_on_exc:
try:
os.unlink(self.part_path)
exceptException:
pass# avoid masking original errorreturntry:
atomic_rename(self.part_path, self.dest_path,
overwrite=self.overwrite)
exceptOSError:
ifself.rm_part_on_exc:
try:
os.unlink(self.part_path)
exceptException:
pass# avoid masking original errorraise# could not save destination filereturndefiter_find_files(directory, patterns, ignored=None, include_dirs=False):
"""Returns a generator that yields file paths under a *directory*, matching *patterns* using `glob`_ syntax (e.g., ``*.txt``). Also supports *ignored* patterns. Args: directory (str): Path that serves as the root of the search. Yielded paths will include this as a prefix. patterns (str or list): A single pattern or list of glob-formatted patterns to find under *directory*. ignored (str or list): A single pattern or list of glob-formatted patterns to ignore. include_dirs (bool): Whether to include directories that match patterns, as well. Defaults to ``False``. For example, finding Python files in the current directory: >>> _CUR_DIR = os.path.dirname(os.path.abspath(__file__)) >>> filenames = sorted(iter_find_files(_CUR_DIR, '*.py')) >>> os.path.basename(filenames[-1]) 'urlutils.py' Or, Python files while ignoring emacs lockfiles: >>> filenames = iter_find_files(_CUR_DIR, '*.py', ignored='.#*') .. _glob: https://en.wikipedia.org/wiki/Glob_%28programming%29 """ifisinstance(patterns, basestring):
patterns= [patterns]
pats_re=re.compile('|'.join([fnmatch.translate(p) forpinpatterns]))
ifnotignored:
ignored= []
elifisinstance(ignored, basestring):
ignored= [ignored]
ign_re=re.compile('|'.join([fnmatch.translate(p) forpinignored]))
forroot, dirs, filesinos.walk(directory):
ifinclude_dirs:
forbasenameindirs:
ifpats_re.match(basename):
ifignoredandign_re.match(basename):
continuefilename=os.path.join(root, basename)
yieldfilenameforbasenameinfiles:
ifpats_re.match(basename):
ifignoredandign_re.match(basename):
continuefilename=os.path.join(root, basename)
yieldfilenamereturndefcopy_tree(src, dst, symlinks=False, ignore=None):
"""The ``copy_tree`` function is an exact copy of the built-in :func:`shutil.copytree`, with one key difference: it will not raise an exception if part of the tree already exists. It achieves this by using :func:`mkdir_p`. Args: src (str): Path of the source directory to copy. dst (str): Destination path. Existing directories accepted. symlinks (bool): If ``True``, copy symlinks rather than their contents. ignore (callable): A callable that takes a path and directory listing, returning the files within the listing to be ignored. For more details, check out :func:`shutil.copytree` and :func:`shutil.copy2`. """names=os.listdir(src)
ifignoreisnotNone:
ignored_names=ignore(src, names)
else:
ignored_names=set()
mkdir_p(dst)
errors= []
fornameinnames:
ifnameinignored_names:
continuesrcname=os.path.join(src, name)
dstname=os.path.join(dst, name)
try:
ifsymlinksandos.path.islink(srcname):
linkto=os.readlink(srcname)
os.symlink(linkto, dstname)
elifos.path.isdir(srcname):
copytree(srcname, dstname, symlinks, ignore)
else:
# Will raise a SpecialFileError for unsupported file typescopy2(srcname, dstname)
# catch the Error from the recursive copytree so that we can# continue with other filesexceptErrorase:
errors.extend(e.args[0])
exceptEnvironmentErroraswhy:
errors.append((srcname, dstname, str(why)))
try:
copystat(src, dst)
exceptOSErroraswhy:
ifWindowsErrorisnotNoneandisinstance(why, WindowsError):
# Copying file access times may fail on Windowspasselse:
errors.append((src, dst, str(why)))
iferrors:
raiseError(errors)
copytree=copy_tree# alias for drop-in replacement of shutiltry:
fileexceptNameError:
file=object# like open(os.devnull) but with even fewer side effectsclassDummyFile(file):
# TODO: raise ValueErrors on closed for all methods?# TODO: enforce read/writedef__init__(self, path, mode='r', buffering=None):
self.name=pathself.mode=modeself.closed=Falseself.errors=Noneself.isatty=Falseself.encoding=Noneself.newlines=Noneself.softspace=0defclose(self):
self.closed=Truedeffileno(self):
return-1defflush(self):
ifself.closed:
raiseValueError('I/O operation on a closed file')
returndefnext(self):
raiseStopIteration()
defread(self, size=0):
ifself.closed:
raiseValueError('I/O operation on a closed file')
return''defreadline(self, size=0):
ifself.closed:
raiseValueError('I/O operation on a closed file')
return''defreadlines(self, size=0):
ifself.closed:
raiseValueError('I/O operation on a closed file')
return []
defseek(self):
ifself.closed:
raiseValueError('I/O operation on a closed file')
returndeftell(self):
ifself.closed:
raiseValueError('I/O operation on a closed file')
return0deftruncate(self):
ifself.closed:
raiseValueError('I/O operation on a closed file')
returndefwrite(self, string):
ifself.closed:
raiseValueError('I/O operation on a closed file')
returndefwritelines(self, list_of_strings):
ifself.closed:
raiseValueError('I/O operation on a closed file')
returndef__next__(self):
raiseStopIteration()
def__enter__(self):
ifself.closed:
raiseValueError('I/O operation on a closed file')
returndef__exit__(self, exc_type, exc_val, exc_tb):
returnif__name__=='__main__':
withatomic_save('/tmp/final.txt') asf:
f.write('rofl')
f.write('\n')
fdc476cb257515a71f2842c77d0a2380719279f0
The text was updated successfully, but these errors were encountered:
raise ValueErrors on closed for all methods?
Aspidites/Aspidites/woma/fileutils.py
Line 579 in b06a654
fdc476cb257515a71f2842c77d0a2380719279f0
The text was updated successfully, but these errors were encountered: