From 763edc6cd41488b7d357f895439b5da7af7be6ab Mon Sep 17 00:00:00 2001 From: Zoltan Haindrich Date: Fri, 10 Feb 2023 13:22:09 +0000 Subject: [PATCH] Accept all compression options on caggs Enable to properly handle 'compress_segmentby' and 'compress_orderby' compression options on continous aggregates. ALTER MATERIALIZED VIEW test_table_cagg SET ( timescaledb.compress = true, timescaledb.compress_segmentby = 'device_id' ); Fixes #5161 --- CHANGELOG.md | 1 + src/compression_with_clause.h | 1 + src/ts_catalog/continuous_agg.c | 60 +++++++++++++++++++- src/ts_catalog/continuous_agg.h | 7 +++ src/with_clause_parser.c | 18 ++++++ src/with_clause_parser.h | 3 + tsl/src/continuous_aggs/options.c | 45 +++++++++------ tsl/test/expected/cagg_errors.out | 5 +- tsl/test/expected/cagg_errors_deprecated.out | 5 +- tsl/test/expected/compression_ddl.out | 43 +++++++++++++- tsl/test/sql/compression_ddl.sql | 44 +++++++++++++- 11 files changed, 202 insertions(+), 30 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index af75afc76cc..4eef877cb7e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ accidentally triggering the load of a previous DB version.** * #5245 Mange life-cycle of connections via memory contexts * #5246 Make connection establishment interruptible * #5253 Make data node command execution interruptible +* #5262 Extend enabling compression on a continuous aggregrate with 'compress_segmentby' and 'compress_orderby' parameters **Bugfixes** * #4926 Fix corruption when inserting into compressed chunks diff --git a/src/compression_with_clause.h b/src/compression_with_clause.h index 1863914512d..ddbf5d92ec4 100644 --- a/src/compression_with_clause.h +++ b/src/compression_with_clause.h @@ -19,6 +19,7 @@ typedef enum CompressHypertableOption CompressSegmentBy, CompressOrderBy, CompressChunkTimeInterval, + CompressOptionMax } CompressHypertableOption; typedef struct diff --git a/src/ts_catalog/continuous_agg.c b/src/ts_catalog/continuous_agg.c index 452454e40bf..00796f23daa 100644 --- a/src/ts_catalog/continuous_agg.c +++ b/src/ts_catalog/continuous_agg.c @@ -39,6 +39,7 @@ #include "time_utils.h" #include "ts_catalog/catalog.h" #include "errors.h" +#include "compression_with_clause.h" #define BUCKET_FUNCTION_SERIALIZE_VERSION 1 #define CHECK_NAME_MATCH(name1, name2) (namestrcmp(name1, name2) == 0) @@ -59,7 +60,7 @@ static const WithClauseDefinition continuous_aggregate_with_clause_def[] = { .type_id = BOOLOID, .default_val = BoolGetDatum(false), }, - [ContinuousViewOptionCompress] = { + [ContinuousViewOptionCompress] = { .arg_name = "compress", .type_id = BOOLOID, }, @@ -68,6 +69,18 @@ static const WithClauseDefinition continuous_aggregate_with_clause_def[] = { .type_id = BOOLOID, .default_val = BoolGetDatum(true), }, + [ContinuousViewOptionCompressSegmentBy] = { + .arg_name = "compress_segmentby", + .type_id = TEXTOID, + }, + [ContinuousViewOptionCompressOrderBy] = { + .arg_name = "compress_orderby", + .type_id = TEXTOID, + }, + [ContinuousViewOptionCompressChunkTimeInterval] = { + .arg_name = "compress_chunk_time_interval", + .type_id = INTERVALOID, + }, }; WithClauseResult * @@ -77,6 +90,51 @@ ts_continuous_agg_with_clause_parse(const List *defelems) continuous_aggregate_with_clause_def, TS_ARRAY_LEN(continuous_aggregate_with_clause_def)); } + +List * +ts_continuous_agg_get_compression_defelems(const WithClauseResult *with_clauses) +{ + List *ret = NIL; + + for (int i = 0; i < CompressOptionMax; i++) + { + int option_index = 0; + switch (i) + { + case CompressEnabled: + option_index = ContinuousViewOptionCompress; + break; + case CompressSegmentBy: + option_index = ContinuousViewOptionCompressSegmentBy; + break; + case CompressOrderBy: + option_index = ContinuousViewOptionCompressOrderBy; + break; + case CompressChunkTimeInterval: + option_index = ContinuousViewOptionCompressChunkTimeInterval; + break; + default: + elog(ERROR, "Unhandled compression option"); + break; + } + + const WithClauseResult *input = &with_clauses[option_index]; + WithClauseDefinition def = continuous_aggregate_with_clause_def[option_index]; + + if (!input->is_default) + { + Node *value = (Node *) makeString(ts_with_clause_result_deparse_value(input)); + DefElem *elem = makeDefElemExtended("timescaledb", + (char *) def.arg_name, + value, + DEFELEM_UNSPEC, + -1); + ret = lappend(ret, elem); + } + } + return ret; +} + static void init_scan_by_mat_hypertable_id(ScanIterator *iterator, const int32 mat_hypertable_id) { diff --git a/src/ts_catalog/continuous_agg.h b/src/ts_catalog/continuous_agg.h index 9a0be3bb85e..9012fd6f32a 100644 --- a/src/ts_catalog/continuous_agg.h +++ b/src/ts_catalog/continuous_agg.h @@ -48,6 +48,10 @@ typedef enum ContinuousAggViewOption ContinuousViewOptionMaterializedOnly, ContinuousViewOptionCompress, ContinuousViewOptionFinalized, + ContinuousViewOptionCompressSegmentBy, + ContinuousViewOptionCompressOrderBy, + ContinuousViewOptionCompressChunkTimeInterval, + ContinuousViewOptionMax } ContinuousAggViewOption; typedef enum ContinuousAggViewType @@ -60,6 +64,9 @@ typedef enum ContinuousAggViewType extern TSDLLEXPORT WithClauseResult *ts_continuous_agg_with_clause_parse(const List *defelems); +extern TSDLLEXPORT List * +ts_continuous_agg_get_compression_defelems(const WithClauseResult *with_clauses); + #define BUCKET_WIDTH_VARIABLE (-1) /* diff --git a/src/with_clause_parser.c b/src/with_clause_parser.c index 12512756061..75e6f9b433e 100644 --- a/src/with_clause_parser.c +++ b/src/with_clause_parser.c @@ -15,6 +15,7 @@ #include #include +#include "debug_assert.h" #include "with_clause_parser.h" #define TIMESCALEDB_NAMESPACE "timescaledb" @@ -76,6 +77,7 @@ ts_with_clauses_parse(const List *def_elems, const WithClauseDefinition *args, S for (i = 0; i < nargs; i++) { + results[i].definition = &args[i]; results[i].parsed = args[i].default_val; results[i].is_default = true; } @@ -113,6 +115,22 @@ ts_with_clauses_parse(const List *def_elems, const WithClauseDefinition *args, S return results; } +extern TSDLLEXPORT char * +ts_with_clause_result_deparse_value(const WithClauseResult *result) +{ + Oid oid = result->definition->type_id; + Ensure(OidIsValid(oid), "argument \"%d\" has invalid OID", oid); + + Oid in_fn; + bool typIsVarlena pg_attribute_unused(); + + getTypeOutputInfo(oid, &in_fn, &typIsVarlena); + Ensure(OidIsValid(in_fn), "no output function for type with OID %d", oid); + + char *val = OidOutputFunctionCall(in_fn, result->parsed); + return val; +} + static Datum parse_arg(WithClauseDefinition arg, DefElem *def) { diff --git a/src/with_clause_parser.h b/src/with_clause_parser.h index f44233519d1..89931f14345 100644 --- a/src/with_clause_parser.h +++ b/src/with_clause_parser.h @@ -22,6 +22,7 @@ typedef struct WithClauseDefinition typedef struct WithClauseResult { + const WithClauseDefinition *definition; bool is_default; Datum parsed; } WithClauseResult; @@ -31,4 +32,6 @@ extern TSDLLEXPORT void ts_with_clause_filter(const List *def_elems, List **with extern TSDLLEXPORT WithClauseResult * ts_with_clauses_parse(const List *def_elems, const WithClauseDefinition *args, Size nargs); + +extern TSDLLEXPORT char *ts_with_clause_result_deparse_value(const WithClauseResult *result); #endif /* TIMESCALEDB_WITH_CLAUSE_PARSER_H */ diff --git a/tsl/src/continuous_aggs/options.c b/tsl/src/continuous_aggs/options.c index 4b84eff7eb9..098ec8bdefe 100644 --- a/tsl/src/continuous_aggs/options.c +++ b/tsl/src/continuous_aggs/options.c @@ -194,28 +194,37 @@ cagg_get_compression_params(ContinuousAgg *agg, Hypertable *mat_ht) return defelems; } -/* enable/disable compression on continuous aggregate */ +/* forwards compression related changes via an alter statement to the underlying HT */ static void -cagg_alter_compression(ContinuousAgg *agg, Hypertable *mat_ht, bool compress_enable) +cagg_alter_compression(ContinuousAgg *agg, Hypertable *mat_ht, List *compress_defelems) { - List *defelems = NIL; Assert(mat_ht != NULL); - if (compress_enable) - defelems = cagg_get_compression_params(agg, mat_ht); + WithClauseResult *with_clause_options = + ts_compress_hypertable_set_clause_parse(compress_defelems); - DefElem *enable = makeDefElemExtended("timescaledb", - "compress", - compress_enable ? (Node *) makeString("true") : - (Node *) makeString("false"), - DEFELEM_UNSPEC, - -1); - defelems = lappend(defelems, enable); + if (with_clause_options[CompressEnabled].parsed) + { + List *default_compress_defelems = cagg_get_compression_params(agg, mat_ht); + WithClauseResult *default_with_clause_options = + ts_compress_hypertable_set_clause_parse(default_compress_defelems); + /* Merge defaults if there's any. */ + for (int i = 0; i < CompressOptionMax; i++) + { + if (with_clause_options[i].is_default && !default_with_clause_options[i].is_default) + { + with_clause_options[i] = default_with_clause_options[i]; + elog(NOTICE, + "defaulting %s to %s", + with_clause_options[i].definition->arg_name, + ts_with_clause_result_deparse_value(&with_clause_options[i])); + } + } + } - WithClauseResult *with_clause_options = ts_compress_hypertable_set_clause_parse(defelems); AlterTableCmd alter_cmd = { .type = T_AlterTableCmd, .subtype = AT_SetRelOptions, - .def = (Node *) defelems, + .def = (Node *) compress_defelems, }; tsl_process_compress_table(&alter_cmd, mat_ht, with_clause_options); @@ -249,16 +258,16 @@ continuous_agg_update_options(ContinuousAgg *agg, WithClauseResult *with_clause_ update_materialized_only(agg, materialized_only); ts_cache_release(hcache); } - if (!with_clause_options[ContinuousViewOptionCompress].is_default) + List *compression_options = ts_continuous_agg_get_compression_defelems(with_clause_options); + + if (list_length(compression_options) > 0) { - bool compress_enable = - DatumGetBool(with_clause_options[ContinuousViewOptionCompress].parsed); Cache *hcache = ts_hypertable_cache_pin(); Hypertable *mat_ht = ts_hypertable_cache_get_entry_by_id(hcache, agg->data.mat_hypertable_id); Assert(mat_ht != NULL); - cagg_alter_compression(agg, mat_ht, compress_enable); + cagg_alter_compression(agg, mat_ht, compression_options); ts_cache_release(hcache); } if (!with_clause_options[ContinuousViewOptionCreateGroupIndex].is_default) diff --git a/tsl/test/expected/cagg_errors.out b/tsl/test/expected/cagg_errors.out index 47866f09882..57eb269b6a6 100644 --- a/tsl/test/expected/cagg_errors.out +++ b/tsl/test/expected/cagg_errors.out @@ -540,10 +540,9 @@ NOTICE: continuous aggregate "i2980_cagg2" is already up-to-date --now enable compression with invalid parameters ALTER MATERIALIZED VIEW i2980_cagg2 SET ( timescaledb.compress, timescaledb.compress_segmentby = 'bucket'); -ERROR: unrecognized parameter "timescaledb.compress_segmentby" +ERROR: cannot use column "bucket" for both ordering and segmenting ALTER MATERIALIZED VIEW i2980_cagg2 SET ( timescaledb.compress, timescaledb.compress_orderby = 'bucket'); -ERROR: unrecognized parameter "timescaledb.compress_orderby" --enable compression and test re-enabling compression ALTER MATERIALIZED VIEW i2980_cagg2 SET ( timescaledb.compress); insert into i2980 select now(); @@ -559,7 +558,7 @@ ERROR: cannot change configuration on already compressed chunks ALTER MATERIALIZED VIEW i2980_cagg2 SET ( timescaledb.compress = 'true'); ERROR: cannot change configuration on already compressed chunks ALTER MATERIALIZED VIEW i2980_cagg2 SET ( timescaledb.compress, timescaledb.compress_segmentby = 'bucket'); -ERROR: unrecognized parameter "timescaledb.compress_segmentby" +ERROR: cannot change configuration on already compressed chunks --Errors with compression policy on caggs-- select add_continuous_aggregate_policy('i2980_cagg2', interval '10 day', interval '2 day' ,'4h') AS job_id ; job_id diff --git a/tsl/test/expected/cagg_errors_deprecated.out b/tsl/test/expected/cagg_errors_deprecated.out index 73effa540f8..75f75888dc5 100644 --- a/tsl/test/expected/cagg_errors_deprecated.out +++ b/tsl/test/expected/cagg_errors_deprecated.out @@ -624,10 +624,9 @@ NOTICE: continuous aggregate "i2980_cagg2" is already up-to-date --now enable compression with invalid parameters ALTER MATERIALIZED VIEW i2980_cagg2 SET ( timescaledb.compress, timescaledb.compress_segmentby = 'bucket'); -ERROR: unrecognized parameter "timescaledb.compress_segmentby" +ERROR: cannot use column "bucket" for both ordering and segmenting ALTER MATERIALIZED VIEW i2980_cagg2 SET ( timescaledb.compress, timescaledb.compress_orderby = 'bucket'); -ERROR: unrecognized parameter "timescaledb.compress_orderby" --enable compression and test re-enabling compression ALTER MATERIALIZED VIEW i2980_cagg2 SET ( timescaledb.compress); insert into i2980 select now(); @@ -643,7 +642,7 @@ ERROR: cannot change configuration on already compressed chunks ALTER MATERIALIZED VIEW i2980_cagg2 SET ( timescaledb.compress = 'true'); ERROR: cannot change configuration on already compressed chunks ALTER MATERIALIZED VIEW i2980_cagg2 SET ( timescaledb.compress, timescaledb.compress_segmentby = 'bucket'); -ERROR: unrecognized parameter "timescaledb.compress_segmentby" +ERROR: cannot change configuration on already compressed chunks --Errors with compression policy on caggs-- select add_continuous_aggregate_policy('i2980_cagg2', interval '10 day', interval '2 day' ,'4h') AS job_id ; job_id diff --git a/tsl/test/expected/compression_ddl.out b/tsl/test/expected/compression_ddl.out index ff3957e4e8e..f7a9ffcd05b 100644 --- a/tsl/test/expected/compression_ddl.out +++ b/tsl/test/expected/compression_ddl.out @@ -1272,8 +1272,7 @@ Indexes: Triggers: ts_insert_blocker BEFORE INSERT ON _timescaledb_internal._compressed_hypertable_23 FOR EACH ROW EXECUTE FUNCTION _timescaledb_internal.insert_blocker() -DROP TABLE metric CASCADE; --- Creating hypertable +-- #5290 Compression can't be enabled on caggs CREATE TABLE "tEst2" ( "Id" uuid NOT NULL, "Time" timestamp with time zone NOT NULL, @@ -1298,3 +1297,43 @@ FROM public."tEst2" GROUP BY "Idd", "bUcket"; NOTICE: continuous aggregate "tEst2_mv" is already up-to-date ALTER MATERIALIZED VIEW "tEst2_mv" SET (timescaledb.compress = true); +NOTICE: defaulting compress_segmentby to "Idd" +NOTICE: defaulting compress_orderby to "bUcket" +-- #5161 segmentby param +CREATE MATERIALIZED VIEW test1_cont_view2 +WITH (timescaledb.continuous, + timescaledb.materialized_only=true + ) +AS SELECT time_bucket('1 hour', "Time") as t, SUM(intcol) as sum,txtcol as "iDeA" + FROM test1 + GROUP BY 1,txtcol WITH NO DATA; +\set ON_ERROR_STOP 0 +ALTER MATERIALIZED VIEW test1_cont_view2 SET ( + timescaledb.compress = true, + timescaledb.compress_segmentby = 'invalid_column' +); +NOTICE: defaulting compress_orderby to t +ERROR: column "invalid_column" does not exist +\set ON_ERROR_STOP 1 +ALTER MATERIALIZED VIEW test1_cont_view2 SET ( + timescaledb.compress = true +); +NOTICE: defaulting compress_segmentby to "iDeA" +NOTICE: defaulting compress_orderby to t +ALTER MATERIALIZED VIEW test1_cont_view2 SET ( + timescaledb.compress = true, + timescaledb.compress_segmentby = '"iDeA"' +); +NOTICE: defaulting compress_orderby to t +\set ON_ERROR_STOP 0 +ALTER MATERIALIZED VIEW test1_cont_view2 SET ( + timescaledb.compress = true, + timescaledb.compress_orderby = '"iDeA"' +); +NOTICE: defaulting compress_segmentby to "iDeA" +ERROR: cannot use column "iDeA" for both ordering and segmenting +\set ON_ERROR_STOP 1 +ALTER MATERIALIZED VIEW test1_cont_view2 SET ( + timescaledb.compress = false +); +DROP TABLE metric CASCADE; diff --git a/tsl/test/sql/compression_ddl.sql b/tsl/test/sql/compression_ddl.sql index 04c4710b12f..ecd22caa879 100644 --- a/tsl/test/sql/compression_ddl.sql +++ b/tsl/test/sql/compression_ddl.sql @@ -547,9 +547,7 @@ WHERE uc_hypertable.table_name like 'metric' \gset -- get definition of compressed hypertable and notice the index \d :COMP_SCHEMA_NAME.:COMP_TABLE_NAME -DROP TABLE metric CASCADE; - --- Creating hypertable +-- #5290 Compression can't be enabled on caggs CREATE TABLE "tEst2" ( "Id" uuid NOT NULL, "Time" timestamp with time zone NOT NULL, @@ -572,3 +570,43 @@ FROM public."tEst2" GROUP BY "Idd", "bUcket"; ALTER MATERIALIZED VIEW "tEst2_mv" SET (timescaledb.compress = true); + + +-- #5161 segmentby param +CREATE MATERIALIZED VIEW test1_cont_view2 +WITH (timescaledb.continuous, + timescaledb.materialized_only=true + ) +AS SELECT time_bucket('1 hour', "Time") as t, SUM(intcol) as sum,txtcol as "iDeA" + FROM test1 + GROUP BY 1,txtcol WITH NO DATA; + + +\set ON_ERROR_STOP 0 +ALTER MATERIALIZED VIEW test1_cont_view2 SET ( + timescaledb.compress = true, + timescaledb.compress_segmentby = 'invalid_column' +); +\set ON_ERROR_STOP 1 + +ALTER MATERIALIZED VIEW test1_cont_view2 SET ( + timescaledb.compress = true +); + +ALTER MATERIALIZED VIEW test1_cont_view2 SET ( + timescaledb.compress = true, + timescaledb.compress_segmentby = '"iDeA"' +); + +\set ON_ERROR_STOP 0 +ALTER MATERIALIZED VIEW test1_cont_view2 SET ( + timescaledb.compress = true, + timescaledb.compress_orderby = '"iDeA"' +); +\set ON_ERROR_STOP 1 + +ALTER MATERIALIZED VIEW test1_cont_view2 SET ( + timescaledb.compress = false +); + +DROP TABLE metric CASCADE;