Skip to content

Commit

Permalink
Alembic-managed schema migrations for postgis driver. (cleaned up git…
Browse files Browse the repository at this point in the history
… history) (#1520)

* Add Alembic as dependency.

* Get Alembic integrated with config API and postgis schema.

* Get Alembic checking database version from datacube CLI.

* More alembic integration and test fixes.

* Get Alembic CLI working again.

* Initial version.

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Lintage.

* Doctest fixes.

* Add alembic to conda env.

* update config in docker test bootstrap.

* update config in docker test bootstrap.

* UTC TZ is showing up differently locally and in docker test harness?

* Bit more test coverage.

* Lintage

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
SpacemanPaul and pre-commit-ci[bot] committed Dec 19, 2023
1 parent 5be0e6e commit 9e31d17
Show file tree
Hide file tree
Showing 19 changed files with 354 additions and 32 deletions.
26 changes: 26 additions & 0 deletions alembic.ini
@@ -0,0 +1,26 @@
# This file is part of the Open Data Cube, see https://opendatacube.org for more information
#
# Copyright (c) 2015-2023 ODC Contributors
# SPDX-License-Identifier: Apache-2.0
#
# Alembic config for postgis index driver
# May be expanded in future to handle alembic environments for other index drivers.

[alembic]
# path to migration scripts
script_location = %(here)s/datacube/drivers/postgis/alembic

# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .

# version_path_separator = :
# version_path_separator = space
version_path_separator = os # Use os.pathsep. Default configuration used for new projects.

[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples

# logging configuration is not used as it breaks our CLI test framework.
1 change: 1 addition & 0 deletions conda-environment.yml
Expand Up @@ -33,6 +33,7 @@ dependencies:
- rasterio >=1.3.2
- ruamel.yaml
- sqlalchemy >=2.0
- alembic
- GeoAlchemy2
- xarray >=0.9
- toolz
Expand Down
8 changes: 5 additions & 3 deletions datacube/drivers/postgis/_connections.py
Expand Up @@ -172,7 +172,7 @@ def init(self, with_permissions=True):
:return: If it was newly created.
"""
is_new = _core.ensure_db(self._engine, with_permissions=with_permissions)
if not is_new:
if not is_new and not _core.schema_is_latest(self._engine):
_core.update_schema(self._engine)

return is_new
Expand Down Expand Up @@ -249,15 +249,17 @@ def _connect(self):
finally:
connection.close()

def give_me_a_connection(self):
def _give_me_a_connection(self):
# A Raw connection outside of the pool, caller is responsible for closing.
# (Used by transaction API)
return self._engine.connect()

@classmethod
def get_dataset_fields(cls, metadata_type_definition):
return _api.get_dataset_fields(metadata_type_definition)

def __repr__(self):
return "PostgresDb<engine={!r}>".format(self._engine)
return "PostgisDb<engine={!r}>".format(self._engine)


def handle_dynamic_token_authentication(engine: Engine,
Expand Down
57 changes: 48 additions & 9 deletions datacube/drivers/postgis/_core.py
Expand Up @@ -7,11 +7,16 @@
"""

import logging
import os

from sqlalchemy import MetaData, inspect, text
from sqlalchemy.engine import Engine
from sqlalchemy.schema import CreateSchema
from sqlalchemy.sql.ddl import DropSchema
from alembic import command, config
from alembic.migration import MigrationContext
from alembic.script import ScriptDirectory
from alembic.runtime.environment import EnvironmentContext

from datacube.drivers.postgis.sql import (INSTALL_TRIGGER_SQL_TEMPLATE,
SCHEMA_NAME, TYPES_INIT_SQL,
Expand All @@ -32,6 +37,18 @@
# tix: test-index, created by hand for testing, particularly in dev.
}

SRC_CODE_ROOT = os.path.dirname( # Source code root
os.path.dirname( # datacube
os.path.dirname( # drivers
os.path.dirname( # postgis
__file__ # This file
)
)
)
)

ALEMBIC_INI_LOCATION = os.path.join(SRC_CODE_ROOT, "alembic.ini")

METADATA = MetaData(naming_convention=SQL_NAMING_CONVENTIONS, schema=SCHEMA_NAME)

_LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -113,6 +130,10 @@ def ensure_db(engine, with_permissions=True):
if with_permissions:
c.execute(text(f'set role {quoted_user}'))
c.commit()
# Stamp with latest Alembic revision
alembic_cfg = config.Config(ALEMBIC_INI_LOCATION)
alembic_cfg.attributes["connection"] = c
command.stamp(alembic_cfg, "head")

if with_permissions:
_LOG.info('Adding role grants.')
Expand Down Expand Up @@ -156,16 +177,30 @@ def schema_is_latest(engine: Engine) -> bool:
See the ``update_schema()`` function below for actually applying the updates.
"""
# In lieu of a versioned schema, we typically check by seeing if one of the objects
# from the change exists.
#
# Eg.
# return pg_column_exists(engine, schema_qualified('dataset_location'), 'archived')
#
# ie. Does the 'archived' column exist? If so, we know the related schema was applied.

# No schema changes recently. Everything is perfect.
return True

cfg = config.Config(ALEMBIC_INI_LOCATION)
scriptdir = ScriptDirectory.from_config(cfg)
# NB this assumes a single unbranched migration branch
# Get Head revision from Alembic environment
with EnvironmentContext(cfg, scriptdir) as env_ctx:
latest_rev = env_ctx.get_head_revision()
# Get current revision from database
with engine.connect() as conn:
context = MigrationContext.configure(
connection=conn,
environment_context=env_ctx,
opts={"version_table_schema": "odc"}
)
current_rev = context.get_current_revision()

# Do they match?
if latest_rev == current_rev:
return True
import warnings
warnings.warn(f"Current Alembic schema revision is {current_rev} expected {latest_rev}")
return False


def update_schema(engine: Engine):
Expand All @@ -177,7 +212,11 @@ def update_schema(engine: Engine):
See the `schema_is_latest()` function above: this should apply updates
that it requires.
"""
# TODO: implement migrations
cfg = config.Config(ALEMBIC_INI_LOCATION)
with engine.begin() as conn:
cfg.attributes["connection"] = conn
print("Running upgrade")
command.upgrade(cfg, "head")


def _ensure_extension(conn, extension_name="POSTGIS"):
Expand Down
3 changes: 3 additions & 0 deletions datacube/drivers/postgis/_schema.py
Expand Up @@ -335,3 +335,6 @@ class DatasetSearchDateTime:
DatasetSearchString.__table__, DatasetSearchNumeric.__table__,
DatasetSearchDateTime.__table__,
]


MetadataObj = MetadataType.__table__.metadata
12 changes: 12 additions & 0 deletions datacube/drivers/postgis/_spatial.py
Expand Up @@ -90,6 +90,18 @@ def _mint_new_spindex(self, epsg: int):
return orm_registry.mapped(type(f'SpatialIdx{epsg}', (SpatialIndex,), attributes))


def is_spindex_table_name(name: str):
bits = name.split("_")
if len(bits) == 2:
if bits[0] == "spatial":
try:
srid = int(bits[1])
return srid > 0
except ValueError:
pass
return False


def spindex_for_epsg(epsg: int) -> Type[SpatialIndex]:
"""Return ORM class of a SpatialIndex for EPSG/SRID - dynamically creating if necessary"""
sir = SpatialIndexORMRegistry()
Expand Down
1 change: 1 addition & 0 deletions datacube/drivers/postgis/alembic/README
@@ -0,0 +1 @@
This is the Alembic environment for the ODC postgis environment.
158 changes: 158 additions & 0 deletions datacube/drivers/postgis/alembic/env.py
@@ -0,0 +1,158 @@
# This file is part of the Open Data Cube, see https://opendatacube.org for more information
#
# Copyright (c) 2015-2023 ODC Contributors
# SPDX-License-Identifier: Apache-2.0
from alembic import context

from datacube.cfg import ODCConfig
from datacube.drivers.postgis._connections import PostGisDb
from datacube.drivers.postgis._schema import MetadataObj
from datacube.drivers.postgis._spatial import is_spindex_table_name
from datacube.drivers.postgis.sql import SCHEMA_NAME

# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
try:
config = context.config
except AttributeError:
# Occurs when being scanned for doctests.
config = None

# Interpret the config file for Python logging.
# This line sets up loggers basically.
#
# Doesn't play nice with rest of ODC's management of logging.
#
# if config.config_file_name is not None:
# fileConfig(config.config_file_name)

# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = MetadataObj

# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.


def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
context.configure(
dialect_name="postgresql",
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
version_table_schema="odc",
)

with context.begin_transaction():
context.run_migrations()


def include_name(name, type_, parent_names):
if type_ == "table":
# Ignore postgis system table
if name == "spatial_ref_sys" and parent_names["schema_name"] is None:
return False

# Ignore dynamically generated spatial index tables
if is_spindex_table_name(name):
return False

# Include other tables
return True
elif type_ == "schema":
if name is None or name == SCHEMA_NAME:
# Monitor default and odc schema
return True
else:
# Ignore any other schemas
return False
elif type_ == "column":
if name == "updated" and parent_names["schema_name"] == SCHEMA_NAME:
# Ignore updated columns with triggers - handled manually
return False
# Include other columns
return True
else:
# Include any constraints, indexes, etc, that made it this far.
return True


def get_odc_env():
# In active Alembic Config?
cfg = config.attributes.get('cfg')
env = config.attributes.get('env')
raw_config = config.attributes.get('raw_config')
if not (cfg or env or raw_config):
# No? How about from alembic CLI -X args?
x_args = context.get_x_argument(as_dictionary=True)
cfg = x_args.get('cfg')
if cfg:
cfg = cfg.split(':')
env = x_args.get('env')
raw_config = x_args.get('raw_config')
return ODCConfig.get_environment(env=env, config=cfg, raw_config=raw_config)


def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
# An active postgis Connection:
connection = config.attributes.get("connection")
if connection:
run_migration_with_connection(connection)
return

# ODC index:
index = config.attributes.get("index")
if index:
connectable = index._db._engine
else:
db = PostGisDb.create(
get_odc_env(),
application_name="migration",
validate=False
)
connectable = db._engine

with connectable.connect() as connection:
run_migration_with_connection(connection)


def run_migration_with_connection(connection):
context.configure(
connection=connection,
target_metadata=target_metadata,
version_table_schema=SCHEMA_NAME,
include_schemas=True,
include_name=include_name,
)
with context.begin_transaction():
context.run_migrations()


if config is None:
# See comment above
pass
elif context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
30 changes: 30 additions & 0 deletions datacube/drivers/postgis/alembic/script.py.mako
@@ -0,0 +1,30 @@
# This file is part of the Open Data Cube, see https://opendatacube.org for more information
#
# Copyright (c) 2015-2023 ODC Contributors
# SPDX-License-Identifier: Apache-2.0
"""${message}

Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa
${imports if imports else ""}

# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}


def upgrade() -> None:
${upgrades if upgrades else "pass"}


def downgrade() -> None:
${downgrades if downgrades else "pass"}

0 comments on commit 9e31d17

Please sign in to comment.