Skip to content

Commit

Permalink
Add Hierarchical Continuous Aggregates validations
Browse files Browse the repository at this point in the history
Commit 3749953 introduce Hierarchical Continuous Aggregates (aka
Continuous Aggregate on top of another Continuous Aggregate) but it
lacks of some basic validations.

Validations added during the creation of a Hierarchical Continuous
Aggregate:

* Forbid create a continuous aggregate with fixed-width bucket on top of
  a continuous aggregate with variable-width bucket.

* Forbid incompatible bucket widths:
  - should not be equal;
  - bucket width of the new continuous aggregate should be greater than
    the source continuous aggregate;
  - bucket width of the new continuous aggregate should be multiple of
    the source continuous aggregate.
  • Loading branch information
fabriziomello committed Nov 24, 2022
1 parent 826dcd2 commit 55d4077
Show file tree
Hide file tree
Showing 18 changed files with 1,521 additions and 124 deletions.
36 changes: 36 additions & 0 deletions src/ts_catalog/continuous_agg.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "time_bucket.h"
#include "time_utils.h"
#include "ts_catalog/catalog.h"
#include "debug_assert.h"

#define BUCKET_FUNCTION_SERIALIZE_VERSION 1
#define CHECK_NAME_MATCH(name1, name2) (namestrcmp(name1, name2) == 0)
Expand Down Expand Up @@ -1898,3 +1899,38 @@ ts_cagg_permissions_check(Oid cagg_oid, Oid userid)

return ownerid;
}

Query *
ts_continuous_agg_get_query(ContinuousAgg *cagg)
{
Oid cagg_view_oid;
Relation cagg_view_rel;
RuleLock *cagg_view_rules;
RewriteRule *rule;
Query *cagg_view_query;

/*
* Get the direct_view definition for the finalized version because
* the user view doesn't have the "GROUP BY" clause anymore.
*/
if (ContinuousAggIsFinalized(cagg))
cagg_view_oid =
get_relname_relid(NameStr(cagg->data.direct_view_name),
get_namespace_oid(NameStr(cagg->data.direct_view_schema), false));
else
cagg_view_oid =
get_relname_relid(NameStr(cagg->data.user_view_name),
get_namespace_oid(NameStr(cagg->data.user_view_schema), false));

cagg_view_rel = table_open(cagg_view_oid, AccessShareLock);
cagg_view_rules = cagg_view_rel->rd_rules;
Assert(cagg_view_rules && cagg_view_rules->numLocks == 1);

rule = cagg_view_rules->rules[0];
Ensure(rule->event == CMD_SELECT, "unexpected rule event %d for view", rule->event);

cagg_view_query = (Query *) copyObject(linitial(rule->actions));
table_close(cagg_view_rel, NoLock);

return cagg_view_query;
}
3 changes: 3 additions & 0 deletions src/ts_catalog/continuous_agg.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#define TIMESCALEDB_CONTINUOUS_AGG_H
#include <postgres.h>
#include <catalog/pg_type.h>
#include <nodes/parsenodes.h>

#include "ts_catalog/catalog.h"
#include "chunk.h"
Expand Down Expand Up @@ -214,4 +215,6 @@ ts_compute_circumscribed_bucketed_refresh_window_variable(int64 *start, int64 *e
extern TSDLLEXPORT int64 ts_compute_beginning_of_the_next_bucket_variable(
int64 timeval, const ContinuousAggsBucketFunction *bf);

extern TSDLLEXPORT Query *ts_continuous_agg_get_query(ContinuousAgg *cagg);

#endif /* TIMESCALEDB_CONTINUOUS_AGG_H */
126 changes: 108 additions & 18 deletions tsl/src/continuous_aggs/create.c
Original file line number Diff line number Diff line change
Expand Up @@ -1092,16 +1092,20 @@ cagg_query_supported(const Query *query, StringInfo hint, StringInfo detail, con
}

static CAggTimebucketInfo
cagg_validate_query(const Query *query, const bool finalized)
cagg_validate_query(const Query *query, const bool finalized, const char *cagg_schema,
const char *cagg_name)
{
CAggTimebucketInfo ret;
CAggTimebucketInfo bucket_info, bucket_info_source;
Cache *hcache;
Hypertable *ht = NULL;
RangeTblRef *rtref = NULL;
RangeTblEntry *rte;
List *fromList;
StringInfo hint = makeStringInfo();
StringInfo detail = makeStringInfo();
bool is_nested = false;
Query *prev_query;
ContinuousAgg *cagg_source;

if (!cagg_query_supported(query, hint, detail, finalized))
{
Expand Down Expand Up @@ -1149,24 +1153,26 @@ cagg_validate_query(const Query *query, const bool finalized)
ht = ts_hypertable_cache_get_cache_and_entry(rte->relid, CACHE_FLAG_NONE, &hcache);
else
{
const ContinuousAgg *cagg;
cagg_source = ts_continuous_agg_find_by_relid(rte->relid);

cagg = ts_continuous_agg_find_by_relid(rte->relid);

if (!ContinuousAggIsFinalized(cagg))
if (!ContinuousAggIsFinalized(cagg_source))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("old format of continuous aggregate is not supported"),
errhint("Run \"CALL cagg_migrate('%s.%s');\" to migrate to the new "
"format.",
NameStr(cagg->data.user_view_schema),
NameStr(cagg->data.user_view_name))));
NameStr(cagg_source->data.user_view_schema),
NameStr(cagg_source->data.user_view_name))));
}

parent_mat_hypertable_id = cagg->data.mat_hypertable_id;
parent_mat_hypertable_id = cagg_source->data.mat_hypertable_id;
hcache = ts_hypertable_cache_pin();
ht = ts_hypertable_cache_get_entry_by_id(hcache, cagg->data.mat_hypertable_id);
ht = ts_hypertable_cache_get_entry_by_id(hcache, cagg_source->data.mat_hypertable_id);

/* get the querydef for the source cagg */
is_nested = true;
prev_query = ts_continuous_agg_get_query(cagg_source);
}

if (TS_HYPERTABLE_IS_INTERNAL_COMPRESSION_TABLE(ht))
Expand Down Expand Up @@ -1226,30 +1232,103 @@ cagg_validate_query(const Query *query, const bool finalized)
errhint("Set a custom time function on the hypertable.")));
}

caggtimebucketinfo_init(&ret,
caggtimebucketinfo_init(&bucket_info,
ht->fd.id,
ht->main_table_relid,
part_dimension->column_attno,
part_dimension->fd.column_type,
part_dimension->fd.interval_length,
parent_mat_hypertable_id);

if (is_nested)
{
caggtimebucketinfo_init(&bucket_info_source,
ht->fd.id,
ht->main_table_relid,
part_dimension->column_attno,
part_dimension->fd.column_type,
part_dimension->fd.interval_length,
cagg_source->data.parent_mat_hypertable_id);
}

ts_cache_release(hcache);
}

/*check row security settings for the table */
/* check row security settings for the table */
if (ts_has_row_security(rte->relid))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot create continuous aggregate on hypertable with row security")));

/* we need a GROUP By clause with time_bucket on the partitioning
/*
* We need a GROUP By clause with time_bucket on the partitioning
* column of the hypertable
*/
Assert(query->groupClause);
caggtimebucket_validate(&bucket_info, query->groupClause, query->targetList);

/* nested cagg validations */
if (is_nested)
{
int64 bucket_width, bucket_width_source;

Assert(prev_query->groupClause);
caggtimebucket_validate(&bucket_info_source,
prev_query->groupClause,
prev_query->targetList);

/* cannot create cagg with fixed bucket on top of variable bucket */
if ((bucket_info_source.bucket_width == BUCKET_WIDTH_VARIABLE &&
bucket_info.bucket_width != BUCKET_WIDTH_VARIABLE))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot create continuous aggregate with fixed-width bucket on top of one using variable-width bucket"),
errdetail("Continuous aggregate with a fixed time bucket width (e.g. 61 days) "
"cannot be created on top of one using variable time bucket width "
"(e.g. 1 month).\n"
"The variance can lead to the fixed width one not being a multiple "
"of the variable width one.")));
}

/* if variable bucket size then get the month part for the arithmetic */
bucket_width = (bucket_info.bucket_width == BUCKET_WIDTH_VARIABLE) ?
bucket_info.interval->month :
bucket_info.bucket_width;
bucket_width_source = (bucket_info_source.bucket_width == BUCKET_WIDTH_VARIABLE) ?
bucket_info_source.interval->month :
bucket_info_source.bucket_width;

/* new bucket should be greater than the previous */
if (bucket_width <= bucket_width_source)
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot create continuous aggregate with incompatible bucket width"),
errdetail("Time bucket width of \"%s.%s\" should be greater than the time "
"bucket width of \"%s.%s\".",
cagg_schema,
cagg_name,
NameStr(cagg_source->data.user_view_schema),
NameStr(cagg_source->data.user_view_name))));
}

/* check if buckets are multiple */
if ((bucket_width % bucket_width_source) != 0)
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot create continuous aggregate with incompatible bucket width"),
errdetail("Time bucket width of \"%s.%s\" should be multiple of the time "
"bucket width of \"%s.%s\".",
cagg_schema,
cagg_name,
NameStr(cagg_source->data.user_view_schema),
NameStr(cagg_source->data.user_view_name))));
}
}

caggtimebucket_validate(&ret, query->groupClause, query->targetList);
return ret;
return bucket_info;
}

/* add ts_internal_cagg_final to bytea column.
Expand Down Expand Up @@ -2402,7 +2481,10 @@ tsl_process_continuous_agg_viewstmt(Node *node, const char *query_string, void *
errhint("Use ALTER MATERIALIZED VIEW to enable compression.")));
}

timebucket_exprinfo = cagg_validate_query((Query *) stmt->into->viewQuery, finalized);
timebucket_exprinfo = cagg_validate_query((Query *) stmt->into->viewQuery,
finalized,
get_namespace_name(nspid),
stmt->into->rel->relname);
cagg_create(stmt, &viewstmt, (Query *) stmt->query, &timebucket_exprinfo, with_clause_options);

if (!stmt->into->skipData)
Expand Down Expand Up @@ -2507,7 +2589,11 @@ cagg_rebuild_view_definition(ContinuousAgg *agg, Hypertable *mat_ht)
Relation direct_view_rel = relation_open(direct_view_oid, AccessShareLock);
Query *direct_query = copyObject(get_view_query(direct_view_rel));
remove_old_and_new_rte_from_query(direct_query);
CAggTimebucketInfo timebucket_exprinfo = cagg_validate_query(direct_query, finalized);
CAggTimebucketInfo timebucket_exprinfo =
cagg_validate_query(direct_query,
finalized,
NameStr(agg->data.user_view_schema),
NameStr(agg->data.user_view_name));

mattablecolumninfo_init(&mattblinfo, copyObject(direct_query->groupClause));
fqi.finalized = finalized;
Expand Down Expand Up @@ -2644,7 +2730,11 @@ cagg_flip_realtime_view_definition(ContinuousAgg *agg, Hypertable *mat_ht)
relation_close(direct_view_rel, NoLock);
remove_old_and_new_rte_from_query(direct_query);

CAggTimebucketInfo timebucket_exprinfo = cagg_validate_query(direct_query, agg->data.finalized);
CAggTimebucketInfo timebucket_exprinfo =
cagg_validate_query(direct_query,
agg->data.finalized,
NameStr(agg->data.user_view_schema),
NameStr(agg->data.user_view_name));

/* flip */
agg->data.materialized_only = !agg->data.materialized_only;
Expand Down
30 changes: 4 additions & 26 deletions tsl/src/continuous_aggs/options.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,34 +101,11 @@ cagg_find_groupingcols(ContinuousAgg *agg, Hypertable *mat_ht)
{
List *retlist = NIL;
ListCell *lc;
Oid cagg_view_oid;

/*
* Get the direct_view definition for the finalized version because
* the user view doesn't have the "GROUP BY" clause anymore.
*/
if (ContinuousAggIsFinalized(agg))
cagg_view_oid =
get_relname_relid(NameStr(agg->data.direct_view_name),
get_namespace_oid(NameStr(agg->data.direct_view_schema), false));
else
cagg_view_oid =
get_relname_relid(NameStr(agg->data.user_view_name),
get_namespace_oid(NameStr(agg->data.user_view_schema), false));

Relation cagg_view_rel = table_open(cagg_view_oid, AccessShareLock);
RuleLock *cagg_view_rules = cagg_view_rel->rd_rules;
Assert(cagg_view_rules && cagg_view_rules->numLocks == 1);
RewriteRule *rule = cagg_view_rules->rules[0];
if (rule->event != CMD_SELECT)
ereport(ERROR, (errcode(ERRCODE_TS_UNEXPECTED), errmsg("unexpected rule event for view")));

Query *cagg_view_query = copyObject(linitial(rule->actions));
table_close(cagg_view_rel, NoLock); /* lock with be released at end of txn*/
Query *cagg_view_query = ts_continuous_agg_get_query(agg);
Oid mat_relid = mat_ht->main_table_relid;
Query *finalize_query;
/* the view rule has dummy old and new range table entries as the 1st and 2nd entries
*/

/* The view rule has dummy old and new range table entries as the 1st and 2nd entries */
Assert(list_length(cagg_view_query->rtable) >= 2);
if (cagg_view_query->setOperations)
{
Expand All @@ -146,6 +123,7 @@ cagg_find_groupingcols(ContinuousAgg *agg, Hypertable *mat_ht)
{
finalize_query = cagg_view_query;
}

foreach (lc, finalize_query->groupClause)
{
SortGroupClause *cagg_gc = (SortGroupClause *) lfirst(lc);
Expand Down

0 comments on commit 55d4077

Please sign in to comment.