Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix havingqual processing for caggs #3430

Merged
merged 1 commit into from
Aug 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@
`psql` with the `-X` flag to prevent any `.psqlrc` commands from
accidentally triggering the load of a previous DB version.**

## Unreleased


**Bugfixes**
* #3430 Fix havingqual processing for continuous aggregates

**Thanks**
* @brianbenns for reporting a segfault with continuous aggregates

## 2.4.0 (2021-07-29)

This release adds new experimental features since the 2.3.1 release.
Expand Down
168 changes: 89 additions & 79 deletions tsl/src/continuous_aggs/create.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* The entry point for the code is
* tsl_process_continuous_agg_viewstmt
* The bulk of the code that creates the underlying tables/views etc. is in
* cagg_create
* cagg_create.
*
*/
#include <postgres.h>
Expand Down Expand Up @@ -1280,6 +1280,25 @@ mattablecolumninfo_addinternal(MatTableColumnInfo *matcolinfo, RangeTblEntry *us
matcolinfo->partial_grouplist = lappend(matcolinfo->partial_grouplist, grpcl);
}

static Aggref *
add_partialize_column(Aggref *agg_to_partialize, AggPartCxt *cxt)
{
Aggref *newagg;
Var *var;
/* step 1: create partialize( aggref) column
* for materialization table */
var = mattablecolumninfo_addentry(cxt->mattblinfo,
(Node *) agg_to_partialize,
cxt->original_query_resno);
cxt->addcol = true;
/* step 2: create finalize_agg expr using var
* for the clumn added to the materialization table
*/
/* This is a var for the column we created */
newagg = get_finalize_aggref(agg_to_partialize, var);
return newagg;
}

static Node *
add_aggregate_partialize_mutator(Node *node, AggPartCxt *cxt)
{
Expand All @@ -1294,96 +1313,95 @@ add_aggregate_partialize_mutator(Node *node, AggPartCxt *cxt)
*/
if (IsA(node, Aggref))
{
Aggref *newagg;
Var *var;

if (cxt->ignore_aggoid == ((Aggref *) node)->aggfnoid)
return node; /*don't process this further */

/* step 1: create partialize( aggref) column
* for materialization table */
var = mattablecolumninfo_addentry(cxt->mattblinfo, node, cxt->original_query_resno);
cxt->addcol = true;
/* step 2: create finalize_agg expr using var
* for the clumn added to the materialization table
*/
/* This is a var for the column we created */
newagg = get_finalize_aggref((Aggref *) node, var);
Aggref *newagg = add_partialize_column((Aggref *) node, cxt);
return (Node *) newagg;
}
return expression_tree_mutator(node, add_aggregate_partialize_mutator, cxt);
}

/* This code modifies modquery */
/* having clause needs transformation
* original query is
* select a, count(b), min(c)
* from ..
* group by a
* having a> 10 or count(b) > 20 or min(d) = 4
* we get a mat table
* a, partial(countb), partial(minc) after processing
* the target list. We need to add entries from the having clause
* so the modified mat table is
* a, partial(count), partial(minc), partial(mind)
* and the new select from the mat table is
* i.e. we have
* select col1, finalize(col2), finalize(col3)
* from ..
* group by col1
* having col1 > 10 or finalize(col2) > 20 or finalize(col4) = 4
* Note: col# = corresponding column from the mat table
*/
typedef struct Cagg_havingcxt
{
TargetEntry *old;
TargetEntry *new;
bool found;
List *origq_tlist;
List *finalizeq_tlist;
AggPartCxt agg_cxt;
} cagg_havingcxt;

/* if we find a target entry expr that matches the node , then replace it with the
* expression from new target entry.
/* This function modifies the passed in havingQual by mapping exprs to
* columns in materialization table or finalized aggregate form.
* Note that HAVING clause can contain only exprs from group-by or aggregates
* and GROUP BY clauses cannot be aggregates.
* (By the time we process havingQuals, all the group by exprs have been
* processed and have associated columns in the materialization hypertable).
* Example, if the original query has
* GROUP BY colA + colB, colC
* HAVING colA + colB + sum(colD) > 10 OR count(colE) = 10
*
* The transformed havingqual would be
* HAVING matCol3 + finalize_agg( sum(matCol4) > 10
* OR finalize_agg( count(matCol5)) = 10
*
*
* Note: GROUP BY exprs always appear in the query's targetlist.
* Some of the aggregates from the havingQual might also already appear in the targetlist.
* We replace all existing entries with their corresponding entry from the modified targetlist.
* If an aggregate (in the havingqual) does not exist in the TL, we create a
* materialization table column for it and use the finalize(column) form in the
* transformed havingQual.
*/
static Node *
replace_having_qual_mutator(Node *node, cagg_havingcxt *cxt)
create_replace_having_qual_mutator(Node *node, cagg_havingcxt *cxt)
{
if (node == NULL)
return NULL;
if (equal(node, cxt->old->expr))
{
cxt->found = true;
return (Node *) cxt->new->expr;
}
return expression_tree_mutator(node, replace_having_qual_mutator, cxt);
}

/* modify the havingqual and replace exprs that already occur in targetlist
* with entries from new target list
* RETURNS: havingQual
*/
static Node *
replace_targetentry_in_havingqual(Query *origquery, List *newtlist)
{
Node *having = copyObject(origquery->havingQual);
List *origtlist = origquery->targetList;
List *modtlist = newtlist;
ListCell *lc, *lc2;
cagg_havingcxt hcxt;

/* if we have any exprs that are in the targetlist, then we already have columns
* for them in the mat table. So replace with the correct expr
/* See if we already have a column in materialization hypertable for this
* expr. We do this by checking the existing targetlist
* entries for the query.
*/
ListCell *lc, *lc2;
List *origtlist = cxt->origq_tlist;
List *modtlist = cxt->finalizeq_tlist;
forboth (lc, origtlist, lc2, modtlist)
{
TargetEntry *te = (TargetEntry *) lfirst(lc);
TargetEntry *modte = (TargetEntry *) lfirst(lc2);
hcxt.old = te;
hcxt.new = modte;
hcxt.found = false;
having =
(Node *) expression_tree_mutator((Node *) having, replace_having_qual_mutator, &hcxt);
if (equal(node, te->expr))
{
return (Node *) modte->expr;
}
}
return having;
/* didn't find a match in targetlist. If it is an aggregate, create a partialize column for
* it in materialization hypertable and return corresponding finalize
* expr.
*/
if (IsA(node, Aggref))
{
AggPartCxt *agg_cxt = &(cxt->agg_cxt);
agg_cxt->addcol = false;
Aggref *newagg = add_partialize_column((Aggref *) node, agg_cxt);
Assert(agg_cxt->addcol == true);
return (Node *) newagg;
}
return expression_tree_mutator(node, create_replace_having_qual_mutator, cxt);
}

static Node *
finalizequery_create_havingqual(FinalizeQueryInfo *inp, MatTableColumnInfo *mattblinfo)
{
Query *orig_query = inp->final_userquery;
if (orig_query->havingQual == NULL)
return NULL;
Node *havingQual = copyObject(orig_query->havingQual);
Assert(inp->final_seltlist != NULL);
cagg_havingcxt hcxt = { .origq_tlist = orig_query->targetList,
.finalizeq_tlist = inp->final_seltlist,
.agg_cxt.mattblinfo = mattblinfo,
.agg_cxt.original_query_resno = 0,
.agg_cxt.ignore_aggoid = get_finalizefnoid(),
.agg_cxt.addcol = false };
return create_replace_having_qual_mutator(havingQual, &hcxt);
}

/*
Expand All @@ -1402,7 +1420,6 @@ finalizequery_init(FinalizeQueryInfo *inp, Query *orig_query, MatTableColumnInfo
{
AggPartCxt cxt;
ListCell *lc;
Node *newhavingQual;
int resno = 1;

inp->final_userquery = copyObject(orig_query);
Expand Down Expand Up @@ -1467,14 +1484,7 @@ finalizequery_init(FinalizeQueryInfo *inp, Query *orig_query, MatTableColumnInfo
}
/* all grouping clause elements are in targetlist already.
so let's check the having clause */
newhavingQual = replace_targetentry_in_havingqual(inp->final_userquery, inp->final_seltlist);
/* we might still have aggs in havingqual which don't appear in the targetlist , but don't
* overwrite finalize_agg exprs that we have in the havingQual*/
cxt.addcol = false;
cxt.ignore_aggoid = get_finalizefnoid();
cxt.original_query_resno = 0;
inp->final_havingqual =
expression_tree_mutator((Node *) newhavingQual, add_aggregate_partialize_mutator, &cxt);
inp->final_havingqual = finalizequery_create_havingqual(inp, mattblinfo);
}

/* Create select query with the finalize aggregates
Expand Down Expand Up @@ -1583,7 +1593,7 @@ fixup_userview_query_tlist(Query *userquery, List *tlist_aliases)
*
* Step 1. create a materialiation table which stores the partials for the
* aggregates and the grouping columns + internal columns.
* So we have a table like ts_internal_mcagg_tab
* So we have a table like _materialization_hypertable
* with columns:
*( a, col1, col2, col3, internal-columns)
* where col1 = partialize(min(b)), col2= partialize(max(d)),
Expand All @@ -1593,7 +1603,7 @@ fixup_userview_query_tlist(Query *userquery, List *tlist_aliases)
* CREATE VIEW mcagg
* as
* select a, finalize( col1) + finalize(col2))
* from ts_internal_mcagg
* from _materialization_hypertable
* group by a, col3
*
* Step 3: Create a view to populate the materialization table
Expand Down
89 changes: 89 additions & 0 deletions tsl/test/expected/continuous_aggs.out
Original file line number Diff line number Diff line change
Expand Up @@ -1415,3 +1415,92 @@ NOTICE: drop cascades to 6 other objects
NOTICE: drop cascades to table _timescaledb_internal._hyper_35_75_chunk
NOTICE: drop cascades to table _timescaledb_internal._hyper_36_76_chunk
NOTICE: drop cascades to table _timescaledb_internal._hyper_37_77_chunk
----
--- github issue 2655 ---
create table raw_data(time timestamptz, search_query text, cnt integer, cnt2 integer);
select create_hypertable('raw_data','time');
NOTICE: adding not-null constraint to column "time"
create_hypertable
------------------------
(38,public,raw_data,t)
(1 row)

insert into raw_data select '2000-01-01','Q1';
--having has exprs that appear in select
CREATE MATERIALIZED VIEW search_query_count_1m WITH (timescaledb.continuous)
AS
SELECT search_query,count(search_query) as count,
time_bucket(INTERVAL '1 minute', time) AS bucket
FROM raw_data
WHERE search_query is not null AND LENGTH(TRIM(both from search_query))>0
GROUP BY search_query, bucket HAVING count(search_query) > 3 OR sum(cnt) > 1;
NOTICE: refreshing continuous aggregate "search_query_count_1m"
--having has aggregates + grp by columns that appear in select
CREATE MATERIALIZED VIEW search_query_count_2 WITH (timescaledb.continuous)
AS
SELECT search_query,count(search_query) as count, sum(cnt),
time_bucket(INTERVAL '1 minute', time) AS bucket
FROM raw_data
WHERE search_query is not null AND LENGTH(TRIM(both from search_query))>0
GROUP BY search_query, bucket
HAVING count(search_query) > 3 OR sum(cnt) > 1 OR
( sum(cnt) + count(cnt)) > 1
AND search_query = 'Q1';
NOTICE: refreshing continuous aggregate "search_query_count_2"
CREATE MATERIALIZED VIEW search_query_count_3 WITH (timescaledb.continuous)
AS
SELECT search_query,count(search_query) as count, sum(cnt),
time_bucket(INTERVAL '1 minute', time) AS bucket
FROM raw_data
WHERE search_query is not null AND LENGTH(TRIM(both from search_query))>0
GROUP BY cnt +cnt2 , bucket, search_query
HAVING cnt + cnt2 + sum(cnt) > 2 or count(cnt2) > 10;
NOTICE: refreshing continuous aggregate "search_query_count_3"
insert into raw_data select '2000-01-01 00:00+0','Q1', 1, 100;
insert into raw_data select '2000-01-01 00:00+0','Q1', 2, 200;
insert into raw_data select '2000-01-01 00:00+0','Q1', 3, 300;
insert into raw_data select '2000-01-02 00:00+0','Q2', 10, 10;
insert into raw_data select '2000-01-02 00:00+0','Q2', 20, 20;
CALL refresh_continuous_aggregate('search_query_count_1m', NULL, NULL);
SELECT * FROM search_query_count_1m ORDER BY 1, 2;
search_query | count | bucket
--------------+-------+------------------------------
Q1 | 3 | Fri Dec 31 16:00:00 1999 PST
Q2 | 2 | Sat Jan 01 16:00:00 2000 PST
(2 rows)

--only 1 of these should appear in the result
insert into raw_data select '2000-01-02 00:00+0','Q3', 0, 0;
insert into raw_data select '2000-01-03 00:00+0','Q4', 20, 20;
CALL refresh_continuous_aggregate('search_query_count_1m', NULL, NULL);
SELECT * FROM search_query_count_1m ORDER BY 1, 2;
search_query | count | bucket
--------------+-------+------------------------------
Q1 | 3 | Fri Dec 31 16:00:00 1999 PST
Q2 | 2 | Sat Jan 01 16:00:00 2000 PST
Q4 | 1 | Sun Jan 02 16:00:00 2000 PST
(3 rows)

--refresh search_query_count_2---
CALL refresh_continuous_aggregate('search_query_count_2', NULL, NULL);
SELECT * FROM search_query_count_2 ORDER BY 1, 2;
search_query | count | sum | bucket
--------------+-------+-----+------------------------------
Q1 | 3 | 6 | Fri Dec 31 16:00:00 1999 PST
Q2 | 2 | 30 | Sat Jan 01 16:00:00 2000 PST
Q4 | 1 | 20 | Sun Jan 02 16:00:00 2000 PST
(3 rows)

--refresh search_query_count_3---
CALL refresh_continuous_aggregate('search_query_count_3', NULL, NULL);
SELECT * FROM search_query_count_3 ORDER BY 1, 2, 3;
search_query | count | sum | bucket
--------------+-------+-----+------------------------------
Q1 | 1 | 1 | Fri Dec 31 16:00:00 1999 PST
Q1 | 1 | 2 | Fri Dec 31 16:00:00 1999 PST
Q1 | 1 | 3 | Fri Dec 31 16:00:00 1999 PST
Q2 | 1 | 10 | Sat Jan 01 16:00:00 2000 PST
Q2 | 1 | 20 | Sat Jan 01 16:00:00 2000 PST
Q4 | 1 | 20 | Sun Jan 02 16:00:00 2000 PST
(6 rows)

Loading