Skip to content

Commit

Permalink
Reimplement associate to use DatasetRef hash field.
Browse files Browse the repository at this point in the history
  • Loading branch information
TallJimbo committed Jan 25, 2019
1 parent 40a530b commit e85c780
Showing 1 changed file with 30 additions and 71 deletions.
101 changes: 30 additions & 71 deletions python/lsst/daf/butler/registries/sqlRegistry.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from sqlalchemy import create_engine, text
from sqlalchemy.pool import NullPool
from sqlalchemy.sql import select, and_, exists
from sqlalchemy.sql import select, and_, exists, bindparam
from sqlalchemy.exc import IntegrityError

from ..core.utils import transactional
Expand Down Expand Up @@ -523,81 +523,40 @@ def associate(self, collection, refs):
If a Dataset with the given `DatasetRef` already exists in the
given collection.
"""
# A collection cannot contain more than one Dataset with the same
# DatasetRef. Our SQL schema does not enforce this constraint yet so
# checks have to be done in the code:
# - read existing collection and try to match its contents with
# new DatasetRefs using dimensions
# - if there is a match and dataset_id is different then constraint
# check fails
# TODO: This implementation has a race which can violate the
# constraint if multiple clients update registry concurrently. Proper
# constraint checks have to be implmented in schema.

def _matchRef(row, ref):
"""Compare Dataset table row with a DatasetRef.
Parameters
----------
row : `sqlalchemy.RowProxy`
Single row from Dataset table.
ref : `DatasetRef`
Returns
-------
match : `bool`
True if Dataset row is identical to ``ref`` (their IDs match),
False otherwise.
Raises
------
ValueError
If DatasetRef dimension values match row data but their IDs differ.
"""
if row.dataset_id == ref.id:
return True

if row.dataset_type_name != ref.datasetType.name:
return False

# Most SqlRegistry subclass implementations should replace this
# implementation with special "UPSERT" or "MERGE" syntax. This
# implementation is only concurrency-safe for databases that implement
# transactions with database- or table-wide locks (e.g. SQLite).

datasetCollectionTable = self._schema.tables["DatasetCollection"]
query = datasetCollectionTable.select(datasetCollectionTable.c.dataset_id).where(
and_(datasetCollectionTable.c.collection == collection,
datasetCollectionTable.c.dataset_ref_hash == bindparam("hash"))
)

for ref in refs:

# TODO: factor this operation out, fix use of private member
if not isinstance(ref.datasetType.dimensions, DimensionGraph):
ref.datasetType._dimensions = self.dimensions.extract(ref.datasetType.dimensions)

dataId = ref.dataId
if all(row[col] == dataId[col] for col in ref.datasetType.dimensions.links()):
raise ValueError("A dataset of type {} with id: {} already exists in collection {}".format(
ref.datasetType, dataId, collection))
return False

if len(refs) == 1:
# small optimization for a single ref
ref = refs[0]
dataset_id = self._findDatasetId(collection, ref.datasetType, ref.dataId)
if dataset_id == ref.id:
# already there
return
elif dataset_id is not None:
raise ValueError("A dataset of type {} with id: {} already exists in collection {}".format(
ref.datasetType, ref.dataId, collection))
else:
# full scan of a collection to compare DatasetRef dimensions
datasetTable = self._schema.tables["Dataset"]
datasetCollectionTable = self._schema.tables["DatasetCollection"]
query = datasetTable.select()
query = query.where(and_(datasetTable.c.dataset_id == datasetCollectionTable.c.dataset_id,
datasetCollectionTable.c.collection == collection))
result = self._connection.execute(query)
for row in result:
# skip DatasetRefs that are already there
refs = [ref for ref in refs if not _matchRef(row, ref)]

# if any ref is not there yet add it
if refs:
datasetCollectionTable = self._schema.tables["DatasetCollection"]
self._connection.execute(datasetCollectionTable.insert(),
[{"dataset_id": ref.id, "dataset_ref_hash": ref.hash,
"collection": collection} for ref in refs])
row = self._connection.execute(query, hash=ref.hash).fetchone()
if row is None:
# No Dataset with this DatasetType and Data ID in collection;
# insert it now.
self._connection.execute(datasetCollectionTable.insert(),
[{"dataset_id": ref.id, "dataset_ref_hash": ref.hash,
"collection": collection} for ref in refs])
elif row.dataset_id != ref.id:
# A different Dataset with this DatasetType and Data ID already
# exists in this collection.
raise ValueError(
"A dataset of type {} with id: {} already exists in collection {}".format(
ref.datasetType, ref.dataId, collection
)
)
# If the same Dataset is already in this collection, do nothing.

@transactional
def disassociate(self, collection, refs, remove=True):
Expand Down

0 comments on commit e85c780

Please sign in to comment.