Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-31251: Allow butler.ingest to work with execution butler #551

Merged
merged 5 commits into from
Aug 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions doc/changes/DM-31251.misc.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Add ``split`` transfer mode that can be used when some files are inside the datastore and some files are outside the datastore.
This is equivalent to using `None` and ``direct`` mode dynamically.
43 changes: 37 additions & 6 deletions python/lsst/daf/butler/_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1516,8 +1516,8 @@ def ingest(self, *datasets: FileDataset, transfer: Optional[str] = "auto", run:
standardized form.
transfer : `str`, optional
If not `None`, must be one of 'auto', 'move', 'copy', 'direct',
'hardlink', 'relsymlink' or 'symlink', indicating how to transfer
the file.
'split', 'hardlink', 'relsymlink' or 'symlink', indicating how to
transfer the file.
run : `str`, optional
The name of the run ingested datasets should be added to,
overriding ``self.run``.
Expand Down Expand Up @@ -1568,16 +1568,49 @@ def ingest(self, *datasets: FileDataset, transfer: Optional[str] = "auto", run:
# This list intentionally shared across the inner loop, since it's
# associated with `dataset`.
resolvedRefs: List[DatasetRef] = []

# Somewhere to store pre-existing refs if we have an
# execution butler.
existingRefs: List[DatasetRef] = []

for ref in dataset.refs:
if ref.dataId in groupedData[ref.datasetType]:
raise ConflictingDefinitionError(f"Ingest conflict. Dataset {dataset.path} has same"
" DataId as other ingest dataset"
f" {groupedData[ref.datasetType][ref.dataId][0].path} "
f" ({ref.dataId})")
if self._allow_put_of_predefined_dataset:
existing_ref = self.registry.findDataset(ref.datasetType,
dataId=ref.dataId,
collections=run)
if existing_ref:
if self.datastore.knows(existing_ref):
raise ConflictingDefinitionError(f"Dataset associated with path {dataset.path}"
f" already exists as {existing_ref}.")
# Store this ref elsewhere since it already exists
# and we do not want to remake it but we do want
# to store it in the datastore.
existingRefs.append(existing_ref)

# Nothing else to do until we have finished
# iterating.
continue

groupedData[ref.datasetType][ref.dataId] = (dataset, resolvedRefs)

if existingRefs:

if len(dataset.refs) != len(existingRefs):
# Keeping track of partially pre-existing datasets is hard
# and should generally never happen. For now don't allow
# it.
raise ConflictingDefinitionError(f"For dataset {dataset.path} some dataIds already exist"
" in registry but others do not. This is not supported.")

# Attach the resolved refs if we found them.
dataset.refs = existingRefs

# Now we can bulk-insert into Registry for each DatasetType.
allResolvedRefs: List[DatasetRef] = []
for datasetType, groupForType in progress.iter_item_chunks(groupedData.items(),
desc="Bulk-inserting datasets by type"):
refs = self.registry.insertDatasets(
Expand All @@ -1593,13 +1626,11 @@ def ingest(self, *datasets: FileDataset, transfer: Optional[str] = "auto", run:
resolvedRefs.append(ref)

# Go back to the original FileDatasets to replace their refs with the
# new resolved ones, and also build a big list of all refs.
allResolvedRefs = []
# new resolved ones.
for groupForType in progress.iter_chunks(groupedData.values(),
desc="Reassociating resolved dataset refs with files"):
for dataset, resolvedRefs in groupForType.values():
dataset.refs = resolvedRefs
allResolvedRefs.extend(resolvedRefs)

# Bulk-insert everything into Datastore.
self.datastore.ingest(*datasets, transfer=transfer)
Expand Down
35 changes: 28 additions & 7 deletions python/lsst/daf/butler/datastores/fileDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -723,8 +723,16 @@ def _overrideTransferMode(self, *datasets: FileDataset, transfer: Optional[str]
# Allow ButlerURI to use its own knowledge
transfer = "auto"
else:
raise ValueError("Some datasets are inside the datastore and some are outside."
" Please use an explicit transfer mode and not 'auto'.")
# This can happen when importing from a datastore that
# has had some datasets ingested using "direct" mode.
# Also allow ButlerURI to sort it out but warn about it.
# This can happen if you are importing from a datastore
# that had some direct transfer datasets.
log.warning("Some datasets are inside the datastore and some are outside. Using 'split' "
"transfer mode. This assumes that the files outside the datastore are "
"still accessible to the new butler since they will not be copied into "
"the target datastore.")
transfer = "split"

return transfer

Expand Down Expand Up @@ -783,7 +791,7 @@ def _standardizeIngestPath(self, path: Union[str, ButlerURI], *,
FileNotFoundError
Raised if one of the given files does not exist.
"""
if transfer not in (None, "direct") + self.root.transferModes:
if transfer not in (None, "direct", "split") + self.root.transferModes:
raise NotImplementedError(f"Transfer mode {transfer} not supported.")

# A relative URI indicates relative to datastore root
Expand Down Expand Up @@ -852,7 +860,7 @@ def _extractIngestInfo(self, path: Union[str, ButlerURI], ref: DatasetRef, *,
have_sized = False

tgtLocation: Optional[Location]
if transfer is None:
if transfer is None or transfer == "split":
# A relative path is assumed to be relative to the datastore
# in this context
if not srcUri.isabs():
Expand All @@ -861,10 +869,18 @@ def _extractIngestInfo(self, path: Union[str, ButlerURI], ref: DatasetRef, *,
# Work out the path in the datastore from an absolute URI
# This is required to be within the datastore.
pathInStore = srcUri.relative_to(self.root)
if pathInStore is None:
if pathInStore is None and transfer is None:
raise RuntimeError(f"Unexpectedly learned that {srcUri} is "
f"not within datastore {self.root}")
tgtLocation = self.locationFactory.fromPath(pathInStore)
if pathInStore:
tgtLocation = self.locationFactory.fromPath(pathInStore)
elif transfer == "split":
# Outside the datastore but treat that as a direct ingest
# instead.
tgtLocation = None
else:
raise RuntimeError(f"Unexpected transfer mode encountered: {transfer} for"
f" URI {srcUri}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems like this whole if block is combining two different bits of logic together. It might be the right thing, but I am having a bit of trouble following it, not knowing what some of the results of calls will be.

You have changed to pathInStore is None and transfer is None is this implying that you are using something like True, False, and None as conditions in normal operation? Is it ok that if pathInStore is None, transfer is not None, for the if pathInStore to evaluate to false based on None being false-y?

I think I understand what you are doing and I think it is fine, but I am sufficiently uncertain, that I want to highlight it for a second look by you.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TallJimbo used transfer=None to mean "the files are already in the right place" and no transfer is needed. For split mode I need to handle the case where some are in and some are out. For the files inside the target location is relative to the datastore root. For the files outside there is also no transfer but that is "direct" mode and is handled by setting the target location to None (so the code below knows to use the source URI directly). It is combining two bits of logic but the None mode is most of the logic and it didn't seem like it was reasonable to add a new split path as a separate check that would duplicate the None -- the only change is to set that target URI to None in split mode.

elif transfer == "direct":
# Want to store the full URI to the resource directly in
# datastore. This is useful for referring to permanent archive
Expand Down Expand Up @@ -1292,6 +1308,11 @@ def getURIs(self, ref: DatasetRef,
raise RuntimeError(f"Unexpectedly got no component name for a component at {location}")
uri = location.uri
if guessing and not uri.exists():
# If we are trusting then it is entirely possible for
# some components to be missing. In that case we skip
# to the next component.
if self.trustGetRequest:
continue
raise FileNotFoundError(f"Expected URI ({uri}) does not exist")
components[storedFileInfo.component] = uri

Expand Down Expand Up @@ -1810,7 +1831,7 @@ def transfer_from(self, source_datastore: Datastore, refs: Iterable[DatasetRef],
# require that the URI inside the source datastore should be stored
# directly in the target datastore, which seems unlikely to be useful
# since at any moment the source datastore could delete the file.
if transfer == "direct":
if transfer in ("direct", "split"):
raise ValueError("Can not transfer from a source datastore using direct mode since"
" those files are controlled by the other datastore.")

Expand Down