Skip to content

Commit

Permalink
Allow butler.ingest to work with execution butler
Browse files Browse the repository at this point in the history
Add the check for pre-existing datasets in registry but no
file in datastore.
  • Loading branch information
timj committed Jul 30, 2021
1 parent 4707e78 commit 4225aba
Showing 1 changed file with 34 additions and 0 deletions.
34 changes: 34 additions & 0 deletions python/lsst/daf/butler/_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1568,14 +1568,48 @@ def ingest(self, *datasets: FileDataset, transfer: Optional[str] = "auto", run:
# This list intentionally shared across the inner loop, since it's
# associated with `dataset`.
resolvedRefs: List[DatasetRef] = []

# Somewhere to store pre-existing refs if we have an
# execution butler.
existingRefs: List[DatasetRef] = []

for ref in dataset.refs:
if ref.dataId in groupedData[ref.datasetType]:
raise ConflictingDefinitionError(f"Ingest conflict. Dataset {dataset.path} has same"
" DataId as other ingest dataset"
f" {groupedData[ref.datasetType][ref.dataId][0].path} "
f" ({ref.dataId})")
if self._allow_put_of_predefined_dataset:
existing_ref = self.registry.findDataset(ref.datasetType,
dataId=ref.dataId,
collections=run)
if existing_ref:
if self.datastore.knows(existing_ref):
raise ConflictingDefinitionError(f"Dataset associated with path {dataset.path}"
f" already exists as {existing_ref}.")
# Store this ref elsewhere since it already exists
# and we do not want to remake it but we do want
# to store it in the datastore.
existingRefs.append(existing_ref)

# Nothing else to do until we have finished
# iterating.
continue

groupedData[ref.datasetType][ref.dataId] = (dataset, resolvedRefs)

if existingRefs:

if len(dataset.refs) != len(existingRefs):
# Keeping track of partially pre-existing datasets is hard
# and should generally never happen. For now don't allow
# it.
raise ConflictingDefinitionError(f"For dataset {dataset.path} some dataIds already exist"
" in registry but others do not. This is not supported.")

# Attach the resolved refs if we found them.
dataset.refs = existingRefs

# Now we can bulk-insert into Registry for each DatasetType.
for datasetType, groupForType in progress.iter_item_chunks(groupedData.items(),
desc="Bulk-inserting datasets by type"):
Expand Down

0 comments on commit 4225aba

Please sign in to comment.