Skip to content

Commit

Permalink
Merge pull request #252 from lsst/tickets/DM-24352
Browse files Browse the repository at this point in the history
DM-24352: Add new auto and link transfer modes
  • Loading branch information
timj committed Apr 6, 2020
2 parents f979b32 + d63b6b6 commit 396cc6f
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 68 deletions.
50 changes: 38 additions & 12 deletions python/lsst/daf/butler/core/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,35 @@ def put(self, inMemoryDataset, datasetRef):
"""
raise NotImplementedError("Must be implemented by subclass")

def _overrideTransferMode(self, *datasets: FileDataset, transfer: Optional[str] = None) -> str:
"""Allow ingest transfer mode to be defaulted based on datasets.
Parameters
----------
datasets : `FileDataset`
Each positional argument is a struct containing information about
a file to be ingested, including its path (either absolute or
relative to the datastore root, if applicable), a complete
`DatasetRef` (with ``dataset_id not None``), and optionally a
formatter class or its fully-qualified string name. If a formatter
is not provided, this method should populate that attribute with
the formatter the datastore would use for `put`. Subclasses are
also permitted to modify the path attribute (typically to put it
in what the datastore considers its standard form).
transfer : `str`, optional
How (and whether) the dataset should be added to the datastore.
See `ingest` for details of transfer modes.
Returns
-------
newTransfer : `str`
Transfer mode to use. Will be identical to the supplied transfer
mode unless "auto" is used.
"""
if transfer != "auto":
return transfer
raise RuntimeError(f"{transfer} is not allowed without specialization.")

def _prepIngest(self, *datasets: FileDataset, transfer: Optional[str] = None) -> IngestPrepData:
"""Process datasets to identify which ones can be ingested into this
Datastore.
Expand All @@ -377,11 +406,7 @@ def _prepIngest(self, *datasets: FileDataset, transfer: Optional[str] = None) ->
in what the datastore considers its standard form).
transfer : `str`, optional
How (and whether) the dataset should be added to the datastore.
If `None` (default), the file must already be in a location
appropriate for the datastore (e.g. within its root directory),
and will not be modified. Other choices include "move", "copy",
"symlink", and "hardlink". Most datastores do not support all
transfer modes.
See `ingest` for details of transfer modes.
Returns
-------
Expand Down Expand Up @@ -432,11 +457,7 @@ def _finishIngest(self, prepData: IngestPrepData, *, transfer: Optional[str] = N
the direct result of a call to `_prepIngest` on this datastore.
transfer : `str`, optional
How (and whether) the dataset should be added to the datastore.
If `None` (default), the file must already be in a location
appropriate for the datastore (e.g. within its root directory),
and will not be modified. Other choices include "move", "copy",
"symlink", and "hardlink". Most datastores do not support all
transfer modes.
See `ingest` for details of transfer modes.
Raises
------
Expand Down Expand Up @@ -474,8 +495,11 @@ def ingest(self, *datasets: FileDataset, transfer: Optional[str] = None):
If `None` (default), the file must already be in a location
appropriate for the datastore (e.g. within its root directory),
and will not be modified. Other choices include "move", "copy",
"symlink", and "hardlink". Most datastores do not support all
transfer modes.
"link", "symlink", and "hardlink". "link" is a special transfer
mode that will first try to make a hardlink and if that fails
a symlink will be used instead. Most datastores do not support all
transfer modes. "auto" is a special option that will let the
data store choose the most natural option for itself.
Raises
------
Expand All @@ -500,6 +524,8 @@ def ingest(self, *datasets: FileDataset, transfer: Optional[str] = None):
Subclasses are encouraged to document their supported transfer modes
in their class documentation.
"""
# Allow a datastore to select a default transfer mode
transfer = self._overrideTransferMode(*datasets, transfer=transfer)
prepData = self._prepIngest(*datasets, transfer=transfer)
refs = {ref.id: ref for dataset in datasets for ref in dataset.refs}
if refs.keys() != prepData.refs.keys():
Expand Down
21 changes: 20 additions & 1 deletion python/lsst/daf/butler/datastores/chainedDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import logging
import warnings
import itertools
from typing import List, Sequence, Optional, Tuple
from typing import List, Sequence, Optional, Tuple, Any

from lsst.utils import doImport
from lsst.daf.butler import Datastore, DatastoreConfig, DatasetTypeNotSupportedError, \
Expand Down Expand Up @@ -331,6 +331,25 @@ def put(self, inMemoryDataset, ref):
if self._transaction is not None:
self._transaction.registerUndo('put', self.remove, ref)

def _overrideTransferMode(self, *datasets: Any, transfer: Optional[str] = None) -> str:
# Docstring inherited from base class.
if transfer != "auto":
return transfer
# Ask each datastore what they think auto means
transfers = {d._overrideTransferMode(*datasets, transfer=transfer) for d in self.datastores}

# Remove any untranslated "auto" values
transfers.discard(transfer)

if len(transfers) == 1:
return transfers.pop()
if not transfers:
# Everything reported "auto"
return transfer

raise RuntimeError("Chained datastore does not yet support different transfer modes"
f" from 'auto' in each child datastore (wanted {transfers})")

def _prepIngest(self, *datasets: FileDataset, transfer: Optional[str] = None) -> _IngestPrepData:
# Docstring inherited from Datastore._prepIngest.
if transfer is None or transfer == "move":
Expand Down
11 changes: 3 additions & 8 deletions python/lsst/daf/butler/datastores/fileLikeDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,10 +442,8 @@ def _standardizeIngestPath(self, path: str, *, transfer: Optional[str] = None) -
Path of a file to be ingested.
transfer : `str`, optional
How (and whether) the dataset should be added to the datastore.
If `None` (default), the file must already be in a location
appropriate for the datastore (e.g. within its root directory),
and will not be moved. Other choices include "move", "copy",
"symlink", and "hardlink". This is provided only so
See `ingest` for details of transfer modes.
This implementation is provided only so
`NotImplementedError` can be raised if the mode is not supported;
actual transfers are deferred to `_extractIngestInfo`.
Expand Down Expand Up @@ -487,10 +485,7 @@ def _extractIngestInfo(self, path: str, ref: DatasetRef, *, formatter: Type[Form
`Formatter` subclass to use for this dataset.
transfer : `str`, optional
How (and whether) the dataset should be added to the datastore.
If `None` (default), the file must already be in a location
appropriate for the datastore (e.g. within its root directory),
and will not be modified. Other choices include "move", "copy",
"symlink", and "hardlink".
See `ingest` for details of transfer modes.
Returns
-------
Expand Down
4 changes: 4 additions & 0 deletions python/lsst/daf/butler/datastores/inMemoryDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,10 @@ def validateConfiguration(self, entities, logFailures=False):
"""
return

def _overrideTransferMode(self, *datasets: Any, transfer: Optional[str] = None) -> str:
# Docstring is inherited from base class
return transfer

def validateKey(self, lookupKey, entity):
# Docstring is inherited from base class
return
Expand Down
60 changes: 53 additions & 7 deletions python/lsst/daf/butler/datastores/posixDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,20 +222,58 @@ def _removeFileExists(path):
info = self._extractIngestInfo(path, ref, formatter=formatter)
self._register_datasets([(ref, info)])

def _overrideTransferMode(self, *datasets: FileDataset, transfer: Optional[str] = None) -> str:
# Docstring inherited from base class
if transfer != "auto":
return transfer

# See if the paths are within the datastore or not
inside = [self._pathInStore(d.path) is not None for d in datasets]

if all(inside):
transfer = None
elif not any(inside):
transfer = "link"
else:
raise ValueError("Some datasets are inside the datastore and some are outside."
" Please use an explicit transfer mode and not 'auto'.")

return transfer

def _pathInStore(self, path: str) -> str:
"""Return path relative to datastore root
Parameters
----------
path : `str`
Path to dataset. Can be absolute path. Returns path in datastore
or raises an exception if the path it outside.
Returns
-------
inStore : `str`
Path relative to datastore root. Returns `None` if the file is
outside the root.
"""
if os.path.isabs(path):
absRoot = os.path.abspath(self.root)
if os.path.commonpath([absRoot, path]) != absRoot:
return None
return os.path.relpath(path, absRoot)
elif path.startswith(os.path.pardir):
return None
return path

def _standardizeIngestPath(self, path: str, *, transfer: Optional[str] = None) -> str:
# Docstring inherited from FileLikeDatastore._standardizeIngestPath.
fullPath = os.path.normpath(os.path.join(self.root, path))
if not os.path.exists(fullPath):
raise FileNotFoundError(f"File at '{fullPath}' does not exist; note that paths to ingest "
f"are assumed to be relative to self.root unless they are absolute.")
if transfer is None:
if os.path.isabs(path):
absRoot = os.path.abspath(self.root)
if os.path.commonpath([absRoot, path]) != absRoot:
raise RuntimeError(f"'{path}' is not inside repository root '{self.root}'.")
return os.path.relpath(path, absRoot)
elif path.startswith(os.path.pardir):
raise RuntimeError(f"'{path}' is outside repository root '{self.root}.'")
path = self._pathInStore(path)
if path is None:
raise RuntimeError(f"'{path}' is not inside repository root '{self.root}'.")
return path

def _extractIngestInfo(self, path: str, ref: DatasetRef, *, formatter: Type[Formatter],
Expand All @@ -259,6 +297,14 @@ def _extractIngestInfo(self, path: str, ref: DatasetRef, *, formatter: Type[Form
elif transfer == "copy":
with self._transaction.undoWith("copy", os.remove, newFullPath):
shutil.copy(fullPath, newFullPath)
elif transfer == "link":
with self._transaction.undoWith("link", os.unlink, newFullPath):
# Try hard link and if that fails use a symlink
try:
os.link(fullPath, newFullPath)
except OSError:
# Read through existing symlinks
os.symlink(os.path.realpath(fullPath), newFullPath)
elif transfer == "hardlink":
with self._transaction.undoWith("hardlink", os.unlink, newFullPath):
os.link(fullPath, newFullPath)
Expand Down
8 changes: 7 additions & 1 deletion python/lsst/daf/butler/datastores/s3Datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import pathlib
import tempfile

from typing import Optional, Type
from typing import Optional, Type, Any

from lsst.daf.butler import (
ButlerURI,
Expand Down Expand Up @@ -250,6 +250,12 @@ def put(self, inMemoryDataset, ref):
info = self._extractIngestInfo(location.uri, ref, formatter=formatter)
self._register_datasets([(ref, info)])

def _overrideTransferMode(self, *datasets: Any, transfer: Optional[str] = None) -> str:
# Docstring inherited from base class
if transfer != "auto":
return transfer
return "copy"

def _standardizeIngestPath(self, path: str, *, transfer: Optional[str] = None) -> str:
# Docstring inherited from FileLikeDatastore._standardizeIngestPath.
if transfer not in (None, "move", "copy"):
Expand Down

0 comments on commit 396cc6f

Please sign in to comment.