Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Realtime Continuous Aggregate performance #5261

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -8,6 +8,7 @@ accidentally triggering the load of a previous DB version.**

**Features**
* #5212 Allow pushdown of reference table joins
* #5221 Improve Realtime Continuous Aggregate performance
* #5312 Add timeout support to the ping_data_node()
* #5361 Add parallel support for partialize_agg()
fabriziomello marked this conversation as resolved.
Show resolved Hide resolved
* #5252 Improve unique constraint support on compressed hypertables
Expand All @@ -26,6 +27,7 @@ accidentally triggering the load of a previous DB version.**
**Thanks**
* @nikolaps for reporting an issue with the COPY fetcher
* @S-imo-n for reporting the issue on Background Worker Scheduler crash
* @kovetskiy and @DZDomi for reporting peformance regression in Realtime Continuous Aggregates

## 2.10.1 (2023-03-07)

Expand Down
12 changes: 12 additions & 0 deletions sql/pre_install/tables.sql
Expand Up @@ -404,6 +404,18 @@ CREATE TABLE _timescaledb_catalog.continuous_aggs_invalidation_threshold (

SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.continuous_aggs_invalidation_threshold', '');

CREATE TABLE _timescaledb_catalog.continuous_aggs_watermark (
erimatnor marked this conversation as resolved.
Show resolved Hide resolved
mat_hypertable_id integer NOT NULL,
watermark bigint NOT NULL,
-- table constraints
CONSTRAINT continuous_aggs_watermark_pkey PRIMARY KEY (mat_hypertable_id),
CONSTRAINT continuous_aggs_watermark_mat_hypertable_id_fkey FOREIGN KEY (mat_hypertable_id) REFERENCES _timescaledb_catalog.continuous_agg (mat_hypertable_id) ON DELETE CASCADE
);

SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.continuous_aggs_watermark', '');



-- this does not have an FK on the materialization table since INSERTs to this
-- table are performance critical
CREATE TABLE _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log (
Expand Down
15 changes: 15 additions & 0 deletions sql/updates/latest-dev.sql
Expand Up @@ -2,3 +2,18 @@ DROP FUNCTION _timescaledb_internal.ping_data_node(NAME);

CREATE FUNCTION _timescaledb_internal.ping_data_node(node_name NAME, timeout INTERVAL = NULL) RETURNS BOOLEAN
AS '@MODULE_PATHNAME@', 'ts_data_node_ping' LANGUAGE C VOLATILE;

CREATE TABLE _timescaledb_catalog.continuous_aggs_watermark (
mat_hypertable_id integer NOT NULL,
watermark bigint NOT NULL,
-- table constraints
CONSTRAINT continuous_aggs_watermark_pkey PRIMARY KEY (mat_hypertable_id),
CONSTRAINT continuous_aggs_watermark_mat_hypertable_id_fkey FOREIGN KEY (mat_hypertable_id) REFERENCES _timescaledb_catalog.continuous_agg (mat_hypertable_id) ON DELETE CASCADE
);

GRANT SELECT ON _timescaledb_catalog.continuous_aggs_watermark TO PUBLIC;

SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.continuous_aggs_watermark', '');

CREATE FUNCTION _timescaledb_internal.cagg_watermark_materialized(hypertable_id INTEGER)
RETURNS INT8 AS '@MODULE_PATHNAME@', 'ts_continuous_agg_watermark_materialized' LANGUAGE C STABLE STRICT PARALLEL SAFE;
20 changes: 19 additions & 1 deletion sql/updates/post-update.sql
Expand Up @@ -124,7 +124,6 @@ BEGIN
END IF;
END $$;


-- Create dimension partition information for existing space-partitioned hypertables
CREATE FUNCTION _timescaledb_internal.update_dimension_partition(hypertable REGCLASS) RETURNS VOID AS '@MODULE_PATHNAME@', 'ts_dimension_partition_update' LANGUAGE C VOLATILE;
SELECT _timescaledb_internal.update_dimension_partition(format('%I.%I', h.schema_name, h.table_name))
Expand All @@ -148,3 +147,22 @@ BEGIN
RAISE WARNING 'Continuous Aggregate: % with old format will not be supported with PG15. You should upgrade to the new format', cagg_name;
END LOOP;
END $$;

-- Create watermark record when required
DO
$$
DECLARE
ts_version TEXT;
BEGIN
SELECT extversion INTO ts_version FROM pg_extension WHERE extname = 'timescaledb';
IF ts_version >= '2.11.0' THEN
INSERT INTO _timescaledb_catalog.continuous_aggs_watermark (mat_hypertable_id, watermark)
SELECT a.mat_hypertable_id, _timescaledb_internal.cagg_watermark_materialized(a.mat_hypertable_id)
FROM _timescaledb_catalog.continuous_agg a
LEFT JOIN _timescaledb_catalog.continuous_aggs_watermark b ON b.mat_hypertable_id = a.mat_hypertable_id
WHERE b.mat_hypertable_id IS NULL
ORDER BY 1;
END IF;
END;
$$;

6 changes: 6 additions & 0 deletions sql/updates/reverse-dev.sql
Expand Up @@ -4,3 +4,9 @@ CREATE OR REPLACE FUNCTION _timescaledb_internal.ping_data_node(node_name NAME)
AS '@MODULE_PATHNAME@', 'ts_data_node_ping' LANGUAGE C VOLATILE;

DROP FUNCTION IF EXISTS _timescaledb_internal.get_approx_row_count(REGCLASS);

ALTER EXTENSION timescaledb DROP TABLE _timescaledb_catalog.continuous_aggs_watermark;

DROP TABLE IF EXISTS _timescaledb_catalog.continuous_aggs_watermark;

DROP FUNCTION IF EXISTS _timescaledb_internal.cagg_watermark_materialized(hypertable_id INTEGER);
3 changes: 3 additions & 0 deletions sql/util_time.sql
Expand Up @@ -72,5 +72,8 @@ RETURNS BIGINT AS '@MODULE_PATHNAME@', 'ts_time_to_internal' LANGUAGE C VOLATILE
CREATE OR REPLACE FUNCTION _timescaledb_internal.cagg_watermark(hypertable_id INTEGER)
RETURNS INT8 AS '@MODULE_PATHNAME@', 'ts_continuous_agg_watermark' LANGUAGE C STABLE STRICT PARALLEL SAFE;

CREATE OR REPLACE FUNCTION _timescaledb_internal.cagg_watermark_materialized(hypertable_id INTEGER)
RETURNS INT8 AS '@MODULE_PATHNAME@', 'ts_continuous_agg_watermark_materialized' LANGUAGE C STABLE STRICT PARALLEL SAFE;

CREATE OR REPLACE FUNCTION _timescaledb_internal.subtract_integer_from_now( hypertable_relid REGCLASS, lag INT8 )
RETURNS INT8 AS '@MODULE_PATHNAME@', 'ts_subtract_integer_from_now' LANGUAGE C STABLE STRICT;
15 changes: 14 additions & 1 deletion src/chunk.c
Expand Up @@ -65,6 +65,7 @@
#include "ts_catalog/chunk_data_node.h"
#include "ts_catalog/compression_chunk_size.h"
#include "ts_catalog/continuous_agg.h"
#include "ts_catalog/continuous_aggs_watermark.h"
#include "ts_catalog/hypertable_data_node.h"
#include "utils.h"

Expand Down Expand Up @@ -3893,7 +3894,7 @@ ts_chunk_do_drop_chunks(Hypertable *ht, int64 older_than, int64 newer_than, int3
Chunk *chunks;
const char *schema_name, *table_name;
const int32 hypertable_id = ht->fd.id;
bool has_continuous_aggs;
bool has_continuous_aggs, is_materialization_hypertable;
fabriziomello marked this conversation as resolved.
Show resolved Hide resolved
const MemoryContext oldcontext = CurrentMemoryContext;
ScanTupLock tuplock = {
.waitpolicy = LockWaitBlock,
Expand All @@ -3913,13 +3914,17 @@ ts_chunk_do_drop_chunks(Hypertable *ht, int64 older_than, int64 newer_than, int3
* well. Do not unlock - let the transaction semantics take care of it. */
lock_referenced_tables(ht->main_table_relid);

is_materialization_hypertable = false;

fabriziomello marked this conversation as resolved.
Show resolved Hide resolved
switch (ts_continuous_agg_hypertable_status(hypertable_id))
{
case HypertableIsMaterialization:
has_continuous_aggs = false;
is_materialization_hypertable = true;
break;
case HypertableIsMaterializationAndRaw:
has_continuous_aggs = true;
is_materialization_hypertable = true;
break;
case HypertableIsRawTable:
has_continuous_aggs = true;
Expand Down Expand Up @@ -4029,6 +4034,14 @@ ts_chunk_do_drop_chunks(Hypertable *ht, int64 older_than, int64 newer_than, int3
}
}

/* When dropping chunks for a given CAgg then force set the watermark */
if (is_materialization_hypertable)
{
bool isnull;
int64 watermark = ts_hypertable_get_open_dim_max_value(ht, 0, &isnull);
ts_cagg_watermark_update(ht, watermark, isnull, true);
}

if (affected_data_nodes)
*affected_data_nodes = data_nodes;

Expand Down
9 changes: 6 additions & 3 deletions src/hypertable.c
Expand Up @@ -2902,20 +2902,23 @@ ts_hypertable_func_call_on_data_nodes(const Hypertable *ht, FunctionCallInfo fci
/*
* Get the max value of an open dimension.
*/
Datum
int64
ts_hypertable_get_open_dim_max_value(const Hypertable *ht, int dimension_index, bool *isnull)
{
StringInfo command;
const Dimension *dim;
int res;
bool max_isnull;
Datum maxdat;
Oid timetype;

dim = hyperspace_get_open_dimension(ht->space, dimension_index);

if (NULL == dim)
elog(ERROR, "invalid open dimension index %d", dimension_index);

timetype = ts_dimension_get_partition_type(dim);

/*
* Query for the last bucket in the materialized hypertable.
* Since this might be run as part of a parallel operation
Expand All @@ -2941,7 +2944,7 @@ ts_hypertable_get_open_dim_max_value(const Hypertable *ht, int dimension_index,
(errmsg("could not find the maximum time value for hypertable \"%s\"",
get_rel_name(ht->main_table_relid)))));

Ensure(SPI_gettypeid(SPI_tuptable->tupdesc, 1) == ts_dimension_get_partition_type(dim),
Ensure(SPI_gettypeid(SPI_tuptable->tupdesc, 1) == timetype,
"partition types for result (%d) and dimension (%d) do not match",
SPI_gettypeid(SPI_tuptable->tupdesc, 1),
ts_dimension_get_partition_type(dim));
Expand All @@ -2953,7 +2956,7 @@ ts_hypertable_get_open_dim_max_value(const Hypertable *ht, int dimension_index,
if ((res = SPI_finish()) != SPI_OK_FINISH)
elog(ERROR, "SPI_finish failed: %s", SPI_result_code_string(res));

return maxdat;
return max_isnull ? ts_time_get_min(timetype) : ts_time_value_to_internal(maxdat, timetype);
}

bool
Expand Down
2 changes: 1 addition & 1 deletion src/hypertable.h
Expand Up @@ -165,7 +165,7 @@ extern TSDLLEXPORT List *ts_hypertable_get_available_data_node_server_oids(const
extern TSDLLEXPORT HypertableType ts_hypertable_get_type(const Hypertable *ht);
extern TSDLLEXPORT void ts_hypertable_func_call_on_data_nodes(const Hypertable *ht,
FunctionCallInfo fcinfo);
extern TSDLLEXPORT Datum ts_hypertable_get_open_dim_max_value(const Hypertable *ht,
extern TSDLLEXPORT int64 ts_hypertable_get_open_dim_max_value(const Hypertable *ht,
int dimension_index, bool *isnull);

extern TSDLLEXPORT bool ts_hypertable_has_compression_table(const Hypertable *ht);
Expand Down
1 change: 1 addition & 0 deletions src/ts_catalog/CMakeLists.txt
Expand Up @@ -3,6 +3,7 @@ set(SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/chunk_data_node.c
${CMAKE_CURRENT_SOURCE_DIR}/compression_chunk_size.c
${CMAKE_CURRENT_SOURCE_DIR}/continuous_agg.c
${CMAKE_CURRENT_SOURCE_DIR}/continuous_aggs_watermark.c
${CMAKE_CURRENT_SOURCE_DIR}/dimension_partition.c
${CMAKE_CURRENT_SOURCE_DIR}/hypertable_compression.c
${CMAKE_CURRENT_SOURCE_DIR}/hypertable_data_node.c
Expand Down
10 changes: 10 additions & 0 deletions src/ts_catalog/catalog.c
Expand Up @@ -120,6 +120,10 @@ static const TableInfoDef catalog_table_names[_MAX_CATALOG_TABLES + 1] = {
.schema_name = INTERNAL_SCHEMA_NAME,
.table_name = JOB_ERRORS_TABLE_NAME,
},
[CONTINUOUS_AGGS_WATERMARK] = {
.schema_name = CATALOG_SCHEMA_NAME,
.table_name = CONTINUOUS_AGGS_WATERMARK_TABLE_NAME,
},
[_MAX_CATALOG_TABLES] = {
.schema_name = "invalid schema",
.table_name = "invalid table",
Expand Down Expand Up @@ -252,6 +256,12 @@ static const TableIndexDef catalog_table_index_definitions[_MAX_CATALOG_TABLES]
[CONTINUOUS_AGGS_MATERIALIZATION_INVALIDATION_LOG_IDX] = "continuous_aggs_materialization_invalidation_log_idx",
},
},
[CONTINUOUS_AGGS_WATERMARK] = {
.length = _MAX_CONTINUOUS_AGGS_WATERMARK_INDEX,
.names = (char *[]) {
[CONTINUOUS_AGGS_WATERMARK_PKEY] = "continuous_aggs_watermark_pkey",
},
},
[HYPERTABLE_COMPRESSION] = {
.length = _MAX_HYPERTABLE_COMPRESSION_INDEX,
.names = (char *[]) {
Expand Down
34 changes: 34 additions & 0 deletions src/ts_catalog/catalog.h
Expand Up @@ -57,6 +57,7 @@ typedef enum CatalogTable
CHUNK_COPY_OPERATION,
CONTINUOUS_AGGS_BUCKET_FUNCTION,
JOB_ERRORS,
CONTINUOUS_AGGS_WATERMARK,
/* Don't forget updating catalog.c when adding new tables! */
_MAX_CATALOG_TABLES,
} CatalogTable;
Expand Down Expand Up @@ -1172,6 +1173,39 @@ typedef enum Anum_continuous_aggs_materialization_invalidation_log_idx
#define Natts_continuous_aggs_materialization_invalidation_log_idx \
(_Anum_continuous_aggs_materialization_invalidation_log_idx_max - 1)

/****** CONTINUOUS_AGGS_WATERMARK_TABLE definitions*/
#define CONTINUOUS_AGGS_WATERMARK_TABLE_NAME "continuous_aggs_watermark"
typedef enum Anum_continuous_aggs_watermark
{
Anum_continuous_aggs_watermark_mat_hypertable_id = 1,
Anum_continuous_aggs_watermark_watermark,
_Anum_continuous_aggs_watermark_max,
} Anum_continuous_aggs_watermark;

#define Natts_continuous_aggs_watermark (_Anum_continuous_aggs_watermark_max - 1)

typedef struct FormData_continuous_aggs_watermark
{
int32 mat_hypertable_id;
int64 watermark;
} FormData_continuous_aggs_watermark;

typedef FormData_continuous_aggs_watermark *Form_continuous_aggs_watermark;

enum
{
CONTINUOUS_AGGS_WATERMARK_PKEY = 0,
_MAX_CONTINUOUS_AGGS_WATERMARK_INDEX,
};

typedef enum Anum_continuous_aggs_watermark_pkey
{
Anum_continuous_aggs_watermark_pkey_mat_hypertable_id = 1,
_Anum_continuous_aggs_watermark_pkey_max,
} Anum_continuous_aggs_watermark_pkey;

#define Natts_continuous_aggs_watermark_pkey (_Anum_continuous_aggs_watermark_pkey_max - 1)

#define HYPERTABLE_COMPRESSION_TABLE_NAME "hypertable_compression"
typedef enum Anum_hypertable_compression
{
Expand Down