Skip to content

Commit

Permalink
Fix segfault on CAggs with multiple JOINs
Browse files Browse the repository at this point in the history
Creating or changing to realtime a Continuous Aggregate with multiple
joins was leading to a segfault.

Fixed it by dealing properly with the `varno` when creating the `Quals`
for the union view in realtime mode.

Also get rid of some left over when we relaxed the CAggs join
restrictions in #7111.
  • Loading branch information
fabriziomello committed Jul 23, 2024
1 parent d18361e commit 1e29cd1
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 195 deletions.
56 changes: 16 additions & 40 deletions tsl/src/continuous_aggs/common.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ caggtimebucketinfo_init(CAggTimebucketInfo *src, int32 hypertable_id, Oid hypert
src->htid = hypertable_id;
src->parent_mat_hypertable_id = parent_mat_hypertable_id;
src->htoid = hypertable_oid;
src->htoidparent = InvalidOid;
src->htpartcolno = hypertable_partition_colno;
src->htpartcoltype = hypertable_partition_coltype;
src->htpartcol_interval_len = hypertable_partition_col_interval;
Expand Down Expand Up @@ -1082,6 +1083,9 @@ cagg_validate_query(const Query *query, const bool finalized, const char *cagg_s
}
}

if (is_hierarchical)
bucket_info.htoidparent = cagg_parent->relid;

return bucket_info;
}

Expand Down Expand Up @@ -1332,55 +1336,27 @@ build_union_query(CAggTimebucketInfo *tbinfo, int matpartcolno, Query *q1, Query
/*
* If there is join in CAgg definition then adjust varno
* to get time column from the hypertable in the join.
*
* In case of joins it is enough to check if the first node is not RangeTblRef,
* because the jointree has RangeTblRef as leaves and JoinExpr above them.
* So if JoinExpr is present, it is the first node.
* Other cases of join i.e. without explicit JOIN clause is confirmed
* by reading the length of rtable.
*/
if (list_length(q2->rtable) == CONTINUOUS_AGG_MAX_JOIN_RELATIONS ||
!IsA(linitial(q2->jointree->fromlist), RangeTblRef))
{
Oid normal_table_id = InvalidOid;
RangeTblEntry *rte = NULL;
RangeTblEntry *rte_other = NULL;
varno = list_length(q2->rtable);

if (list_length(q2->rtable) == CONTINUOUS_AGG_MAX_JOIN_RELATIONS)
{
RangeTblRef *rtref = linitial_node(RangeTblRef, q2->jointree->fromlist);
rte = list_nth(q2->rtable, rtref->rtindex - 1);
RangeTblRef *rtref_other = lsecond_node(RangeTblRef, q2->jointree->fromlist);
rte_other = list_nth(q2->rtable, rtref_other->rtindex - 1);
}
else if (!IsA(linitial(q2->jointree->fromlist), RangeTblRef))
if (list_length(q2->rtable) > 1 || !IsA(linitial(q2->jointree->fromlist), RangeTblRef))
{
int nvarno = 1;
foreach (lc2, q2->rtable)
{
ListCell *l;
foreach (l, q2->jointree->fromlist)
RangeTblEntry *rte = lfirst_node(RangeTblEntry, lc2);
if (rte->rtekind == RTE_RELATION)
{
Node *jtnode = (Node *) lfirst(l);
JoinExpr *join = NULL;
if (IsA(jtnode, JoinExpr))
/* look for hypertable or parent hypertable in RangeTableEntry list */
if (rte->relid == tbinfo->htoid || rte->relid == tbinfo->htoidparent)
{
join = castNode(JoinExpr, jtnode);
rte = list_nth(q2->rtable, ((RangeTblRef *) join->larg)->rtindex - 1);
rte_other = list_nth(q2->rtable, ((RangeTblRef *) join->rarg)->rtindex - 1);
varno = nvarno;
break;
}
}
nvarno++;
}
if (rte->relkind == RELKIND_VIEW)
normal_table_id = rte_other->relid;
else if (rte_other->relkind == RELKIND_VIEW)
normal_table_id = rte->relid;
else
normal_table_id = ts_is_hypertable(rte->relid) ? rte_other->relid : rte->relid;
if (normal_table_id == rte->relid)
varno = 2;
else
varno = 1;
}
else
varno = list_length(q2->rtable);

q2_quals = build_union_query_quals(materialize_htid,
tbinfo->htpartcoltype,
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/continuous_aggs/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
#include "ts_catalog/catalog.h"
#include "ts_catalog/continuous_agg.h"

#define CONTINUOUS_AGG_MAX_JOIN_RELATIONS 2
#define DEFAULT_MATPARTCOLUMN_NAME "time_partition_col"
#define CAGG_INVALIDATION_THRESHOLD_NAME "invalidation threshold watermark"

Expand Down Expand Up @@ -66,6 +65,7 @@ typedef struct CAggTimebucketInfo
int32 htid; /* hypertable id */
int32 parent_mat_hypertable_id; /* parent materialization hypertable id */
Oid htoid; /* hypertable oid */
Oid htoidparent; /* parent hypertable oid in case of hierarchical */
AttrNumber htpartcolno; /* primary partitioning column of raw hypertable */
/* This should also be the column used by time_bucket */
Oid htpartcoltype; /* The collation type */
Expand Down
160 changes: 30 additions & 130 deletions tsl/src/continuous_aggs/finalize.c
Original file line number Diff line number Diff line change
Expand Up @@ -131,102 +131,17 @@ finalizequery_get_select_query(FinalizeQueryInfo *inp, List *matcollist,
ObjectAddress *mattbladdress, char *relname)
{
Query *final_selquery = NULL;
ListCell *lc;
FromExpr *fromexpr;
RangeTblEntry *rte;
#if PG16_GE
RTEPermissionInfo *perminfo;
#endif

CAGG_MAKEQUERY(final_selquery, inp->final_userquery);
final_selquery->hasAggs = !inp->finalized;

/*
* For initial cagg creation rtable will have only 1 entry,
* for alter table rtable will have multiple entries with our
* RangeTblEntry as last member.
* For cagg with joins, we need to create a new RTE and jointree
* which contains the information of the materialised hypertable
* that is created for this cagg.
*/
if (list_length(inp->final_userquery->jointree->fromlist) >=
CONTINUOUS_AGG_MAX_JOIN_RELATIONS ||
!IsA(linitial(inp->final_userquery->jointree->fromlist), RangeTblRef))
{
rte = makeNode(RangeTblEntry);
rte->alias = makeAlias(relname, NIL);
rte->inFromCl = true;
rte->inh = true;
rte->rellockmode = 1;
rte->eref = copyObject(rte->alias);
rte->relid = mattbladdress->objectId;
#if PG16_GE
perminfo = addRTEPermissionInfo(&final_selquery->rteperminfos, rte);
perminfo->selectedCols = NULL;
#endif
ListCell *l;
foreach (l, inp->final_userquery->jointree->fromlist)
{
/*
* In case of joins, update the rte with all the join related struct.
*/
Node *jtnode = (Node *) lfirst(l);
JoinExpr *join = NULL;
if (IsA(jtnode, JoinExpr))
{
join = castNode(JoinExpr, jtnode);
RangeTblEntry *jrte = rt_fetch(join->rtindex, inp->final_userquery->rtable);
rte->joinaliasvars = jrte->joinaliasvars;
rte->jointype = jrte->jointype;
rte->joinleftcols = jrte->joinleftcols;
rte->joinrightcols = jrte->joinrightcols;
rte->joinmergedcols = jrte->joinmergedcols;
rte->join_using_alias = jrte->join_using_alias;
#if PG16_LT
rte->selectedCols = jrte->selectedCols;
#else
if (jrte->perminfoindex > 0)
{
RTEPermissionInfo *jperminfo =
getRTEPermissionInfo(inp->final_userquery->rteperminfos, jrte);
perminfo->selectedCols = jperminfo->selectedCols;
}
#endif
}
}
}
else
{
rte = llast_node(RangeTblEntry, inp->final_userquery->rtable);
rte->eref->colnames = NIL;
#if PG16_LT
rte->selectedCols = NULL;
#else
perminfo = getRTEPermissionInfo(inp->final_userquery->rteperminfos, rte);
perminfo->selectedCols = NULL;
#endif
}
if (rte->eref->colnames == NIL)
{
/*
* We only need to do this for the case when there is no Join node in the query.
* In the case of join, rte->eref is already populated by jrte->eref and hence the
* relevant info, so need not to do this.
*/
/* New RangeTblEntry for the materialization hypertable */
RangeTblEntry *rte = makeNode(RangeTblEntry);
rte->inFromCl = true;
rte->inh = true;
rte->rellockmode = 1;
rte->eref = makeAlias(relname, NIL);

/* Aliases for column names for the materialization table. */
foreach (lc, matcollist)
{
ColumnDef *cdef = lfirst_node(ColumnDef, lc);
rte->eref->colnames = lappend(rte->eref->colnames, makeString(cdef->colname));
int attno = list_length(rte->eref->colnames) - FirstLowInvalidHeapAttributeNumber;
#if PG16_LT
rte->selectedCols = bms_add_member(rte->selectedCols, attno);
#else
perminfo->selectedCols = bms_add_member(perminfo->selectedCols, attno);
#endif
}
}
rte->relid = mattbladdress->objectId;
rte->rtekind = RTE_RELATION;
rte->relkind = RELKIND_RELATION;
Expand All @@ -236,16 +151,33 @@ finalizequery_get_select_query(FinalizeQueryInfo *inp, List *matcollist,
rte->insertedCols = NULL;
rte->updatedCols = NULL;
#else
RTEPermissionInfo *perminfo = addRTEPermissionInfo(&final_selquery->rteperminfos, rte);
perminfo->selectedCols = NULL;
perminfo->relid = mattbladdress->objectId;
perminfo->requiredPerms |= ACL_SELECT;
perminfo->insertedCols = NULL;
perminfo->updatedCols = NULL;
#endif

/* 2. Fixup targetlist with the correct rel information. */
/* Aliases for column names for the materialization hypertable. */
ListCell *lc;
int attno = 0;
foreach (lc, matcollist)
{
ColumnDef *cdef = lfirst_node(ColumnDef, lc);
rte->eref->colnames = lappend(rte->eref->colnames, makeString(cdef->colname));
attno = list_length(rte->eref->colnames) - FirstLowInvalidHeapAttributeNumber;
#if PG16_LT
rte->selectedCols = bms_add_member(rte->selectedCols, attno);
#else
perminfo->selectedCols = bms_add_member(perminfo->selectedCols, attno);
#endif
}

/* Fixup targetlist with the correct rel information. */
foreach (lc, inp->final_seltlist)
{
TargetEntry *tle = (TargetEntry *) lfirst(lc);
TargetEntry *tle = lfirst_node(TargetEntry, lc);
/*
* In case when this is a cagg with joins, the Var from the normal table
* already has resorigtbl populated and we need to use that to resolve
Expand All @@ -255,50 +187,18 @@ finalizequery_get_select_query(FinalizeQueryInfo *inp, List *matcollist,
if (IsA(tle->expr, Var) && !OidIsValid(tle->resorigtbl))
{
tle->resorigtbl = rte->relid;
tle->resorigcol = ((Var *) tle->expr)->varattno;
tle->resorigcol = castNode(Var, tle->expr)->varattno;
}
}

if (list_length(inp->final_userquery->jointree->fromlist) >=
CONTINUOUS_AGG_MAX_JOIN_RELATIONS ||
!IsA(linitial(inp->final_userquery->jointree->fromlist), RangeTblRef))
{
RangeTblRef *rtr;
final_selquery->rtable = list_make1(rte);
#if PG16_GE
/* perminfo has been set already in the previous if/else */
Assert(list_length(final_selquery->rteperminfos) == 1);
#endif
rtr = makeNode(RangeTblRef);
rtr->rtindex = 1;
fromexpr = makeFromExpr(list_make1(rtr), NULL);
}
else
{
final_selquery->rtable = inp->final_userquery->rtable;
#if PG16_GE
final_selquery->rteperminfos = inp->final_userquery->rteperminfos;
#endif
fromexpr = inp->final_userquery->jointree;
fromexpr->quals = NULL;
}

/*
* Fixup from list. No quals on original table should be
* present here - they should be on the query that populates
* the mattable (partial_selquery). For the Cagg with join,
* we can not copy the fromlist from inp->final_userquery as
* it has two tables in this case.
*/
Assert(list_length(inp->final_userquery->jointree->fromlist) <=
CONTINUOUS_AGG_MAX_JOIN_RELATIONS);
RangeTblRef *rtr = makeNode(RangeTblRef);
rtr->rtindex = 1;

final_selquery->jointree = fromexpr;
final_selquery->rtable = list_make1(rte);
final_selquery->jointree = makeFromExpr(list_make1(rtr), NULL);
final_selquery->targetList = inp->final_seltlist;
final_selquery->sortClause = inp->final_userquery->sortClause;

/* Already finalized query no need to copy group by or having clause. */

return final_selquery;
}

Expand Down
Loading

0 comments on commit 1e29cd1

Please sign in to comment.