Skip to content

Commit

Permalink
Add Hierarchical Continuous Aggregates validations
Browse files Browse the repository at this point in the history
Commit 3749953 introduce Hierarchical Continuous Aggregates (aka
Continuous Aggregate on top of another Continuous Aggregate) but it
lacks of some basic validations.

Validations added during the creation of a Hierarchical Continuous
Aggregate:

* Forbid create a continuous aggregate with fixed-width bucket on top of
  a continuous aggregate with variable-width bucket.

* Forbid incompatible bucket widths:
  - should not be equal;
  - bucket width of the new continuous aggregate should be greater than
    the source continuous aggregate;
  - bucket width of the new continuous aggregate should be multiple of
    the source continuous aggregate.
  • Loading branch information
fabriziomello committed Nov 25, 2022
1 parent 3b94b99 commit e06f27b
Show file tree
Hide file tree
Showing 17 changed files with 1,498 additions and 64 deletions.
37 changes: 37 additions & 0 deletions src/ts_catalog/continuous_agg.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "time_bucket.h"
#include "time_utils.h"
#include "ts_catalog/catalog.h"
#include "errors.h"

#define BUCKET_FUNCTION_SERIALIZE_VERSION 1
#define CHECK_NAME_MATCH(name1, name2) (namestrcmp(name1, name2) == 0)
Expand Down Expand Up @@ -1898,3 +1899,39 @@ ts_cagg_permissions_check(Oid cagg_oid, Oid userid)

return ownerid;
}

Query *
ts_continuous_agg_get_query(ContinuousAgg *cagg)
{
Oid cagg_view_oid;
Relation cagg_view_rel;
RuleLock *cagg_view_rules;
RewriteRule *rule;
Query *cagg_view_query;

/*
* Get the direct_view definition for the finalized version because
* the user view doesn't have the "GROUP BY" clause anymore.
*/
if (ContinuousAggIsFinalized(cagg))
cagg_view_oid =
get_relname_relid(NameStr(cagg->data.direct_view_name),
get_namespace_oid(NameStr(cagg->data.direct_view_schema), false));
else
cagg_view_oid =
get_relname_relid(NameStr(cagg->data.user_view_name),
get_namespace_oid(NameStr(cagg->data.user_view_schema), false));

cagg_view_rel = table_open(cagg_view_oid, AccessShareLock);
cagg_view_rules = cagg_view_rel->rd_rules;
Assert(cagg_view_rules && cagg_view_rules->numLocks == 1);

rule = cagg_view_rules->rules[0];
if (rule->event != CMD_SELECT)
ereport(ERROR, (errcode(ERRCODE_TS_UNEXPECTED), errmsg("unexpected rule event for view")));

cagg_view_query = (Query *) copyObject(linitial(rule->actions));
table_close(cagg_view_rel, NoLock);

return cagg_view_query;
}
3 changes: 3 additions & 0 deletions src/ts_catalog/continuous_agg.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#define TIMESCALEDB_CONTINUOUS_AGG_H
#include <postgres.h>
#include <catalog/pg_type.h>
#include <nodes/parsenodes.h>

#include "ts_catalog/catalog.h"
#include "chunk.h"
Expand Down Expand Up @@ -214,4 +215,6 @@ ts_compute_circumscribed_bucketed_refresh_window_variable(int64 *start, int64 *e
extern TSDLLEXPORT int64 ts_compute_beginning_of_the_next_bucket_variable(
int64 timeval, const ContinuousAggsBucketFunction *bf);

extern TSDLLEXPORT Query *ts_continuous_agg_get_query(ContinuousAgg *cagg);

#endif /* TIMESCALEDB_CONTINUOUS_AGG_H */
155 changes: 135 additions & 20 deletions tsl/src/continuous_aggs/create.c
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ typedef struct CAggTimebucketInfo
/* This should also be the column used by time_bucket */
Oid htpartcoltype;
int64 htpartcol_interval_len; /* interval length setting for primary partitioning column */
int64 bucket_width; /* bucket_width of time_bucket, stores BUCKET_WIDHT_VARIABLE for
int64 bucket_width; /* bucket_width of time_bucket, stores BUCKET_WIDTH_VARIABLE for
variable-sized buckets */
Interval *interval; /* stores the interval, NULL if not specified */
const char *timezone; /* the name of the timezone, NULL if not specified */
Expand Down Expand Up @@ -770,7 +770,7 @@ caggtimebucket_validate(CAggTimebucketInfo *tbinfo, List *groupClause, List *tar

tbinfo->bucket_func = fe;

/*only column allowed : time_bucket('1day', <column> ) */
/* only column allowed : time_bucket('1day', <column> ) */
col_arg = lsecond(fe->args);

if (!(IsA(col_arg, Var)) || ((Var *) col_arg)->varattno != tbinfo->htpartcolno)
Expand Down Expand Up @@ -1092,16 +1092,20 @@ cagg_query_supported(const Query *query, StringInfo hint, StringInfo detail, con
}

static CAggTimebucketInfo
cagg_validate_query(const Query *query, const bool finalized)
cagg_validate_query(const Query *query, const bool finalized, const char *cagg_schema,
const char *cagg_name)
{
CAggTimebucketInfo ret;
CAggTimebucketInfo bucket_info, bucket_info_parent;
Cache *hcache;
Hypertable *ht = NULL;
RangeTblRef *rtref = NULL;
RangeTblEntry *rte;
List *fromList;
StringInfo hint = makeStringInfo();
StringInfo detail = makeStringInfo();
bool is_nested = false;
Query *prev_query = NULL;
ContinuousAgg *cagg_source = NULL;

if (!cagg_query_supported(query, hint, detail, finalized))
{
Expand Down Expand Up @@ -1149,24 +1153,26 @@ cagg_validate_query(const Query *query, const bool finalized)
ht = ts_hypertable_cache_get_cache_and_entry(rte->relid, CACHE_FLAG_NONE, &hcache);
else
{
const ContinuousAgg *cagg;
cagg_source = ts_continuous_agg_find_by_relid(rte->relid);

cagg = ts_continuous_agg_find_by_relid(rte->relid);

if (!ContinuousAggIsFinalized(cagg))
if (!ContinuousAggIsFinalized(cagg_source))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("old format of continuous aggregate is not supported"),
errhint("Run \"CALL cagg_migrate('%s.%s');\" to migrate to the new "
"format.",
NameStr(cagg->data.user_view_schema),
NameStr(cagg->data.user_view_name))));
NameStr(cagg_source->data.user_view_schema),
NameStr(cagg_source->data.user_view_name))));
}

parent_mat_hypertable_id = cagg->data.mat_hypertable_id;
parent_mat_hypertable_id = cagg_source->data.mat_hypertable_id;
hcache = ts_hypertable_cache_pin();
ht = ts_hypertable_cache_get_entry_by_id(hcache, cagg->data.mat_hypertable_id);
ht = ts_hypertable_cache_get_entry_by_id(hcache, cagg_source->data.mat_hypertable_id);

/* get the querydef for the source cagg */
is_nested = true;
prev_query = ts_continuous_agg_get_query(cagg_source);
}

if (TS_HYPERTABLE_IS_INTERNAL_COMPRESSION_TABLE(ht))
Expand Down Expand Up @@ -1226,30 +1232,128 @@ cagg_validate_query(const Query *query, const bool finalized)
errhint("Set a custom time function on the hypertable.")));
}

caggtimebucketinfo_init(&ret,
caggtimebucketinfo_init(&bucket_info,
ht->fd.id,
ht->main_table_relid,
part_dimension->column_attno,
part_dimension->fd.column_type,
part_dimension->fd.interval_length,
parent_mat_hypertable_id);

if (is_nested)
{
caggtimebucketinfo_init(&bucket_info_parent,
ht->fd.id,
ht->main_table_relid,
part_dimension->column_attno,
part_dimension->fd.column_type,
part_dimension->fd.interval_length,
cagg_source->data.parent_mat_hypertable_id);
}

ts_cache_release(hcache);
}

/*check row security settings for the table */
/* check row security settings for the table */
if (ts_has_row_security(rte->relid))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot create continuous aggregate on hypertable with row security")));

/* we need a GROUP By clause with time_bucket on the partitioning
/*
* We need a GROUP By clause with time_bucket on the partitioning
* column of the hypertable
*/
Assert(query->groupClause);
caggtimebucket_validate(&bucket_info, query->groupClause, query->targetList);

/* nested cagg validations */
if (is_nested)
{
int64 bucket_width, bucket_width_parent;
bool is_greater_than_parent, is_multiple_of_parent;

Assert(prev_query->groupClause);
caggtimebucket_validate(&bucket_info_parent,
prev_query->groupClause,
prev_query->targetList);

/* cannot create cagg with fixed bucket on top of variable bucket */
if ((bucket_info_parent.bucket_width == BUCKET_WIDTH_VARIABLE &&
bucket_info.bucket_width != BUCKET_WIDTH_VARIABLE))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot create continuous aggregate with fixed-width bucket on top of "
"one using variable-width bucket"),
errdetail("Continuous aggregate with a fixed time bucket width (e.g. 61 days) "
"cannot be created on top of one using variable time bucket width "
"(e.g. 1 month).\n"
"The variance can lead to the fixed width one not being a multiple "
"of the variable width one.")));
}

/* if variable bucket size then get the month part for the arithmetic */
bucket_width = (bucket_info.bucket_width == BUCKET_WIDTH_VARIABLE) ?
bucket_info.interval->month :
bucket_info.bucket_width;
bucket_width_parent = (bucket_info_parent.bucket_width == BUCKET_WIDTH_VARIABLE) ?
bucket_info_parent.interval->month :
bucket_info_parent.bucket_width;

/* check if the current bucket is greater than the parent */
is_greater_than_parent = (bucket_width <= bucket_width_parent);
/* check if buckets are multiple */
is_multiple_of_parent = ((bucket_width % bucket_width_parent) != 0);

/* proceed with validation errors */
if (is_greater_than_parent || is_multiple_of_parent)
{
Datum width, width_parent;
Oid outfuncid = InvalidOid;
bool isvarlena;
Oid width_type = exprType(linitial(bucket_info.bucket_func->args));
char *width_out, *width_out_parent;
char *message = NULL;

getTypeOutputInfo(width_type, &outfuncid, &isvarlena);

width = (bucket_info.bucket_width == BUCKET_WIDTH_VARIABLE) ?
IntervalPGetDatum(bucket_info.interval) :
ts_internal_to_interval_value(bucket_width, width_type);

width_out = DatumGetCString(OidFunctionCall1(outfuncid, width));

width_parent = (bucket_info_parent.bucket_width == BUCKET_WIDTH_VARIABLE) ?
IntervalPGetDatum(bucket_info_parent.interval) :
ts_internal_to_interval_value(bucket_width_parent, width_type);

width_out_parent = DatumGetCString(OidFunctionCall1(outfuncid, width_parent));

/* new bucket should be greater than the parent */
if (is_greater_than_parent)
message = "greater than";

/* new bucket should be multiple of the parent */
if (is_multiple_of_parent)
message = "multiple of";

ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot create continuous aggregate with incompatible bucket width"),
errdetail("Time bucket width of \"%s.%s\" [%s] should be %s the time "
"bucket width of \"%s.%s\" [%s].",
cagg_schema,
cagg_name,
width_out,
message,
NameStr(cagg_source->data.user_view_schema),
NameStr(cagg_source->data.user_view_name),
width_out_parent)));
}
}

caggtimebucket_validate(&ret, query->groupClause, query->targetList);
return ret;
return bucket_info;
}

/* add ts_internal_cagg_final to bytea column.
Expand Down Expand Up @@ -2402,7 +2506,10 @@ tsl_process_continuous_agg_viewstmt(Node *node, const char *query_string, void *
errhint("Use ALTER MATERIALIZED VIEW to enable compression.")));
}

timebucket_exprinfo = cagg_validate_query((Query *) stmt->into->viewQuery, finalized);
timebucket_exprinfo = cagg_validate_query((Query *) stmt->into->viewQuery,
finalized,
get_namespace_name(nspid),
stmt->into->rel->relname);
cagg_create(stmt, &viewstmt, (Query *) stmt->query, &timebucket_exprinfo, with_clause_options);

if (!stmt->into->skipData)
Expand Down Expand Up @@ -2507,7 +2614,11 @@ cagg_rebuild_view_definition(ContinuousAgg *agg, Hypertable *mat_ht)
Relation direct_view_rel = relation_open(direct_view_oid, AccessShareLock);
Query *direct_query = copyObject(get_view_query(direct_view_rel));
remove_old_and_new_rte_from_query(direct_query);
CAggTimebucketInfo timebucket_exprinfo = cagg_validate_query(direct_query, finalized);
CAggTimebucketInfo timebucket_exprinfo =
cagg_validate_query(direct_query,
finalized,
NameStr(agg->data.user_view_schema),
NameStr(agg->data.user_view_name));

mattablecolumninfo_init(&mattblinfo, copyObject(direct_query->groupClause));
fqi.finalized = finalized;
Expand Down Expand Up @@ -2644,7 +2755,11 @@ cagg_flip_realtime_view_definition(ContinuousAgg *agg, Hypertable *mat_ht)
relation_close(direct_view_rel, NoLock);
remove_old_and_new_rte_from_query(direct_query);

CAggTimebucketInfo timebucket_exprinfo = cagg_validate_query(direct_query, agg->data.finalized);
CAggTimebucketInfo timebucket_exprinfo =
cagg_validate_query(direct_query,
agg->data.finalized,
NameStr(agg->data.user_view_schema),
NameStr(agg->data.user_view_name));

/* flip */
agg->data.materialized_only = !agg->data.materialized_only;
Expand Down
30 changes: 4 additions & 26 deletions tsl/src/continuous_aggs/options.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,34 +101,11 @@ cagg_find_groupingcols(ContinuousAgg *agg, Hypertable *mat_ht)
{
List *retlist = NIL;
ListCell *lc;
Oid cagg_view_oid;

/*
* Get the direct_view definition for the finalized version because
* the user view doesn't have the "GROUP BY" clause anymore.
*/
if (ContinuousAggIsFinalized(agg))
cagg_view_oid =
get_relname_relid(NameStr(agg->data.direct_view_name),
get_namespace_oid(NameStr(agg->data.direct_view_schema), false));
else
cagg_view_oid =
get_relname_relid(NameStr(agg->data.user_view_name),
get_namespace_oid(NameStr(agg->data.user_view_schema), false));

Relation cagg_view_rel = table_open(cagg_view_oid, AccessShareLock);
RuleLock *cagg_view_rules = cagg_view_rel->rd_rules;
Assert(cagg_view_rules && cagg_view_rules->numLocks == 1);
RewriteRule *rule = cagg_view_rules->rules[0];
if (rule->event != CMD_SELECT)
ereport(ERROR, (errcode(ERRCODE_TS_UNEXPECTED), errmsg("unexpected rule event for view")));

Query *cagg_view_query = copyObject(linitial(rule->actions));
table_close(cagg_view_rel, NoLock); /* lock with be released at end of txn*/
Query *cagg_view_query = ts_continuous_agg_get_query(agg);
Oid mat_relid = mat_ht->main_table_relid;
Query *finalize_query;
/* the view rule has dummy old and new range table entries as the 1st and 2nd entries
*/

/* The view rule has dummy old and new range table entries as the 1st and 2nd entries */
Assert(list_length(cagg_view_query->rtable) >= 2);
if (cagg_view_query->setOperations)
{
Expand All @@ -146,6 +123,7 @@ cagg_find_groupingcols(ContinuousAgg *agg, Hypertable *mat_ht)
{
finalize_query = cagg_view_query;
}

foreach (lc, finalize_query->groupClause)
{
SortGroupClause *cagg_gc = (SortGroupClause *) lfirst(lc);
Expand Down

0 comments on commit e06f27b

Please sign in to comment.