Skip to content

Commit

Permalink
Merge pull request #289 from lsst/tickets/DM-25000
Browse files Browse the repository at this point in the history
DM-25000: Make registry query methods work with composition again.
  • Loading branch information
TallJimbo committed May 22, 2020
2 parents 2cb96f3 + 11c8da9 commit f58ce13
Show file tree
Hide file tree
Showing 19 changed files with 287 additions and 575 deletions.
91 changes: 16 additions & 75 deletions python/lsst/daf/butler/_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -649,34 +649,16 @@ def put(self, obj: Any, datasetRefOrType: Union[DatasetRef, DatasetType, str],
raise TypeError(f"Cannot associate into collection '{tag}' of non-TAGGED type "
f"{collectionType.name}.")

# Disable all disassembly at the registry level for now
isVirtualComposite = False

# Add Registry Dataset entry. If not a virtual composite, add
# and attach components at the same time.
# Add Registry Dataset entry.
dataId = self.registry.expandDataId(dataId, graph=datasetType.dimensions, **kwds)
ref, = self.registry.insertDatasets(datasetType, run=run, dataIds=[dataId],
producer=producer,
# Never write components into
# registry
recursive=False)

# Check to see if this datasetType requires disassembly
if isVirtualComposite:
components = datasetType.storageClass.assembler().disassemble(obj)
componentRefs = {}
for component, info in components.items():
compTypeName = datasetType.componentTypeName(component)
compRef = self.put(info.component, compTypeName, dataId, producer=producer, run=run,
collection=False) # We don't need to recursively associate.
componentRefs[component] = compRef
ref = self.registry.attachComponents(ref, componentRefs)
else:
# This is an entity without a disassembler.
self.datastore.put(obj, ref)
producer=producer)

# Add Datastore entry.
self.datastore.put(obj, ref)

for tag in tags:
self.registry.associate(tag, [ref]) # this is already recursive by default
self.registry.associate(tag, [ref])

return ref

Expand All @@ -700,37 +682,7 @@ def getDirect(self, ref: DatasetRef, *, parameters: Optional[Dict[str, Any]] = N
obj : `object`
The dataset.
"""
# if the ref exists in the store we return it directly
if self.datastore.exists(ref):
return self.datastore.get(ref, parameters=parameters)
elif ref.isComposite() and ref.components:
# The presence of components indicates that this dataset
# was disassembled at the registry level.
# Check that we haven't got any unknown parameters
ref.datasetType.storageClass.validateParameters(parameters)
# Reconstruct the composite
usedParams = set()
components = {}
for compName, compRef in ref.components.items():
# make a dictionary of parameters containing only the subset
# supported by the StorageClass of the components
compParams = compRef.datasetType.storageClass.filterParameters(parameters)
usedParams.update(set(compParams))
components[compName] = self.datastore.get(compRef, parameters=compParams)

# Any unused parameters will have to be passed to the assembler
if parameters:
unusedParams = {k: v for k, v in parameters.items() if k not in usedParams}
else:
unusedParams = {}

# Assemble the components
inMemoryDataset = ref.datasetType.storageClass.assembler().assemble(components)
return ref.datasetType.storageClass.assembler().handleParameters(inMemoryDataset,
parameters=unusedParams)
else:
# single entity in datastore
raise FileNotFoundError(f"Unable to locate dataset '{ref}' in datastore {self.datastore.name}")
return self.datastore.get(ref, parameters=parameters)

def getDeferred(self, datasetRefOrType: Union[DatasetRef, DatasetType, str],
dataId: Optional[DataId] = None, *,
Expand Down Expand Up @@ -1034,8 +986,7 @@ def pruneDatasets(self, refs: Iterable[DatasetRef], *,
unstore: bool = False,
tags: Optional[Iterable[str]] = None,
purge: bool = False,
run: Optional[str] = None,
recursive: bool = True):
run: Optional[str] = None):
"""Remove one or more datasets from a collection and/or storage.
Parameters
Expand Down Expand Up @@ -1069,13 +1020,6 @@ def pruneDatasets(self, refs: Iterable[DatasetRef], *,
run : `str`, optional
`~CollectionType.RUN` collection to purge from, overriding
``self.run``. Ignored unless ``purge`` is `True`.
recursive : `bool`, optional
If `True` (default) also prune component datasets of any given
composite datasets. This will only prune components that are
actually attached to the given `DatasetRef` objects, which may
not reflect what is in the database (especially if they were
obtained from `Registry.queryDatasets`, which does not include
components in its results).
Raises
------
Expand Down Expand Up @@ -1110,15 +1054,15 @@ def pruneDatasets(self, refs: Iterable[DatasetRef], *,
if collectionType is not CollectionType.TAGGED:
raise TypeError(f"Cannot disassociate from collection '{tag}' "
f"of non-TAGGED type {collectionType.name}.")
# Transform possibly-single-pass iterable into something we can iterate
# over multiple times.
refs = list(refs)
# Pruning a component of a DatasetRef makes no sense since registry
# doesn't always know about components and datastore might not store
# doesn't know about components and datastore might not store
# components in a separate file
for ref in refs:
if ref.datasetType.component():
raise ValueError(f"Can not prune a component of a dataset (ref={ref})")

if recursive:
refs = list(DatasetRef.flatten(refs))
# We don't need an unreliable Datastore transaction for this, because
# we've been extra careful to ensure that Datastore.trash only involves
# mutating the Registry (it can _look_ at Datastore-specific things,
Expand All @@ -1141,12 +1085,10 @@ def pruneDatasets(self, refs: Iterable[DatasetRef], *,
if self.datastore.exists(ref):
self.datastore.trash(ref)
if purge:
self.registry.removeDatasets(refs, recursive=False) # refs is already recursiveley expanded
self.registry.removeDatasets(refs)
elif disassociate:
for tag in tags:
# recursive=False here because refs is already recursive
# if we want it to be.
self.registry.disassociate(tag, refs, recursive=False)
self.registry.disassociate(tag, refs)
# We've exited the Registry transaction, and apparently committed.
# (if there was an exception, everything rolled back, and it's as if
# nothing happened - and we never get here).
Expand Down Expand Up @@ -1258,8 +1200,7 @@ def ingest(self, *datasets: FileDataset, transfer: Optional[str] = None, run: Op
for datasetType, groupForType in groupedData.items():
refs = self.registry.insertDatasets(datasetType,
dataIds=groupForType.keys(),
run=run,
recursive=True)
run=run)
# Append those resolved DatasetRefs to the new lists we set up for
# them.
for ref, (_, resolvedRefs) in zip(refs, groupForType.values()):
Expand Down Expand Up @@ -1418,7 +1359,7 @@ def validateConfiguration(self, logFailures: bool = False,
ignore : iterable of `str`, optional
Names of DatasetTypes to skip over. This can be used to skip
known problems. If a named `DatasetType` corresponds to a
composite, all component of that `DatasetType` will also be
composite, all components of that `DatasetType` will also be
ignored.
Raises
Expand Down

0 comments on commit f58ce13

Please sign in to comment.