Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-26378: Refactor S3/WebDAV datastores #355

Merged
merged 14 commits into from
Aug 20, 2020
68 changes: 56 additions & 12 deletions python/lsst/daf/butler/core/_butlerUri.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,10 @@ class ButlerURI:
uri : `str` or `urllib.parse.ParseResult`
URI in string form. Can be scheme-less if referring to a local
filesystem path.
root : `str`, optional
root : `str` or `ButlerURI`, optional
When fixing up a relative path in a ``file`` scheme or if scheme-less,
use this as the root. Must be absolute. If `None` the current
working directory will be used.
working directory will be used. Can be a file URI.
forceAbsolute : `bool`, optional
If `True`, scheme-less relative URI will be converted to an absolute
path using a ``file`` scheme. If `False` scheme-less URI will remain
Expand Down Expand Up @@ -207,7 +207,7 @@ class ButlerURI:
_uri: urllib.parse.ParseResult

def __new__(cls, uri: Union[str, urllib.parse.ParseResult, ButlerURI],
root: Optional[str] = None, forceAbsolute: bool = True,
root: Optional[Union[str, ButlerURI]] = None, forceAbsolute: bool = True,
forceDirectory: bool = False) -> ButlerURI:
parsed: urllib.parse.ParseResult
dirLike: bool
Expand Down Expand Up @@ -595,6 +595,17 @@ def mkdir(self) -> None:
"""
raise NotImplementedError()

def size(self) -> int:
"""For non-dir-like URI, return the size of the resource.

Returns
-------
sz : `int`
The size in bytes of the resource associated with this URI.
Returns 0 if dir-like.
"""
raise NotImplementedError()

def __str__(self) -> str:
return self.geturl()

Expand All @@ -618,7 +629,7 @@ def __getnewargs__(self) -> Tuple:
return (str(self),)

@staticmethod
def _fixupPathUri(parsed: urllib.parse.ParseResult, root: Optional[str] = None,
def _fixupPathUri(parsed: urllib.parse.ParseResult, root: Optional[Union[str, ButlerURI]] = None,
forceAbsolute: bool = False,
forceDirectory: bool = False) -> Tuple[urllib.parse.ParseResult, bool]:
"""Correct any issues with the supplied URI.
Expand All @@ -627,7 +638,7 @@ def _fixupPathUri(parsed: urllib.parse.ParseResult, root: Optional[str] = None,
----------
parsed : `~urllib.parse.ParseResult`
The result from parsing a URI using `urllib.parse`.
root : `str`, ignored
root : `str` or `ButlerURI`, ignored
Not used by the this implementation since all URIs are
absolute except for those representing the local file system.
forceAbsolute : `bool`, ignored.
Expand Down Expand Up @@ -730,6 +741,14 @@ def exists(self) -> bool:
# to a file that no longer exists this will return False
return os.path.exists(self.ospath)

def size(self) -> int:
if not os.path.isdir(self.ospath):
stat = os.stat(self.ospath)
sz = stat.st_size
else:
sz = 0
return sz

def remove(self) -> None:
"""Remove the resource."""
os.remove(self.ospath)
Expand Down Expand Up @@ -968,7 +987,7 @@ def transfer_from(self, src: ButlerURI, transfer: str,
os.remove(local_src)

@staticmethod
def _fixupPathUri(parsed: urllib.parse.ParseResult, root: Optional[str] = None,
def _fixupPathUri(parsed: urllib.parse.ParseResult, root: Optional[Union[str, ButlerURI]] = None,
forceAbsolute: bool = False,
forceDirectory: bool = False) -> Tuple[urllib.parse.ParseResult, bool]:
"""Fix up relative paths in URI instances.
Expand All @@ -977,10 +996,10 @@ def _fixupPathUri(parsed: urllib.parse.ParseResult, root: Optional[str] = None,
----------
parsed : `~urllib.parse.ParseResult`
The result from parsing a URI using `urllib.parse`.
root : `str`, optional
root : `str` or `ButlerURI`, optional
Path to use as root when converting relative to absolute.
If `None`, it will be the current working directory. This
is a local file system path, not a URI. It is only used if
is a local file system path, or a file URI. It is only used if
a file-scheme is used incorrectly with a relative path.
forceAbsolute : `bool`, ignored
Has no effect for this subclass. ``file`` URIs are always
Expand Down Expand Up @@ -1037,6 +1056,10 @@ def _fixupPathUri(parsed: urllib.parse.ParseResult, root: Optional[str] = None,

if root is None:
root = os.path.abspath(os.path.curdir)
elif isinstance(root, ButlerURI):
if root.scheme and root.scheme != "file":
raise RuntimeError(f"The override root must be a file URI not {root.scheme}")
root = os.path.abspath(root.ospath)

replacements["path"] = posixpath.normpath(posixpath.join(os2posix(root), parsed.path))

Expand Down Expand Up @@ -1071,14 +1094,22 @@ def exists(self) -> bool:
exists, _ = s3CheckFileExists(self, client=self.client)
return exists

def size(self) -> int:
# s3utils itself imports ButlerURI so defer this import
from .s3utils import s3CheckFileExists
if self.dirLike:
return 0
_, sz = s3CheckFileExists(self, client=self.client)
return sz

def remove(self) -> None:
"""Remove the resource."""

# https://github.com/boto/boto3/issues/507 - there is no
# way of knowing if the file was actually deleted except
# for checking all the keys again, reponse is HTTP 204 OK
# response all the time
self.client.delete(Bucket=self.netloc, Key=self.relativeToPathRoot)
self.client.delete_object(Bucket=self.netloc, Key=self.relativeToPathRoot)

def read(self, size: int = -1) -> bytes:
args = {}
Expand Down Expand Up @@ -1224,6 +1255,15 @@ def exists(self) -> bool:

return True if r.status_code == 200 else False

def size(self) -> int:
if self.dirLike:
return 0
r = self.session.head(self.geturl())
if r.status_code == 200:
return int(r.headers['Content-Length'])
else:
raise FileNotFoundError(f"Resource {self} does not exist")

def mkdir(self) -> None:
"""For a dir-like URI, create the directory resource if it does not
already exist.
Expand Down Expand Up @@ -1407,7 +1447,7 @@ def _force_to_file(self) -> ButlerFileURI:
return ButlerURI(uri, forceDirectory=self.dirLike) # type: ignore

@staticmethod
def _fixupPathUri(parsed: urllib.parse.ParseResult, root: Optional[str] = None,
def _fixupPathUri(parsed: urllib.parse.ParseResult, root: Optional[Union[str, ButlerURI]] = None,
forceAbsolute: bool = False,
forceDirectory: bool = False) -> Tuple[urllib.parse.ParseResult, bool]:
"""Fix up relative paths for local file system.
Expand All @@ -1416,10 +1456,10 @@ def _fixupPathUri(parsed: urllib.parse.ParseResult, root: Optional[str] = None,
----------
parsed : `~urllib.parse.ParseResult`
The result from parsing a URI using `urllib.parse`.
root : `str`, optional
root : `str` or `ButlerURI`, optional
Path to use as root when converting relative to absolute.
If `None`, it will be the current working directory. This
is a local file system path, not a URI.
is a local file system path, or a file URI.
forceAbsolute : `bool`, optional
If `True`, scheme-less relative URI will be converted to an
absolute path using a ``file`` scheme. If `False` scheme-less URI
Expand Down Expand Up @@ -1454,6 +1494,10 @@ def _fixupPathUri(parsed: urllib.parse.ParseResult, root: Optional[str] = None,

if root is None:
root = os.path.abspath(os.path.curdir)
elif isinstance(root, ButlerURI):
if root.scheme and root.scheme != "file":
raise RuntimeError(f"The override root must be a file URI not {root.scheme}")
root = os.path.abspath(root.ospath)

# this is a local OS file path which can support tilde expansion.
# we quoted it in the constructor so unquote here
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/core/location.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ class LocationFactory:
be treated as a posixpath but then converted to an absolute path.
"""

def __init__(self, datastoreRoot: str):
def __init__(self, datastoreRoot: Union[ButlerURI, str]):
self._datastoreRootUri = ButlerURI(datastoreRoot, forceAbsolute=True,
forceDirectory=True)

Expand Down
15 changes: 11 additions & 4 deletions python/lsst/daf/butler/datastores/fileLikeDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ class FileLikeDatastore(GenericBaseDatastore):
or relative to a search path. Can be None if no defaults specified.
"""

root: str
"""Root directory or URI of this `Datastore`."""
root: ButlerURI
"""Root directory URI of this `Datastore`."""

locationFactory: LocationFactory
"""Factory for creating locations relative to the datastore root."""
Expand Down Expand Up @@ -239,7 +239,8 @@ def __init__(self, config: Union[DatastoreConfig, str],

# Support repository relocation in config
# Existence of self.root is checked in subclass
self.root = replaceRoot(self.config["root"], butlerRoot)
self.root = ButlerURI(replaceRoot(self.config["root"], butlerRoot),
forceDirectory=True, forceAbsolute=True)

self.locationFactory = LocationFactory(self.root)
self.formatterFactory = FormatterFactory()
Expand Down Expand Up @@ -275,7 +276,7 @@ def __init__(self, config: Union[DatastoreConfig, str],
self.useChecksum = self.config.get("checksum", True)

def __str__(self) -> str:
return self.root
return str(self.root)

@property
def bridge(self) -> DatastoreRegistryBridge:
Expand Down Expand Up @@ -549,6 +550,12 @@ def _prepare_for_put(self, inMemoryDataset: Any, ref: DatasetRef) -> Tuple[Locat

return location, formatter

def _overrideTransferMode(self, *datasets: Any, transfer: Optional[str] = None) -> Optional[str]:
if transfer != "auto":
timj marked this conversation as resolved.
Show resolved Hide resolved
return transfer
# Copy is always a good default
return "copy"

@abstractmethod
def _standardizeIngestPath(self, path: str, *, transfer: Optional[str] = None) -> str:
"""Standardize the path of a to-be-ingested file.
Expand Down
21 changes: 9 additions & 12 deletions python/lsst/daf/butler/datastores/posixDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,13 @@ def __init__(self, config: Union[DatastoreConfig, str],
super().__init__(config, bridgeManager, butlerRoot)

# Check that root is a valid URI for this datastore
root = ButlerURI(self.root, forceDirectory=True)
if root.scheme and root.scheme != "file":
if self.root.scheme and self.root.scheme != "file":
raise ValueError(f"Root location must only be a file URI not {self.root}")

self.root = root.ospath
if not os.path.isdir(self.root):
if not self.root.exists():
if "create" not in self.config or not self.config["create"]:
raise ValueError(f"No valid root at: {self.root}")
safeMakeDir(self.root)
raise ValueError(f"No valid root and not allowed to create one at: {self.root}")
self.root.mkdir()

def _artifact_exists(self, location: Location) -> bool:
"""Check that an artifact exists in this datastore at the specified
Expand Down Expand Up @@ -163,7 +161,7 @@ def _write_in_memory_to_artifact(self, inMemoryDataset: Any, ref: DatasetRef) ->
safeMakeDir(storageDir)

# Write the file
predictedFullPath = os.path.join(self.root, formatter.predictPath())
predictedFullPath = os.path.join(self.root.ospath, formatter.predictPath())

if os.path.exists(predictedFullPath):
# Assume that by this point if registry thinks the file should
Expand Down Expand Up @@ -200,7 +198,7 @@ def _removeFileExists(path: str) -> None:
if formatter_exception:
raise formatter_exception

assert predictedFullPath == os.path.join(self.root, path)
assert predictedFullPath == os.path.join(self.root.ospath, path)

return self._extractIngestInfo(path, ref, formatter=formatter)

Expand Down Expand Up @@ -238,12 +236,11 @@ def _pathInStore(self, path: str) -> Optional[str]:
outside the root.
"""
pathUri = ButlerURI(path, forceAbsolute=False)
rootUri = ButlerURI(self.root, forceDirectory=True, forceAbsolute=True)
return pathUri.relative_to(rootUri)
return pathUri.relative_to(self.root)

def _standardizeIngestPath(self, path: str, *, transfer: Optional[str] = None) -> str:
# Docstring inherited from FileLikeDatastore._standardizeIngestPath.
fullPath = os.path.normpath(os.path.join(self.root, path))
fullPath = os.path.normpath(os.path.join(self.root.ospath, path))
if not os.path.exists(fullPath):
raise FileNotFoundError(f"File at '{fullPath}' does not exist; note that paths to ingest "
f"are assumed to be relative to self.root unless they are absolute.")
Expand All @@ -266,7 +263,7 @@ def _extractIngestInfo(self, path: Union[str, ButlerURI], ref: DatasetRef, *,
srcUri = ButlerURI(path, root=self.root, forceAbsolute=True)
if transfer is None:
# File should exist already
rootUri = ButlerURI(self.root, forceDirectory=True)
rootUri = self.root
pathInStore = srcUri.relative_to(rootUri)
if pathInStore is None:
raise RuntimeError(f"Unexpectedly learned that {srcUri} is not within datastore {rootUri}")
Expand Down
10 changes: 4 additions & 6 deletions python/lsst/daf/butler/datastores/s3Datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,10 +305,9 @@ def _standardizeIngestPath(self, path: str, *, transfer: Optional[str] = None) -
raise FileNotFoundError(f"Resource at {srcUri} does not exist")

if transfer is None:
rootUri = ButlerURI(self.root)
if not srcUri.relative_to(rootUri):
if not srcUri.relative_to(self.root):
raise RuntimeError(f"Transfer is none but source file ({srcUri}) is not "
f"within datastore ({rootUri})")
f"within datastore ({self.root})")
return path

def _extractIngestInfo(self, path: Union[str, ButlerURI], ref: DatasetRef, *,
Expand All @@ -321,10 +320,9 @@ def _extractIngestInfo(self, path: Union[str, ButlerURI], ref: DatasetRef, *,
# to work out the path relative to the root of the datastore.
# Because unlike for file to file ingest we can get absolute
# URIs here
rootUri = ButlerURI(self.root, forceDirectory=True)
pathInStore = srcUri.relative_to(rootUri)
pathInStore = srcUri.relative_to(self.root)
if pathInStore is None:
raise RuntimeError(f"Unexpectedly learned that {srcUri} is not within datastore {rootUri}")
raise RuntimeError(f"Unexpectedly learned that {srcUri} is not within datastore {self.root}")
tgtLocation = self.locationFactory.fromPath(pathInStore)
else:
# Work out the name we want this ingested file to have
Expand Down
22 changes: 8 additions & 14 deletions python/lsst/daf/butler/datastores/webdavDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,24 +79,20 @@ class WebdavDatastore(FileLikeDatastore):
ValueError
If root location does not exist and ``create`` is `False` in the
configuration.

Notes
-----
"""

defaultConfigFile = "datastores/webdavDatastore.yaml"
"""Path to configuration defaults. Relative to $DAF_BUTLER_DIR/config or
absolute path. Can be None if no defaults specified.
"""Path to configuration defaults. Accessed within the ``config`` resource
or relative to a search path. Can be None if no defaults specified.
"""

def __init__(self, config: Union[DatastoreConfig, str],
bridgeManager: DatastoreRegistryBridgeManager, butlerRoot: str = None):
super().__init__(config, bridgeManager, butlerRoot)
uri = ButlerURI(self.root, forceDirectory=True)
self.session = getHttpSession()
if not uri.exists():
if not self.root.exists():
try:
uri.mkdir()
self.root.mkdir()
except ValueError:
raise ValueError(f"Can not create directory {self.root}, check permissions.")

Expand Down Expand Up @@ -219,14 +215,13 @@ def _standardizeIngestPath(self, path: str, *, transfer: Optional[str] = None) -
raise NotImplementedError(f"Scheme type {srcUri.scheme} not supported.")

if transfer is None:
rootUri = ButlerURI(self.root)
if srcUri.scheme == "file":
raise RuntimeError(f"'{srcUri}' is not inside repository root '{rootUri}'. "
raise RuntimeError(f"'{srcUri}' is not inside repository root '{self.root}'. "
"Ingesting local data to WebdavDatastore without upload "
"to Webdav is not allowed.")
elif srcUri.scheme.startswith("http"):
if not srcUri.path.startswith(rootUri.path):
raise RuntimeError(f"'{srcUri}' is not inside repository root '{rootUri}'.")
if not srcUri.path.startswith(self.root.path):
raise RuntimeError(f"'{srcUri}' is not inside repository root '{self.root}'.")
return path

def _extractIngestInfo(self, path: Union[str, ButlerURI], ref: DatasetRef, *,
Expand All @@ -235,9 +230,8 @@ def _extractIngestInfo(self, path: Union[str, ButlerURI], ref: DatasetRef, *,
srcUri = ButlerURI(path)

if transfer is None:
rootUri = ButlerURI(self.root)
p = pathlib.PurePosixPath(srcUri.relativeToPathRoot)
pathInStore = str(p.relative_to(rootUri.relativeToPathRoot))
pathInStore = str(p.relative_to(self.root.relativeToPathRoot))
tgtLocation = self.locationFactory.fromPath(pathInStore)
else:
assert transfer == "move" or transfer == "copy", "Should be guaranteed by _standardizeIngestPath"
Expand Down
2 changes: 1 addition & 1 deletion tests/test_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -965,7 +965,7 @@ def testExportTransferCopy(self):
datasets = list(exportButler.registry.queryDatasets(..., collections=...))
self.assertGreater(len(datasets), 0)
uris = [exportButler.getURI(d) for d in datasets]
datastoreRoot = ButlerURI(exportButler.datastore.root, forceDirectory=True)
datastoreRoot = exportButler.datastore.root

pathsInStore = [uri.relative_to(datastoreRoot) for uri in uris]

Expand Down
4 changes: 2 additions & 2 deletions tests/test_datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,8 +488,8 @@ def succeed(obj, path, ref):
"""Ingest a file already in the datastore root."""
# first move it into the root, and adjust the path
# accordingly
path = shutil.copy(path, datastore.root)
path = os.path.relpath(path, start=datastore.root)
path = shutil.copy(path, datastore.root.ospath)
path = os.path.relpath(path, start=datastore.root.ospath)
datastore.ingest(FileDataset(path=path, refs=ref), transfer=mode)
self.assertEqual(obj, datastore.get(ref))

Expand Down