Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-17491: Implement Butler deletion APIs #130

Merged
merged 7 commits into from
Feb 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 11 additions & 0 deletions config/schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ schema:
-
src: quantum_id
tgt: Quantum.execution_id
onDelete: SET NULL
-
src: abstract_filter
tgt: AbstractFilter.abstract_filter
Expand Down Expand Up @@ -192,9 +193,11 @@ schema:
-
src: parent_dataset_id
tgt: Dataset.dataset_id
onDelete: CASCADE
-
src: component_dataset_id
tgt: Dataset.dataset_id
onDelete: CASCADE

DatasetType:
doc: >
Expand Down Expand Up @@ -301,6 +304,7 @@ schema:
-
src: dataset_id
tgt: Dataset.dataset_id
onDelete: CASCADE
unique:
- [dataset_ref_hash, collection]

Expand Down Expand Up @@ -377,12 +381,15 @@ schema:
-
src: environment_id
tgt: Dataset.dataset_id
onDelete: SET NULL
-
src: pipeline_id
tgt: Dataset.dataset_id
onDelete: SET NULL
-
src: execution_id
tgt: Execution.execution_id
onDelete: CASCADE

Quantum:
doc: >
Expand Down Expand Up @@ -413,9 +420,11 @@ schema:
-
src: run_id
tgt: Run.execution_id
onDelete: CASCADE
-
src: execution_id
tgt: Execution.execution_id
onDelete: CASCADE

DatasetConsumers:
doc: >
Expand Down Expand Up @@ -444,9 +453,11 @@ schema:
-
src: quantum_id
tgt: Quantum.execution_id
onDelete: CASCADE
-
src: dataset_id
tgt: Dataset.dataset_id
onDelete: CASCADE

DatasetStorage:
doc: >
Expand Down
68 changes: 64 additions & 4 deletions python/lsst/daf/butler/butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import os
import contextlib
import logging
import itertools

from lsst.utils import doImport
from .core.utils import transactional
Expand Down Expand Up @@ -367,7 +368,8 @@ def getDirect(self, ref, parameters=None):
parameters=unusedParams)
else:
# single entity in datastore
raise ValueError("Unable to locate ref {} in datastore {}".format(ref.id, self.datastore.name))
raise FileNotFoundError("Unable to locate ref {} in datastore {}".format(ref.id,
self.datastore.name))

def get(self, datasetRefOrType, dataId=None, parameters=None, **kwds):
"""Retrieve a stored dataset.
Expand All @@ -380,7 +382,7 @@ def get(self, datasetRefOrType, dataId=None, parameters=None, **kwds):
dataId : `dict` or `DataId`
A `dict` of `Dimension` link name, value pairs that label the
`DatasetRef` within a Collection. When `None`, a `DatasetRef`
should be provided as the second argument.
should be provided as the first argument.
parameters : `dict`
Additional StorageClass-defined options to control reading,
typically used to efficiently read only a subset of the dataset.
Expand Down Expand Up @@ -420,7 +422,7 @@ def getUri(self, datasetRefOrType, dataId=None, predict=False, **kwds):
dataId : `dict` or `DataId`
A `dict` of `Dimension` link name, value pairs that label the
`DatasetRef` within a Collection. When `None`, a `DatasetRef`
should be provided as the second argument.
should be provided as the first argument.
predict : `bool`
If `True`, allow URIs to be returned of datasets that have not
been written.
Expand Down Expand Up @@ -468,7 +470,7 @@ def datasetExists(self, datasetRefOrType, dataId=None, **kwds):
dataId : `dict` or `DataId`
A `dict` of `Dimension` link name, value pairs that label the
`DatasetRef` within a Collection. When `None`, a `DatasetRef`
should be provided as the second argument.
should be provided as the first argument.
kwds
Additional keyword arguments used to augment or construct a
`DataId`. See `DataId` parameters.
Expand All @@ -485,3 +487,61 @@ def datasetExists(self, datasetRefOrType, dataId=None, **kwds):
"{} with {} not found in collection {}".format(datasetType, dataId, self.collection)
)
return self.datastore.exists(ref)

def remove(self, datasetRefOrType, dataId=None, *, delete=True, remember=True, **kwds):
"""Remove a dataset from the collection and possibly the repository.

The identified dataset is always at least removed from the Butler's
collection. By default it is also deleted from the Datastore (e.g.
files are actually deleted), but the dataset is "remembered" by
retaining its row in the dataset and provenance tables in the registry.

If the dataset is a composite, all components will also be removed.

Parameters
----------
datasetRefOrType : `DatasetRef`, `DatasetType`, or `str`
When `DatasetRef` the `dataId` should be `None`.
Otherwise the `DatasetType` or name thereof.
dataId : `dict` or `DataId`
A `dict` of `Dimension` link name, value pairs that label the
`DatasetRef` within a Collection. When `None`, a `DatasetRef`
should be provided as the first argument.
delete : `bool`
If `True` (default) actually delete the dataset from the
Datastore (i.e. actually remove files).
remember : `bool`
If `True` (default), retain dataset and provenance records in
the `Registry` for this dataset.
kwds
Additional keyword arguments used to augment or construct a
`DataId`. See `DataId` parameters.

Raises
------
ValueError
Raised if ``delete`` and ``remember`` are both `False`; a dataset
cannot remain in a `Datastore` if all of its `Registry` entries are
removed.
OrphanedRecordError
Raised if ``remember`` is `False` but the dataset is still present
in a `Datastore` not recognized by this `Butler` client.
"""
datasetType, dataId = self._standardizeArgs(datasetRefOrType, dataId, **kwds)
ref = self.registry.find(self.collection, datasetType, dataId, **kwds)
if delete:
for r in itertools.chain([ref], ref.components.values()):
# If dataset is a composite, we don't know whether it's the
# parent or the components that actually need to be removed,
# so try them all and swallow errors.
try:
self.datastore.remove(r)
except FileNotFoundError:
pass
elif not remember:
raise ValueError("Cannot retain dataset in Datastore without keeping Registry dataset record.")
if remember:
self.registry.disassociate(self.collection, [ref])
else:
# This also implicitly disassociates.
self.registry.removeDataset(ref)
50 changes: 37 additions & 13 deletions python/lsst/daf/butler/core/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from .dataIdPacker import DataIdPackerFactory

__all__ = ("RegistryConfig", "Registry", "disableWhenLimited",
"AmbiguousDatasetError", "ConflictingDefinitionError")
"AmbiguousDatasetError", "ConflictingDefinitionError", "OrphanedRecordError")


class AmbiguousDatasetError(Exception):
Expand All @@ -48,6 +48,12 @@ class ConflictingDefinitionError(Exception):
"""


class OrphanedRecordError(Exception):
"""Exception raised when trying to remove or modify a database record
that is still being used in some other table.
"""


def disableWhenLimited(func):
"""Decorator that indicates that a method should raise NotImplementedError
on Registries whose ``limited`` attribute is `True`.
Expand Down Expand Up @@ -415,6 +421,32 @@ def getDataset(self, id, datasetType=None, dataId=None):
"""
raise NotImplementedError("Must be implemented by subclass")

@abstractmethod
@transactional
def removeDataset(self, ref):
"""Remove a dataset from the Registry.

The dataset and all components will be removed unconditionally from
all collections, and any associated `Quantum` records will also be
removed. `Datastore` records will *not* be deleted; the caller is
responsible for ensuring that the dataset has already been removed
from all Datastores.

Parameters
----------
ref : `DatasetRef`
Reference to the dataset to be removed. Must include a valid
``id`` attribute, and should be considered invalidated upon return.

Raises
------
AmbiguousDatasetError
Raised if ``ref.id`` is `None`.
OrphanedRecordError
Raised if the dataset is still present in any `Datastore`.
"""
raise NotImplementedError("Must be implemented by subclass")

@abstractmethod
@transactional
def attachComponent(self, name, parent, component):
Expand Down Expand Up @@ -454,7 +486,8 @@ def associate(self, collection, refs):
Indicates the collection the Datasets should be associated with.
refs : iterable of `DatasetRef`
An iterable of `DatasetRef` instances that already exist in this
`Registry`.
`Registry`. All component datasets will be associated with the
collection as well.

Raises
------
Expand All @@ -466,7 +499,7 @@ def associate(self, collection, refs):

@abstractmethod
@transactional
def disassociate(self, collection, refs, remove=True):
def disassociate(self, collection, refs):
r"""Remove existing Datasets from a collection.

``collection`` and ``ref`` combinations that are not currently
Expand All @@ -478,16 +511,7 @@ def disassociate(self, collection, refs, remove=True):
The collection the Datasets should no longer be associated with.
refs : `list` of `DatasetRef`
A `list` of `DatasetRef` instances that already exist in this
`Registry`.
remove : `bool`
If `True`, remove Datasets from the `Registry` if they are not
associated with any collection (including via any composites).

Returns
-------
removed : `list` of `DatasetRef`
If `remove` is `True`, the `list` of `DatasetRef`\ s that were
removed.
`Registry`. All component datasets will also be removed.

Raises
------
Expand Down
13 changes: 10 additions & 3 deletions python/lsst/daf/butler/core/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,15 +262,17 @@ def addForeignKeyConstraint(self, table, constraintDescription):
Should always contain:
- src, list of source column names
- tgt, list of target column names
May also contain:
- onDelete, one of "SET NULL" or "CASCADE".
"""
if isinstance(table, str):
table = self.metadata.tables[table]
src, tgt, tgtTable = self.normalizeForeignKeyConstraint(constraintDescription)
src, tgt, tgtTable, onDelete = self.normalizeForeignKeyConstraint(constraintDescription)
if not self.isIncluded(table.name) or not self.isIncluded(tgtTable):
return
if self.isView(table.name) or self.isView(tgtTable):
return
table.append_constraint(ForeignKeyConstraint(src, tgt))
table.append_constraint(ForeignKeyConstraint(src, tgt, ondelete=onDelete))

def makeColumn(self, columnDescription):
"""Make a Column entry for addition to a Table.
Expand Down Expand Up @@ -334,6 +336,8 @@ def normalizeForeignKeyConstraint(self, constraintDescription):
Should always contain:
- src, list of source column names or single source column name
- tgt, list of (table-qualified) target column names or single target column name
May also contain:
- onDelete, one of "SET NULL" or "CASCADE".

Returns
-------
Expand All @@ -343,9 +347,12 @@ def normalizeForeignKeyConstraint(self, constraintDescription):
Sequence of table-qualified field names in the remote table.
tgtTable : `str`
Name of the target table.
onDelete : `str`, optional
One of "SET NULL", "CASCADE", or `None`.
"""
src = tuple(iterable(constraintDescription["src"]))
tgt = tuple(iterable(constraintDescription["tgt"]))
tgtTable, _ = tgt[0].split(".")
assert all(t.split(".")[0] == tgtTable for t in tgt[1:])
return src, tgt, tgtTable
onDelete = constraintDescription.get("onDelete", None)
return src, tgt, tgtTable, onDelete