Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement]: Extend enabling compression on a continuous aggregrate with 'segmentby' and 'orderby' parameters #5262

Merged
merged 1 commit into from Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -12,6 +12,7 @@ accidentally triggering the load of a previous DB version.**
* #5246 Make connection establishment interruptible
* #5253 Make data node command execution interruptible
* #5243 Enable real-time aggregation for continuous aggregates with joins
* #5262 Extend enabling compression on a continuous aggregrate with 'compress_segmentby' and 'compress_orderby' parameters

**Bugfixes**
* #4926 Fix corruption when inserting into compressed chunks
Expand Down
1 change: 1 addition & 0 deletions src/compression_with_clause.h
Expand Up @@ -19,6 +19,7 @@ typedef enum CompressHypertableOption
CompressSegmentBy,
CompressOrderBy,
CompressChunkTimeInterval,
CompressOptionMax
} CompressHypertableOption;

typedef struct
Expand Down
60 changes: 59 additions & 1 deletion src/ts_catalog/continuous_agg.c
Expand Up @@ -39,6 +39,7 @@
#include "time_utils.h"
#include "ts_catalog/catalog.h"
#include "errors.h"
#include "compression_with_clause.h"

#define BUCKET_FUNCTION_SERIALIZE_VERSION 1
#define CHECK_NAME_MATCH(name1, name2) (namestrcmp(name1, name2) == 0)
Expand All @@ -59,7 +60,7 @@ static const WithClauseDefinition continuous_aggregate_with_clause_def[] = {
.type_id = BOOLOID,
.default_val = BoolGetDatum(false),
},
[ContinuousViewOptionCompress] = {
[ContinuousViewOptionCompress] = {
.arg_name = "compress",
.type_id = BOOLOID,
},
Expand All @@ -68,6 +69,18 @@ static const WithClauseDefinition continuous_aggregate_with_clause_def[] = {
.type_id = BOOLOID,
.default_val = BoolGetDatum(true),
},
[ContinuousViewOptionCompressSegmentBy] = {
.arg_name = "compress_segmentby",
.type_id = TEXTOID,
},
[ContinuousViewOptionCompressOrderBy] = {
.arg_name = "compress_orderby",
.type_id = TEXTOID,
},
[ContinuousViewOptionCompressChunkTimeInterval] = {
.arg_name = "compress_chunk_time_interval",
.type_id = INTERVALOID,
},
};

WithClauseResult *
Expand All @@ -77,6 +90,51 @@ ts_continuous_agg_with_clause_parse(const List *defelems)
continuous_aggregate_with_clause_def,
TS_ARRAY_LEN(continuous_aggregate_with_clause_def));
}

List *
ts_continuous_agg_get_compression_defelems(const WithClauseResult *with_clauses)
{
List *ret = NIL;

for (int i = 0; i < CompressOptionMax; i++)
{
int option_index = 0;
switch (i)
{
case CompressEnabled:
option_index = ContinuousViewOptionCompress;
break;
case CompressSegmentBy:
option_index = ContinuousViewOptionCompressSegmentBy;
break;
case CompressOrderBy:
option_index = ContinuousViewOptionCompressOrderBy;
break;
case CompressChunkTimeInterval:
option_index = ContinuousViewOptionCompressChunkTimeInterval;
break;
default:
elog(ERROR, "Unhandled compression option");
break;
}

const WithClauseResult *input = &with_clauses[option_index];
WithClauseDefinition def = continuous_aggregate_with_clause_def[option_index];

if (!input->is_default)
{
Node *value = (Node *) makeString(ts_with_clause_result_deparse_value(input));
DefElem *elem = makeDefElemExtended("timescaledb",
(char *) def.arg_name,
value,
DEFELEM_UNSPEC,
-1);
ret = lappend(ret, elem);
}
}
return ret;
}

static void
init_scan_by_mat_hypertable_id(ScanIterator *iterator, const int32 mat_hypertable_id)
{
Expand Down
7 changes: 7 additions & 0 deletions src/ts_catalog/continuous_agg.h
Expand Up @@ -48,6 +48,10 @@ typedef enum ContinuousAggViewOption
ContinuousViewOptionMaterializedOnly,
ContinuousViewOptionCompress,
ContinuousViewOptionFinalized,
ContinuousViewOptionCompressSegmentBy,
ContinuousViewOptionCompressOrderBy,
ContinuousViewOptionCompressChunkTimeInterval,
ContinuousViewOptionMax
} ContinuousAggViewOption;

typedef enum ContinuousAggViewType
Expand All @@ -60,6 +64,9 @@ typedef enum ContinuousAggViewType

extern TSDLLEXPORT WithClauseResult *ts_continuous_agg_with_clause_parse(const List *defelems);

extern TSDLLEXPORT List *
ts_continuous_agg_get_compression_defelems(const WithClauseResult *with_clauses);

#define BUCKET_WIDTH_VARIABLE (-1)

/*
Expand Down
18 changes: 18 additions & 0 deletions src/with_clause_parser.c
Expand Up @@ -15,6 +15,7 @@
#include <utils/lsyscache.h>
#include <utils/syscache.h>

#include "debug_assert.h"
#include "with_clause_parser.h"

#define TIMESCALEDB_NAMESPACE "timescaledb"
Expand Down Expand Up @@ -76,6 +77,7 @@ ts_with_clauses_parse(const List *def_elems, const WithClauseDefinition *args, S

for (i = 0; i < nargs; i++)
{
results[i].definition = &args[i];
results[i].parsed = args[i].default_val;
results[i].is_default = true;
}
Expand Down Expand Up @@ -113,6 +115,22 @@ ts_with_clauses_parse(const List *def_elems, const WithClauseDefinition *args, S
return results;
}

extern TSDLLEXPORT char *
ts_with_clause_result_deparse_value(const WithClauseResult *result)
{
Oid oid = result->definition->type_id;
Ensure(OidIsValid(oid), "argument \"%d\" has invalid OID", oid);

Oid in_fn;
bool typIsVarlena pg_attribute_unused();

getTypeOutputInfo(oid, &in_fn, &typIsVarlena);
Ensure(OidIsValid(in_fn), "no output function for type with OID %d", oid);

char *val = OidOutputFunctionCall(in_fn, result->parsed);
return val;
}

static Datum
parse_arg(WithClauseDefinition arg, DefElem *def)
{
Expand Down
3 changes: 3 additions & 0 deletions src/with_clause_parser.h
Expand Up @@ -22,6 +22,7 @@ typedef struct WithClauseDefinition

typedef struct WithClauseResult
{
const WithClauseDefinition *definition;
bool is_default;
Datum parsed;
} WithClauseResult;
Expand All @@ -31,4 +32,6 @@ extern TSDLLEXPORT void ts_with_clause_filter(const List *def_elems, List **with

extern TSDLLEXPORT WithClauseResult *
ts_with_clauses_parse(const List *def_elems, const WithClauseDefinition *args, Size nargs);

extern TSDLLEXPORT char *ts_with_clause_result_deparse_value(const WithClauseResult *result);
#endif /* TIMESCALEDB_WITH_CLAUSE_PARSER_H */
45 changes: 27 additions & 18 deletions tsl/src/continuous_aggs/options.c
Expand Up @@ -194,28 +194,37 @@ cagg_get_compression_params(ContinuousAgg *agg, Hypertable *mat_ht)
return defelems;
}

/* enable/disable compression on continuous aggregate */
/* forwards compression related changes via an alter statement to the underlying HT */
static void
cagg_alter_compression(ContinuousAgg *agg, Hypertable *mat_ht, bool compress_enable)
cagg_alter_compression(ContinuousAgg *agg, Hypertable *mat_ht, List *compress_defelems)
{
List *defelems = NIL;
Assert(mat_ht != NULL);
if (compress_enable)
defelems = cagg_get_compression_params(agg, mat_ht);
WithClauseResult *with_clause_options =
ts_compress_hypertable_set_clause_parse(compress_defelems);

DefElem *enable = makeDefElemExtended("timescaledb",
"compress",
compress_enable ? (Node *) makeString("true") :
(Node *) makeString("false"),
DEFELEM_UNSPEC,
-1);
defelems = lappend(defelems, enable);
if (with_clause_options[CompressEnabled].parsed)
{
List *default_compress_defelems = cagg_get_compression_params(agg, mat_ht);
WithClauseResult *default_with_clause_options =
ts_compress_hypertable_set_clause_parse(default_compress_defelems);
/* Merge defaults if there's any. */
for (int i = 0; i < CompressOptionMax; i++)
{
if (with_clause_options[i].is_default && !default_with_clause_options[i].is_default)
{
with_clause_options[i] = default_with_clause_options[i];
elog(NOTICE,
"defaulting %s to %s",
with_clause_options[i].definition->arg_name,
ts_with_clause_result_deparse_value(&with_clause_options[i]));
}
}
}

WithClauseResult *with_clause_options = ts_compress_hypertable_set_clause_parse(defelems);
AlterTableCmd alter_cmd = {
.type = T_AlterTableCmd,
.subtype = AT_SetRelOptions,
.def = (Node *) defelems,
.def = (Node *) compress_defelems,
};

tsl_process_compress_table(&alter_cmd, mat_ht, with_clause_options);
Expand Down Expand Up @@ -249,16 +258,16 @@ continuous_agg_update_options(ContinuousAgg *agg, WithClauseResult *with_clause_
update_materialized_only(agg, materialized_only);
ts_cache_release(hcache);
}
if (!with_clause_options[ContinuousViewOptionCompress].is_default)
List *compression_options = ts_continuous_agg_get_compression_defelems(with_clause_options);

if (list_length(compression_options) > 0)
{
bool compress_enable =
DatumGetBool(with_clause_options[ContinuousViewOptionCompress].parsed);
Cache *hcache = ts_hypertable_cache_pin();
Hypertable *mat_ht =
ts_hypertable_cache_get_entry_by_id(hcache, agg->data.mat_hypertable_id);
Assert(mat_ht != NULL);

cagg_alter_compression(agg, mat_ht, compress_enable);
cagg_alter_compression(agg, mat_ht, compression_options);
ts_cache_release(hcache);
}
if (!with_clause_options[ContinuousViewOptionCreateGroupIndex].is_default)
Expand Down
4 changes: 4 additions & 0 deletions tsl/test/expected/cagg_ddl.out
Expand Up @@ -1646,6 +1646,7 @@ SELECT count(*) from test_setting_cagg ORDER BY 1;
INSERT INTO test_setting VALUES( '2020-11-01', 20);
--try out 2 settings here --
ALTER MATERIALIZED VIEW test_setting_cagg SET (timescaledb.materialized_only = 'true', timescaledb.compress='true');
psql:include/cagg_ddl_common.sql:1141: NOTICE: defaulting compress_orderby to time_bucket
SELECT view_name, compression_enabled, materialized_only
FROM timescaledb_information.continuous_aggregates
where view_name = 'test_setting_cagg';
Expand All @@ -1663,6 +1664,7 @@ SELECT count(*) from test_setting_cagg ORDER BY 1;

--now set it back to false --
ALTER MATERIALIZED VIEW test_setting_cagg SET (timescaledb.materialized_only = 'false', timescaledb.compress='true');
psql:include/cagg_ddl_common.sql:1149: NOTICE: defaulting compress_orderby to time_bucket
SELECT view_name, compression_enabled, materialized_only
FROM timescaledb_information.continuous_aggregates
where view_name = 'test_setting_cagg';
Expand Down Expand Up @@ -1729,6 +1731,7 @@ SELECT count(*) from test_setting_cagg ORDER BY 1;
INSERT INTO test_setting VALUES( '2020-11-01', 20);
--try out 2 settings here --
ALTER MATERIALIZED VIEW test_setting_cagg SET (timescaledb.materialized_only = 'false', timescaledb.compress='true');
psql:include/cagg_ddl_common.sql:1185: NOTICE: defaulting compress_orderby to time_bucket
SELECT view_name, compression_enabled, materialized_only
FROM timescaledb_information.continuous_aggregates
where view_name = 'test_setting_cagg';
Expand All @@ -1746,6 +1749,7 @@ SELECT count(*) from test_setting_cagg ORDER BY 1;

--now set it back to false --
ALTER MATERIALIZED VIEW test_setting_cagg SET (timescaledb.materialized_only = 'true', timescaledb.compress='true');
psql:include/cagg_ddl_common.sql:1193: NOTICE: defaulting compress_orderby to time_bucket
SELECT view_name, compression_enabled, materialized_only
FROM timescaledb_information.continuous_aggregates
where view_name = 'test_setting_cagg';
Expand Down
4 changes: 4 additions & 0 deletions tsl/test/expected/cagg_ddl_dist_ht.out
Expand Up @@ -1689,6 +1689,7 @@ SELECT count(*) from test_setting_cagg ORDER BY 1;
INSERT INTO test_setting VALUES( '2020-11-01', 20);
--try out 2 settings here --
ALTER MATERIALIZED VIEW test_setting_cagg SET (timescaledb.materialized_only = 'true', timescaledb.compress='true');
psql:include/cagg_ddl_common.sql:1141: NOTICE: defaulting compress_orderby to time_bucket
SELECT view_name, compression_enabled, materialized_only
FROM timescaledb_information.continuous_aggregates
where view_name = 'test_setting_cagg';
Expand All @@ -1706,6 +1707,7 @@ SELECT count(*) from test_setting_cagg ORDER BY 1;

--now set it back to false --
ALTER MATERIALIZED VIEW test_setting_cagg SET (timescaledb.materialized_only = 'false', timescaledb.compress='true');
psql:include/cagg_ddl_common.sql:1149: NOTICE: defaulting compress_orderby to time_bucket
SELECT view_name, compression_enabled, materialized_only
FROM timescaledb_information.continuous_aggregates
where view_name = 'test_setting_cagg';
Expand Down Expand Up @@ -1772,6 +1774,7 @@ SELECT count(*) from test_setting_cagg ORDER BY 1;
INSERT INTO test_setting VALUES( '2020-11-01', 20);
--try out 2 settings here --
ALTER MATERIALIZED VIEW test_setting_cagg SET (timescaledb.materialized_only = 'false', timescaledb.compress='true');
psql:include/cagg_ddl_common.sql:1185: NOTICE: defaulting compress_orderby to time_bucket
SELECT view_name, compression_enabled, materialized_only
FROM timescaledb_information.continuous_aggregates
where view_name = 'test_setting_cagg';
Expand All @@ -1789,6 +1792,7 @@ SELECT count(*) from test_setting_cagg ORDER BY 1;

--now set it back to false --
ALTER MATERIALIZED VIEW test_setting_cagg SET (timescaledb.materialized_only = 'true', timescaledb.compress='true');
psql:include/cagg_ddl_common.sql:1193: NOTICE: defaulting compress_orderby to time_bucket
SELECT view_name, compression_enabled, materialized_only
FROM timescaledb_information.continuous_aggregates
where view_name = 'test_setting_cagg';
Expand Down
10 changes: 7 additions & 3 deletions tsl/test/expected/cagg_errors.out
Expand Up @@ -540,12 +540,13 @@ NOTICE: continuous aggregate "i2980_cagg2" is already up-to-date
--now enable compression with invalid parameters
ALTER MATERIALIZED VIEW i2980_cagg2 SET ( timescaledb.compress,
timescaledb.compress_segmentby = 'bucket');
ERROR: unrecognized parameter "timescaledb.compress_segmentby"
NOTICE: defaulting compress_orderby to bucket
ERROR: cannot use column "bucket" for both ordering and segmenting
ALTER MATERIALIZED VIEW i2980_cagg2 SET ( timescaledb.compress,
timescaledb.compress_orderby = 'bucket');
ERROR: unrecognized parameter "timescaledb.compress_orderby"
--enable compression and test re-enabling compression
ALTER MATERIALIZED VIEW i2980_cagg2 SET ( timescaledb.compress);
NOTICE: defaulting compress_orderby to bucket
insert into i2980 select now();
call refresh_continuous_aggregate('i2980_cagg2', NULL, NULL);
SELECT compress_chunk(ch) FROM show_chunks('i2980_cagg2') ch;
Expand All @@ -557,9 +558,11 @@ SELECT compress_chunk(ch) FROM show_chunks('i2980_cagg2') ch;
ALTER MATERIALIZED VIEW i2980_cagg2 SET ( timescaledb.compress = 'false');
ERROR: cannot change configuration on already compressed chunks
ALTER MATERIALIZED VIEW i2980_cagg2 SET ( timescaledb.compress = 'true');
NOTICE: defaulting compress_orderby to bucket
ERROR: cannot change configuration on already compressed chunks
ALTER MATERIALIZED VIEW i2980_cagg2 SET ( timescaledb.compress, timescaledb.compress_segmentby = 'bucket');
ERROR: unrecognized parameter "timescaledb.compress_segmentby"
NOTICE: defaulting compress_orderby to bucket
ERROR: cannot change configuration on already compressed chunks
--Errors with compression policy on caggs--
select add_continuous_aggregate_policy('i2980_cagg2', interval '10 day', interval '2 day' ,'4h') AS job_id ;
job_id
Expand All @@ -570,6 +573,7 @@ select add_continuous_aggregate_policy('i2980_cagg2', interval '10 day', interva
SELECT add_compression_policy('i2980_cagg', '8 day'::interval);
ERROR: compression not enabled on continuous aggregate "i2980_cagg"
ALTER MATERIALIZED VIEW i2980_cagg SET ( timescaledb.compress );
NOTICE: defaulting compress_orderby to time_bucket
SELECT add_compression_policy('i2980_cagg', '8 day'::interval);
ERROR: compress_after value for compression policy should be greater than the start of the refresh window of continuous aggregate policy for i2980_cagg
SELECT add_continuous_aggregate_policy('i2980_cagg2', '10 day'::interval, '6 day'::interval);
Expand Down
10 changes: 7 additions & 3 deletions tsl/test/expected/cagg_errors_deprecated.out
Expand Up @@ -624,12 +624,13 @@ NOTICE: continuous aggregate "i2980_cagg2" is already up-to-date
--now enable compression with invalid parameters
ALTER MATERIALIZED VIEW i2980_cagg2 SET ( timescaledb.compress,
timescaledb.compress_segmentby = 'bucket');
ERROR: unrecognized parameter "timescaledb.compress_segmentby"
NOTICE: defaulting compress_orderby to bucket
ERROR: cannot use column "bucket" for both ordering and segmenting
ALTER MATERIALIZED VIEW i2980_cagg2 SET ( timescaledb.compress,
timescaledb.compress_orderby = 'bucket');
ERROR: unrecognized parameter "timescaledb.compress_orderby"
--enable compression and test re-enabling compression
ALTER MATERIALIZED VIEW i2980_cagg2 SET ( timescaledb.compress);
NOTICE: defaulting compress_orderby to bucket
insert into i2980 select now();
call refresh_continuous_aggregate('i2980_cagg2', NULL, NULL);
SELECT compress_chunk(ch) FROM show_chunks('i2980_cagg2') ch;
Expand All @@ -641,9 +642,11 @@ SELECT compress_chunk(ch) FROM show_chunks('i2980_cagg2') ch;
ALTER MATERIALIZED VIEW i2980_cagg2 SET ( timescaledb.compress = 'false');
ERROR: cannot change configuration on already compressed chunks
ALTER MATERIALIZED VIEW i2980_cagg2 SET ( timescaledb.compress = 'true');
NOTICE: defaulting compress_orderby to bucket
ERROR: cannot change configuration on already compressed chunks
ALTER MATERIALIZED VIEW i2980_cagg2 SET ( timescaledb.compress, timescaledb.compress_segmentby = 'bucket');
ERROR: unrecognized parameter "timescaledb.compress_segmentby"
NOTICE: defaulting compress_orderby to bucket
ERROR: cannot change configuration on already compressed chunks
--Errors with compression policy on caggs--
select add_continuous_aggregate_policy('i2980_cagg2', interval '10 day', interval '2 day' ,'4h') AS job_id ;
job_id
Expand All @@ -654,6 +657,7 @@ select add_continuous_aggregate_policy('i2980_cagg2', interval '10 day', interva
SELECT add_compression_policy('i2980_cagg', '8 day'::interval);
ERROR: compression not enabled on continuous aggregate "i2980_cagg"
ALTER MATERIALIZED VIEW i2980_cagg SET ( timescaledb.compress );
NOTICE: defaulting compress_orderby to time_bucket
SELECT add_compression_policy('i2980_cagg', '8 day'::interval);
ERROR: compress_after value for compression policy should be greater than the start of the refresh window of continuous aggregate policy for i2980_cagg
SELECT add_continuous_aggregate_policy('i2980_cagg2', '10 day'::interval, '6 day'::interval);
Expand Down