Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-36172: Add explicit transaction around moveToTrash calls #733

Merged
merged 4 commits into from
Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions doc/changes/DM-36172.misc.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Add support for in-memory datastore to roll back a call to ``datastore.trash()``.
This required that the ``bridge.moveToTrash()`` method now takes an additional ``transaction`` parameter (that can be `None`).
51 changes: 27 additions & 24 deletions python/lsst/daf/butler/_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1636,13 +1636,14 @@ def removeRuns(self, names: Iterable[str], unstore: bool = True) -> None:
if collectionType is not CollectionType.RUN:
raise TypeError(f"The collection type of '{name}' is {collectionType.name}, not RUN.")
refs.extend(self.registry.queryDatasets(..., collections=name, findFirst=True))
with self.registry.transaction():
if unstore:
self.datastore.trash(refs)
else:
self.datastore.forget(refs)
for name in names:
self.registry.removeCollection(name)
with self.datastore.transaction():
with self.registry.transaction():
if unstore:
self.datastore.trash(refs)
else:
self.datastore.forget(refs)
for name in names:
self.registry.removeCollection(name)
if unstore:
# Point of no return for removing artifacts
self.datastore.emptyTrash()
Expand Down Expand Up @@ -1703,14 +1704,15 @@ def remove(child: str, parent: str) -> None:
raise RuntimeError(f"{name} is not a child of {parent}") from e
self.registry.setCollectionChain(parent, chain)

with self.registry.transaction():
if unlink:
for parent in unlink:
remove(name, parent)
if unstore:
refs = self.registry.queryDatasets(..., collections=name, findFirst=True)
self.datastore.trash(refs)
self.registry.removeCollection(name)
with self.datastore.transaction():
with self.registry.transaction():
if unlink:
for parent in unlink:
remove(name, parent)
if unstore:
refs = self.registry.queryDatasets(..., collections=name, findFirst=True)
self.datastore.trash(refs)
self.registry.removeCollection(name)

if unstore:
# Point of no return for removing artifacts
Expand Down Expand Up @@ -1765,15 +1767,16 @@ def pruneDatasets(
# mutating the Registry (it can _look_ at Datastore-specific things,
# but shouldn't change them), and hence all operations here are
# Registry operations.
with self.registry.transaction():
if unstore:
self.datastore.trash(refs)
if purge:
self.registry.removeDatasets(refs)
elif disassociate:
assert tags, "Guaranteed by earlier logic in this function."
for tag in tags:
self.registry.disassociate(tag, refs)
with self.datastore.transaction():
with self.registry.transaction():
if unstore:
self.datastore.trash(refs)
if purge:
self.registry.removeDatasets(refs)
elif disassociate:
assert tags, "Guaranteed by earlier logic in this function."
for tag in tags:
self.registry.disassociate(tag, refs)
# We've exited the Registry transaction, and apparently committed.
# (if there was an exception, everything rolled back, and it's as if
# nothing happened - and we never get here).
Expand Down
4 changes: 2 additions & 2 deletions python/lsst/daf/butler/datastores/fileDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -2246,7 +2246,7 @@ def trash(self, ref: Union[DatasetRef, Iterable[DatasetRef]], ignore_errors: boo
log.debug("Doing multi-dataset trash in datastore %s", self.name)
# Assumed to be an iterable of refs so bulk mode enabled.
try:
self.bridge.moveToTrash(ref)
self.bridge.moveToTrash(ref, transaction=self._transaction)
except Exception as e:
if ignore_errors:
log.warning("Unexpected issue moving multiple datasets to trash: %s", e)
Expand Down Expand Up @@ -2280,7 +2280,7 @@ def trash(self, ref: Union[DatasetRef, Iterable[DatasetRef]], ignore_errors: boo

# Mark dataset as trashed
try:
self.bridge.moveToTrash([ref])
self.bridge.moveToTrash([ref], transaction=self._transaction)
except Exception as e:
if ignore_errors:
log.warning(
Expand Down
6 changes: 4 additions & 2 deletions python/lsst/daf/butler/datastores/inMemoryDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
StorageClass,
StoredDatastoreItemInfo,
)
from lsst.daf.butler.core.utils import transactional
from lsst.daf.butler.registry.interfaces import DatastoreRegistryBridge
from lsst.resources import ResourcePath

Expand Down Expand Up @@ -512,6 +513,7 @@ def forget(self, refs: Iterable[DatasetRef]) -> None:
for ref in refs:
self.removeStoredItemInfo(ref)

@transactional
def trash(self, ref: Union[DatasetRef, Iterable[DatasetRef]], ignore_errors: bool = False) -> None:
"""Indicate to the Datastore that a dataset can be removed.

Expand All @@ -536,7 +538,7 @@ def trash(self, ref: Union[DatasetRef, Iterable[DatasetRef]], ignore_errors: boo
"""
if not isinstance(ref, DatasetRef):
log.debug("Bulk trashing of datasets in datastore %s", self.name)
self.bridge.moveToTrash(ref)
self.bridge.moveToTrash(ref, transaction=self._transaction)
return

log.debug("Trash %s in datastore %s", ref, self.name)
Expand All @@ -546,7 +548,7 @@ def trash(self, ref: Union[DatasetRef, Iterable[DatasetRef]], ignore_errors: boo
self._get_dataset_info(ref)

# Move datasets to trash table
self.bridge.moveToTrash([ref])
self.bridge.moveToTrash([ref], transaction=self._transaction)
except Exception as e:
if ignore_errors:
log.warning(
Expand Down
14 changes: 12 additions & 2 deletions python/lsst/daf/butler/registry/bridge/ephemeral.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

if TYPE_CHECKING:
from ...core import StoredDatastoreItemInfo
from ...core.datastore import DatastoreTransaction


class EphemeralDatastoreRegistryBridge(DatastoreRegistryBridge):
Expand Down Expand Up @@ -62,9 +63,18 @@ def insert(self, refs: Iterable[DatasetIdRef]) -> None:
def forget(self, refs: Iterable[DatasetIdRef]) -> None:
self._datasetIds.difference_update(ref.id for ref in refs)

def moveToTrash(self, refs: Iterable[DatasetIdRef]) -> None:
def _rollbackMoveToTrash(self, refs: Iterable[DatasetIdRef]) -> None:
"""Rollback a moveToTrash call."""
for ref in refs:
self._trashedIds.remove(ref.getCheckedId())

def moveToTrash(self, refs: Iterable[DatasetIdRef], transaction: Optional[DatastoreTransaction]) -> None:
# Docstring inherited from DatastoreRegistryBridge
self._trashedIds.update(ref.getCheckedId() for ref in refs)
if transaction is None:
raise RuntimeError("Must be called with a defined transaction.")
ref_list = list(refs)
with transaction.undoWith(f"Trash {len(ref_list)} datasets", self._rollbackMoveToTrash, ref_list):
self._trashedIds.update(ref.getCheckedId() for ref in ref_list)

def check(self, refs: Iterable[DatasetIdRef]) -> Iterable[DatasetIdRef]:
# Docstring inherited from DatastoreRegistryBridge
Expand Down
3 changes: 2 additions & 1 deletion python/lsst/daf/butler/registry/bridge/monolithic.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

if TYPE_CHECKING:
from ...core import DimensionUniverse
from ...core.datastore import DatastoreTransaction
from ..interfaces import (
Database,
DatasetRecordStorageManager,
Expand Down Expand Up @@ -157,7 +158,7 @@ def forget(self, refs: Iterable[DatasetIdRef]) -> None:
rows = self._refsToRows(self.check(refs))
self._db.delete(self._tables.dataset_location, ["datastore_name", "dataset_id"], *rows)

def moveToTrash(self, refs: Iterable[DatasetIdRef]) -> None:
def moveToTrash(self, refs: Iterable[DatasetIdRef], transaction: Optional[DatastoreTransaction]) -> None:
# Docstring inherited from DatastoreRegistryBridge
# TODO: avoid self.check() call via queries like
# INSERT INTO dataset_location_trash
Expand Down
6 changes: 5 additions & 1 deletion python/lsst/daf/butler/registry/interfaces/_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

if TYPE_CHECKING:
from ...core import DatasetType, DimensionUniverse, StoredDatastoreItemInfo
from ...core.datastore import DatastoreTransaction
from ._database import Database, StaticTablesContext
from ._datasets import DatasetRecordStorageManager
from ._opaque import OpaqueTableStorage, OpaqueTableStorageManager
Expand Down Expand Up @@ -150,13 +151,16 @@ def forget(self, refs: Iterable[DatasetIdRef]) -> None:
raise NotImplementedError()

@abstractmethod
def moveToTrash(self, refs: Iterable[DatasetIdRef]) -> None:
def moveToTrash(self, refs: Iterable[DatasetIdRef], transaction: Optional[DatastoreTransaction]) -> None:
"""Move dataset location information to trash.

Parameters
----------
refs : `Iterable` of `DatasetIdRef`
References to the datasets.
transaction : `DatastoreTransaction` or `None`
Transaction object. Can be `None` in some bridges or if no rollback
is required.

Raises
------
Expand Down
12 changes: 6 additions & 6 deletions tests/test_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -799,10 +799,10 @@ def testPruneCollections(self):
# Try to delete RUN collections, which should fail with complete
# rollback because they're still referenced by the CHAINED
# collection.
with self.assertRaises(Exception):
butler.pruneCollection(run1, pruge=True, unstore=True)
with self.assertRaises(Exception):
butler.pruneCollection(run2, pruge=True, unstore=True)
with self.assertRaises(sqlalchemy.exc.IntegrityError):
butler.pruneCollection(run1, purge=True, unstore=True)
timj marked this conversation as resolved.
Show resolved Hide resolved
with self.assertRaises(sqlalchemy.exc.IntegrityError):
butler.pruneCollection(run2, purge=True, unstore=True)
self.assertCountEqual(set(butler.registry.queryDatasets(..., collections=...)), [ref1, ref2, ref3])
existence = butler.datastore.mexists([ref1, ref2, ref3])
self.assertTrue(existence[ref1])
Expand Down Expand Up @@ -1388,7 +1388,7 @@ def testPruneDatasets(self):
# Check that in normal mode, deleting the record will lead to
# trash not touching the file.
uri1 = butler.datastore.getURI(ref1)
butler.datastore.bridge.moveToTrash([ref1]) # Update the dataset_location table
butler.datastore.bridge.moveToTrash([ref1], transaction=None) # Update the dataset_location table
butler.datastore._table.delete(["dataset_id"], {"dataset_id": ref1.id})
butler.datastore.trash(ref1)
butler.datastore.emptyTrash()
Expand All @@ -1404,7 +1404,7 @@ def testPruneDatasets(self):
self.assertTrue(uri3.exists())

# Remove the datastore record.
butler.datastore.bridge.moveToTrash([ref2]) # Update the dataset_location table
butler.datastore.bridge.moveToTrash([ref2], transaction=None) # Update the dataset_location table
butler.datastore._table.delete(["dataset_id"], {"dataset_id": ref2.id})
self.assertTrue(uri2.exists())
butler.datastore.trash([ref2, ref3])
Expand Down
1 change: 0 additions & 1 deletion ups/daf_butler.table
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,3 @@ setupRequired(resources)

envPrepend(PATH, ${PRODUCT_DIR}/bin)
envPrepend(PYTHONPATH, ${PRODUCT_DIR}/python)
envPrepend(MYPYPATH, ${PRODUCT_DIR}/python)