Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spatial index api #1481

Merged
merged 15 commits into from Aug 21, 2023
2 changes: 1 addition & 1 deletion datacube/drivers/postgis/_api.py
Expand Up @@ -219,7 +219,7 @@ def extract_dataset_fields(ds_metadata, fields):
return result


class PostgisDbAPI(object):
class PostgisDbAPI:
def __init__(self, parentdb, connection):
self._db = parentdb
self._connection = connection
Expand Down
22 changes: 18 additions & 4 deletions datacube/drivers/postgis/_connections.py
Expand Up @@ -30,7 +30,7 @@

from . import _api
from . import _core
from ._spatial import ensure_spindex, spindexes, spindex_for_crs
from ._spatial import ensure_spindex, spindexes, spindex_for_crs, drop_spindex, normalise_crs
from ._schema import SpatialIndex

_LIB_ID = 'odc-' + str(datacube.__version__)
Expand Down Expand Up @@ -223,7 +223,7 @@ def create_spatial_index(self, crs: CRS) -> Optional[Type[SpatialIndex]]:
"""
Create a spatial index across the database, for the named CRS.

:param crs_str:
:param crs:
:return:
"""
spidx = self.spindexes.get(crs)
Expand All @@ -233,11 +233,25 @@ def create_spatial_index(self, crs: CRS) -> Optional[Type[SpatialIndex]]:
_LOG.warning("Could not dynamically model an index for CRS %s", crs._str)
return None
ensure_spindex(self._engine, spidx)
self.spindexes[crs] = spidx
self._refresh_spindexes()
return spidx

def drop_spatial_index(self, crs: CRS) -> bool:
"""
Create a spatial index across the database, for the named CRS.

:param crs:
:return:
"""
spidx = self.spindexes.get(crs)
if spidx is None:
return False
result = drop_spindex(self._engine, spidx)
self._refresh_spindexes()
return result

def spatial_index(self, crs: CRS) -> Optional[Type[SpatialIndex]]:
Ariana-B marked this conversation as resolved.
Show resolved Hide resolved
return self.spindexes.get(crs)
return self.spindexes.get(normalise_crs(crs))

def spatial_indexes(self, refresh=False) -> Iterable[CRS]:
SpacemanPaul marked this conversation as resolved.
Show resolved Hide resolved
if refresh:
Expand Down
35 changes: 33 additions & 2 deletions datacube/drivers/postgis/_spatial.py
Expand Up @@ -10,7 +10,7 @@
from threading import Lock
from typing import Mapping, Optional, Type, Union

from sqlalchemy import ForeignKey, select
from sqlalchemy import ForeignKey, select, delete
from sqlalchemy.dialects import postgresql as postgres
from geoalchemy2 import Geometry

Expand All @@ -20,6 +20,8 @@

from odc.geo import CRS, Geometry as Geom
from odc.geo.geom import multipolygon, polygon
from sqlalchemy.sql.ddl import DropTable

from ._core import METADATA
from .sql import SCHEMA_NAME
from ._schema import orm_registry, Dataset, SpatialIndex, SpatialIndexRecord
Expand Down Expand Up @@ -108,6 +110,10 @@ def spindex_for_crs(crs: CRS) -> Type[SpatialIndex]:
return spindex_for_epsg(crs.epsg)


def normalise_crs(crs_in: CRS) -> CRS:
SpacemanPaul marked this conversation as resolved.
Show resolved Hide resolved
return CRS(f'epsg:{crs_in.epsg}')


def spindex_for_record(rec: SpatialIndexRecord) -> Type[SpatialIndex]:
SpacemanPaul marked this conversation as resolved.
Show resolved Hide resolved
"""Convert a Record of a SpatialIndex created in a particular database to an ORM class"""
return spindex_for_crs(rec.crs)
Expand All @@ -131,6 +137,31 @@ def ensure_spindex(engine: Connectable, sp_idx: Type[SpatialIndex]) -> None:
return


def drop_spindex(engine: Connectable, sp_idx: Type[SpatialIndex]):
with Session(engine) as session:
results = session.execute(
select(SpatialIndexRecord).where(SpatialIndexRecord.srid == sp_idx.__tablename__[8:])
)
spidx_record = None
for result in results:
spidx_record = result[0]
break
record_del_result = False
if spidx_record:
result = session.execute(
delete(SpatialIndexRecord).where(SpatialIndexRecord.srid == spidx_record.srid)
)
record_del_result = (result.rowcount == 1)

result = session.execute(
DropTable(sp_idx.__table__, if_exists=True)
)
drop_table_result = (result.rowcount == 1)
print(f"spindex record deleted: {record_del_result} table dropped: {drop_table_result}")

return True


def spindexes(engine: Connectable) -> Mapping[CRS, Type[SpatialIndex]]:
"""
Return a CRS-to-Spatial Index ORM class mapping for indexes that exist in a particular database.
Expand All @@ -141,7 +172,7 @@ def spindexes(engine: Connectable) -> Mapping[CRS, Type[SpatialIndex]]:
for result in results:
epsg = int(result[0])
spindex = spindex_for_epsg(epsg)
crs = CRS(f'EPSG:{epsg}')
crs = CRS(f'epsg:{epsg}')
SpacemanPaul marked this conversation as resolved.
Show resolved Hide resolved
out[crs] = spindex
return out

Expand Down
132 changes: 89 additions & 43 deletions datacube/index/abstract.py
Expand Up @@ -1862,6 +1862,8 @@ class AbstractIndex(ABC):
supports_external_home = False
# Supports ACID transactions
supports_transactions = False
# Supports per-CRS spatial indexes
supports_spatial_indexes = False

@property
@abstractmethod
Expand Down Expand Up @@ -1921,6 +1923,88 @@ def init_db(self,
:return: true if the database was created, false if already exists
"""

# Spatial Index API

def create_spatial_index(self, crs: CRS) -> bool:
"""
Create a spatial index for a CRS.

Note that a newly created spatial index is empty. If there are already datatsets in the index whose
extents can be safely projected into the CRS, then it is necessary to also call update_spatial_index
otherwise they will not be found by queries against that CRS.

Only implemented by index drivers with supports_spatial_indexes set to True.

:param crs: The coordinate reference system to create a spatial index for.
:return: True if the spatial index was successfully created (or already exists)
"""
if not self.supports_spatial_indexes:
raise NotImplementedError("This index driver does not support the Spatial Index API")
else:
raise NotImplementedError()

def spatial_indexes(self, refresh=False) -> Iterable[CRS]:
SpacemanPaul marked this conversation as resolved.
Show resolved Hide resolved
"""
Return the CRSs for which spatial indexes have been created.

:param refresh: If true, query the backend for the list of current spatial indexes. If false (the default)
a cached list of spatial index CRSs may be returned.
:return: An iterable of CRSs for which spatial indexes exist in the index
"""
if not self.supports_spatial_indexes:
raise NotImplementedError("This index driver does not support the Spatial Index API")
else:
raise NotImplementedError()

def update_spatial_index(self,
crses: Sequence[CRS] = [],
product_names: Sequence[str] = [],
dataset_ids: Sequence[DSID] = []
) -> int:
"""
Populate a newly created spatial index (or indexes).

Spatial indexes are automatically populated with new datasets as they are indexed, but if there were
datasets already in the index when a new spatial index is created, or if geometries have been added or
modified outside of the ODC in a populated index (e.g. with SQL) then the spatial indexies must be
updated manually with this method.

This is a very slow operation. The product_names and dataset_ids lists can be used to break the
operation up into chunks or allow faster updating when the spatial index is only relevant to a
small portion of the entire index.

:param crses: A list of CRSes whose spatial indexes are to be updated.
Default is to update all spatial indexes
:param product_names: A list of product names to update the spatial indexes.
Default is to update for all products
:param dataset_ids: A list of ids of specific datasets to update in the spatial index.
Default is to update for all datasets (or all datasts in the products
in the product_names list)
:return: The number of dataset extents processed - i.e. the number of datasets updated multiplied by the
number of spatial indexes updated.
"""
if not self.supports_spatial_indexes:
raise NotImplementedError("This index driver does not support the Spatial Index API")
else:
raise NotImplementedError()

def drop_spatial_index(self, crs: CRS) -> bool:
"""
Remove a spatial index from the database.

Note that creating spatial indexes on an existing index is a slow and expensive operation. Do not
delete spatial indexes unless you are absolutely certain it is no longer required by any users of
this ODC index.

:param crs: The CRS whose spatial index is to be deleted.
:return: True if the spatial index was successfully dropped.
False if spatial index could not be dropped.
"""
if not self.supports_spatial_indexes:
raise NotImplementedError("This index driver does not support the Spatial Index API")
else:
raise NotImplementedError()

def clone(self,
origin_index: "AbstractIndex",
batch_size: int = 1000,
Expand Down Expand Up @@ -1957,6 +2041,11 @@ def clone(self,
"""
results = {}
if not lineage_only:
if self.supports_spatial_indexes and origin_index.supports_spatial_indexes:
for crs in origin_index.spatial_indexes(refresh=True):
report_to_user(f"Creating spatial index for CRS {crs}")
self.create_spatial_index(crs)
self.update_spatial_index(crs)
# Clone Metadata Types
report_to_user("Cloning Metadata Types:")
results["metadata_types"] = self.metadata_types.bulk_add(origin_index.metadata_types.get_all_docs(),
Expand Down Expand Up @@ -2020,55 +2109,12 @@ def transaction(self) -> AbstractTransaction:
:return: a Transaction context manager for this index.
"""

@abstractmethod
def create_spatial_index(self, crs: CRS) -> bool:
"""
Create a spatial index using the nominated CRS.

:param crs: The CRS to use in the spatial index.
:return: True is the index was successfully created or already exists.
None if spatial indexes are not supported.
"""

def thread_transaction(self) -> Optional["AbstractTransaction"]:
"""
:return: The existing Transaction object cached in thread-local storage for this index, if there is one.
"""
return thread_local_cache(f"txn-{self.index_id}", None)

def spatial_indexes(self, refresh=False) -> Iterable[CRS]:
"""
Return a list of CRSs for which spatiotemporal indexes exist in the database.

:param refresh: If true, re-read from database record (e.g. to catch spatial
indexes recently created in another datacube session.
:return:
"""
_LOG.warning("Spatial index API is unstable and may change between releases.")
return []

def update_spatial_index(self,
crses: Sequence[CRS] = [],
product_names: Sequence[str] = [],
dataset_ids: Sequence[DSID] = []
) -> int:
"""
Update a spatial index
:param crs: CRSs for Spatial Indexes to update. Default=all indexes
:param product_names: Product names to update
:param dsids: Dataset IDs to update

If neither product_names nor dataset ids are supplied, update for all datasets.

If both are supplied, both the named products and identified datasets are updated.

If spatial indexes are not supported by the index driver, always return zero.

:return: Number of spatial index entries updated or verified as unindexed.
"""
_LOG.warning("Spatial index API is unstable and may change between releases.")
return 0

def __enter__(self):
return self

Expand Down
5 changes: 5 additions & 0 deletions datacube/index/postgis/index.py
Expand Up @@ -58,6 +58,8 @@ class Index(AbstractIndex):
supports_external_home = True
# Postgis driver supports ACID database transactions
supports_transactions = True
# Postgis supports per-CRS spatial indexes
supports_spatial_indexes = True

def __init__(self, db: PostGisDb) -> None:
# POSTGIS driver is not stable with respect to database schema or internal APIs.
Expand Down Expand Up @@ -155,6 +157,9 @@ def update_spatial_index(self,
def __repr__(self):
return "Index<db={!r}>".format(self._db)

def drop_spatial_index(self, crs: CRS) -> bool:
return self._db.drop_spatial_index(crs)

@contextmanager
def _active_connection(self, transaction: bool = False) -> PostgisDbAPI:
"""
Expand Down
4 changes: 0 additions & 4 deletions datacube/index/postgres/index.py
Expand Up @@ -15,7 +15,6 @@
from datacube.index.abstract import AbstractIndex, AbstractIndexDriver, AbstractTransaction, \
default_metadata_type_docs
from datacube.model import MetadataType
from odc.geo import CRS

_LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -117,9 +116,6 @@ def index_id(self) -> str:
def transaction(self) -> AbstractTransaction:
return PostgresTransaction(self._db, self.index_id)

def create_spatial_index(self, crs: CRS) -> None:
_LOG.warning("postgres driver does not support spatio-temporal indexes")

def __repr__(self):
return "Index<db={!r}>".format(self._db)

Expand Down
1 change: 1 addition & 0 deletions datacube/scripts/cli_app.py
Expand Up @@ -13,6 +13,7 @@
import datacube.scripts.dataset # noqa: F401
import datacube.scripts.product # noqa: F401
import datacube.scripts.metadata # noqa: F401
import datacube.scripts.spindex # noqa: F401
import datacube.scripts.system # noqa: F401
import datacube.scripts.user # noqa: F401

Expand Down