Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REF/NEW: new filesystem util, graft #571

Draft
wants to merge 1 commit into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 9 additions & 19 deletions qiime2/core/archive/provenance.py
Expand Up @@ -11,17 +11,16 @@
import pkg_resources
import uuid
import copy
import shutil
import sys
from datetime import datetime, timezone

import distutils
import yaml
import tzlocal
import dateutil.relativedelta as relativedelta

import qiime2
import qiime2.core.util as util
import qiime2.util as util
import qiime2.core.util as cutil
from qiime2.core.cite import Citations


Expand Down Expand Up @@ -162,17 +161,16 @@ def add_ancestor(self, artifact):
# (and so are its ancestors)
if not destination.exists():
# Handle root node of ancestor
shutil.copytree(
str(other_path), str(destination),
ignore=shutil.ignore_patterns(self.ANCESTOR_DIR + '*'))
util.graft(other_path, destination,
ignore=lambda d: self.ANCESTOR_DIR in d.name)

# Handle ancestral nodes of ancestor
grandcestor_path = other_path / self.ANCESTOR_DIR
if grandcestor_path.exists():
for grandcestor in grandcestor_path.iterdir():
destination = self.ancestor_dir / grandcestor.name
if not destination.exists():
shutil.copytree(str(grandcestor), str(destination))
util.graft(grandcestor, destination)

return str(artifact.uuid)

Expand Down Expand Up @@ -260,7 +258,7 @@ def make_execution_section(self):
runtime['start'] = start = _ts_to_date(self.start)
runtime['end'] = end = _ts_to_date(self.end)
runtime['duration'] = \
util.duration_time(relativedelta.relativedelta(end, start))
cutil.duration_time(relativedelta.relativedelta(end, start))

return execution

Expand Down Expand Up @@ -311,19 +309,11 @@ def write_citations_bib(self):
def finalize(self, final_path, node_members):
self.end = time.time()

for member in node_members:
shutil.copy(str(member), str(self.path))

self.write_action_yaml()
self.write_citations_bib()

# Certain networked filesystems will experience a race
# condition on `rename`, so fall back to copying.
try:
self.path.rename(final_path)
except FileExistsError:
distutils.dir_util.copy_tree(str(self.path), str(final_path))
distutils.dir_util.remove_tree(str(self.path))
util.graft_iterable(node_members, self.path)
util.graft(self.path, final_path, merge=True, remove_src=True)

def fork(self):
forked = copy.copy(self)
Expand All @@ -334,7 +324,7 @@ def fork(self):
# create a copy of the backing dir so factory (the hard stuff is
# mostly done by this point)
forked._build_paths()
distutils.dir_util.copy_tree(str(self.path), str(forked.path))
util.graft(self.path, forked.path, merge=True)

return forked

Expand Down
23 changes: 4 additions & 19 deletions qiime2/core/path.py
Expand Up @@ -9,9 +9,9 @@
import os
import pathlib
import shutil
import distutils
import tempfile
import weakref
import qiime2.util as util


_ConcretePath = type(pathlib.Path())
Expand All @@ -27,30 +27,15 @@ def __new__(cls, *args, **kwargs):
self._user_owned = True
return self

def _copy_dir_or_file(self, other):
if self.is_dir():
return distutils.dir_util.copy_tree(str(self), str(other))
else:
return shutil.copy(str(self), str(other))

def _destruct(self):
if self.is_dir():
distutils.dir_util.remove_tree(str(self))
shutil.rmtree(self)
else:
self.unlink()

def _move_or_copy(self, other):
if self._user_owned:
return self._copy_dir_or_file(other)
else:
# Certain networked filesystems will experience a race
# condition on `rename`, so fall back to copying.
try:
return _ConcretePath.rename(self, other)
except FileExistsError:
copied = self._copy_dir_or_file(other)
self._destruct()
return copied
util.graft(self, other,
remove_src=not self._user_owned, merge=self.is_dir())


class InPath(OwnedPath):
Expand Down
1 change: 1 addition & 0 deletions qiime2/plugin/testing.py
Expand Up @@ -237,6 +237,7 @@ def transform_format(self, source_format, target, filename=None,
source_path = self.temp_dir.name
for filename in filenames:
filepath = self.get_data_path(filename)
# make a copy, who knows what will happen to these poor files
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol

shutil.copy(filepath, source_path)
input = source_format(source_path, mode='r')

Expand Down
23 changes: 7 additions & 16 deletions qiime2/sdk/result.py
Expand Up @@ -6,20 +6,18 @@
# The full license is in the file LICENSE, distributed with this software.
# ----------------------------------------------------------------------------

import os
import shutil
import collections
import distutils.dir_util
import pathlib

import qiime2.util as util
import qiime2.metadata
import qiime2.plugin
import qiime2.sdk
import qiime2.core.type
import qiime2.core.transform as transform
import qiime2.core.archive as archive
import qiime2.plugin.model as model
import qiime2.core.util as util
import qiime2.core.util as cutil
import qiime2.core.exceptions as exceptions

# Note: Result, Artifact, and Visualization classes are in this file to avoid
Expand Down Expand Up @@ -133,8 +131,7 @@ def __ne__(self, other):
return not (self == other)

def export_data(self, output_dir):
distutils.dir_util.copy_tree(
str(self._archiver.data_dir), str(output_dir))
util.graft(self._archiver.data_dir, output_dir, merge=True)
# Return None for now, although future implementations that include
# format tranformations may return the invoked transformers
return None
Expand All @@ -151,11 +148,7 @@ def save(self, filepath):

def _alias(self, provenance_capture):
def clone_original(into):
# directory is empty, this function is meant to fix that, so we
# can rmdir so that copytree is happy
into.rmdir()
shutil.copytree(str(self._archiver.data_dir), str(into),
copy_function=os.link) # Use hardlinks
util.graft(self._archiver.data_dir, into, merge=True)

cls = type(self)
alias = cls.__new__(cls)
Expand Down Expand Up @@ -228,9 +221,9 @@ def import_data(cls, type, view, view_type=None):
if is_format:
path = pathlib.Path(view)
if path.is_file():
md5sums = {path.name: util.md5sum(path)}
md5sums = {path.name: cutil.md5sum(path)}
elif path.is_dir():
md5sums = util.md5sum_directory(path)
md5sums = cutil.md5sum_directory(path)
else:
raise qiime2.plugin.ValidationError(
"Path '%s' does not exist." % path)
Expand Down Expand Up @@ -330,10 +323,8 @@ def _is_valid_type(cls, type_):

@classmethod
def _from_data_dir(cls, data_dir, provenance_capture):
# shutil.copytree doesn't allow the destination directory to exist.
def data_initializer(destination):
return distutils.dir_util.copy_tree(
str(data_dir), str(destination))
util.graft(data_dir, destination, merge=True)

viz = cls.__new__(cls)
viz._archiver = archive.Archiver.from_data(
Expand Down
8 changes: 0 additions & 8 deletions qiime2/tests/test_util.py
Expand Up @@ -61,14 +61,6 @@ def test_dst_dir(self):
with self.assertRaisesRegex(IsADirectoryError, self.dir):
util.duplicate(self.src, self.dir)

@mock.patch('qiime2.util.os.link', side_effect=EACCES)
def test_perm_error_EACCES(self, mocked_link):
with self.assertRaisesRegex(
PermissionError, "insufficient r/w permissions"):
util.duplicate(self.src, self.dst1)

assert mocked_link.called

@mock.patch('qiime2.util.os.link', side_effect=EPERM)
def test_perm_error_EPERM(self, mocked_link):
util.duplicate(self.src, self.dst1)
Expand Down
172 changes: 157 additions & 15 deletions qiime2/util.py
Expand Up @@ -74,23 +74,165 @@ def duplicate(src, dst):
Unlike copyfile, this will not overwrite the destination if it exists.

"""
if os.path.isdir(src):
# os.link will give a permission error
graft_file(src, dst)


def graft(src, dst, *, merge=False, overwrite=False, graft_within=False,
remove_src=False, mkdirs=False, ignore=None):
"""Graft inodes onto another part of the filesystem (copying as needed).

Your one-stop shop for migrating files from point A to point B (unless you
really mean to duplicate the data on disk, in which case, don't use this).

Cheat sheet:
TODO

Parameters
----------
src : path (file or directory)
The source to move, may be deleted by `remove_src`
dst : path (file or directory)
The destination recieving new files, may exist if `merge` or
`graft_within` are True, otherwise, should not yet exist.
merge : bool
`src` must be a directory. If so, then the contents
of `src` will be merged into `dst` (if it exists).
overwrite : bool
Whether files will be overwritten in the engraftment. This will not
cause files to be replaced with directories or vice versa. Directories
will never be overwritten (use `merge` or remove them before calling
`graft`).
graft_within : bool
Places `src` *within* the `dst` directory. If `dst` does not exist,
it will be assumed to be a directory.
remove_src : bool
Whether to remove the source file/directory after finishing (replicates
`os.rename` behavior)
mkdirs : bool
Whether to create parent directories of `dst` as needed.
ignore : filter
....

Returns
-------
A list of (action, src, dst).
"""
system_actions = []
if dst == src:
raise ValueError("destination cannot be the same as the source")

if os.path.commonpath([src, dst]) == os.path.normpath(src):
raise ValueError("destination cannot be nested within source")

if not os.path.exists(src):
raise OSError(errno.ENOENT, 'No such file or directory', src)

if mkdirs:
system_actions.append(
lambda: os.makedirs(os.path.dirname(dst), exist_ok=True))
elif os.path.dirname(dst) and not os.path.exists(os.path.dirname(dst)):
raise OSError(errno.ENOENT, 'No such directory', os.path.dirname(dst))

if graft_within:
if os.path.isfile(dst):
raise OSError(errno.ENOTDIR, "Not a directory", dst)
elif not os.path.exists(dst):
system_actions.append(lambda: os.mkdir(dst))

dst = os.path.join(dst, os.path.basename(src))

# dst may have changed
if os.path.isdir(dst) and os.path.isdir(src) and not merge:
raise ValueError("Cannot combine directory into an existing directory"
" without explicit `merge`. %s -> %s" % (src, dst))
if merge and os.path.isfile(src):
raise ValueError("Cannot use `merge` with a single file, use"
" `overwrite` instead.")

if os.path.isfile(dst) and os.path.isdir(src):
raise OSError(errno.EISDIR, "Is a directory", src)
if os.path.isdir(dst):
# os.link will give a FileExists error
raise OSError(errno.EISDIR, "Is a directory", dst)

if os.path.exists(dst):
# shutil.copyfile will overwrite the existing file
raise OSError(errno.EEXIST, "File exists", src, "File exists", dst)
# most validation finished
for action in system_actions:
action()

res = []
if os.path.isdir(src):
if remove_src and not merge:
res.append(_checked_call(os.rename, src, dst))
if not res:
if not os.path.isdir(dst):
os.mkdir(dst)
for dir_entry in os.scandir(src):
if ignore is not None and ignore(dir_entry):
continue
merge = merge and dir_entry.is_dir()
res.extend(graft(dir_entry.path, dst, graft_within=True,
overwrite=overwrite, merge=merge,
ignore=ignore))

if remove_src and os.path.exists(src):
shutil.rmtree(src)

else:
res.append(graft_file(src, dst, overwrite=overwrite,
remove_src=remove_src))
return res


def _checked_call(func, src, dst, reraise=False):
try:
os.link(src, dst)
except OSError as e:
if e.errno == errno.EXDEV: # Invalid cross-device link
shutil.copyfile(src, dst)
elif e.errno == errno.EPERM: # Permissions/ownership error
shutil.copyfile(src, dst)
else:
func(src, dst)
except OSError:
if reraise:
raise
return False
return (func, src, dst)


def graft_file(src, dst, overwrite=False, remove_src=False):
# Validate source
if not os.path.exists(src):
raise OSError(errno.ENOENT, 'No such file', src)
elif os.path.isdir(src):
raise OSError(errno.EISDIR, "Is a directory", src)

# Validate destination
if os.path.isfile(dst):
if overwrite:
os.remove(dst)
else:
raise OSError(errno.EEXIST, "File exists", dst)
elif os.path.isdir(dst):
raise OSError(errno.EISDIR, "Is a directory", dst)

# Strategy: try, try, try, and try again
succeeded = False
# If we can destroy the source, then a rename is the fastest
if remove_src:
succeeded = _checked_call(os.rename, src, dst)
# Try a hard-link
if not succeeded:
succeeded = _checked_call(os.link, src, dst)
# Try a file copy
if not succeeded:
succeeded = _checked_call(shutil.copyfile, src, dst,
reraise=True)
# The engraftment is finished, cleanup source if needed
if remove_src and os.path.exists(src):
os.remove(src)

return succeeded


def graft_iterable(iterable, dst, overwrite=False, remove_src=False,
mkdirs=False, ignore=None):
for src in iterable:
merge = True
if os.path.isfile(src):
merge = False
graft(src, dst, merge=merge, graft_within=True,
overwrite=overwrite,
remove_src=remove_src,
mkdirs=mkdirs,
ignore=ignore)