Skip to content

Commit

Permalink
Merge pull request #335 from lsst/tickets/DM-25985
Browse files Browse the repository at this point in the history
DM-25985: Add saving and checking of per-manager versions
  • Loading branch information
andy-slac committed Aug 3, 2020
2 parents 9fffe91 + 0bee58b commit 49b7167
Show file tree
Hide file tree
Showing 23 changed files with 658 additions and 210 deletions.
61 changes: 50 additions & 11 deletions python/lsst/daf/butler/registry/_registry.py
Expand Up @@ -27,6 +27,7 @@

from collections import defaultdict
import contextlib
import logging
import sys
from typing import (
Any,
Expand Down Expand Up @@ -71,7 +72,7 @@
from ._exceptions import ConflictingDefinitionError, InconsistentDataIdError, OrphanedRecordError
from .wildcards import CategorizedWildcard, CollectionQuery, CollectionSearch, Ellipsis
from .interfaces import ChainedCollectionRecord, RunRecord
from .versions import ButlerVersionsManager
from .versions import ButlerVersionsManager, DigestMismatchError

if TYPE_CHECKING:
from ..butlerConfig import ButlerConfig
Expand All @@ -86,13 +87,35 @@
)


_LOG = logging.getLogger(__name__)


class Registry:
"""Registry interface.
Parameters
----------
config : `ButlerConfig`, `RegistryConfig`, `Config` or `str`
Registry configuration
database : `Database`
Database instance to store Registry.
universe : `DimensionUniverse`
Full set of dimensions for Registry.
attributes : `type`
Manager class implementing `ButlerAttributeManager`.
opaque : `type`
Manager class implementing `OpaqueTableStorageManager`.
dimensions : `type`
Manager class implementing `DimensionRecordStorageManager`.
collections : `type`
Manager class implementing `CollectionManager`.
datasets : `type`
Manager class implementing `DatasetRecordStorageManager`.
datastoreBridges : `type`
Manager class implementing `DatastoreRegistryBridgeManager`.
writeable : `bool`, optional
If True then Registry will support write operations.
create : `bool`, optional
If True then database schema will be initialized, it must be empty
before instantiating Registry.
"""

defaultConfigFile: Optional[str] = None
Expand Down Expand Up @@ -140,11 +163,10 @@ def fromConfig(cls, config: Union[ButlerConfig, RegistryConfig, Config, str], cr
collections = doImport(config["managers", "collections"])
datasets = doImport(config["managers", "datasets"])
datastoreBridges = doImport(config["managers", "datastores"])
versions = ButlerVersionsManager.fromConfig(config.get("schema_versions"))

return cls(database, universe, dimensions=dimensions, attributes=attributes, opaque=opaque,
collections=collections, datasets=datasets, datastoreBridges=datastoreBridges,
versions=versions, writeable=writeable, create=create)
writeable=writeable, create=create)

def __init__(self, database: Database, universe: DimensionUniverse, *,
attributes: Type[ButlerAttributeManager],
Expand All @@ -153,7 +175,6 @@ def __init__(self, database: Database, universe: DimensionUniverse, *,
collections: Type[CollectionManager],
datasets: Type[DatasetRecordStorageManager],
datastoreBridges: Type[DatastoreRegistryBridgeManager],
versions: ButlerVersionsManager,
writeable: bool = True,
create: bool = False):
self._db = database
Expand All @@ -170,14 +191,32 @@ def __init__(self, database: Database, universe: DimensionUniverse, *,
opaque=self._opaque,
datasets=datasets,
universe=self.dimensions)
context.addInitializer(lambda db: versions.storeVersions(self._attributes))
versions = ButlerVersionsManager(
self._attributes,
dict(
attributes=self._attributes,
opaque=self._opaque,
dimensions=self._dimensions,
collections=self._collections,
datasets=self._datasets,
datastores=self._datastoreBridges,
)
)
# store managers and their versions in attributes table
context.addInitializer(lambda db: versions.storeManagersConfig())
context.addInitializer(lambda db: versions.storeManagersVersions())

# This call does not do anything right now as we do not have a way to
# split tables between sub-schemas yet.
versions.checkVersionDigests()
if not create:
# verify that configured versions are compatible with schema
versions.checkStoredVersions(self._attributes, writeable)
versions.checkManagersConfig()
versions.checkManagersVersions(writeable)
try:
versions.checkManagersDigests()
except DigestMismatchError as exc:
# potentially digest mismatch is a serious error but during
# development it could be benign, treat this as warning for
# now.
_LOG.warning(f"Registry schema digest mismatch: {exc}")

self._collections.refresh()
self._datasets.refresh(universe=self._dimensions.universe)
Expand Down
24 changes: 23 additions & 1 deletion python/lsst/daf/butler/registry/attributes.py
Expand Up @@ -39,10 +39,17 @@
Database,
ButlerAttributeExistsError,
ButlerAttributeManager,
StaticTablesContext
StaticTablesContext,
VersionTuple
)


# This manager is supposed to have super-stable schema that never changes
# but there may be cases when we need data migration on this table so we
# keep version for it as well.
_VERSION = VersionTuple(0, 1, 0)


class DefaultButlerAttributeManager(ButlerAttributeManager):
"""An implementation of `ButlerAttributeManager` that stores attributes
in a database table.
Expand Down Expand Up @@ -114,3 +121,18 @@ def items(self) -> Iterable[Tuple[str, str]]:
])
for row in self._db.query(sql):
yield row[0], row[1]

def empty(self) -> bool:
# Docstring inherited from ButlerAttributeManager.
sql = sqlalchemy.sql.select([sqlalchemy.sql.func.count()]).select_from(self._table)
row = self._db.query(sql).fetchone()
return row[0] == 0

@classmethod
def currentVersion(cls) -> Optional[VersionTuple]:
# Docstring inherited from VersionedExtension.
return _VERSION

def schemaDigest(self) -> Optional[str]:
# Docstring inherited from VersionedExtension.
return self._defaultSchemaDigest([self._table])
15 changes: 14 additions & 1 deletion python/lsst/daf/butler/registry/bridge/monolithic.py
Expand Up @@ -25,7 +25,7 @@
from collections import namedtuple
from contextlib import contextmanager
import copy
from typing import cast, Dict, Iterable, Iterator, List, Type, TYPE_CHECKING
from typing import cast, Dict, Iterable, Iterator, List, Optional, Type, TYPE_CHECKING

import sqlalchemy

Expand All @@ -35,6 +35,7 @@
DatastoreRegistryBridge,
DatastoreRegistryBridgeManager,
FakeDatasetRef,
VersionTuple,
)
from lsst.daf.butler.registry.bridge.ephemeral import EphemeralDatastoreRegistryBridge

Expand All @@ -56,6 +57,9 @@
]
)

# This has to be updated on every schema change
_VERSION = VersionTuple(0, 1, 0)


def _makeTableSpecs(datasets: Type[DatasetRecordStorageManager]) -> _TablesTuple:
"""Construct specifications for tables used by the monolithic datastore
Expand Down Expand Up @@ -254,3 +258,12 @@ def findDatastores(self, ref: DatasetRef) -> Iterable[str]:
for name, bridge in self._ephemeral.items():
if ref in bridge:
yield name

@classmethod
def currentVersion(cls) -> Optional[VersionTuple]:
# Docstring inherited from VersionedExtension.
return _VERSION

def schemaDigest(self) -> Optional[str]:
# Docstring inherited from VersionedExtension.
return self._defaultSchemaDigest(self._tables)
15 changes: 14 additions & 1 deletion python/lsst/daf/butler/registry/collections/nameKey.py
Expand Up @@ -37,6 +37,7 @@
makeCollectionChainTableSpec,
)
from ...core import ddl
from ..interfaces import VersionTuple

if TYPE_CHECKING:
from ..interfaces import CollectionRecord, Database, StaticTablesContext
Expand All @@ -52,14 +53,17 @@
collection_chain=makeCollectionChainTableSpec("name", sqlalchemy.String),
)

# This has to be updated on every schema change
_VERSION = VersionTuple(0, 1, 0)


class NameKeyCollectionManager(DefaultCollectionManager):
"""A `CollectionManager` implementation that uses collection names for
primary/foreign keys and aggressively loads all collection/run records in
the database into memory.
Most of the logic, including caching policy, is implemented in the base
class, this class only adds customisations specific to this particular
class, this class only adds customizations specific to this particular
table schema.
"""

Expand Down Expand Up @@ -105,3 +109,12 @@ def addRunForeignKey(cls, tableSpec: ddl.TableSpec, *, prefix: str = "run",
def _getByName(self, name: str) -> Optional[CollectionRecord]:
# Docstring inherited from DefaultCollectionManager.
return self._records.get(name)

@classmethod
def currentVersion(cls) -> Optional[VersionTuple]:
# Docstring inherited from VersionedExtension.
return _VERSION

def schemaDigest(self) -> Optional[str]:
# Docstring inherited from VersionedExtension.
return self._defaultSchemaDigest(self._tables)
16 changes: 14 additions & 2 deletions python/lsst/daf/butler/registry/collections/synthIntKey.py
Expand Up @@ -39,7 +39,7 @@
makeCollectionChainTableSpec,
)
from ...core import ddl
from ..interfaces import CollectionRecord
from ..interfaces import CollectionRecord, VersionTuple

if TYPE_CHECKING:
from ..interfaces import Database, StaticTablesContext
Expand All @@ -58,13 +58,16 @@
collection_chain=makeCollectionChainTableSpec("collection_id", sqlalchemy.BigInteger),
)

# This has to be updated on every schema change
_VERSION = VersionTuple(0, 1, 0)


class SynthIntKeyCollectionManager(DefaultCollectionManager):
"""A `CollectionManager` implementation that uses synthetic primary key
(auto-incremented integer) for collections table.
Most of the logic, including caching policy, is implemented in the base
class, this class only adds customisations specific to this particular
class, this class only adds customizations specific to this particular
table schema.
Parameters
Expand Down Expand Up @@ -143,3 +146,12 @@ def _removeCachedRecord(self, record: CollectionRecord) -> None:
def _getByName(self, name: str) -> Optional[CollectionRecord]:
# Docstring inherited from DefaultCollectionManager.
return self._nameCache.get(name)

@classmethod
def currentVersion(cls) -> Optional[VersionTuple]:
# Docstring inherited from VersionedExtension.
return _VERSION

def schemaDigest(self) -> Optional[str]:
# Docstring inherited from VersionedExtension.
return self._defaultSchemaDigest(self._tables)
Expand Up @@ -22,7 +22,11 @@
DimensionUniverse,
)
from lsst.daf.butler.registry import ConflictingDefinitionError
from lsst.daf.butler.registry.interfaces import DatasetRecordStorage, DatasetRecordStorageManager
from lsst.daf.butler.registry.interfaces import (
DatasetRecordStorage,
DatasetRecordStorageManager,
VersionTuple
)

from .tables import makeStaticTableSpecs, addDatasetForeignKey, makeDynamicTableName, makeDynamicTableSpec
from ._storage import ByDimensionsDatasetRecordStorage
Expand All @@ -36,6 +40,10 @@
from .tables import StaticDatasetTablesTuple


# This has to be updated on every schema change
_VERSION = VersionTuple(0, 1, 0)


class ByDimensionsDatasetRecordStorageManager(DatasetRecordStorageManager):
"""A manager class for datasets that uses one dataset-collection table for
each group of dataset types that share the same dimensions.
Expand Down Expand Up @@ -182,3 +190,12 @@ def getDatasetRef(self, id: int, *, universe: DimensionUniverse) -> Optional[Dat
id=id,
run=self._collections[row[self._collections.getRunForeignKeyName()]].name
)

@classmethod
def currentVersion(cls) -> Optional[VersionTuple]:
# Docstring inherited from VersionedExtension.
return _VERSION

def schemaDigest(self) -> Optional[str]:
# Docstring inherited from VersionedExtension.
return self._defaultSchemaDigest(self._static)
4 changes: 4 additions & 0 deletions python/lsst/daf/butler/registry/dimensions/caching.py
Expand Up @@ -114,3 +114,7 @@ def fetch(self, dataIds: DataCoordinateIterable) -> Iterable[DimensionRecord]:
missing -= self._cache.keys()
for dataId in missing:
self._cache[dataId] = None

def digestTables(self) -> Iterable[sqlalchemy.schema.Table]:
# Docstring inherited from DimensionRecordStorage.digestTables.
return self._nested.digestTables()
4 changes: 4 additions & 0 deletions python/lsst/daf/butler/registry/dimensions/query.py
Expand Up @@ -152,3 +152,7 @@ def fetch(self, dataIds: DataCoordinateIterable) -> Iterable[DimensionRecord]:
# Given the restrictions imposed at construction, we know there's
# nothing to actually fetch: everything we need is in the data ID.
yield RecordClass.fromDict(dataId.byName())

def digestTables(self) -> Iterable[sqlalchemy.schema.Table]:
# Docstring inherited from DimensionRecordStorage.digestTables.
return []
4 changes: 4 additions & 0 deletions python/lsst/daf/butler/registry/dimensions/skypix.py
Expand Up @@ -102,3 +102,7 @@ def fetch(self, dataIds: DataCoordinateIterable) -> Iterable[DimensionRecord]:
for dataId in dataIds:
index = dataId[self._dimension.name]
yield RecordClass(index, self._dimension.pixelization.pixel(index))

def digestTables(self) -> Iterable[sqlalchemy.schema.Table]:
# Docstring inherited from DimensionRecordStorage.digestTables.
return []
28 changes: 26 additions & 2 deletions python/lsst/daf/butler/registry/dimensions/static.py
Expand Up @@ -20,11 +20,23 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import annotations

from typing import Optional
from typing import List, Optional

import sqlalchemy

from ...core import NamedKeyDict
from ...core.dimensions import DimensionElement, DimensionUniverse
from ..interfaces import Database, StaticTablesContext, DimensionRecordStorageManager, DimensionRecordStorage
from ..interfaces import (
Database,
StaticTablesContext,
DimensionRecordStorageManager,
DimensionRecordStorage,
VersionTuple
)


# This has to be updated on every schema change
_VERSION = VersionTuple(0, 1, 0)


class StaticDimensionRecordStorageManager(DimensionRecordStorageManager):
Expand Down Expand Up @@ -80,3 +92,15 @@ def clearCaches(self) -> None:
# Docstring inherited from DimensionRecordStorageManager.
for storage in self._records.values():
storage.clearCaches()

@classmethod
def currentVersion(cls) -> Optional[VersionTuple]:
# Docstring inherited from VersionedExtension.
return _VERSION

def schemaDigest(self) -> Optional[str]:
# Docstring inherited from VersionedExtension.
tables: List[sqlalchemy.schema.Table] = []
for recStorage in self._records.values():
tables += recStorage.digestTables()
return self._defaultSchemaDigest(tables)
4 changes: 4 additions & 0 deletions python/lsst/daf/butler/registry/dimensions/table.py
Expand Up @@ -144,3 +144,7 @@ def sync(self, record: DimensionRecord) -> bool:
compared={k: getattr(record, k) for k in record.__slots__[n:]},
)
return inserted

def digestTables(self) -> Iterable[sqlalchemy.schema.Table]:
# Docstring inherited from DimensionRecordStorage.digestTables.
return [self._table]
1 change: 1 addition & 0 deletions python/lsst/daf/butler/registry/interfaces/__init__.py
Expand Up @@ -26,3 +26,4 @@
from ._datasets import *
from ._bridge import *
from ._attributes import *
from ._versioning import *

0 comments on commit 49b7167

Please sign in to comment.