Skip to content

Commit

Permalink
Accept all compression options on caggs
Browse files Browse the repository at this point in the history
Enable to properly handle 'compress_segmentby' and 'compress_orderby'
compression options on continous aggregates.

ALTER MATERIALIZED VIEW test_table_cagg SET (
  timescaledb.compress = true,
  timescaledb.compress_segmentby = 'device_id'
);

Fixes timescale#5161
  • Loading branch information
kgyrtkirk committed Feb 13, 2023
1 parent 9ec11d8 commit afa1b82
Show file tree
Hide file tree
Showing 11 changed files with 202 additions and 30 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -12,6 +12,7 @@ accidentally triggering the load of a previous DB version.**
* #5246 Make connection establishment interruptible
* #5253 Make data node command execution interruptible
* #5243 Enable real-time aggregation for continuous aggregates with joins
* #5262 Extend enabling compression on a continuous aggregrate with 'compress_segmentby' and 'compress_orderby' parameters

**Bugfixes**
* #4926 Fix corruption when inserting into compressed chunks
Expand Down
1 change: 1 addition & 0 deletions src/compression_with_clause.h
Expand Up @@ -19,6 +19,7 @@ typedef enum CompressHypertableOption
CompressSegmentBy,
CompressOrderBy,
CompressChunkTimeInterval,
CompressOptionMax
} CompressHypertableOption;

typedef struct
Expand Down
60 changes: 59 additions & 1 deletion src/ts_catalog/continuous_agg.c
Expand Up @@ -39,6 +39,7 @@
#include "time_utils.h"
#include "ts_catalog/catalog.h"
#include "errors.h"
#include "compression_with_clause.h"

#define BUCKET_FUNCTION_SERIALIZE_VERSION 1
#define CHECK_NAME_MATCH(name1, name2) (namestrcmp(name1, name2) == 0)
Expand All @@ -59,7 +60,7 @@ static const WithClauseDefinition continuous_aggregate_with_clause_def[] = {
.type_id = BOOLOID,
.default_val = BoolGetDatum(false),
},
[ContinuousViewOptionCompress] = {
[ContinuousViewOptionCompress] = {
.arg_name = "compress",
.type_id = BOOLOID,
},
Expand All @@ -68,6 +69,18 @@ static const WithClauseDefinition continuous_aggregate_with_clause_def[] = {
.type_id = BOOLOID,
.default_val = BoolGetDatum(true),
},
[ContinuousViewOptionCompressSegmentBy] = {
.arg_name = "compress_segmentby",
.type_id = TEXTOID,
},
[ContinuousViewOptionCompressOrderBy] = {
.arg_name = "compress_orderby",
.type_id = TEXTOID,
},
[ContinuousViewOptionCompressChunkTimeInterval] = {
.arg_name = "compress_chunk_time_interval",
.type_id = INTERVALOID,
},
};

WithClauseResult *
Expand All @@ -77,6 +90,51 @@ ts_continuous_agg_with_clause_parse(const List *defelems)
continuous_aggregate_with_clause_def,
TS_ARRAY_LEN(continuous_aggregate_with_clause_def));
}

List *
ts_continuous_agg_get_compression_defelems(const WithClauseResult *with_clauses)
{
List *ret = NIL;

for (int i = 0; i < CompressOptionMax; i++)
{
int option_index = 0;
switch (i)
{
case CompressEnabled:
option_index = ContinuousViewOptionCompress;
break;
case CompressSegmentBy:
option_index = ContinuousViewOptionCompressSegmentBy;
break;
case CompressOrderBy:
option_index = ContinuousViewOptionCompressOrderBy;
break;
case CompressChunkTimeInterval:
option_index = ContinuousViewOptionCompressChunkTimeInterval;
break;
default:
elog(ERROR, "Unhandled compression option");
break;
}

const WithClauseResult *input = &with_clauses[option_index];
WithClauseDefinition def = continuous_aggregate_with_clause_def[option_index];

if (!input->is_default)
{
Node *value = (Node *) makeString(ts_with_clause_result_deparse_value(input));
DefElem *elem = makeDefElemExtended("timescaledb",
(char *) def.arg_name,
value,
DEFELEM_UNSPEC,
-1);
ret = lappend(ret, elem);
}
}
return ret;
}

static void
init_scan_by_mat_hypertable_id(ScanIterator *iterator, const int32 mat_hypertable_id)
{
Expand Down
7 changes: 7 additions & 0 deletions src/ts_catalog/continuous_agg.h
Expand Up @@ -48,6 +48,10 @@ typedef enum ContinuousAggViewOption
ContinuousViewOptionMaterializedOnly,
ContinuousViewOptionCompress,
ContinuousViewOptionFinalized,
ContinuousViewOptionCompressSegmentBy,
ContinuousViewOptionCompressOrderBy,
ContinuousViewOptionCompressChunkTimeInterval,
ContinuousViewOptionMax
} ContinuousAggViewOption;

typedef enum ContinuousAggViewType
Expand All @@ -60,6 +64,9 @@ typedef enum ContinuousAggViewType

extern TSDLLEXPORT WithClauseResult *ts_continuous_agg_with_clause_parse(const List *defelems);

extern TSDLLEXPORT List *
ts_continuous_agg_get_compression_defelems(const WithClauseResult *with_clauses);

#define BUCKET_WIDTH_VARIABLE (-1)

/*
Expand Down
18 changes: 18 additions & 0 deletions src/with_clause_parser.c
Expand Up @@ -15,6 +15,7 @@
#include <utils/lsyscache.h>
#include <utils/syscache.h>

#include "debug_assert.h"
#include "with_clause_parser.h"

#define TIMESCALEDB_NAMESPACE "timescaledb"
Expand Down Expand Up @@ -76,6 +77,7 @@ ts_with_clauses_parse(const List *def_elems, const WithClauseDefinition *args, S

for (i = 0; i < nargs; i++)
{
results[i].definition = &args[i];
results[i].parsed = args[i].default_val;
results[i].is_default = true;
}
Expand Down Expand Up @@ -113,6 +115,22 @@ ts_with_clauses_parse(const List *def_elems, const WithClauseDefinition *args, S
return results;
}

extern TSDLLEXPORT char *
ts_with_clause_result_deparse_value(const WithClauseResult *result)
{
Oid oid = result->definition->type_id;
Ensure(OidIsValid(oid), "argument \"%d\" has invalid OID", oid);

Oid in_fn;
bool typIsVarlena pg_attribute_unused();

getTypeOutputInfo(oid, &in_fn, &typIsVarlena);
Ensure(OidIsValid(in_fn), "no output function for type with OID %d", oid);

char *val = OidOutputFunctionCall(in_fn, result->parsed);
return val;
}

static Datum
parse_arg(WithClauseDefinition arg, DefElem *def)
{
Expand Down
3 changes: 3 additions & 0 deletions src/with_clause_parser.h
Expand Up @@ -22,6 +22,7 @@ typedef struct WithClauseDefinition

typedef struct WithClauseResult
{
const WithClauseDefinition *definition;
bool is_default;
Datum parsed;
} WithClauseResult;
Expand All @@ -31,4 +32,6 @@ extern TSDLLEXPORT void ts_with_clause_filter(const List *def_elems, List **with

extern TSDLLEXPORT WithClauseResult *
ts_with_clauses_parse(const List *def_elems, const WithClauseDefinition *args, Size nargs);

extern TSDLLEXPORT char *ts_with_clause_result_deparse_value(const WithClauseResult *result);
#endif /* TIMESCALEDB_WITH_CLAUSE_PARSER_H */
45 changes: 27 additions & 18 deletions tsl/src/continuous_aggs/options.c
Expand Up @@ -194,28 +194,37 @@ cagg_get_compression_params(ContinuousAgg *agg, Hypertable *mat_ht)
return defelems;
}

/* enable/disable compression on continuous aggregate */
/* forwards compression related changes via an alter statement to the underlying HT */
static void
cagg_alter_compression(ContinuousAgg *agg, Hypertable *mat_ht, bool compress_enable)
cagg_alter_compression(ContinuousAgg *agg, Hypertable *mat_ht, List *compress_defelems)
{
List *defelems = NIL;
Assert(mat_ht != NULL);
if (compress_enable)
defelems = cagg_get_compression_params(agg, mat_ht);
WithClauseResult *with_clause_options =
ts_compress_hypertable_set_clause_parse(compress_defelems);

DefElem *enable = makeDefElemExtended("timescaledb",
"compress",
compress_enable ? (Node *) makeString("true") :
(Node *) makeString("false"),
DEFELEM_UNSPEC,
-1);
defelems = lappend(defelems, enable);
if (with_clause_options[CompressEnabled].parsed)
{
List *default_compress_defelems = cagg_get_compression_params(agg, mat_ht);
WithClauseResult *default_with_clause_options =
ts_compress_hypertable_set_clause_parse(default_compress_defelems);
/* Merge defaults if there's any. */
for (int i = 0; i < CompressOptionMax; i++)
{
if (with_clause_options[i].is_default && !default_with_clause_options[i].is_default)
{
with_clause_options[i] = default_with_clause_options[i];
elog(NOTICE,
"defaulting %s to %s",
with_clause_options[i].definition->arg_name,
ts_with_clause_result_deparse_value(&with_clause_options[i]));
}
}
}

WithClauseResult *with_clause_options = ts_compress_hypertable_set_clause_parse(defelems);
AlterTableCmd alter_cmd = {
.type = T_AlterTableCmd,
.subtype = AT_SetRelOptions,
.def = (Node *) defelems,
.def = (Node *) compress_defelems,
};

tsl_process_compress_table(&alter_cmd, mat_ht, with_clause_options);
Expand Down Expand Up @@ -249,16 +258,16 @@ continuous_agg_update_options(ContinuousAgg *agg, WithClauseResult *with_clause_
update_materialized_only(agg, materialized_only);
ts_cache_release(hcache);
}
if (!with_clause_options[ContinuousViewOptionCompress].is_default)
List *compression_options = ts_continuous_agg_get_compression_defelems(with_clause_options);

if (list_length(compression_options) > 0)
{
bool compress_enable =
DatumGetBool(with_clause_options[ContinuousViewOptionCompress].parsed);
Cache *hcache = ts_hypertable_cache_pin();
Hypertable *mat_ht =
ts_hypertable_cache_get_entry_by_id(hcache, agg->data.mat_hypertable_id);
Assert(mat_ht != NULL);

cagg_alter_compression(agg, mat_ht, compress_enable);
cagg_alter_compression(agg, mat_ht, compression_options);
ts_cache_release(hcache);
}
if (!with_clause_options[ContinuousViewOptionCreateGroupIndex].is_default)
Expand Down
5 changes: 2 additions & 3 deletions tsl/test/expected/cagg_errors.out
Expand Up @@ -540,10 +540,9 @@ NOTICE: continuous aggregate "i2980_cagg2" is already up-to-date
--now enable compression with invalid parameters
ALTER MATERIALIZED VIEW i2980_cagg2 SET ( timescaledb.compress,
timescaledb.compress_segmentby = 'bucket');
ERROR: unrecognized parameter "timescaledb.compress_segmentby"
ERROR: cannot use column "bucket" for both ordering and segmenting
ALTER MATERIALIZED VIEW i2980_cagg2 SET ( timescaledb.compress,
timescaledb.compress_orderby = 'bucket');
ERROR: unrecognized parameter "timescaledb.compress_orderby"
--enable compression and test re-enabling compression
ALTER MATERIALIZED VIEW i2980_cagg2 SET ( timescaledb.compress);
insert into i2980 select now();
Expand All @@ -559,7 +558,7 @@ ERROR: cannot change configuration on already compressed chunks
ALTER MATERIALIZED VIEW i2980_cagg2 SET ( timescaledb.compress = 'true');
ERROR: cannot change configuration on already compressed chunks
ALTER MATERIALIZED VIEW i2980_cagg2 SET ( timescaledb.compress, timescaledb.compress_segmentby = 'bucket');
ERROR: unrecognized parameter "timescaledb.compress_segmentby"
ERROR: cannot change configuration on already compressed chunks
--Errors with compression policy on caggs--
select add_continuous_aggregate_policy('i2980_cagg2', interval '10 day', interval '2 day' ,'4h') AS job_id ;
job_id
Expand Down
5 changes: 2 additions & 3 deletions tsl/test/expected/cagg_errors_deprecated.out
Expand Up @@ -624,10 +624,9 @@ NOTICE: continuous aggregate "i2980_cagg2" is already up-to-date
--now enable compression with invalid parameters
ALTER MATERIALIZED VIEW i2980_cagg2 SET ( timescaledb.compress,
timescaledb.compress_segmentby = 'bucket');
ERROR: unrecognized parameter "timescaledb.compress_segmentby"
ERROR: cannot use column "bucket" for both ordering and segmenting
ALTER MATERIALIZED VIEW i2980_cagg2 SET ( timescaledb.compress,
timescaledb.compress_orderby = 'bucket');
ERROR: unrecognized parameter "timescaledb.compress_orderby"
--enable compression and test re-enabling compression
ALTER MATERIALIZED VIEW i2980_cagg2 SET ( timescaledb.compress);
insert into i2980 select now();
Expand All @@ -643,7 +642,7 @@ ERROR: cannot change configuration on already compressed chunks
ALTER MATERIALIZED VIEW i2980_cagg2 SET ( timescaledb.compress = 'true');
ERROR: cannot change configuration on already compressed chunks
ALTER MATERIALIZED VIEW i2980_cagg2 SET ( timescaledb.compress, timescaledb.compress_segmentby = 'bucket');
ERROR: unrecognized parameter "timescaledb.compress_segmentby"
ERROR: cannot change configuration on already compressed chunks
--Errors with compression policy on caggs--
select add_continuous_aggregate_policy('i2980_cagg2', interval '10 day', interval '2 day' ,'4h') AS job_id ;
job_id
Expand Down
43 changes: 41 additions & 2 deletions tsl/test/expected/compression_ddl.out
Expand Up @@ -1272,8 +1272,7 @@ Indexes:
Triggers:
ts_insert_blocker BEFORE INSERT ON _timescaledb_internal._compressed_hypertable_23 FOR EACH ROW EXECUTE FUNCTION _timescaledb_internal.insert_blocker()

DROP TABLE metric CASCADE;
-- Creating hypertable
-- #5290 Compression can't be enabled on caggs
CREATE TABLE "tEst2" (
"Id" uuid NOT NULL,
"Time" timestamp with time zone NOT NULL,
Expand All @@ -1298,3 +1297,43 @@ FROM public."tEst2"
GROUP BY "Idd", "bUcket";
NOTICE: continuous aggregate "tEst2_mv" is already up-to-date
ALTER MATERIALIZED VIEW "tEst2_mv" SET (timescaledb.compress = true);
NOTICE: defaulting compress_segmentby to "Idd"
NOTICE: defaulting compress_orderby to "bUcket"
-- #5161 segmentby param
CREATE MATERIALIZED VIEW test1_cont_view2
WITH (timescaledb.continuous,
timescaledb.materialized_only=true
)
AS SELECT time_bucket('1 hour', "Time") as t, SUM(intcol) as sum,txtcol as "iDeA"
FROM test1
GROUP BY 1,txtcol WITH NO DATA;
\set ON_ERROR_STOP 0
ALTER MATERIALIZED VIEW test1_cont_view2 SET (
timescaledb.compress = true,
timescaledb.compress_segmentby = 'invalid_column'
);
NOTICE: defaulting compress_orderby to t
ERROR: column "invalid_column" does not exist
\set ON_ERROR_STOP 1
ALTER MATERIALIZED VIEW test1_cont_view2 SET (
timescaledb.compress = true
);
NOTICE: defaulting compress_segmentby to "iDeA"
NOTICE: defaulting compress_orderby to t
ALTER MATERIALIZED VIEW test1_cont_view2 SET (
timescaledb.compress = true,
timescaledb.compress_segmentby = '"iDeA"'
);
NOTICE: defaulting compress_orderby to t
\set ON_ERROR_STOP 0
ALTER MATERIALIZED VIEW test1_cont_view2 SET (
timescaledb.compress = true,
timescaledb.compress_orderby = '"iDeA"'
);
NOTICE: defaulting compress_segmentby to "iDeA"
ERROR: cannot use column "iDeA" for both ordering and segmenting
\set ON_ERROR_STOP 1
ALTER MATERIALIZED VIEW test1_cont_view2 SET (
timescaledb.compress = false
);
DROP TABLE metric CASCADE;

0 comments on commit afa1b82

Please sign in to comment.