Skip to content

Commit

Permalink
Merge pull request #484 from lsst/tickets/DM-29011
Browse files Browse the repository at this point in the history
DM-29011: Add support for ButlerURI.walk()
  • Loading branch information
timj committed Mar 4, 2021
2 parents e0e3205 + 611f52e commit ec59584
Show file tree
Hide file tree
Showing 4 changed files with 347 additions and 0 deletions.
101 changes: 101 additions & 0 deletions python/lsst/daf/butler/core/_butlerUri/_butlerUri.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
from typing import (
TYPE_CHECKING,
Any,
Iterable,
Iterator,
List,
Optional,
Tuple,
Type,
Expand Down Expand Up @@ -478,7 +480,12 @@ def join(self, path: Union[str, ButlerURI]) -> ButlerURI:
POSIX separator if the supplied path has directory structure. It
may be this never becomes a problem but datastore templates assume
POSIX separator is being used.
Currently, if the join path is given as an absolute scheme-less
URI it will be returned as an absolute ``file:`` URI even if the
URI it is being joined to is non-file.
"""

# If we have a full URI in path we will use it directly
# but without forcing to absolute so that we can trap the
# expected option of relative path.
Expand All @@ -497,6 +504,12 @@ def join(self, path: Union[str, ButlerURI]) -> ButlerURI:
path = urllib.parse.quote(path)

newpath = self._pathModule.normpath(self._pathModule.join(new.path, path))

# normpath can strip trailing / so for consistency we add it back
# since geturl() does not add it
if path.endswith(self._pathModule.sep) and not newpath.endswith(self._pathModule.sep):
newpath += self._pathModule.sep

new._uri = new._uri._replace(path=newpath)
# Declare the new URI not be dirLike unless path ended in /
if not path.endswith(self._pathModule.sep):
Expand Down Expand Up @@ -645,6 +658,10 @@ def mkdir(self) -> None:
"""
raise NotImplementedError()

def isdir(self) -> bool:
"""Return True if this URI looks like a directory, else False."""
return self.dirLike

def size(self) -> int:
"""For non-dir-like URI, return the size of the resource.
Expand Down Expand Up @@ -771,3 +788,87 @@ def transfer_from(self, src: ButlerURI, transfer: str,
expected to be problematic if a remote resource was involved.
"""
raise NotImplementedError(f"No transfer modes supported by URI scheme {self.scheme}")

def walk(self, file_filter: Optional[Union[str, re.Pattern]] = None) -> Iterator[Union[List,
Tuple[ButlerURI,
List[str],
List[str]]]]:
"""For dir-like URI, walk the directory returning matching files and
directories.
Parameters
----------
file_filter : `str` or `re.Pattern`, optional
Regex to filter out files from the list before it is returned.
Yields
------
dirpath : `ButlerURI`
Current directory being examined.
dirnames : `list` of `str`
Names of subdirectories within dirpath.
filenames : `list` of `str`
Names of all the files within dirpath.
"""
raise NotImplementedError()

@classmethod
def findFileResources(cls, candidates: Iterable[Union[str, ButlerURI]],
file_filter: Optional[str] = None,
grouped: bool = False) -> Iterator[Union[ButlerURI, Iterator[ButlerURI]]]:
"""Get the files from a list of values. If a value is a file it is
yielded immediately. If a value is a directory, all the files in
the directory (recursively) that match the regex will be yielded in
turn.
Parameters
----------
candidates : iterable [`str` or `ButlerURI`]
The files to return and directories in which to look for files to
return.
file_filter : `str`, optional
The regex to use when searching for files within directories.
By default returns all the found files.
grouped : `bool`, optional
If `True` the results will be grouped by directory and each
yielded value will be an iterator over URIs. If `False` each
URI will be returned separately.
Yields
------
found_file: `ButlerURI`
The passed-in URIs and URIs found in passed-in directories.
If grouping is enabled, each of the yielded values will be an
iterator yielding members of the group. Files given explicitly
will be returned as a single group at the end.
"""
fileRegex = None if file_filter is None else re.compile(file_filter)

singles = []

# Find all the files of interest
for location in candidates:
uri = ButlerURI(location)
if uri.isdir():
for found in uri.walk(fileRegex):
if not found:
# This means the uri does not exist and by
# convention we ignore it
continue
root, dirs, files = found
if not files:
continue
if grouped:
yield (root.join(name) for name in files)
else:
for name in files:
yield root.join(name)
else:
if grouped:
singles.append(uri)
else:
yield uri

# Finally, return any explicitly given files in one group
if grouped and singles:
yield iter(singles)
42 changes: 42 additions & 0 deletions python/lsst/daf/butler/core/_butlerUri/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@
import posixpath
import copy
import logging
import re

__all__ = ('ButlerFileURI',)

from typing import (
TYPE_CHECKING,
cast,
Iterator,
List,
Optional,
Tuple,
Union,
Expand Down Expand Up @@ -196,6 +199,12 @@ def mkdir(self) -> None:
elif not os.path.isdir(self.ospath):
raise FileExistsError(f"URI {self} exists but is not a directory!")

def isdir(self) -> bool:
"""Return True if this URI is a directory or looks like a directory,
else False.
"""
return self.dirLike or os.path.isdir(self.ospath)

def transfer_from(self, src: ButlerURI, transfer: str,
overwrite: bool = False,
transaction: Optional[Union[DatastoreTransaction, NoTransaction]] = None) -> None:
Expand Down Expand Up @@ -323,6 +332,39 @@ def transfer_from(self, src: ButlerURI, transfer: str,
# Transactions do not work here
src.remove()

def walk(self, file_filter: Optional[Union[str, re.Pattern]] = None) -> Iterator[Union[List,
Tuple[ButlerURI,
List[str],
List[str]]]]:
"""For dir-like URI, walk the directory returning matching files and
directories.
Parameters
----------
file_filter : `str` or `re.Pattern`, optional
Regex to filter out files from the list before it is returned.
Yields
------
dirpath : `ButlerURI`
Current directory being examined.
dirnames : `list` of `str`
Names of subdirectories within dirpath.
filenames : `list` of `str`
Names of all the files within dirpath.
"""
if not self.isdir():
raise ValueError("Can not walk a non-directory URI")

if isinstance(file_filter, str):
file_filter = re.compile(file_filter)

for root, dirs, files in os.walk(self.ospath):
# Filter by the regex
if file_filter is not None:
files = [f for f in files if file_filter.search(f)]
yield type(self)(root, forceAbsolute=False, forceDirectory=True), dirs, files

@staticmethod
def _fixupPathUri(parsed: urllib.parse.ParseResult, root: Optional[Union[str, ButlerURI]] = None,
forceAbsolute: bool = False,
Expand Down
79 changes: 79 additions & 0 deletions python/lsst/daf/butler/core/_butlerUri/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from __future__ import annotations

import logging
import re
import tempfile

__all__ = ('ButlerS3URI',)
Expand All @@ -31,6 +32,8 @@
Optional,
Any,
Callable,
Iterator,
List,
Tuple,
Union,
)
Expand Down Expand Up @@ -226,3 +229,79 @@ def transfer_from(self, src: ButlerURI, transfer: str = "copy",
if transfer == "move":
# Transactions do not work here
src.remove()

@backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time)
def walk(self, file_filter: Optional[Union[str, re.Pattern]] = None) -> Iterator[Union[List,
Tuple[ButlerURI,
List[str],
List[str]]]]:
"""For dir-like URI, walk the directory returning matching files and
directories.
Parameters
----------
file_filter : `str` or `re.Pattern`, optional
Regex to filter out files from the list before it is returned.
Yields
------
dirpath : `ButlerURI`
Current directory being examined.
dirnames : `list` of `str`
Names of subdirectories within dirpath.
filenames : `list` of `str`
Names of all the files within dirpath.
"""
# We pretend that S3 uses directories and files and not simply keys
if not self.isdir():
raise ValueError(f"Can not walk a non-directory URI: {self}")

if isinstance(file_filter, str):
file_filter = re.compile(file_filter)

s3_paginator = self.client.get_paginator('list_objects_v2')

# Limit each query to a single "directory" to match os.walk
# We could download all keys at once with no delimiter and work
# it out locally but this could potentially lead to large memory
# usage for millions of keys. It will also make the initial call
# to this method potentially very slow. If making this method look
# like os.walk was not required, we could query all keys with
# pagination and return them in groups of 1000, but that would
# be a different interface since we can't guarantee we would get
# them all grouped properly across the 1000 limit boundary.
prefix_len = len(self.relativeToPathRoot)
dirnames = []
filenames = []
files_there = False

for page in s3_paginator.paginate(Bucket=self.netloc, Prefix=self.relativeToPathRoot, Delimiter="/"):
# All results are returned as full key names and we must
# convert them back to the root form. The prefix is fixed
# and delimited so that is a simple trim

# Directories are reported in the CommonPrefixes result
# which reports the entire key and must be stripped.
found_dirs = [dir["Prefix"][prefix_len:] for dir in page.get("CommonPrefixes", ())]
dirnames.extend(found_dirs)

found_files = [file["Key"][prefix_len:] for file in page.get("Contents", ())]
if found_files:
files_there = True
if file_filter is not None:
found_files = [f for f in found_files if file_filter.search(f)]

filenames.extend(found_files)

# Directories do not exist so we can't test for them. If no files
# or directories were found though, this means that it effectively
# does not exist and we should match os.walk() behavior and return
# [].
if not dirnames and not files_there:
yield []
else:
yield self, dirnames, filenames

for dir in dirnames:
new_uri = self.join(dir)
yield from new_uri.walk(file_filter)

0 comments on commit ec59584

Please sign in to comment.