Skip to content

Commit

Permalink
temp addition
Browse files Browse the repository at this point in the history
  • Loading branch information
gayyappan committed Aug 11, 2021
1 parent dadb75c commit eb9a60d
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 80 deletions.
157 changes: 78 additions & 79 deletions tsl/src/continuous_aggs/create.c
Original file line number Diff line number Diff line change
Expand Up @@ -1280,6 +1280,25 @@ mattablecolumninfo_addinternal(MatTableColumnInfo *matcolinfo, RangeTblEntry *us
matcolinfo->partial_grouplist = lappend(matcolinfo->partial_grouplist, grpcl);
}

static Aggref *
add_partialize_column(Aggref *agg_to_partialize, AggPartCxt *cxt)
{
Aggref *newagg;
Var *var;
/* step 1: create partialize( aggref) column
* for materialization table */
var = mattablecolumninfo_addentry(cxt->mattblinfo,
(Node *) agg_to_partialize,
cxt->original_query_resno);
cxt->addcol = true;
/* step 2: create finalize_agg expr using var
* for the clumn added to the materialization table
*/
/* This is a var for the column we created */
newagg = get_finalize_aggref(agg_to_partialize, var);
return newagg;
}

static Node *
add_aggregate_partialize_mutator(Node *node, AggPartCxt *cxt)
{
Expand All @@ -1294,61 +1313,54 @@ add_aggregate_partialize_mutator(Node *node, AggPartCxt *cxt)
*/
if (IsA(node, Aggref))
{
Aggref *newagg;
Var *var;

if (cxt->ignore_aggoid == ((Aggref *) node)->aggfnoid)
return node; /*don't process this further */

/* step 1: create partialize( aggref) column
* for materialization table */
var = mattablecolumninfo_addentry(cxt->mattblinfo, node, cxt->original_query_resno);
cxt->addcol = true;
/* step 2: create finalize_agg expr using var
* for the clumn added to the materialization table
*/
/* This is a var for the column we created */
newagg = get_finalize_aggref((Aggref *) node, var);
Aggref *newagg = add_partialize_column((Aggref *) node, cxt);
return (Node *) newagg;
}
return expression_tree_mutator(node, add_aggregate_partialize_mutator, cxt);
}

/* This code modifies modquery */
/* having clause needs transformation
* original query is
* select a, count(b), min(c)
* from ..
* group by a
* having a> 10 or count(b) > 20 or min(d) = 4
* we get a mat table
* a, partial(countb), partial(minc) after processing
* the target list. We need to add entries from the having clause
* so the modified mat table is
* a, partial(count), partial(minc), partial(mind)
* and the new select from the mat table is
* i.e. we have
* select col1, finalize(col2), finalize(col3)
* from ..
* group by col1
* having col1 > 10 or finalize(col2) > 20 or finalize(col4) = 4
* Note: col# = corresponding column from the mat table
*/
typedef struct Cagg_havingcxt
{
List *orig_tlist;
List *mod_tlist;
bool found;
AggPartCxt agg_cxt;
} cagg_havingcxt;

/* if we find a target entry expr that matches the node , then replace it with the
* expression from new target entry.
/* This function modifies the passed in havingQual by
* mapping exprs to their columns in materialization table.
* or finalized aggregate form.
* Note that HAVING clause can contain only exprs from group-by or aggregates
* and GROUP BY clauses cannot be aggregates.
* (By the time we process havingQuals, all the group by exprs have been processed
* and have associated columns in the materialization hypertable).
* Example, if the original query has
* GROUP BY colA + colB, colC
* HAVING colA + colB + sum(colD) > 10 OR count(colE) = 10
*
* The transformed havingqual would be
* HAVING matCol3 + finalize_agg( sum(matCol4) > 10
* OR finalize_agg( count(matCol5)) = 10
*
*
* Note: GROUP BY exprs always appear in the query's targetlist.
* Some of the aggregates from the havingQual might also already appear in the targetlist.
* We replace all existing entries with their corresponding entry from the modified targetlist.
* If an aggregate (in the havingqual) does not exist in the TL, we create a
* materialization table column for it and use the finalize(column) form in the
* transformed havingQual.
*/
static Node *
replace_having_qual_mutator(Node *node, cagg_havingcxt *cxt)
create_replace_having_qual_mutator(Node *node, cagg_havingcxt *cxt)
{
if (node == NULL)
return NULL;
/* See if we already have a column in materialization hypertable for this
* expr. We do this by checking the existing targetlist
* entries for the query.
*/
ListCell *lc, *lc2;
List *origtlist = cxt->orig_tlist;
List *modtlist = cxt->mod_tlist;
Expand All @@ -1358,47 +1370,42 @@ replace_having_qual_mutator(Node *node, cagg_havingcxt *cxt)
TargetEntry *modte = (TargetEntry *) lfirst(lc2);
if (equal(node, te->expr))
{
cxt->found = true;
// cxt->found = true;
return (Node *) modte->expr;
}
}
return expression_tree_mutator(node, replace_having_qual_mutator, cxt);
/* didn't find a match in targetlist. If it is an aggregate, create a partialize column for
* it in materialization hypertable and return corresponding finalize
* expr.
*/
if (IsA(node, Aggref))
{
AggPartCxt *agg_cxt = &(cxt->agg_cxt);
agg_cxt->addcol = false;
Aggref *newagg = add_partialize_column((Aggref *) node, agg_cxt);
Assert(agg_cxt->addcol == true);
return (Node *) newagg;
}
return expression_tree_mutator(node, create_replace_having_qual_mutator, cxt);
}

/* modify the havingqual and replace exprs (in havingqual) that already occur
* in targetlist with entries from new target list
* Arguments:
* origquery : Query whose havingqual will be modified.
* newtlist : fixed up targetlist for origquery (the origquery's tlist is mapped
* to columns from materialized hypertable). There is a 1-1 mapping
* between origquery->targetList and newtlist
* RETURNS: havingQual
* Example:
* SELECT x, count(x) , time_bucket(...) ---->origquery's tlist
* FROM ...
* HAVING count(x) > 10 and sum(y) > 11 --->havingQual
*
* modtlist would have entries corresponding to the finalize query's tlist:
* x , finalize(count(x)), timebucket,
* which look more like: col1, finalize(col1), col2 where the col# correspond to the
* columns from the materialization hypertable.
* (see comments for cagg_havingcxt )
* count(x) : already appears in the targetlist of the query and should be
* replaced by the corresponding entry from modtlist. We have to compare count(x)
* with all the entries in the tlist so that we do not match subexprs when the
* complete expr (e.g do not match x instead of count(x) ) is in the tlist.
* (issue 2655)
*/
static Node *
replace_targetentry_in_havingqual(Query *origquery, List *newtlist)
finalizequery_create_havingqual(FinalizeQueryInfo *inp, MatTableColumnInfo *mattblinfo)
{
Node *having = copyObject(origquery->havingQual);
cagg_havingcxt hcxt;
hcxt.orig_tlist = origquery->targetList;
hcxt.mod_tlist = newtlist;
hcxt.found = false;
having = (Node *) expression_tree_mutator((Node *) having, replace_having_qual_mutator, &hcxt);
return having;
Query *orig_query = inp->final_userquery;
if (orig_query->havingQual == NULL)
{
return NULL;
}
Node *havingQual = copyObject(orig_query->havingQual);
Assert(inp->final_seltlist != NULL);
cagg_havingcxt hcxt = { .orig_tlist = orig_query->targetList,
.mod_tlist = inp->final_seltlist };
hcxt.agg_cxt.mattblinfo = mattblinfo;
hcxt.agg_cxt.original_query_resno = 0;
hcxt.agg_cxt.ignore_aggoid = get_finalizefnoid();
hcxt.agg_cxt.addcol = false;
return create_replace_having_qual_mutator(havingQual, &hcxt);
}

/*
Expand All @@ -1417,7 +1424,6 @@ finalizequery_init(FinalizeQueryInfo *inp, Query *orig_query, MatTableColumnInfo
{
AggPartCxt cxt;
ListCell *lc;
Node *newhavingQual;
int resno = 1;

inp->final_userquery = copyObject(orig_query);
Expand Down Expand Up @@ -1482,14 +1488,7 @@ finalizequery_init(FinalizeQueryInfo *inp, Query *orig_query, MatTableColumnInfo
}
/* all grouping clause elements are in targetlist already.
so let's check the having clause */
newhavingQual = replace_targetentry_in_havingqual(inp->final_userquery, inp->final_seltlist);
/* we might still have aggs in havingqual which don't appear in the targetlist , but don't
* overwrite finalize_agg exprs that we have in the havingQual*/
cxt.addcol = false;
cxt.ignore_aggoid = get_finalizefnoid();
cxt.original_query_resno = 0;
inp->final_havingqual =
expression_tree_mutator((Node *) newhavingQual, add_aggregate_partialize_mutator, &cxt);
inp->final_havingqual = finalizequery_create_havingqual(inp, mattblinfo);
}

/* Create select query with the finalize aggregates
Expand Down
23 changes: 23 additions & 0 deletions tsl/test/expected/continuous_aggs.out
Original file line number Diff line number Diff line change
Expand Up @@ -1426,6 +1426,7 @@ NOTICE: adding not-null constraint to column "time"
(1 row)

insert into raw_data select '2000-01-01','Q1';
--having has exprs that appear in select
CREATE MATERIALIZED VIEW search_query_count_1m WITH (timescaledb.continuous)
AS
SELECT search_query,count(search_query) as count,
Expand All @@ -1434,6 +1435,18 @@ AS
WHERE search_query is not null AND LENGTH(TRIM(both from search_query))>0
GROUP BY search_query, bucket HAVING count(search_query) > 3 OR sum(cnt) > 1;
NOTICE: refreshing continuous aggregate "search_query_count_1m"
--having has aggregates + grp by columns that appear in select
CREATE MATERIALIZED VIEW search_query_count_2 WITH (timescaledb.continuous)
AS
SELECT search_query,count(search_query) as count, sum(cnt),
time_bucket(INTERVAL '1 minute', time) AS bucket
FROM raw_data
WHERE search_query is not null AND LENGTH(TRIM(both from search_query))>0
GROUP BY search_query, bucket
HAVING count(search_query) > 3 OR sum(cnt) > 1 OR
( sum(cnt) + count(cnt)) > 1
AND search_query = 'Q1';
NOTICE: refreshing continuous aggregate "search_query_count_2"
insert into raw_data select '2000-01-01 00:00+0','Q1', 1;
insert into raw_data select '2000-01-01 00:00+0','Q1', 2;
insert into raw_data select '2000-01-01 00:00+0','Q1', 3;
Expand All @@ -1459,3 +1472,13 @@ SELECT * FROM search_query_count_1m ORDER BY 1, 2;
Q4 | 1 | Sun Jan 02 16:00:00 2000 PST
(3 rows)

--refresh search_query_count_2---
CALL refresh_continuous_aggregate('search_query_count_2', NULL, NULL);
SELECT * FROM search_query_count_2 ORDER BY 1, 2;
search_query | count | sum | bucket
--------------+-------+-----+------------------------------
Q1 | 3 | 6 | Fri Dec 31 16:00:00 1999 PST
Q2 | 2 | 30 | Sat Jan 01 16:00:00 2000 PST
Q4 | 1 | 20 | Sun Jan 02 16:00:00 2000 PST
(3 rows)

17 changes: 16 additions & 1 deletion tsl/test/sql/continuous_aggs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1034,6 +1034,7 @@ create table raw_data(time timestamptz, search_query text, cnt integer);
select create_hypertable('raw_data','time');
insert into raw_data select '2000-01-01','Q1';

--having has exprs that appear in select
CREATE MATERIALIZED VIEW search_query_count_1m WITH (timescaledb.continuous)
AS
SELECT search_query,count(search_query) as count,
Expand All @@ -1042,6 +1043,18 @@ AS
WHERE search_query is not null AND LENGTH(TRIM(both from search_query))>0
GROUP BY search_query, bucket HAVING count(search_query) > 3 OR sum(cnt) > 1;

--having has aggregates + grp by columns that appear in select
CREATE MATERIALIZED VIEW search_query_count_2 WITH (timescaledb.continuous)
AS
SELECT search_query,count(search_query) as count, sum(cnt),
time_bucket(INTERVAL '1 minute', time) AS bucket
FROM raw_data
WHERE search_query is not null AND LENGTH(TRIM(both from search_query))>0
GROUP BY search_query, bucket
HAVING count(search_query) > 3 OR sum(cnt) > 1 OR
( sum(cnt) + count(cnt)) > 1
AND search_query = 'Q1';

insert into raw_data select '2000-01-01 00:00+0','Q1', 1;
insert into raw_data select '2000-01-01 00:00+0','Q1', 2;
insert into raw_data select '2000-01-01 00:00+0','Q1', 3;
Expand All @@ -1058,4 +1071,6 @@ insert into raw_data select '2000-01-03 00:00+0','Q4', 20;
CALL refresh_continuous_aggregate('search_query_count_1m', NULL, NULL);
SELECT * FROM search_query_count_1m ORDER BY 1, 2;


--refresh search_query_count_2---
CALL refresh_continuous_aggregate('search_query_count_2', NULL, NULL);
SELECT * FROM search_query_count_2 ORDER BY 1, 2;

0 comments on commit eb9a60d

Please sign in to comment.