Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-30140: Allow trash to take iterable of dataset refs #521

Merged
merged 3 commits into from
May 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions COPYRIGHT
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Copyright 2016-2019 The Board of Trustees of the Leland Stanford Junior University, through SLAC National Accelerator Laboratory
Copyright 2018-2019 Association of Universities for Research in Astronomy
Copyright 2015, 2018-2019 The Trustees of Princeton University
Copyright 2016-2021 The Board of Trustees of the Leland Stanford Junior University, through SLAC National Accelerator Laboratory
Copyright 2018-2021 Association of Universities for Research in Astronomy
Copyright 2015, 2018-2021 The Trustees of Princeton University
Copyright 2020 University of Washington
26 changes: 5 additions & 21 deletions python/lsst/daf/butler/_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1241,9 +1241,7 @@ def removeRuns(self, names: Iterable[str], unstore: bool = True) -> None:
refs.extend(self.registry.queryDatasets(..., collections=name, findFirst=True))
with self.registry.transaction():
if unstore:
for ref in refs:
if self.datastore.exists(ref):
self.datastore.trash(ref)
self.datastore.trash(refs)
else:
self.datastore.forget(refs)
for name in names:
Expand Down Expand Up @@ -1283,7 +1281,6 @@ def pruneCollection(self, name: str, purge: bool = False, unstore: bool = False,
Raised if the butler is read-only or arguments are mutually
inconsistent.
"""

# See pruneDatasets comments for more information about the logic here;
# the cases are almost the same, but here we can rely on Registry to
# take care everything but Datastore deletion when we remove the
Expand Down Expand Up @@ -1313,10 +1310,10 @@ def remove(child: str, parent: str) -> None:
for parent in unlink:
remove(name, parent)
if unstore:
for ref in self.registry.queryDatasets(..., collections=name, findFirst=True):
if self.datastore.exists(ref):
self.datastore.trash(ref)
refs = self.registry.queryDatasets(..., collections=name, findFirst=True)
self.datastore.trash(refs)
self.registry.removeCollection(name)

if unstore:
# Point of no return for removing artifacts
self.datastore.emptyTrash()
Expand Down Expand Up @@ -1396,20 +1393,7 @@ def pruneDatasets(self, refs: Iterable[DatasetRef], *,
# Registry operations.
with self.registry.transaction():
if unstore:
for ref in refs:
# There is a difference between a concrete composite
# and virtual composite. In a virtual composite the
# datastore is never given the top level DatasetRef. In
# the concrete composite the datastore knows all the
# refs and will clean up itself if asked to remove the
# parent ref. We can not check configuration for this
# since we can not trust that the configuration is the
# same. We therefore have to ask if the ref exists or
# not. This is consistent with the fact that we want
# to ignore already-removed-from-datastore datasets
# anyway.
if self.datastore.exists(ref):
self.datastore.trash(ref)
self.datastore.trash(refs)
if purge:
self.registry.removeDatasets(refs)
elif disassociate:
Expand Down
12 changes: 7 additions & 5 deletions python/lsst/daf/butler/core/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -718,20 +718,22 @@ def forget(self, refs: Iterable[DatasetRef]) -> None:
raise NotImplementedError("Must be implemented by subclass")

@abstractmethod
def trash(self, datasetRef: DatasetRef, ignore_errors: bool = True) -> None:
def trash(self, ref: Union[DatasetRef, Iterable[DatasetRef]], ignore_errors: bool = True) -> None:
"""Indicate to the Datastore that a Dataset can be moved to the trash.

Parameters
----------
datasetRef : `DatasetRef`
Reference to the required Dataset.
ref : `DatasetRef` or iterable thereof
Reference(s) to the required Dataset.
ignore_errors : `bool`, optional
Determine whether errors should be ignored.
Determine whether errors should be ignored. When multiple
refs are being trashed there will be no per-ref check.

Raises
------
FileNotFoundError
When Dataset does not exist.
When Dataset does not exist and errors are not ignored. Only
checked if a single ref is supplied (and not in a list).

Notes
-----
Expand Down
15 changes: 10 additions & 5 deletions python/lsst/daf/butler/datastores/chainedDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@

from lsst.utils import doImport
from lsst.daf.butler import ButlerURI, Datastore, DatastoreConfig, DatasetTypeNotSupportedError, \
DatastoreValidationError, Constraints, FileDataset
DatastoreValidationError, Constraints, FileDataset, DatasetRef

if TYPE_CHECKING:
from lsst.daf.butler import Config, DatasetRef, DatasetType, LookupKey, StorageClass
from lsst.daf.butler import Config, DatasetType, LookupKey, StorageClass
from lsst.daf.butler.registry.interfaces import DatastoreRegistryBridgeManager

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -629,8 +629,13 @@ def forget(self, refs: Iterable[DatasetRef]) -> None:
for datastore in tuple(self.datastores):
datastore.forget(refs)

def trash(self, ref: DatasetRef, ignore_errors: bool = True) -> None:
log.debug("Trashing %s", ref)
def trash(self, ref: Union[DatasetRef, Iterable[DatasetRef]], ignore_errors: bool = True) -> None:
if isinstance(ref, DatasetRef):
ref_label = str(ref)
else:
ref_label = "bulk datasets"
timj marked this conversation as resolved.
Show resolved Hide resolved

log.debug("Trashing %s", ref_label)

counter = 0
for datastore in self.datastores:
Expand All @@ -641,7 +646,7 @@ def trash(self, ref: DatasetRef, ignore_errors: bool = True) -> None:
pass

if counter == 0:
err_msg = f"Could not mark for removal from any child datastore: {ref}"
err_msg = f"Could not mark for removal from any child datastore: {ref_label}"
if ignore_errors:
log.warning(err_msg)
else:
Expand Down
34 changes: 15 additions & 19 deletions python/lsst/daf/butler/datastores/fileDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -1572,25 +1572,21 @@ def put(self, inMemoryDataset: Any, ref: DatasetRef) -> None:
self._register_datasets(artifacts)

@transactional
def trash(self, ref: DatasetRef, ignore_errors: bool = True) -> None:
"""Indicate to the datastore that a dataset can be removed.

Parameters
----------
ref : `DatasetRef`
Reference to the required Dataset.
ignore_errors : `bool`
If `True` return without error even if something went wrong.
Problems could occur if another process is simultaneously trying
to delete.

Raises
------
FileNotFoundError
Attempt to remove a dataset that does not exist.
"""
def trash(self, ref: Union[DatasetRef, Iterable[DatasetRef]], ignore_errors: bool = True) -> None:
# Get file metadata and internal metadata
log.debug("Trashing %s in datastore %s", ref, self.name)
if not isinstance(ref, DatasetRef):
log.debug("Doing multi-dataset trash in datastore %s", self.name)
# Assumed to be an iterable of refs so bulk mode enabled.
try:
self.bridge.moveToTrash(ref)
except Exception as e:
if ignore_errors:
log.warning("Unexpected issue moving multiple datasets to trash: %s", e)
else:
raise
return

log.debug("Trashing dataset %s in datastore %s", ref, self.name)

fileLocations = self._get_dataset_locations_info(ref)

Expand All @@ -1614,7 +1610,7 @@ def trash(self, ref: DatasetRef, ignore_errors: bool = True) -> None:

# Mark dataset as trashed
try:
self._move_to_trash_in_registry(ref)
self.bridge.moveToTrash([ref])
except Exception as e:
if ignore_errors:
log.warning(f"Attempted to mark dataset ({ref}) to be trashed in datastore {self.name} "
Expand Down
18 changes: 0 additions & 18 deletions python/lsst/daf/butler/datastores/genericDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,24 +132,6 @@ def _register_datasets(self, refsAndInfos: Iterable[Tuple[DatasetRef, StoredData
self.bridge.insert(registryRefs.values())
self.addStoredItemInfo(expandedRefs, expandedItemInfos)

def _move_to_trash_in_registry(self, ref: DatasetRef) -> None:
"""Tell registry that this dataset and associated components
are to be trashed.

Parameters
----------
ref : `DatasetRef`
Dataset to mark for removal from registry.

Notes
-----
Dataset is not removed from internal stored item info table.
"""
# Note that a ref can point to component dataset refs that
# have been deleted already from registry but are still in
# the python object. moveToTrash will deal with that.
self.bridge.moveToTrash([ref])

def _post_process_get(self, inMemoryDataset: Any, readStorageClass: StorageClass,
assemblerParams: Optional[Mapping[str, Any]] = None,
isComponent: bool = False) -> Any:
Expand Down
22 changes: 15 additions & 7 deletions python/lsst/daf/butler/datastores/inMemoryDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@
Union,
)

from lsst.daf.butler import DatasetId, StoredDatastoreItemInfo, StorageClass, ButlerURI
from lsst.daf.butler import DatasetId, DatasetRef, StoredDatastoreItemInfo, StorageClass, ButlerURI
from lsst.daf.butler.registry.interfaces import DatastoreRegistryBridge
from .genericDatastore import GenericBaseDatastore

if TYPE_CHECKING:
from lsst.daf.butler import (Config, DatasetRef, DatasetType,
from lsst.daf.butler import (Config, DatasetType,
LookupKey)
from lsst.daf.butler.registry.interfaces import DatasetIdRef, DatastoreRegistryBridgeManager

Expand Down Expand Up @@ -483,27 +483,32 @@ def forget(self, refs: Iterable[DatasetRef]) -> None:
for ref in refs:
self.removeStoredItemInfo(ref)

def trash(self, ref: DatasetRef, ignore_errors: bool = False) -> None:
def trash(self, ref: Union[DatasetRef, Iterable[DatasetRef]], ignore_errors: bool = False) -> None:
"""Indicate to the Datastore that a dataset can be removed.

Parameters
----------
ref : `DatasetRef`
Reference to the required Dataset.
ref : `DatasetRef` or iterable thereof
Reference to the required Dataset(s).
ignore_errors: `bool`, optional
Indicate that errors should be ignored.

Raises
------
FileNotFoundError
Attempt to remove a dataset that does not exist.
Attempt to remove a dataset that does not exist. Only relevant
if a single dataset ref is given.

Notes
-----
Concurrency should not normally be an issue for the in memory datastore
since all internal changes are isolated to solely this process and
the registry only changes rows associated with this process.
"""
if not isinstance(ref, DatasetRef):
log.debug("Bulk trashing of datasets in datastore %s", self.name)
self.bridge.moveToTrash(ref)
return

log.debug("Trash %s in datastore %s", ref, self.name)

Expand All @@ -512,7 +517,7 @@ def trash(self, ref: DatasetRef, ignore_errors: bool = False) -> None:
self._get_dataset_info(ref)

# Move datasets to trash table
self._move_to_trash_in_registry(ref)
self.bridge.moveToTrash([ref])
except Exception as e:
if ignore_errors:
log.warning("Error encountered moving dataset %s to trash in datastore %s: %s",
Expand Down Expand Up @@ -544,6 +549,9 @@ def emptyTrash(self, ignore_errors: bool = False) -> None:
for ref, _ in trashed:
try:
realID, _ = self._get_dataset_info(ref)
except FileNotFoundError:
# Dataset already removed so ignore it
continue
except Exception as e:
if ignore_errors:
log.warning("Emptying trash in datastore %s but encountered an "
Expand Down