Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-16160: Restore pre-insert uniqueness check in addDataset(). #98

Merged
merged 2 commits into from
Oct 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
148 changes: 133 additions & 15 deletions python/lsst/daf/butler/registries/sqlRegistry.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,11 @@ def makeDatabaseDict(self, table, types, key, value):
config["table"] = table
return SqlRegistryDatabaseDict(config, types=types, key=key, value=value, registry=self)

def find(self, collection, datasetType, dataId):
"""Lookup a dataset.
def _findDatasetId(self, collection, datasetType, dataId):
"""Lookup a dataset ID.

This can be used to obtain a `DatasetRef` that permits the dataset to
be read from a `Datastore`.
This can be used to obtain a ``dataset_id`` that permits the dataset
to be read from a `Datastore`.

Parameters
----------
Expand All @@ -192,9 +192,8 @@ def find(self, collection, datasetType, dataId):

Returns
-------
ref : `DatasetRef`
A ref to the Dataset, or `None` if no matching Dataset
was found.
dataset_id : `int` or `None`
``dataset_id`` value, or `None` if no matching Dataset was found.

Raises
------
Expand All @@ -214,7 +213,41 @@ def find(self, collection, datasetType, dataId):
dataIdExpression))).fetchone()
# TODO update unit values and add Run, Quantum and assembler?
if result is not None:
return self.getDataset(result["dataset_id"])
return result.dataset_id
else:
return None

def find(self, collection, datasetType, dataId):
"""Lookup a dataset.

This can be used to obtain a `DatasetRef` that permits the dataset to
be read from a `Datastore`.

Parameters
----------
collection : `str`
Identifies the collection to search.
datasetType : `DatasetType`
The `DatasetType`.
dataId : `dict`
A `dict` of `DataUnit` link name, value pairs that label the
`DatasetRef` within a collection.

Returns
-------
ref : `DatasetRef`
A ref to the Dataset, or `None` if no matching Dataset
was found.

Raises
------
ValueError
If dataId is invalid.
"""
dataset_id = self._findDatasetId(collection, datasetType, dataId)
# TODO update unit values and add Run, Quantum and assembler?
if dataset_id is not None:
return self.getDataset(dataset_id)
else:
return None

Expand Down Expand Up @@ -381,13 +414,19 @@ def addDataset(self, datasetType, dataId, run, producer=None, recursive=False):
Exception
If ``dataId`` contains unknown or invalid `DataUnit` entries.
"""
# TODO this is obviously not the most efficient way to check
# for existence.
# Collection cannot have more than one unique DataId of the same
# DatasetType, this constraint is checked in `associate` method
# which raises.
# NOTE: Client code (e.g. `lsst.obs.base.ingest`) has some assumptions
# about behavior of this code, in particular that it should not modify
# database contents if exception is raised. This is why we have to
# make additional check for uniqueness before we add a row to Dataset
# table.
# TODO also note that this check is not safe
# in the presence of concurrent calls to addDataset.
# Then again, it is undoubtedly not the only place where
# this problem occurs. Needs some serious thought.
if self.find(run.collection, datasetType, dataId) is not None:
if self._findDatasetId(run.collection, datasetType, dataId) is not None:
raise ValueError("A dataset of type {} with id: {} already exists in collection {}".format(
datasetType, dataId, run.collection))
datasetTable = self._schema.tables["Dataset"]
Expand All @@ -400,7 +439,7 @@ def addDataset(self, datasetType, dataId, run, producer=None, recursive=False):
datasetRef = DatasetRef(datasetType=datasetType, dataId=dataId, id=result.inserted_primary_key[0],
run=run)
# A dataset is always associated with its Run collection
self.associate(run.collection, [datasetRef, ])
self.associate(run.collection, [datasetRef, ], transactional=False)

if recursive:
for component in datasetType.storageClass.components:
Expand Down Expand Up @@ -482,17 +521,96 @@ def associate(self, collection, refs):
"""Add existing Datasets to a collection, possibly creating the
collection in the process.

If a DatasetRef with the same exact `dataset_id`` is already in a
collection nothing is changed. If a DatasetRef with the same
DatasetType and unit values but with different ``dataset_id`` exists
in a collection then exception is raised.

Parameters
----------
collection : `str`
Indicates the collection the Datasets should be associated with.
refs : `list` of `DatasetRef`
A `list` of `DatasetRef` instances that already exist in this
`SqlRegistry`.

Raises
------
ValueError
If a Dataset with the given `DatasetRef` already exists in the
given collection.
"""
datasetCollectionTable = self._schema.tables["DatasetCollection"]
self._connection.execute(datasetCollectionTable.insert(),
[{"dataset_id": ref.id, "collection": collection} for ref in refs])
# A collection cannot contain more than one Dataset with the same
# DatasetRef. Our SQL schema does not enforce this constraint yet so
# checks have to be done in the code:
# - read existing collection and try to match its contents with
# new DatasetRefs using units
# - if there is a match and dataset_id is different then constraint
# check fails
# TODO: This implementation has a race which can violate the
# constraint if multiple clients update registry concurrently. Proper
# constraint checks have to be implmented in schema.

def _matchRef(row, ref):
"""Compare Dataset table row with a DatasetRef.

Parameters
----------
row : `sqlalchemy.RowProxy`
Single row from Dataset table.
ref : `DatasetRef`

Returns
-------
match : `bool`
True if Dataset row is identical to ``ref`` (their IDs match),
False otherwise.

Raises
------
ValueError
If DatasetRef unit values match row data but their IDs differ.
"""
if row.dataset_id == ref.id:
return True

if row.dataset_type_name != ref.datasetType.name:
return False

columns = self._dataUnits.getPrimaryKeyNames(ref.datasetType.dataUnits)
dataId = ref.dataId
if all(row[col] == dataId[col] for col in columns):
raise ValueError("A dataset of type {} with id: {} already exists in collection {}".format(
ref.datasetType, dataId, collection))
return False

if len(refs) == 1:
# small optimization for a single ref
ref = refs[0]
dataset_id = self._findDatasetId(collection, ref.datasetType, ref.dataId)
if dataset_id == ref.id:
# already there
return
elif dataset_id is not None:
raise ValueError("A dataset of type {} with id: {} already exists in collection {}".format(
ref.datasetType, ref.dataId, collection))
else:
# full scan of a collection to compare DatasetRef units
datasetTable = self._schema.tables["Dataset"]
datasetCollectionTable = self._schema.tables["DatasetCollection"]
query = datasetTable.select()
query = query.where(and_(datasetTable.c.dataset_id == datasetCollectionTable.c.dataset_id,
datasetCollectionTable.c.collection == collection))
result = self._connection.execute(query)
for row in result:
# skip DatasetRefs that are already there
refs = [ref for ref in refs if not _matchRef(row, ref)]

# if any ref is not there yet add it
if refs:
datasetCollectionTable = self._schema.tables["DatasetCollection"]
self._connection.execute(datasetCollectionTable.insert(),
[{"dataset_id": ref.id, "collection": collection} for ref in refs])

@transactional
def disassociate(self, collection, refs, remove=True):
Expand Down
68 changes: 66 additions & 2 deletions tests/test_sqlRegistry.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ class RegistryTests(metaclass=ABCMeta):
def makeRegistry(self):
raise NotImplementedError()

def assertRowCount(self, registry, table, count, where=None):
"""Check the number of rows in table
"""
query = 'select count(*) as "cnt" from "{}"'.format(table)
if where:
query = '{} where {}'.format(query, where)
rows = list(registry.query(query))
self.assertEqual(rows[0]["cnt"], count)

def testDatasetType(self):
registry = self.makeRegistry()
# Check valid insert
Expand Down Expand Up @@ -314,6 +323,61 @@ def testCollections(self):
outputRef = registry.find(newCollection, datasetType, dataId2)
self.assertEqual(outputRef, inputRef2)

def testAssociate(self):
registry = self.makeRegistry()
storageClass = StorageClass("testAssociate")
registry.storageClasses.registerStorageClass(storageClass)
dataUnits = ("Camera", "Visit")
datasetType1 = DatasetType(name="dummytype", dataUnits=dataUnits, storageClass=storageClass)
registry.registerDatasetType(datasetType1)
datasetType2 = DatasetType(name="smartytype", dataUnits=dataUnits, storageClass=storageClass)
registry.registerDatasetType(datasetType2)
if not registry.limited:
registry.addDataUnitEntry("Camera", {"camera": "DummyCam"})
registry.addDataUnitEntry("PhysicalFilter", {"camera": "DummyCam", "physical_filter": "d-r"})
registry.addDataUnitEntry("Visit", {"camera": "DummyCam", "visit": 0, "physical_filter": "d-r"})
registry.addDataUnitEntry("Visit", {"camera": "DummyCam", "visit": 1, "physical_filter": "d-r"})
run1 = registry.makeRun(collection="ingest1")
run2 = registry.makeRun(collection="ingest2")
run3 = registry.makeRun(collection="ingest3")
# TODO: Dataset.physical_filter should be populated as well here
# from the Visit DataUnit values.
dataId1 = {"camera": "DummyCam", "visit": 0}
dataId2 = {"camera": "DummyCam", "visit": 1}
ref1_run1 = registry.addDataset(datasetType1, dataId=dataId1, run=run1)
ref2_run1 = registry.addDataset(datasetType1, dataId=dataId2, run=run1)
ref1_run2 = registry.addDataset(datasetType2, dataId=dataId1, run=run2)
ref2_run2 = registry.addDataset(datasetType2, dataId=dataId2, run=run2)
ref1_run3 = registry.addDataset(datasetType2, dataId=dataId1, run=run3)
ref2_run3 = registry.addDataset(datasetType2, dataId=dataId2, run=run3)
# should have exactly 4 rows in Dataset
self.assertRowCount(registry, "Dataset", 6)
self.assertRowCount(registry, "DatasetCollection", 6)
# adding same DatasetRef to the same run is an error
with self.assertRaises(ValueError):
registry.addDataset(datasetType1, dataId=dataId2, run=run1)
# above exception must rollback and not add anything to Dataset
self.assertRowCount(registry, "Dataset", 6)
self.assertRowCount(registry, "DatasetCollection", 6)
# associated refs from run1 with some other collection
newCollection = "something"
registry.associate(newCollection, [ref1_run1, ref2_run1])
self.assertRowCount(registry, "DatasetCollection", 8)
# associating same exact DatasetRef is OK (not doing anything),
# two cases to test - single-ref and many-refs
registry.associate(newCollection, [ref1_run1])
registry.associate(newCollection, [ref1_run1, ref2_run1])
self.assertRowCount(registry, "DatasetCollection", 8)
# associated refs from run2 with same other collection, this should be OK
# because thy have different dataset type
registry.associate(newCollection, [ref1_run2, ref2_run2])
self.assertRowCount(registry, "DatasetCollection", 10)
# associating DatasetRef with the same units but different ID is not OK
with self.assertRaises(ValueError):
registry.associate(newCollection, [ref1_run3])
with self.assertRaises(ValueError):
registry.associate(newCollection, [ref1_run3, ref2_run3])

def testDatasetUnit(self):
registry = self.makeRegistry()
dataUnitName = "Camera"
Expand Down Expand Up @@ -472,7 +536,7 @@ def testGetRegion(self):
self.assertNotEqual(rows[0]["cnt"], 0)


class SqlRegistryTestCase(lsst.utils.tests.TestCase):
class SqlRegistryTestCase(lsst.utils.tests.TestCase, RegistryTests):
"""Test for SqlRegistry.
"""

Expand All @@ -488,7 +552,7 @@ def testInitFromConfig(self):
self.assertFalse(registry.limited)


class LimitedSqlRegistryTestCase(lsst.utils.tests.TestCase):
class LimitedSqlRegistryTestCase(lsst.utils.tests.TestCase, RegistryTests):
"""Test for SqlRegistry with limited=True.
"""

Expand Down