Skip to content

Commit

Permalink
Add pgSphere support to obscore manager (DM-36489)
Browse files Browse the repository at this point in the history
Obscore manager adds generic plugin mechanisms to support spatial columns
and indices. A specific implementation using pgSphere extension adds
two columns and corresponding indices that contain pgSphere polygon and
position (center of bounding circle) to obscore table.
  • Loading branch information
andy-slac committed Oct 12, 2022
1 parent 75cb68d commit f3daa2d
Show file tree
Hide file tree
Showing 16 changed files with 750 additions and 134 deletions.
38 changes: 35 additions & 3 deletions python/lsst/daf/butler/core/ddl.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"TableSpec",
"FieldSpec",
"ForeignKeySpec",
"IndexSpec",
"Base64Bytes",
"Base64Region",
"AstropyTimeNsecTai",
Expand Down Expand Up @@ -494,6 +495,37 @@ def fromConfig(cls, config: Config) -> ForeignKeySpec:
)


@dataclass(frozen=True)
class IndexSpec:
"""Specification of an index on table columns.
Parameters
----------
*columns : `str`
Names of the columns to index.
**kwargs: `Any`
Additional keyword arguments to pass directly to
`sqlalchemy.schema.Index` constructor. This could be used to provide
backend-specific options, e.g. to create a ``GIST`` index in PostgreSQL
one can pass ``postgresql_using="gist"``.
"""

def __init__(self, *columns: str, **kwargs: Any):
object.__setattr__(self, "columns", tuple(columns))
object.__setattr__(self, "kwargs", kwargs)

def __hash__(self) -> int:
return hash(self.columns)

columns: Tuple[str, ...]
"""Column names to include in the index (`Tuple` [ `str` ])."""

kwargs: dict[str, Any]
"""Additional keyword arguments passed directly to
`sqlalchemy.schema.Index` constructor (`dict` [ `str`, `Any` ]).
"""


@dataclass
class TableSpec:
"""A data class used to define a table or table-like query interface.
Expand All @@ -504,7 +536,7 @@ class TableSpec:
Specifications for the columns in this table.
unique : `Iterable` [ `tuple` [ `str` ] ], optional
Non-primary-key unique constraints for the table.
indexes: `Iterable` [ `tuple` [ `str` ] ], optional
indexes: `Iterable` [ `IndexSpec` ], optional
Indexes for the table.
foreignKeys : `Iterable` [ `ForeignKeySpec` ], optional
Foreign key constraints for the table.
Expand All @@ -527,7 +559,7 @@ def __init__(
fields: Iterable[FieldSpec],
*,
unique: Iterable[Tuple[str, ...]] = (),
indexes: Iterable[Tuple[str, ...]] = (),
indexes: Iterable[IndexSpec] = (),
foreignKeys: Iterable[ForeignKeySpec] = (),
exclusion: Iterable[Tuple[Union[str, Type[TimespanDatabaseRepresentation]], ...]] = (),
recycleIds: bool = True,
Expand All @@ -547,7 +579,7 @@ def __init__(
unique: Set[Tuple[str, ...]]
"""Non-primary-key unique constraints for the table."""

indexes: Set[Tuple[str, ...]]
indexes: Set[IndexSpec]
"""Indexes for the table."""

foreignKeys: List[ForeignKeySpec]
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/datastores/fileDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ def makeTableSpec(cls, datasetIdColumnType: type) -> ddl.TableSpec:
ddl.FieldSpec(name="file_size", dtype=BigInteger, nullable=True),
],
unique=frozenset(),
indexes=[tuple(["path"])],
indexes=[ddl.IndexSpec("path")],
)

def __init__(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,5 +450,5 @@ def makeCalibTableSpec(
# in our DatasetRecordStorage.certify() implementation, and just create
# a regular index here in the hope that helps with lookups.
index.extend(fieldSpec.name for fieldSpec in tsFieldSpecs)
tableSpec.indexes.add(tuple(index)) # type: ignore
tableSpec.indexes.add(ddl.IndexSpec(*index)) # type: ignore
return tableSpec
6 changes: 3 additions & 3 deletions python/lsst/daf/butler/registry/dimensions/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -562,12 +562,12 @@ def _makeOverlapTableSpec(cls, element: DatabaseDimensionElement) -> ddl.TableSp
# This index has the same fields as the PK, in a different
# order, to facilitate queries that know skypix_index and want
# to find the other element.
(
ddl.IndexSpec(
"skypix_system",
"skypix_level",
"skypix_index",
)
+ tuple(element.graph.required.names),
*element.graph.required.names,
),
},
foreignKeys=[
# Foreign key to summary table. This makes sure we don't
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/registry/interfaces/_collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ def resolve_wildcard(
their children, but not both.
Returns
------
-------
records : `list` [ `CollectionRecord` ]
Matching collection records.
"""
Expand Down
13 changes: 7 additions & 6 deletions python/lsst/daf/butler/registry/interfaces/_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -975,14 +975,15 @@ def _convertTableSpec(
allIndexes.update(spec.unique)
args.extend(
sqlalchemy.schema.Index(
self.shrinkDatabaseEntityName("_".join([name, "idx"] + list(columns))),
*columns,
unique=(columns in spec.unique),
self.shrinkDatabaseEntityName("_".join([name, "idx"] + list(index.columns))),
*index.columns,
unique=(index.columns in spec.unique),
**index.kwargs,
)
for columns in spec.indexes
if columns not in allIndexes
for index in spec.indexes
if index.columns not in allIndexes
)
allIndexes.update(spec.indexes)
allIndexes.update(index.columns for index in spec.indexes)
args.extend(
sqlalchemy.schema.Index(
self.shrinkDatabaseEntityName("_".join((name, "fkidx") + fk.source)),
Expand Down
1 change: 1 addition & 0 deletions python/lsst/daf/butler/registry/obscore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@
from ._manager import *
from ._records import *
from ._schema import *
from ._spatial import *
21 changes: 18 additions & 3 deletions python/lsst/daf/butler/registry/obscore/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"ExtraColumnType",
"ObsCoreConfig",
"ObsCoreManagerConfig",
"SpatialPluginConfig",
]

import enum
Expand Down Expand Up @@ -60,6 +61,9 @@ class ExtraColumnConfig(BaseModel):
length: Optional[int] = None
"""Optional length qualifier for a column, only used for strings."""

doc: Optional[str] = None
"""Documentation string for this column."""


class DatasetTypeConfig(BaseModel):
"""Configuration describing dataset type-related options."""
Expand Down Expand Up @@ -100,6 +104,16 @@ class DatasetTypeConfig(BaseModel):
values, or ExtraColumnConfig mappings."""


class SpatialPluginConfig(BaseModel):
"""Configuration class for a spatial plugin."""

cls: str
"""Name of the class implementing plugin methods."""

config: Dict[str, Any] = {}
"""Configuration object passed to plugin ``initialize()`` method."""


class ObsCoreConfig(BaseModel):
"""Configuration which controls conversion of Registry datasets into
obscore records.
Expand Down Expand Up @@ -143,9 +157,10 @@ class ObsCoreConfig(BaseModel):
spectral_ranges: Dict[str, Tuple[float, float]] = {}
"""Maps band name or filter name to a min/max of spectral range."""

spatial_backend: Optional[str] = None
"""The name of a spatial backend which manages additional spatial
columns and indices (e.g. "pgsphere"). By default there is no spatial
spatial_plugins: Dict[str, SpatialPluginConfig] = {}
"""Optional configuration for plugins managing spatial columns and
indices. The key is an arbitrary name and the value is an object describing
plugin class and its configuration options. By default there is no spatial
indexing support, but a standard ``s_region`` column is always included.
"""

Expand Down
66 changes: 44 additions & 22 deletions python/lsst/daf/butler/registry/obscore/_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import re
import uuid
from collections import defaultdict
from collections.abc import Mapping
from collections.abc import Collection, Mapping
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Type, cast

import sqlalchemy
Expand All @@ -44,8 +44,9 @@

from ..interfaces import ObsCoreTableManager, VersionTuple
from ._config import ConfigCollectionType, ObsCoreManagerConfig
from ._records import ExposureRegionFactory, RecordFactory
from ._records import ExposureRegionFactory, Record, RecordFactory
from ._schema import ObsCoreSchema
from ._spatial import SpatialObsCorePlugin

if TYPE_CHECKING:
from ..interfaces import (
Expand Down Expand Up @@ -134,14 +135,18 @@ def __init__(
universe: DimensionUniverse,
config: ObsCoreManagerConfig,
dimensions: DimensionRecordStorageManager,
spatial_plugins: Collection[SpatialObsCorePlugin],
):
self.db = db
self.table = table
self.schema = schema
self.universe = universe
self.config = config
self.spatial_plugins = spatial_plugins
exposure_region_factory = _ExposureRegionFactory(dimensions)
self.record_factory = RecordFactory(config, schema, universe, exposure_region_factory)
self.record_factory = RecordFactory(
config, schema, universe, spatial_plugins, exposure_region_factory
)
self.tagged_collection: Optional[str] = None
self.run_patterns: list[re.Pattern] = []
if config.collection_type is ConfigCollectionType.TAGGED:
Expand Down Expand Up @@ -174,15 +179,29 @@ def initialize(
config_data = Config(config)
obscore_config = ObsCoreManagerConfig.parse_obj(config_data)

schema = ObsCoreSchema(config=obscore_config, datasets=datasets)
# Instantiate all spatial plugins.
spatial_plugins = SpatialObsCorePlugin.load_plugins(obscore_config.spatial_plugins, db)

schema = ObsCoreSchema(config=obscore_config, spatial_plugins=spatial_plugins, datasets=datasets)

# Generate table specification for main obscore table.
table_spec = schema.table_spec
for plugin in spatial_plugins:
plugin.extend_table_spec(table_spec)
table = context.addTable(obscore_config.table_name, schema.table_spec)

# Create additional tables if needed.
for plugin in spatial_plugins:
plugin.make_extra_tables(schema, context)

return ObsCoreLiveTableManager(
db=db,
table=table,
schema=schema,
universe=universe,
config=obscore_config,
dimensions=dimensions,
spatial_plugins=spatial_plugins,
)

def config_json(self) -> str:
Expand Down Expand Up @@ -241,15 +260,7 @@ def add_datasets(self, refs: Iterable[DatasetRef]) -> None:
# Take all refs, no collection check.
obscore_refs = refs

# Convert them all to records.
records: List[dict] = []
for ref in obscore_refs:
if (record := self.record_factory(ref)) is not None:
records.append(record)

if records:
# Ignore potential conflicts with existing datasets.
self.db.ensure(self.table, *records, primary_key_only=True)
self._populate(obscore_refs)

def associate(self, refs: Iterable[DatasetRef], collection: CollectionRecord) -> None:
# Docstring inherited from base class.
Expand All @@ -259,15 +270,7 @@ def associate(self, refs: Iterable[DatasetRef], collection: CollectionRecord) ->
return

if collection.name == self.tagged_collection:

records: List[dict] = []
for ref in refs:
if (record := self.record_factory(ref)) is not None:
records.append(record)

if records:
# Ignore potential conflicts with existing datasets.
self.db.ensure(self.table, *records, primary_key_only=True)
self._populate(refs)

def disassociate(self, refs: Iterable[DatasetRef], collection: CollectionRecord) -> None:
# Docstring inherited from base class.
Expand All @@ -288,6 +291,25 @@ def disassociate(self, refs: Iterable[DatasetRef], collection: CollectionRecord)
where = self.table.columns[fk_field.name].in_(ids)
self.db.deleteWhere(self.table, where)

def _populate(self, refs: Iterable[DatasetRef]) -> None:
"""Populate obscore table with the data from given datasets."""
records: List[Record] = []
extra_plugin_records: Dict[sqlalchemy.schema.Table, List[Record]] = defaultdict(list)
for ref in refs:
record, extra_records = self.record_factory(ref)
if record is not None:
records.append(record)
if extra_records is not None:
for table, table_records in extra_records.items():
extra_plugin_records[table].extend(table_records)

if records:
# Ignore potential conflicts with existing datasets.
self.db.ensure(self.table, *records, primary_key_only=True)
if extra_plugin_records:
for table, table_records in extra_plugin_records.items():
self.db.ensure(table, *table_records, primary_key_only=True)

def _check_dataset_run(self, run: str) -> bool:
"""Check that specified run collection matches know patterns."""

Expand Down

0 comments on commit f3daa2d

Please sign in to comment.