Skip to content

Commit

Permalink
Merge pull request #130 from lsst/tickets/DM-17491
Browse files Browse the repository at this point in the history
DM-17491: Implement Butler deletion APIs
  • Loading branch information
TallJimbo committed Feb 9, 2019
2 parents 1dcecdb + c1f19e4 commit a653640
Show file tree
Hide file tree
Showing 7 changed files with 269 additions and 73 deletions.
11 changes: 11 additions & 0 deletions config/schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ schema:
-
src: quantum_id
tgt: Quantum.execution_id
onDelete: SET NULL
-
src: abstract_filter
tgt: AbstractFilter.abstract_filter
Expand Down Expand Up @@ -192,9 +193,11 @@ schema:
-
src: parent_dataset_id
tgt: Dataset.dataset_id
onDelete: CASCADE
-
src: component_dataset_id
tgt: Dataset.dataset_id
onDelete: CASCADE

DatasetType:
doc: >
Expand Down Expand Up @@ -301,6 +304,7 @@ schema:
-
src: dataset_id
tgt: Dataset.dataset_id
onDelete: CASCADE
unique:
- [dataset_ref_hash, collection]

Expand Down Expand Up @@ -377,12 +381,15 @@ schema:
-
src: environment_id
tgt: Dataset.dataset_id
onDelete: SET NULL
-
src: pipeline_id
tgt: Dataset.dataset_id
onDelete: SET NULL
-
src: execution_id
tgt: Execution.execution_id
onDelete: CASCADE

Quantum:
doc: >
Expand Down Expand Up @@ -413,9 +420,11 @@ schema:
-
src: run_id
tgt: Run.execution_id
onDelete: CASCADE
-
src: execution_id
tgt: Execution.execution_id
onDelete: CASCADE

DatasetConsumers:
doc: >
Expand Down Expand Up @@ -444,9 +453,11 @@ schema:
-
src: quantum_id
tgt: Quantum.execution_id
onDelete: CASCADE
-
src: dataset_id
tgt: Dataset.dataset_id
onDelete: CASCADE

DatasetStorage:
doc: >
Expand Down
68 changes: 64 additions & 4 deletions python/lsst/daf/butler/butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import os
import contextlib
import logging
import itertools

from lsst.utils import doImport
from .core.utils import transactional
Expand Down Expand Up @@ -367,7 +368,8 @@ def getDirect(self, ref, parameters=None):
parameters=unusedParams)
else:
# single entity in datastore
raise ValueError("Unable to locate ref {} in datastore {}".format(ref.id, self.datastore.name))
raise FileNotFoundError("Unable to locate ref {} in datastore {}".format(ref.id,
self.datastore.name))

def get(self, datasetRefOrType, dataId=None, parameters=None, **kwds):
"""Retrieve a stored dataset.
Expand All @@ -380,7 +382,7 @@ def get(self, datasetRefOrType, dataId=None, parameters=None, **kwds):
dataId : `dict` or `DataId`
A `dict` of `Dimension` link name, value pairs that label the
`DatasetRef` within a Collection. When `None`, a `DatasetRef`
should be provided as the second argument.
should be provided as the first argument.
parameters : `dict`
Additional StorageClass-defined options to control reading,
typically used to efficiently read only a subset of the dataset.
Expand Down Expand Up @@ -420,7 +422,7 @@ def getUri(self, datasetRefOrType, dataId=None, predict=False, **kwds):
dataId : `dict` or `DataId`
A `dict` of `Dimension` link name, value pairs that label the
`DatasetRef` within a Collection. When `None`, a `DatasetRef`
should be provided as the second argument.
should be provided as the first argument.
predict : `bool`
If `True`, allow URIs to be returned of datasets that have not
been written.
Expand Down Expand Up @@ -468,7 +470,7 @@ def datasetExists(self, datasetRefOrType, dataId=None, **kwds):
dataId : `dict` or `DataId`
A `dict` of `Dimension` link name, value pairs that label the
`DatasetRef` within a Collection. When `None`, a `DatasetRef`
should be provided as the second argument.
should be provided as the first argument.
kwds
Additional keyword arguments used to augment or construct a
`DataId`. See `DataId` parameters.
Expand All @@ -485,3 +487,61 @@ def datasetExists(self, datasetRefOrType, dataId=None, **kwds):
"{} with {} not found in collection {}".format(datasetType, dataId, self.collection)
)
return self.datastore.exists(ref)

def remove(self, datasetRefOrType, dataId=None, *, delete=True, remember=True, **kwds):
"""Remove a dataset from the collection and possibly the repository.
The identified dataset is always at least removed from the Butler's
collection. By default it is also deleted from the Datastore (e.g.
files are actually deleted), but the dataset is "remembered" by
retaining its row in the dataset and provenance tables in the registry.
If the dataset is a composite, all components will also be removed.
Parameters
----------
datasetRefOrType : `DatasetRef`, `DatasetType`, or `str`
When `DatasetRef` the `dataId` should be `None`.
Otherwise the `DatasetType` or name thereof.
dataId : `dict` or `DataId`
A `dict` of `Dimension` link name, value pairs that label the
`DatasetRef` within a Collection. When `None`, a `DatasetRef`
should be provided as the first argument.
delete : `bool`
If `True` (default) actually delete the dataset from the
Datastore (i.e. actually remove files).
remember : `bool`
If `True` (default), retain dataset and provenance records in
the `Registry` for this dataset.
kwds
Additional keyword arguments used to augment or construct a
`DataId`. See `DataId` parameters.
Raises
------
ValueError
Raised if ``delete`` and ``remember`` are both `False`; a dataset
cannot remain in a `Datastore` if all of its `Registry` entries are
removed.
OrphanedRecordError
Raised if ``remember`` is `False` but the dataset is still present
in a `Datastore` not recognized by this `Butler` client.
"""
datasetType, dataId = self._standardizeArgs(datasetRefOrType, dataId, **kwds)
ref = self.registry.find(self.collection, datasetType, dataId, **kwds)
if delete:
for r in itertools.chain([ref], ref.components.values()):
# If dataset is a composite, we don't know whether it's the
# parent or the components that actually need to be removed,
# so try them all and swallow errors.
try:
self.datastore.remove(r)
except FileNotFoundError:
pass
elif not remember:
raise ValueError("Cannot retain dataset in Datastore without keeping Registry dataset record.")
if remember:
self.registry.disassociate(self.collection, [ref])
else:
# This also implicitly disassociates.
self.registry.removeDataset(ref)
50 changes: 37 additions & 13 deletions python/lsst/daf/butler/core/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from .dataIdPacker import DataIdPackerFactory

__all__ = ("RegistryConfig", "Registry", "disableWhenLimited",
"AmbiguousDatasetError", "ConflictingDefinitionError")
"AmbiguousDatasetError", "ConflictingDefinitionError", "OrphanedRecordError")


class AmbiguousDatasetError(Exception):
Expand All @@ -48,6 +48,12 @@ class ConflictingDefinitionError(Exception):
"""


class OrphanedRecordError(Exception):
"""Exception raised when trying to remove or modify a database record
that is still being used in some other table.
"""


def disableWhenLimited(func):
"""Decorator that indicates that a method should raise NotImplementedError
on Registries whose ``limited`` attribute is `True`.
Expand Down Expand Up @@ -415,6 +421,32 @@ def getDataset(self, id, datasetType=None, dataId=None):
"""
raise NotImplementedError("Must be implemented by subclass")

@abstractmethod
@transactional
def removeDataset(self, ref):
"""Remove a dataset from the Registry.
The dataset and all components will be removed unconditionally from
all collections, and any associated `Quantum` records will also be
removed. `Datastore` records will *not* be deleted; the caller is
responsible for ensuring that the dataset has already been removed
from all Datastores.
Parameters
----------
ref : `DatasetRef`
Reference to the dataset to be removed. Must include a valid
``id`` attribute, and should be considered invalidated upon return.
Raises
------
AmbiguousDatasetError
Raised if ``ref.id`` is `None`.
OrphanedRecordError
Raised if the dataset is still present in any `Datastore`.
"""
raise NotImplementedError("Must be implemented by subclass")

@abstractmethod
@transactional
def attachComponent(self, name, parent, component):
Expand Down Expand Up @@ -454,7 +486,8 @@ def associate(self, collection, refs):
Indicates the collection the Datasets should be associated with.
refs : iterable of `DatasetRef`
An iterable of `DatasetRef` instances that already exist in this
`Registry`.
`Registry`. All component datasets will be associated with the
collection as well.
Raises
------
Expand All @@ -466,7 +499,7 @@ def associate(self, collection, refs):

@abstractmethod
@transactional
def disassociate(self, collection, refs, remove=True):
def disassociate(self, collection, refs):
r"""Remove existing Datasets from a collection.
``collection`` and ``ref`` combinations that are not currently
Expand All @@ -478,16 +511,7 @@ def disassociate(self, collection, refs, remove=True):
The collection the Datasets should no longer be associated with.
refs : `list` of `DatasetRef`
A `list` of `DatasetRef` instances that already exist in this
`Registry`.
remove : `bool`
If `True`, remove Datasets from the `Registry` if they are not
associated with any collection (including via any composites).
Returns
-------
removed : `list` of `DatasetRef`
If `remove` is `True`, the `list` of `DatasetRef`\ s that were
removed.
`Registry`. All component datasets will also be removed.
Raises
------
Expand Down
13 changes: 10 additions & 3 deletions python/lsst/daf/butler/core/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,15 +262,17 @@ def addForeignKeyConstraint(self, table, constraintDescription):
Should always contain:
- src, list of source column names
- tgt, list of target column names
May also contain:
- onDelete, one of "SET NULL" or "CASCADE".
"""
if isinstance(table, str):
table = self.metadata.tables[table]
src, tgt, tgtTable = self.normalizeForeignKeyConstraint(constraintDescription)
src, tgt, tgtTable, onDelete = self.normalizeForeignKeyConstraint(constraintDescription)
if not self.isIncluded(table.name) or not self.isIncluded(tgtTable):
return
if self.isView(table.name) or self.isView(tgtTable):
return
table.append_constraint(ForeignKeyConstraint(src, tgt))
table.append_constraint(ForeignKeyConstraint(src, tgt, ondelete=onDelete))

def makeColumn(self, columnDescription):
"""Make a Column entry for addition to a Table.
Expand Down Expand Up @@ -334,6 +336,8 @@ def normalizeForeignKeyConstraint(self, constraintDescription):
Should always contain:
- src, list of source column names or single source column name
- tgt, list of (table-qualified) target column names or single target column name
May also contain:
- onDelete, one of "SET NULL" or "CASCADE".
Returns
-------
Expand All @@ -343,9 +347,12 @@ def normalizeForeignKeyConstraint(self, constraintDescription):
Sequence of table-qualified field names in the remote table.
tgtTable : `str`
Name of the target table.
onDelete : `str`, optional
One of "SET NULL", "CASCADE", or `None`.
"""
src = tuple(iterable(constraintDescription["src"]))
tgt = tuple(iterable(constraintDescription["tgt"]))
tgtTable, _ = tgt[0].split(".")
assert all(t.split(".")[0] == tgtTable for t in tgt[1:])
return src, tgt, tgtTable
onDelete = constraintDescription.get("onDelete", None)
return src, tgt, tgtTable, onDelete

0 comments on commit a653640

Please sign in to comment.