Skip to content

Commit

Permalink
Merge pull request #551 from lsst/tickets/DM-31251
Browse files Browse the repository at this point in the history
DM-31251: Allow butler.ingest to work with execution butler
  • Loading branch information
timj committed Aug 2, 2021
2 parents cf79cce + 2f6b627 commit 023bc29
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 13 deletions.
2 changes: 2 additions & 0 deletions doc/changes/DM-31251.misc.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Add ``split`` transfer mode that can be used when some files are inside the datastore and some files are outside the datastore.
This is equivalent to using `None` and ``direct`` mode dynamically.
43 changes: 37 additions & 6 deletions python/lsst/daf/butler/_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1516,8 +1516,8 @@ def ingest(self, *datasets: FileDataset, transfer: Optional[str] = "auto", run:
standardized form.
transfer : `str`, optional
If not `None`, must be one of 'auto', 'move', 'copy', 'direct',
'hardlink', 'relsymlink' or 'symlink', indicating how to transfer
the file.
'split', 'hardlink', 'relsymlink' or 'symlink', indicating how to
transfer the file.
run : `str`, optional
The name of the run ingested datasets should be added to,
overriding ``self.run``.
Expand Down Expand Up @@ -1568,16 +1568,49 @@ def ingest(self, *datasets: FileDataset, transfer: Optional[str] = "auto", run:
# This list intentionally shared across the inner loop, since it's
# associated with `dataset`.
resolvedRefs: List[DatasetRef] = []

# Somewhere to store pre-existing refs if we have an
# execution butler.
existingRefs: List[DatasetRef] = []

for ref in dataset.refs:
if ref.dataId in groupedData[ref.datasetType]:
raise ConflictingDefinitionError(f"Ingest conflict. Dataset {dataset.path} has same"
" DataId as other ingest dataset"
f" {groupedData[ref.datasetType][ref.dataId][0].path} "
f" ({ref.dataId})")
if self._allow_put_of_predefined_dataset:
existing_ref = self.registry.findDataset(ref.datasetType,
dataId=ref.dataId,
collections=run)
if existing_ref:
if self.datastore.knows(existing_ref):
raise ConflictingDefinitionError(f"Dataset associated with path {dataset.path}"
f" already exists as {existing_ref}.")
# Store this ref elsewhere since it already exists
# and we do not want to remake it but we do want
# to store it in the datastore.
existingRefs.append(existing_ref)

# Nothing else to do until we have finished
# iterating.
continue

groupedData[ref.datasetType][ref.dataId] = (dataset, resolvedRefs)

if existingRefs:

if len(dataset.refs) != len(existingRefs):
# Keeping track of partially pre-existing datasets is hard
# and should generally never happen. For now don't allow
# it.
raise ConflictingDefinitionError(f"For dataset {dataset.path} some dataIds already exist"
" in registry but others do not. This is not supported.")

# Attach the resolved refs if we found them.
dataset.refs = existingRefs

# Now we can bulk-insert into Registry for each DatasetType.
allResolvedRefs: List[DatasetRef] = []
for datasetType, groupForType in progress.iter_item_chunks(groupedData.items(),
desc="Bulk-inserting datasets by type"):
refs = self.registry.insertDatasets(
Expand All @@ -1593,13 +1626,11 @@ def ingest(self, *datasets: FileDataset, transfer: Optional[str] = "auto", run:
resolvedRefs.append(ref)

# Go back to the original FileDatasets to replace their refs with the
# new resolved ones, and also build a big list of all refs.
allResolvedRefs = []
# new resolved ones.
for groupForType in progress.iter_chunks(groupedData.values(),
desc="Reassociating resolved dataset refs with files"):
for dataset, resolvedRefs in groupForType.values():
dataset.refs = resolvedRefs
allResolvedRefs.extend(resolvedRefs)

# Bulk-insert everything into Datastore.
self.datastore.ingest(*datasets, transfer=transfer)
Expand Down
35 changes: 28 additions & 7 deletions python/lsst/daf/butler/datastores/fileDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -723,8 +723,16 @@ def _overrideTransferMode(self, *datasets: FileDataset, transfer: Optional[str]
# Allow ButlerURI to use its own knowledge
transfer = "auto"
else:
raise ValueError("Some datasets are inside the datastore and some are outside."
" Please use an explicit transfer mode and not 'auto'.")
# This can happen when importing from a datastore that
# has had some datasets ingested using "direct" mode.
# Also allow ButlerURI to sort it out but warn about it.
# This can happen if you are importing from a datastore
# that had some direct transfer datasets.
log.warning("Some datasets are inside the datastore and some are outside. Using 'split' "
"transfer mode. This assumes that the files outside the datastore are "
"still accessible to the new butler since they will not be copied into "
"the target datastore.")
transfer = "split"

return transfer

Expand Down Expand Up @@ -783,7 +791,7 @@ def _standardizeIngestPath(self, path: Union[str, ButlerURI], *,
FileNotFoundError
Raised if one of the given files does not exist.
"""
if transfer not in (None, "direct") + self.root.transferModes:
if transfer not in (None, "direct", "split") + self.root.transferModes:
raise NotImplementedError(f"Transfer mode {transfer} not supported.")

# A relative URI indicates relative to datastore root
Expand Down Expand Up @@ -852,7 +860,7 @@ def _extractIngestInfo(self, path: Union[str, ButlerURI], ref: DatasetRef, *,
have_sized = False

tgtLocation: Optional[Location]
if transfer is None:
if transfer is None or transfer == "split":
# A relative path is assumed to be relative to the datastore
# in this context
if not srcUri.isabs():
Expand All @@ -861,10 +869,18 @@ def _extractIngestInfo(self, path: Union[str, ButlerURI], ref: DatasetRef, *,
# Work out the path in the datastore from an absolute URI
# This is required to be within the datastore.
pathInStore = srcUri.relative_to(self.root)
if pathInStore is None:
if pathInStore is None and transfer is None:
raise RuntimeError(f"Unexpectedly learned that {srcUri} is "
f"not within datastore {self.root}")
tgtLocation = self.locationFactory.fromPath(pathInStore)
if pathInStore:
tgtLocation = self.locationFactory.fromPath(pathInStore)
elif transfer == "split":
# Outside the datastore but treat that as a direct ingest
# instead.
tgtLocation = None
else:
raise RuntimeError(f"Unexpected transfer mode encountered: {transfer} for"
f" URI {srcUri}")
elif transfer == "direct":
# Want to store the full URI to the resource directly in
# datastore. This is useful for referring to permanent archive
Expand Down Expand Up @@ -1292,6 +1308,11 @@ def getURIs(self, ref: DatasetRef,
raise RuntimeError(f"Unexpectedly got no component name for a component at {location}")
uri = location.uri
if guessing and not uri.exists():
# If we are trusting then it is entirely possible for
# some components to be missing. In that case we skip
# to the next component.
if self.trustGetRequest:
continue
raise FileNotFoundError(f"Expected URI ({uri}) does not exist")
components[storedFileInfo.component] = uri

Expand Down Expand Up @@ -1810,7 +1831,7 @@ def transfer_from(self, source_datastore: Datastore, refs: Iterable[DatasetRef],
# require that the URI inside the source datastore should be stored
# directly in the target datastore, which seems unlikely to be useful
# since at any moment the source datastore could delete the file.
if transfer == "direct":
if transfer in ("direct", "split"):
raise ValueError("Can not transfer from a source datastore using direct mode since"
" those files are controlled by the other datastore.")

Expand Down

0 comments on commit 023bc29

Please sign in to comment.