Skip to content

Commit

Permalink
GEOMESA-3307 Partitioned PostGIS CLI - Support update schema (#3006)
Browse files Browse the repository at this point in the history
  • Loading branch information
elahrvivaz committed Nov 6, 2023
1 parent cb3cc9e commit 789a78e
Show file tree
Hide file tree
Showing 15 changed files with 618 additions and 99 deletions.
21 changes: 20 additions & 1 deletion docs/user/postgis/commandline.rst
Expand Up @@ -4,7 +4,6 @@ Partitioned PostGIS Command-Line Tools
The partitioned PostGIS data store is bundled with the ``geomesa-gt`` command-line tools. See :ref:`gt_tools` for
additional details.


Commands
--------

Expand All @@ -21,3 +20,23 @@ Argument Description
======================== =========================================================
``-f, --feature-name *`` The name of the schema
======================== =========================================================

.. _postgis_cli_update_schema:

``update-schema``
-----------------

Alter an existing ``SimpleFeatureType``. For PostGIS, this command can only be used to modify configuration
values. See :ref:`postgis_index_config` for available configuration values.

This command will also re-write the partition procedures as necessary to apply the configuration changes.

======================== ==============================================================
Argument Description
======================== ==============================================================
``-f, --feature-name *`` The name of the schema to operate on
``--add-user-data`` Add or update an entry in the feature type user data
======================== ==============================================================

The ``--add-user-data`` parameter can be used to add or update any user data key. See :ref:`postgis_index_config` for
some examples of configurable values. Entries can be specified as ``<key>:<value>``.
35 changes: 33 additions & 2 deletions docs/user/postgis/index_config.rst
@@ -1,22 +1,34 @@
.. _postgis_index_config:

Partitioned PostGIS Index Configuration
=======================================

GeoMesa exposes a variety of configuration options that can be used to customize and optimize a given installation.
See :ref:`set_sft_options` for details on setting configuration parameters. Note that most of the general options
for GeoMesa stores are not supported by the partitioned PostGIS store, except as specified below.

.. note::

Most configurations can be updated after a schema has been created. See below for details
specific to each configuration.

Configuring the Default Date Attribute
--------------------------------------

The default date attribute is the attribute that will be used for sorting data into partitions. See
:ref:`set_date_attribute` for details on how to specify it.

The default date cannot be changed after the schema has been created.

Configuring Indices
-------------------

Attributes in the feature type may be marked for indexing, which will create a B-tree index on the associated
table column. See :ref:`attribute_indices` for details on how to specify indices.

After the schema has been created, additional indices can be added through ``CREATE INDEX`` statements on the
parent partition tables. See :ref:`pg_partition_table_design` for a description of the partition tables.

Configuring Partition Size
--------------------------

Expand All @@ -34,6 +46,11 @@ Partition size is configured with the key ``pg.partitions.interval.hours``.
SimpleFeatureType sft = ....;
sft.getUserData().put("pg.partitions.interval.hours", "12");
After the schema has been created, changes to the partition size can be made through the
:ref:`postgis_cli_update_schema` command. Changes will not be applied to any existing partitions. If the partition
size is **increased**, any recent partitions that would overlap with the new partition size will need to be
manually dropped and the data re-inserted in the write-ahead table in order to prevent partition range conflict errors.

Configuring Index Resolution
----------------------------

Expand All @@ -51,6 +68,8 @@ The number of pages is configured with the key ``pg.partitions.pages-per-range``
SimpleFeatureType sft = ....;
sft.getUserData().put("pg.partitions.pages-per-range", "64");
The index resolution cannot be changed after the schema has been created.

Configuring Data Age-Off
------------------------

Expand All @@ -68,6 +87,10 @@ Age-off is configured with the key ``pg.partitions.max``.
SimpleFeatureType sft = ....;
sft.getUserData().put("pg.partitions.max", "14");
After the schema has been created, changes to the age-off can be made through the
:ref:`postgis_cli_update_schema` command, or by directly updating the ``geomesa_userdata`` table in Postgres.
Changes will take effect within the next 10 minutes.

.. _postgis_filter_world:

Configuring Filter Optimizations
Expand All @@ -86,6 +109,10 @@ which will ignore whole world filters.
// enable filtering on "whole world" queries
sft.getUserData().put("pg.partitions.filter.world", "true");
After the schema has been created, changes to the filter optimization can be made through the
:ref:`postgis_cli_update_schema` command, or by directly updating the ``geomesa_userdata`` table in Postgres.
Clients must be restarted in order to pick up the change.

Configuring Tablespaces
-----------------------

Expand All @@ -104,8 +131,9 @@ and ``pg.partitions.tablespace.main``. See :ref:`pg_partition_table_design` for
SimpleFeatureType sft = ....;
sft.getUserData().put("pg.partitions.tablespace.wa", "fasttablespace");
Once the schema has been created, the tablespaces are stored in the ``partition_tablespaces`` table. This table
can be modified manually to change the location used for new partitions.
After the schema has been created, changes to the configured tablespaces can be made through the
:ref:`postgis_cli_update_schema` command, or by directly updating the ``partition_tablespaces`` table in Postgres.
Changes will not be applied to any existing partitions.

Configuring the Maintenance Schedule
------------------------------------
Expand All @@ -125,3 +153,6 @@ for each query, moving data out of it faster may improve performance.
SimpleFeatureType sft = ....;
sft.getUserData().put("pg.partitions.cron.minute", "0");
After the schema has been created, changes to the schedule can be made through the
:ref:`postgis_cli_update_schema` command.
Expand Up @@ -165,19 +165,43 @@ class PartitionedPostgisDialect(store: JDBCDataStore) extends PostGISDialect(sto
metadata: DatabaseMetaData,
schemaName: String,
cx: Connection): Unit = {

import PartitionedPostgisDialect.Config._

// normally views get set to read-only, override that here since we use triggers to delegate writes
sft.getUserData.remove(JDBCDataStore.JDBC_READ_ONLY)

// populate user data
val sql = s"select key, value from ${escape(schemaName)}.${UserDataTable.Name.quoted} where type_name = ?"
WithClose(cx.prepareStatement(sql)) { statement =>
val userDataSql = s"select key, value from ${escape(schemaName)}.${UserDataTable.Name.quoted} where type_name = ?"
WithClose(cx.prepareStatement(userDataSql)) { statement =>
statement.setString(1, sft.getTypeName)
WithClose(statement.executeQuery()) { rs =>
while (rs.next()) {
sft.getUserData.put(rs.getString(1), rs.getString(2))
}
}
}

// populate tablespaces
val tablespaceSql =
s"select table_space, table_type from " +
s"${escape(schemaName)}.${PartitionTablespacesTable.Name.quoted} where type_name = ?"
WithClose(cx.prepareStatement(tablespaceSql)) { statement =>
statement.setString(1, sft.getTypeName)
WithClose(statement.executeQuery()) { rs =>
while (rs.next()) {
val ts = rs.getString(1)
if (ts != null && ts.nonEmpty) {
rs.getString(2) match {
case WriteAheadTableSuffix.raw => sft.getUserData.put(WriteAheadTableSpace, ts)
case PartitionedWriteAheadTableSuffix.raw => sft.getUserData.put(WriteAheadPartitionsTableSpace, ts)
case PartitionedTableSuffix.raw => sft.getUserData.put(MainTableSpace, ts)
case s => logger.warn(s"Ignoring unexpected tablespace table: $s")
}
}
}
}
}
}

override def preDropTable(schemaName: String, sft: SimpleFeatureType, cx: Connection): Unit = {
Expand Down Expand Up @@ -359,14 +383,23 @@ object PartitionedPostgisDialect {

object Config extends Conversions {

val IntervalHours = "pg.partitions.interval.hours"
val PagesPerRange = "pg.partitions.pages-per-range"
val MaxPartitions = "pg.partitions.max"
val WriteAheadTableSpace = "pg.partitions.tablespace.wa"
// size of each partition - can be updated after schema is created, but requires
// running PartitionedPostgisDialect.upgrade in order to be applied
val IntervalHours = "pg.partitions.interval.hours"
// pages_per_range on the BRIN index - can't be updated after schema is created
val PagesPerRange = "pg.partitions.pages-per-range"
// max partitions to keep, i.e. age-off - can be updated freely after schema is created
val MaxPartitions = "pg.partitions.max"
// minute of each 10 minute block to execute the partition jobs - can be updated after schema is created,
// but requires running PartitionedPostgisDialect.upgrade in order to be applied
val CronMinute = "pg.partitions.cron.minute"
// remove 'whole world' filters - can be updated freely after schema is created
val FilterWholeWorld = "pg.partitions.filter.world"

// tablespace configurations - can be updated freely after the schema is created
val WriteAheadTableSpace = "pg.partitions.tablespace.wa"
val WriteAheadPartitionsTableSpace = "pg.partitions.tablespace.wa-partitions"
val MainTableSpace = "pg.partitions.tablespace.main"
val CronMinute = "pg.partitions.cron.minute"
val FilterWholeWorld = "pg.partitions.filter.world"
val MainTableSpace = "pg.partitions.tablespace.main"

implicit class ConfigConversions(val sft: SimpleFeatureType) extends AnyVal {
def getIntervalHours: Int = Option(sft.getUserData.get(IntervalHours)).map(int).getOrElse(6)
Expand Down
Expand Up @@ -620,9 +620,8 @@ package object dialect {
val create =
s"""DO $$$$
|BEGIN
| IF NOT EXISTS (SELECT FROM cron.job WHERE jobname = $jName) THEN
| PERFORM cron.schedule($jName, ${schedule(info).quoted}, ${invocation(info).quoted});
| END IF;
|${unscheduleSql(jName)}
| PERFORM cron.schedule($jName, ${schedule(info).quoted}, ${invocation(info).quoted});
|END$$$$;""".stripMargin
Seq(create)
}
Expand All @@ -632,12 +631,15 @@ package object dialect {
val drop =
s"""DO $$$$
|BEGIN
| IF EXISTS (SELECT FROM cron.job WHERE jobname = $jName) THEN
| PERFORM cron.unschedule($jName);
| END IF;
|${unscheduleSql(jName)}
|END$$$$;""".stripMargin
Seq(drop) ++ super.dropStatements(info)
}

protected def unscheduleSql(quotedName: String): String =
s""" IF EXISTS (SELECT FROM cron.job WHERE jobname = $quotedName) THEN
| PERFORM cron.unschedule($quotedName);
| END IF;""".stripMargin
}

/**
Expand Down
Expand Up @@ -21,17 +21,27 @@ class PartitionTablespacesTable extends Sql {
val Name: TableName = TableName("partition_tablespaces")

override def create(info: TypeInfo)(implicit ex: ExecutionContext): Unit = {
val table = s"${info.schema.quoted}.${Name.quoted}"
val table = TableIdentifier(info.schema.raw, Name.raw)
val cName = TableName(Name.raw + "_pkey")
val create =
s"""CREATE TABLE IF NOT EXISTS $table (
s"""CREATE TABLE IF NOT EXISTS ${table.quoted} (
| type_name text not null,
| table_type text not null,
| table_space text
|);""".stripMargin
ex.execute(create)
val constraint =
s"""DO $$$$
|BEGIN
| IF NOT EXISTS (SELECT FROM pg_constraint WHERE conname = ${cName.asLiteral} AND conrelid = ${table.asRegclass}) THEN
| ALTER TABLE ${table.quoted} ADD CONSTRAINT ${cName.quoted} PRIMARY KEY (type_name, table_type);
| END IF;
|END$$$$;""".stripMargin

Seq(create, constraint).foreach(ex.execute)

val insertSql =
s"INSERT INTO $table (type_name, table_type, table_space) VALUES (?, ?, ?) ON CONFLICT DO NOTHING;"
s"INSERT INTO ${table.quoted} (type_name, table_type, table_space) VALUES (?, ?, ?) " +
"ON CONFLICT (type_name, table_type) DO UPDATE SET table_space = EXCLUDED.table_space;"

def insert(suffix: String, table: TableConfig): Unit =
ex.executeUpdate(insertSql, Seq(info.typeName, suffix, table.tablespace.map(_.raw).orNull))
Expand Down
Expand Up @@ -54,9 +54,6 @@ class UserDataTable extends Sql {
insert(Config.IntervalHours, Some(Integer.toString(info.partitions.hoursPerPartition)))
insert(Config.PagesPerRange, Some(Integer.toString(info.partitions.pagesPerRange)))
insert(Config.MaxPartitions, info.partitions.maxPartitions.map(Integer.toString))
insert(Config.WriteAheadTableSpace, info.tables.writeAhead.tablespace.map(_.raw))
insert(Config.WriteAheadPartitionsTableSpace, info.tables.writeAheadPartitions.tablespace.map(_.raw))
insert(Config.MainTableSpace, info.tables.mainPartitions.tablespace.map(_.raw))
insert(Config.CronMinute, info.partitions.cronMinute.map(Integer.toString))
insert(Config.FilterWholeWorld, info.userData.get(Config.FilterWholeWorld))
}
Expand Down
4 changes: 4 additions & 0 deletions geomesa-gt/geomesa-gt-tools/pom.xml
Expand Up @@ -92,6 +92,10 @@
<groupId>org.specs2</groupId>
<artifactId>specs2-junit_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
</dependency>
</dependencies>

<build>
Expand Down

0 comments on commit 789a78e

Please sign in to comment.