Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic spatial index tables. #1312

Merged
merged 7 commits into from Aug 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
40 changes: 39 additions & 1 deletion datacube/drivers/postgis/_connections.py
Expand Up @@ -17,7 +17,7 @@
import os
import re
from contextlib import contextmanager
from typing import Callable, Optional, Union
from typing import Any, Callable, Iterable, Mapping, Optional, Union, Type

from sqlalchemy import event, create_engine, text
from sqlalchemy.engine import Engine
Expand All @@ -26,9 +26,12 @@
import datacube
from datacube.index.exceptions import IndexSetupError
from datacube.utils import jsonify_document
from datacube.utils.geometry import CRS

from . import _api
from . import _core
from ._spatial import ensure_spindex, spindexes, spindex_for_crs
from ._schema import SpatialIndex

_LIB_ID = 'odc-' + str(datacube.__version__)

Expand Down Expand Up @@ -67,6 +70,7 @@ def __init__(self, engine):
# We don't recommend using this constructor directly as it may change.
# Use static methods PostGisDb.create() or PostGisDb.from_config()
self._engine = engine
self._spindexes: Optional[Mapping[CRS, Any]] = None

@classmethod
def from_config(cls, config, application_name=None, validate_connection=True):
Expand Down Expand Up @@ -206,6 +210,40 @@ def init(self, with_permissions=True):

return is_new

def _refresh_spindexes(self):
self._spindexes = spindexes(self._engine)

@property
def spindexes(self) -> Mapping[CRS, Type[SpatialIndex]]:
if self._spindexes is None:
self._refresh_spindexes()
return self._spindexes

def create_spatial_index(self, crs: "datacube.utils.geometry.CRS") -> Optional[Type[SpatialIndex]]:
"""
Create a spatial index across the database, for the named CRS.

:param crs_str:
:return:
"""
spidx = self.spindexes.get(crs)
if spidx is None:
spidx = spindex_for_crs(crs)
if spidx is None:
_LOG.warning("Could not dynamically model an index for CRS %s", crs._str)
return None
ensure_spindex(self._engine, spidx)
self.spindexes[crs] = spidx
return spidx

def spatial_index(self, crs: "datacube.utils.geometry.CRS") -> Optional[Type[SpatialIndex]]:
return self.spindexes.get(CRS)

def spatial_indexes(self, refresh=False) -> Iterable[Any]:
if refresh:
self._refresh_spindexes()
return list(self.spindexes.keys())

@contextmanager
def connect(self):
"""
Expand Down
35 changes: 12 additions & 23 deletions datacube/drivers/postgis/_core.py
Expand Up @@ -12,8 +12,7 @@
SCHEMA_NAME, TYPES_INIT_SQL,
UPDATE_COLUMN_MIGRATE_SQL_TEMPLATE,
UPDATE_TIMESTAMP_SQL,
escape_pg_identifier,
pg_column_exists)
escape_pg_identifier)
from sqlalchemy import MetaData
from sqlalchemy.engine import Engine
from sqlalchemy.schema import CreateSchema
Expand Down Expand Up @@ -79,6 +78,8 @@ def ensure_db(engine, with_permissions=True):

quoted_db_name, quoted_user = _get_quoted_connection_info(c)

_ensure_extension(c, 'POSTGIS')

if with_permissions:
_LOG.info('Ensuring user roles.')
_ensure_role(c, 'odc_user')
Expand All @@ -101,10 +102,9 @@ def ensure_db(engine, with_permissions=True):
c.execute(CreateSchema(SCHEMA_NAME))
_LOG.info('Creating tables.')
c.execute(TYPES_INIT_SQL)
from . import _schema
base = _schema.orm_registry.generate_base()
_LOG.info("Dataset indexes: %s", repr(base.metadata.tables["odc.dataset"].indexes))
base.metadata.create_all(c)
from ._schema import orm_registry, ALL_STATIC_TABLES
_LOG.info("Dataset indexes: %s", repr(orm_registry.metadata.tables["odc.dataset"].indexes))
orm_registry.metadata.create_all(c, tables=ALL_STATIC_TABLES)
_LOG.info("Creating triggers.")
install_timestamp_trigger(c)
c.execute('commit')
Expand Down Expand Up @@ -180,23 +180,12 @@ def update_schema(engine: Engine):
See the `schema_is_latest()` function above: this should apply updates
that it requires.
"""
# This will typically check if something exists (like a newly added column), and
# run the SQL of the change inside a single transaction.

# Empty, as no schema changes have been made recently.
# -> If you need to write one, look at the Git history of this
# function for some examples.

# Post 1.8 DB Incremental Sync triggers
if not pg_column_exists(engine, schema_qualified('dataset'), 'updated'):
_LOG.info("Adding 'updated'/'added' fields and triggers to schema.")
c = engine.connect()
c.execute('begin')
install_timestamp_trigger(c)
c.execute('commit')
c.close()
else:
_LOG.info("No schema updates required.")
# TODO: implement migrations


def _ensure_extension(engine, extension_name="POSTGIS"):
sql = f'create extension if not exists {extension_name}'
engine.execute(sql)


def _ensure_role(engine, name, inherits_from=None, add_user=False, create_db=False):
Expand Down
36 changes: 36 additions & 0 deletions datacube/drivers/postgis/_schema.py
Expand Up @@ -7,6 +7,7 @@
"""

import logging
from typing import Type

from sqlalchemy.orm import aliased, registry, relationship, column_property
from sqlalchemy import ForeignKey, UniqueConstraint, PrimaryKeyConstraint, CheckConstraint, SmallInteger, Text
Expand Down Expand Up @@ -166,3 +167,38 @@ class DatasetSource:
classifier = Column(String, nullable=False, comment="""An identifier for this source dataset.
E.g. the dataset type ('ortho', 'nbar'...) if there's only one source of each type, or a datestamp
for a time-range summary.""")


class SpatialIndex:
"""
Base class for dynamically SpatialIndex ORM models (See _spatial.py)
"""


@orm_registry.mapped
class SpatialIndexRecord:
__tablename__ = "spatial_indicies"
__table_args__ = (
_core.METADATA,
{
"schema": sql.SCHEMA_NAME,
"comment": "Record of the existence of a Spatial Index Table for an SRID/CRS",
}
)
srid = Column(SmallInteger, primary_key=True, autoincrement=False)
table_name = Column(String,
unique=True, nullable=True,
comment="The name of the table implementing the index - DO NOT CHANGE")
added = Column(DateTime(timezone=True), server_default=func.now(), nullable=False, comment="when added")
added_by = Column(Text, server_default=func.current_user(), nullable=False, comment="added by whom")

@classmethod
def from_spindex(cls, spindex: Type[SpatialIndex]) -> "SpatialIndexRecord":
return cls(srid=spindex.__tablename__[8:],
table_name=spindex.__tablename__)


ALL_STATIC_TABLES = [
MetadataType.__table__, Product.__table__, Dataset.__table__,
DatasetLocation.__table__, DatasetSource.__table__, SpatialIndexRecord.__table__
]
145 changes: 145 additions & 0 deletions datacube/drivers/postgis/_spatial.py
@@ -0,0 +1,145 @@
# This file is part of the Open Data Cube, see https://opendatacube.org for more information
#
# Copyright (c) 2015-2020 ODC Contributors
# SPDX-License-Identifier: Apache-2.0
"""
Tracking spatial indexes
"""

import logging
from threading import Lock
from typing import Mapping, Optional, Type, Union

from sqlalchemy import ForeignKey, select
from sqlalchemy.dialects import postgresql as postgres
from geoalchemy2 import Geometry

from sqlalchemy.engine import Connectable
from sqlalchemy import Column
from sqlalchemy.orm import Session

from datacube.utils.geometry import CRS
from ._core import METADATA
from .sql import SCHEMA_NAME
from ._schema import orm_registry, Dataset, SpatialIndex, SpatialIndexRecord

_LOG = logging.getLogger(__name__)


# In theory we could just use the SQLAlchemy registry for this, but it is not indexed
# in a useful way.
class SpatialIndexORMRegistry:
"""Threadsafe global registry of SpatialIndex ORM classes, indexed by EPSG/SRID code."""
_registry: Mapping[int, Type[SpatialIndex]] = {}
_lock = Lock()

def __init__(self):
self._registry = self.__class__._registry
self._lock = self.__class__._lock

def _to_epsg(self, epsg_or_crs: Union[CRS, int]) -> int:
"""Utility method to convert a epsg_or_crs to an epsg."""
if isinstance(epsg_or_crs, CRS):
return epsg_or_crs.epsg
else:
return epsg_or_crs

def register(self, epsg_or_crs: Union[CRS, int]) -> bool:
"""Ensure that SpatialIndex ORM clss is registered for this EPSG/SRID"""
epsg = self._to_epsg(epsg_or_crs)
added = False
with self._lock:
if epsg not in self._registry:
self._registry[epsg] = self._mint_new_spindex(epsg)
added = True
return added

def get(self, epsg_or_crs: Union[CRS, int]) -> Optional[Type[SpatialIndex]]:
"""Retrieve the registered SpatialIndex ORM class"""
epsg = self._to_epsg(epsg_or_crs)
return self._registry.get(epsg)

def _mint_new_spindex(self, epsg: int):
"""
Dynamically create a new ORM class for a EPSG/SRID.

Note: Called within registry lock.
"""
table_name = f"spatial_{epsg}"
attributes = {
'__tablename__': table_name,
'__table_args__': (
METADATA,
{
"schema": SCHEMA_NAME,
"comment": "A product or dataset type, family of related datasets."
}
),
"dataset_ref": Column(postgres.UUID(as_uuid=True), ForeignKey(Dataset.id),
primary_key=True,
nullable=False,
comment="The dataset being indexed")
}
# Add geometry column
attributes["extent"] = Column(Geometry('MULTIPOLYGON', srid=epsg),
nullable=False,
comment="The extent of the dataset")
return orm_registry.mapped(type(f'SpatialIdx{epsg}', (SpatialIndex,), attributes))


def spindex_for_epsg(epsg: int) -> Type[SpatialIndex]:
"""Return ORM class of a SpatialIndex for EPSG/SRID - dynamically creating if necessary"""
sir = SpatialIndexORMRegistry()
spindex = sir.get(epsg)
if spindex is None:
sir.register(epsg)
spindex = sir.get(epsg)
return spindex


def spindex_for_crs(crs: CRS) -> Type[SpatialIndex]:
"""Return ORM class of a SpatialIndex for CRS - dynamically creating if necessary"""
if not (str(crs).startswith('EPSG') and crs.epsg):
# Postgis identifies CRSs by a numeric "SRID" which is equivalent to EPSG number.
_LOG.error("Cannot create a postgis spatial index for a non-EPSG-style CRS.")
return None

return spindex_for_epsg(crs.epsg)


def spindex_for_record(rec: SpatialIndexRecord) -> Type[SpatialIndex]:
"""Convert a Record of a SpatialIndex created in a particular database to an ORM class"""
return spindex_for_crs(rec.crs)


def ensure_spindex(engine: Connectable, sp_idx: Type[SpatialIndex]) -> None:
"""Ensure a Spatial Index exists in a particular database."""
with Session(engine) as session:
results = session.execute(
select(SpatialIndexRecord.srid).where(SpatialIndexRecord.srid == sp_idx.__tablename__[8:])
)
for result in results:
# SpatialIndexRecord exists - actual index assumed to exist too.
return
# SpatialIndexRecord doesn't exist - create the index table...
orm_registry.metadata.create_all(engine, [sp_idx.__table__])
# ... and add a SpatialIndexRecord
session.add(SpatialIndexRecord.from_spindex(sp_idx))
session.flush()
return


def spindexes(engine: Connectable) -> Mapping[CRS, Type[SpatialIndex]]:
"""
Return a CRS-to-Spatial Index ORM class mapping for indexes that exist in a particular database.
"""
out = {}
sir = SpatialIndexORMRegistry()
with Session(engine) as session:
results = session.execute(select(SpatialIndexRecord.srid))
for result in results:
epsg = int(result[0])
spindex = spindex_for_epsg(epsg)
crs = CRS(f'EPSG:{epsg}')
out[crs] = spindex
return out
21 changes: 21 additions & 0 deletions datacube/index/abstract.py
Expand Up @@ -17,6 +17,7 @@
from datacube.model import DatasetType as Product
from datacube.utils import cached_property, read_documents, InvalidDocException
from datacube.utils.changes import AllowPolicy, Change, Offset
from datacube.utils.geometry import CRS


class AbstractUserResource(ABC):
Expand Down Expand Up @@ -971,6 +972,26 @@ def init_db(self,
def close(self) -> None:
...

@abstractmethod
def create_spatial_index(self, crs: CRS) -> bool:
"""
Create a spatial index using the nominated CRS.

:param crs: The CRS to use in the spatial index.
:return: True is the index was successfully created or already exists.
None if spatial indexes are not supported.
"""

def spatial_indexes(self, refresh=False) -> Iterable[CRS]:
"""
Return a list of CRSs for which spatiotemporal indexes exist in the database.

:param refresh: If true, re-read from database record (e.g. to catch spatial
indexes recently created in another datacube session.
:return:
"""
return []

def __enter__(self):
return self

Expand Down
5 changes: 5 additions & 0 deletions datacube/index/memory/index.py
Expand Up @@ -11,6 +11,7 @@
from datacube.index.memory._users import UserResource
from datacube.index.abstract import AbstractIndex, AbstractIndexDriver
from datacube.model import MetadataType
from datacube.utils.geometry import CRS

_LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -60,6 +61,10 @@ def init_db(self, with_default_types=True, with_permissions=True):
def close(self):
pass

def create_spatial_index(self, crs: CRS) -> bool:
_LOG.warning("memory index driver does not support spatio-temporal indexes")
return False

def __repr__(self):
return "Index<memory>"

Expand Down
5 changes: 5 additions & 0 deletions datacube/index/null/index.py
Expand Up @@ -11,6 +11,7 @@
from datacube.index.abstract import AbstractIndex, AbstractIndexDriver
from datacube.model import MetadataType
from datacube.model.fields import get_dataset_fields
from datacube.utils.geometry import CRS

_LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -62,6 +63,10 @@ def init_db(self, with_default_types=True, with_permissions=True):
def close(self):
pass

def create_spatial_index(self, crs: CRS) -> bool:
_LOG.warning("null driver does not support spatio-temporal indexes")
return False

def __repr__(self):
return "Index<null>"

Expand Down