Skip to content

Commit

Permalink
Merge pull request #23 from lsst/tickets/DM-13869
Browse files Browse the repository at this point in the history
DM-13869: Implement basic Registry.find and related functionality
  • Loading branch information
Pim Schellart committed Mar 22, 2018
2 parents 6028092 + 3857fbe commit 795f8ff
Show file tree
Hide file tree
Showing 18 changed files with 200 additions and 24 deletions.
3 changes: 2 additions & 1 deletion python/lsst/daf/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
import pkgutil, lsstimport
import pkgutil
import lsstimport
__path__ = pkgutil.extend_path(__path__, __name__)
1 change: 1 addition & 0 deletions python/lsst/daf/butler/butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
class ButlerConfig(Config):
"""Contains the configuration for a `Butler`
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.validate()
Expand Down
1 change: 1 addition & 0 deletions python/lsst/daf/butler/core/composites.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class DatasetComponent:
Component extracted from the composite object.
"""

def __init__(self, name, storageClass, component):
self.name = name
self.storageClass = storageClass
Expand Down
2 changes: 2 additions & 0 deletions python/lsst/daf/butler/core/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ class DatasetRef(object):
__slots__ = ("_id", "_datasetType", "_dataId", "_producer",
"_predictedConsumers", "_actualConsumers", "_components",
"_assembler")
__eq__ = slotValuesAreEqual
__hash__ = slotValuesToHash

def __init__(self, datasetType, dataId, id=None):
assert isinstance(datasetType, DatasetType)
Expand Down
1 change: 1 addition & 0 deletions python/lsst/daf/butler/core/fileTemplates.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class FileTemplates:
config : `FileTemplatesConfig` or `str`
Load configuration.
"""

def __init__(self, config, default=None):
self.config = FileTemplatesConfig(config)
self.templates = {}
Expand Down
16 changes: 9 additions & 7 deletions python/lsst/daf/butler/core/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,19 +312,21 @@ def expand(self, ref):
raise NotImplementedError("Must be implemented by subclass")

@abstractmethod
def find(self, collection, ref):
"""Look up the location of the `Dataset` associated with the given
`DatasetRef`.
def find(self, collection, datasetType, dataId):
"""Lookup a dataset.
This can be used to obtain the URI that permits the `Dataset` to be
read from a `Datastore`.
This can be used to obtain a `DatasetRef` that permits the dataset to
be read from a `Datastore`.
Parameters
----------
collection : `str`
Identifies the Collection to search.
ref : `DatasetRef`
Identifies the `Dataset`.
datasetType : `DatasetType`
The `DatasetType`.
dataId : `dict`
A `dict` of `DataUnit` name, value pairs that label the `DatasetRef`
within a Collection.
Returns
-------
Expand Down
1 change: 1 addition & 0 deletions python/lsst/daf/butler/core/safeFileIo.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ class SafeLockedFileForWrite:
Contains __enter__ and __exit__ functions so this can be used by a context manager.
"""

def __init__(self, name):
self.log = Log.getLogger("daf.butler")
self.name = name
Expand Down
24 changes: 23 additions & 1 deletion python/lsst/daf/butler/core/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,25 @@ def tables(self):
table = {}
if 'tables' in self:
table.update(self['tables'])
# TODO move this to some other place once DataUnit relations are settled
if 'dataUnits' in self:
for dataUnitDescription in self['dataUnits'].values():
if 'tables' in dataUnitDescription:
table.update(dataUnitDescription['tables'])
return table

@property
def dataUnitLinks(self):
"""All DataUnit links.
TODO move this to some other place once DataUnit relations are settled
"""
dataUnits = self['dataUnits']
links = []
for dataUnitName in sorted(dataUnits.keys()):
links.extend(dataUnits[dataUnitName]['link'])
return links


class Schema:
"""The SQL schema for a Butler Registry.
Expand All @@ -58,6 +71,8 @@ class Schema:
----------
metadata : `sqlalchemy.MetaData`
The sqlalchemy schema description
dataUnits : `dict`
Columns that represent dataunit links.
"""
VALID_COLUMN_TYPES = {'string': String, 'int': Integer, 'float': Float,
'bool': Boolean, 'blob': LargeBinary, 'datetime': DateTime}
Expand All @@ -67,6 +82,13 @@ def __init__(self, config):
self.metadata = MetaData()
for tableName, tableDescription in self.config.tables.items():
self.addTable(tableName, tableDescription)
# Add DataUnit links
self.dataUnits = {}
datasetTable = self.metadata.tables['Dataset']
for dataUnitLinkDescription in self.config.dataUnitLinks:
linkColumn = self.makeColumn(dataUnitLinkDescription)
self.dataUnits[dataUnitLinkDescription['name']] = linkColumn
datasetTable.append_column(linkColumn)

def addTable(self, tableName, tableDescription):
"""Add a table to the schema metadata.
Expand Down Expand Up @@ -123,7 +145,7 @@ def makeColumn(self, columnDescription):
args = (columnName, self.VALID_COLUMN_TYPES[description.pop("type")])
# foreign_key is special
if "foreign_key" in description:
args += ForeignKey(description.pop("foreign_key"))
args += (ForeignKey(description.pop("foreign_key")), )
# additional optional arguments can be passed through directly
kwargs = {}
for opt in ("nullable", "primary_key", "doc"):
Expand Down
92 changes: 81 additions & 11 deletions python/lsst/daf/butler/registries/sqlRegistry.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,25 @@ def addDataset(self, datasetType, dataId, run, producer=None):
If a `Dataset` with the given `DatasetRef` already exists in the
given Collection.
"""
# TODO this is obviously not the most efficient way to check
# for existence.
# TODO also note that this check is not safe
# in the presence of concurrent calls to addDataset.
# Then again, it is undoubtedly not the only place where
# this problem occurs. Needs some serious thought.
if self.find(run.collection, datasetType, dataId) is not None:
raise ValueError("A dataset with id: {} already exists in collection {}".format(
dataId, run.collection))
datasetTable = self._schema.metadata.tables['Dataset']
datasetRef = None
with self._engine.begin() as connection:
result = connection.execute(datasetTable.insert().values(dataset_type_name=datasetType.name,
run_id=run.execution,
quantum_id=None)) # TODO add producer
quantum_id=None, # TODO add producer
**dataId))
datasetRef = DatasetRef(datasetType, dataId, result.inserted_primary_key[0])
# A dataset is always associated with its Run collection
self.associate(run.collection, [datasetRef])
return datasetRef

def setAssembler(self, ref, assembler):
Expand Down Expand Up @@ -209,7 +221,10 @@ def associate(self, collection, refs):
A `list` of `DatasetRef` instances that already exist in this
`SqlRegistry`.
"""
raise NotImplementedError("Must be implemented by subclass")
datasetCollectionTable = self._schema.metadata.tables['DatasetCollection']
with self._engine.begin() as connection:
connection.execute(datasetCollectionTable.insert(),
[{'dataset_id': ref.id, 'collection': collection} for ref in refs])

def disassociate(self, collection, refs, remove=True):
"""Remove existing `Dataset`\ s from a Collection.
Expand All @@ -234,7 +249,15 @@ def disassociate(self, collection, refs, remove=True):
If `remove` is `True`, the `list` of `DatasetRef`\ s that were
removed.
"""
raise NotImplementedError("Must be implemented by subclass")
if remove:
raise NotImplementedError("Cleanup of datasets not yet implemented")
datasetCollectionTable = self._schema.metadata.tables['DatasetCollection']
with self._engine.begin() as connection:
for ref in refs:
connection.execute(datasetCollectionTable.delete().where(
and_(datasetCollectionTable.c.dataset_id == ref.id,
datasetCollectionTable.c.collection == collection)))
return []

def makeRun(self, collection):
"""Create a new `Run` in the `SqlRegistry` and return it.
Expand Down Expand Up @@ -477,27 +500,74 @@ def expand(self, ref):
"""
raise NotImplementedError("Must be implemented by subclass")

def find(self, collection, ref):
"""Look up the location of the `Dataset` associated with the given
`DatasetRef`.
def _validateDataId(self, datasetType, dataId):
"""Check if a dataId is valid for a particular `DatasetType`.
TODO move this function to some other place once DataUnit relations
are implemented.
datasetType : `DatasetType`
The `DatasetType`.
dataId : `dict`
A `dict` of `DataUnit` name, value pairs that label the `DatasetRef`
within a Collection.
Returns
-------
valid : `bool`
`True` if the dataId is valid, `False` otherwise.
"""
for name in datasetType.dataUnits:
if name not in dataId:
return False
return True

This can be used to obtain the URI that permits the `Dataset` to be
read from a `Datastore`.
def find(self, collection, datasetType, dataId):
"""Lookup a dataset.
This can be used to obtain a `DatasetRef` that permits the dataset to
be read from a `Datastore`.
Parameters
----------
collection : `str`
Identifies the Collection to search.
ref : `DatasetRef`
Identifies the `Dataset`.
datasetType : `DatasetType`
The `DatasetType`.
dataId : `dict`
A `dict` of `DataUnit` name, value pairs that label the `DatasetRef`
within a Collection.
Returns
-------
ref : `DatasetRef`
A ref to the `Dataset`, or `None` if no matching `Dataset`
was found.
Raises
------
`ValueError`
If dataId is invalid.
"""
raise NotImplementedError("Must be implemented by subclass")
if not self._validateDataId(datasetType, dataId):
raise ValueError("Invalid dataId: {}".format(dataId))
datasetTable = self._schema.metadata.tables['Dataset']
datasetCollectionTable = self._schema.metadata.tables['DatasetCollection']
dataIdExpression = and_((self._schema.dataUnits[name] == dataId[name]
for name in datasetType.dataUnits))
with self._engine.begin() as connection:
result = connection.execute(select([datasetTable.c.dataset_id]).select_from(
datasetTable.join(datasetCollectionTable)).where(and_(
datasetTable.c.dataset_type_name == datasetType.name,
datasetCollectionTable.c.collection == collection,
dataIdExpression))).fetchone()
# TODO update unit values and add Run, Quantum and assembler?
if result is not None:
return DatasetRef(datasetType=datasetType,
dataId=dataId,
id=result['dataset_id'])
else:
return None

def subset(self, collection, expr, datasetTypes):
"""Create a new `Collection` by subsetting an existing one.
Expand Down
5 changes: 5 additions & 0 deletions tests/test_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
class ButlerTestCase(lsst.utils.tests.TestCase, FitsCatalogDatasetsHelper):
"""Test for Butler.
"""

def setUp(self):
self.testDir = os.path.dirname(__file__)
self.configFile = os.path.join(self.testDir, "config/basic/butler.yaml")
Expand All @@ -60,8 +61,12 @@ def testBasicPutGet(self):
dataId = {"camera": "DummyCam", "visit": 42}
ref = butler.put(catalog, datasetTypeName, dataId)
self.assertIsInstance(ref, DatasetRef)
# Test getDirect
catalogOut = butler.getDirect(ref)
self.assertCatalogEqual(catalog, catalogOut)
# Test get
catalogOut = butler.get(datasetType, dataId)
self.assertCatalogEqual(catalog, catalogOut)


class MemoryTester(lsst.utils.tests.MemoryTestCase):
Expand Down
2 changes: 2 additions & 0 deletions tests/test_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
class DatasetTypeTestCase(lsst.utils.tests.TestCase):
"""Test for DatasetType.
"""

def testConstructor(self):
"""Test construction preserves values.
Expand Down Expand Up @@ -83,6 +84,7 @@ def testHashability(self):
class DatasetRefTestCase(lsst.utils.tests.TestCase):
"""Test for DatasetRef.
"""

def testConstructor(self):
"""Test construction preserves values.
"""
Expand Down
1 change: 1 addition & 0 deletions tests/test_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
class FormatterFactoryTestCase(lsst.utils.tests.TestCase):
"""Tests of the formatter factory infrastructure.
"""

def setUp(self):
self.factory = formatter.FormatterFactory()

Expand Down
1 change: 1 addition & 0 deletions tests/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
class RunTestCase(lsst.utils.tests.TestCase):
"""Test for Run.
"""

def testConstructor(self):
"""Test of constructor.
"""
Expand Down
1 change: 1 addition & 0 deletions tests/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class SchemaTestCase(lsst.utils.tests.TestCase):
This unittest does not verify the validaty of the schema description.
It only checks that the generated tables match it.
"""

def setUp(self):
self.testDir = os.path.dirname(__file__)
self.schemaFile = os.path.join(self.testDir, "../config/registry/default_schema.yaml")
Expand Down

0 comments on commit 795f8ff

Please sign in to comment.