Skip to content

Commit

Permalink
Add support for specifying schema versions (DM-25355)
Browse files Browse the repository at this point in the history
Schema versions are now specified in the registry config file and they
are stored to attributes table on schema initialization. When opening
existing database these versions are compared with configured ones,
exception is raised if they are not compatible. There is also a
groundwork for detecting schema changes by checking schema digests, it
is not used by anything yet.

Couple of unit tests needed updates to avoid repeated initialization of
existing schema.
  • Loading branch information
andy-slac committed Jun 23, 2020
1 parent 029026b commit 31b3b40
Show file tree
Hide file tree
Showing 7 changed files with 385 additions and 9 deletions.
10 changes: 10 additions & 0 deletions config/registry.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,13 @@ registry:
collections: lsst.daf.butler.registry.collections.synthIntKey.SynthIntKeyCollectionManager
datasets: lsst.daf.butler.registry.datasets.byDimensions.ByDimensionsDatasetRecordStorageManager
datastores: lsst.daf.butler.registry.bridge.monolithic.MonolithicDatastoreRegistryBridgeManager
schema_versions:
core:
version: 0.0.1
digest: ""
dimensions:
version: 0.0.1
digest: ""
datastores:
version: 0.0.1
digest: ""
16 changes: 15 additions & 1 deletion python/lsst/daf/butler/registry/_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
from ._exceptions import ConflictingDefinitionError, InconsistentDataIdError, OrphanedRecordError
from .wildcards import CategorizedWildcard, CollectionQuery, CollectionSearch, Ellipsis
from .interfaces import ChainedCollectionRecord, RunRecord
from .versions import ButlerVersionsManager

if TYPE_CHECKING:
from ..butlerConfig import ButlerConfig
Expand Down Expand Up @@ -138,9 +139,11 @@ def fromConfig(cls, config: Union[ButlerConfig, RegistryConfig, Config, str], cr
collections = doImport(config["managers", "collections"])
datasets = doImport(config["managers", "datasets"])
datastoreBridges = doImport(config["managers", "datastores"])
versions = ButlerVersionsManager.fromConfig(config["schema_versions"])

return cls(database, universe, dimensions=dimensions, attributes=attributes, opaque=opaque,
collections=collections, datasets=datasets, datastoreBridges=datastoreBridges,
create=create)
versions=versions, writeable=writeable, create=create)

def __init__(self, database: Database, universe: DimensionUniverse, *,
attributes: Type[ButlerAttributeManager],
Expand All @@ -149,6 +152,8 @@ def __init__(self, database: Database, universe: DimensionUniverse, *,
collections: Type[CollectionManager],
datasets: Type[DatasetRecordStorageManager],
datastoreBridges: Type[DatastoreRegistryBridgeManager],
versions: ButlerVersionsManager,
writeable: bool = True,
create: bool = False):
self._db = database
self.storageClasses = StorageClassFactory()
Expand All @@ -164,6 +169,15 @@ def __init__(self, database: Database, universe: DimensionUniverse, *,
opaque=self._opaque,
datasets=datasets,
universe=self.dimensions)
context.addInitializer(lambda db: versions.storeVersions(self._attributes))

# This call does not do anything right now as we do not have a way to
# split tables between sub-schemas yet.
versions.checkVersionDigests()
if not create:
# verify that configured versions are compatible with schema
versions.checkStoredVersions(self._attributes, writeable)

self._collections.refresh()
self._datasets.refresh(universe=self._dimensions.universe)

Expand Down
21 changes: 21 additions & 0 deletions python/lsst/daf/butler/registry/interfaces/_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from contextlib import contextmanager
from typing import (
Any,
Callable,
Dict,
Iterable,
Iterator,
Expand Down Expand Up @@ -100,6 +101,7 @@ def __init__(self, db: Database):
self._foreignKeys: List[Tuple[sqlalchemy.schema.Table, sqlalchemy.schema.ForeignKeyConstraint]] = []
self._inspector = sqlalchemy.engine.reflection.Inspector(self._db._connection)
self._tableNames = frozenset(self._inspector.get_table_names(schema=self._db.namespace))
self._initializers: List[Callable[[Database], None]] = []

def addTable(self, name: str, spec: ddl.TableSpec) -> sqlalchemy.schema.Table:
"""Add a new table to the schema, returning its sqlalchemy
Expand Down Expand Up @@ -141,6 +143,20 @@ def addTableTuple(self, specs: Tuple[ddl.TableSpec, ...]) -> Tuple[sqlalchemy.sc
return specs._make(self.addTable(name, spec) # type: ignore
for name, spec in zip(specs._fields, specs)) # type: ignore

def addInitializer(self, initializer: Callable[[Database], None]) -> None:
"""Add a method that does one-time initialization of a database.
Initialization can mean anything that changes state of a database
and needs to be done exactly once after database schema was created.
An example for that could be population of schema attributes.
Parameters
----------
initializer : callable
Method of a single argument which is a `Database` instance.
"""
self._initializers.append(initializer)


class Database(ABC):
"""An abstract interface that represents a particular database engine's
Expand Down Expand Up @@ -387,7 +403,12 @@ def declareStaticTables(self, *, create: bool) -> Iterator[StaticTablesContext]:
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=sqlalchemy.exc.SADeprecationWarning)
self._metadata.create_all(self._connection)
# call all initializer methods sequentially
for init in context._initializers:
init(self)
except BaseException:
# TODO: this is potentially dangerous if we run it on
# pre-existing schema.
self._metadata.drop_all(self._connection)
self._metadata = None
raise
Expand Down
16 changes: 10 additions & 6 deletions python/lsst/daf/butler/registry/tests/_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -1020,14 +1020,17 @@ def testAbstractFilterQuery(self):
def testAttributeManager(self):
"""Test basic functionality of attribute manager.
"""
# number of attributes with schema versions in a fresh database
VERSION_COUNT = 3

registry = self.makeRegistry()
attributes = registry._attributes

# check what get() returns for non-existing key
self.assertIsNone(attributes.get("attr"))
self.assertEqual(attributes.get("attr", ""), "")
self.assertEqual(attributes.get("attr", "Value"), "Value")
self.assertEqual(len(list(attributes.items())), 0)
self.assertEqual(len(list(attributes.items())), VERSION_COUNT)

# cannot store empty key or value
with self.assertRaises(ValueError):
Expand All @@ -1037,20 +1040,20 @@ def testAttributeManager(self):

# set value of non-existing key
attributes.set("attr", "value")
self.assertEqual(len(list(attributes.items())), 1)
self.assertEqual(len(list(attributes.items())), VERSION_COUNT + 1)
self.assertEqual(attributes.get("attr"), "value")

# update value of existing key
with self.assertRaises(ButlerAttributeExistsError):
attributes.set("attr", "value2")

attributes.set("attr", "value2", force=True)
self.assertEqual(len(list(attributes.items())), 1)
self.assertEqual(len(list(attributes.items())), VERSION_COUNT + 1)
self.assertEqual(attributes.get("attr"), "value2")

# delete existing key
self.assertTrue(attributes.delete("attr"))
self.assertEqual(len(list(attributes.items())), 0)
self.assertEqual(len(list(attributes.items())), VERSION_COUNT)

# delete non-existing key
self.assertFalse(attributes.delete("non-attr"))
Expand All @@ -1063,5 +1066,6 @@ def testAttributeManager(self):
]
for key, value in data:
attributes.set(key, value)
items = list(attributes.items())
self.assertCountEqual(items, data)
items = dict(attributes.items())
for key, value in data:
self.assertEqual(items[key], value)

0 comments on commit 31b3b40

Please sign in to comment.