Skip to content

Commit

Permalink
Support "move" ingest into chained datastore
Browse files Browse the repository at this point in the history
Treat it as a copy and then clean up.
  • Loading branch information
timj committed Oct 8, 2022
1 parent 07bf668 commit b689edf
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 11 deletions.
41 changes: 33 additions & 8 deletions python/lsst/daf/butler/datastores/chainedDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ class _IngestPrepData(Datastore.IngestPrepData):
Pairs of `Datastore`, `IngestPrepData` for all child datastores.
"""

def __init__(self, children: List[Tuple[Datastore, Datastore.IngestPrepData]]):
super().__init__(itertools.chain.from_iterable(data.refs.values() for _, data in children))
def __init__(self, children: List[Tuple[Datastore, Datastore.IngestPrepData, set[ResourcePath]]]):
super().__init__(itertools.chain.from_iterable(data.refs.values() for _, data, _ in children))
self.children = children


Expand Down Expand Up @@ -446,8 +446,8 @@ def _overrideTransferMode(self, *datasets: Any, transfer: Optional[str] = None)

def _prepIngest(self, *datasets: FileDataset, transfer: Optional[str] = None) -> _IngestPrepData:
# Docstring inherited from Datastore._prepIngest.
if transfer is None or transfer == "move":
raise NotImplementedError("ChainedDatastore does not support transfer=None or transfer='move'.")
if transfer is None:
raise NotImplementedError("ChainedDatastore does not support transfer=None.")

def isDatasetAcceptable(dataset: FileDataset, *, name: str, constraints: Constraints) -> bool:
acceptable = [ref for ref in dataset.refs if constraints.isAcceptable(ref)]
Expand All @@ -471,7 +471,7 @@ def isDatasetAcceptable(dataset: FileDataset, *, name: str, constraints: Constra

# Iterate over nested datastores and call _prepIngest on each.
# Save the results to a list:
children: List[Tuple[Datastore, Datastore.IngestPrepData]] = []
children: List[Tuple[Datastore, Datastore.IngestPrepData, set[ResourcePath]]] = []
# ...and remember whether all of the failures are due to
# NotImplementedError being raised.
allFailuresAreNotImplementedError = True
Expand All @@ -495,7 +495,16 @@ def isDatasetAcceptable(dataset: FileDataset, *, name: str, constraints: Constra
)
continue
allFailuresAreNotImplementedError = False
children.append((datastore, prepDataForChild))
if okForChild:
# Do not store for later if a datastore has rejected
# everything.
# Include the source paths if this is a "move". It's clearer
# to find the paths now rather than try to infer how
# each datastore has stored them in the internal prep class.
paths = (
{ResourcePath(dataset.path) for dataset in okForChild} if transfer == "move" else set()
)
children.append((datastore, prepDataForChild, paths))
if allFailuresAreNotImplementedError:
raise NotImplementedError(f"No child datastore supports transfer mode {transfer}.")
return _IngestPrepData(children=children)
Expand All @@ -508,10 +517,26 @@ def _finishIngest(
record_validation_info: bool = True,
) -> None:
# Docstring inherited from Datastore._finishIngest.
for datastore, prepDataForChild in prepData.children:
# For "move" we must use "copy" and then delete the input
# data at the end. This has no rollback option if the ingest
# subsequently fails. If there is only one active datastore
# accepting any files we can leave it as "move"
actual_transfer: str | None
if transfer == "move" and len(prepData.children) > 1:
actual_transfer = "copy"
else:
actual_transfer = transfer
to_be_deleted: set[ResourcePath] = set()
for datastore, prepDataForChild, paths in prepData.children:
datastore._finishIngest(
prepDataForChild, transfer=transfer, record_validation_info=record_validation_info
prepDataForChild, transfer=actual_transfer, record_validation_info=record_validation_info
)
to_be_deleted.update(paths)
if actual_transfer != transfer:
# These datasets were copied but now need to be deleted.
# This can not be rolled back.
for uri in to_be_deleted:
uri.remove()

def getManyURIs(
self,
Expand Down
12 changes: 9 additions & 3 deletions tests/test_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -819,10 +819,16 @@ def testIngest(self):
# Create a DatasetRef for ingest
refs.append(DatasetRef(datasetType, dataId, id=None))

datasets = []
datasets.append(FileDataset(path=metricFile, refs=refs, formatter=MultiDetectorFormatter))
# Test "move" transfer to ensure that the files themselves
# have disappeared following ingest.
with ResourcePath.temporary_uri(suffix=".yaml") as tempFile:
tempFile.transfer_from(ResourcePath(metricFile), transfer="copy")

datasets = []
datasets.append(FileDataset(path=tempFile, refs=refs, formatter=MultiDetectorFormatter))

butler.ingest(*datasets, transfer="copy", record_validation_info=False)
butler.ingest(*datasets, transfer="move", record_validation_info=False)
self.assertFalse(tempFile.exists())

# Check that the datastore recorded no file size.
# Not all datastores can support this.
Expand Down

0 comments on commit b689edf

Please sign in to comment.