New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support WHERE and HAVING clauses for real time aggregation #1868
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -1500,7 +1500,7 @@ finalizequery_get_select_query(FinalizeQueryInfo *inp, List *matcollist, | |||||||||||||||||
} | ||||||||||||||||||
rte->insertedCols = NULL; | ||||||||||||||||||
rte->updatedCols = NULL; | ||||||||||||||||||
result = makeWholeRowVar(rte, list_length(inp->final_userquery->rtable), 0, true); | ||||||||||||||||||
result = makeWholeRowVar(rte, 1, 0, true); | ||||||||||||||||||
result->location = 0; | ||||||||||||||||||
markVarForSelectPriv(NULL, result, rte); | ||||||||||||||||||
/* 2. Fixup targetlist with the correct rel information */ | ||||||||||||||||||
|
@@ -1596,6 +1596,25 @@ fixup_userview_query_tlist(Query *userquery, List *tlist_aliases) | |||||||||||||||||
* Notes: ViewStmt->query is the raw parse tree | ||||||||||||||||||
* panquery is the output of running parse_anlayze( ViewStmt->query) | ||||||||||||||||||
* so don't recreate invalidation trigger. | ||||||||||||||||||
|
||||||||||||||||||
* Since 1.7, we support real time aggregation. | ||||||||||||||||||
* If real time aggregation is off i.e. materialized only, the mcagg vew is as described in Step 2. | ||||||||||||||||||
* If it is turned on | ||||||||||||||||||
* we build a union query that selects from the internal mat view and the raw hypertable | ||||||||||||||||||
* (see build_union_query for details) | ||||||||||||||||||
* CREATE VIEW mcagg | ||||||||||||||||||
* as | ||||||||||||||||||
* SELECT * from | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||
* ( SELECT a, finalize(col1) + finalize(col2) from ts_internal_mcagg_view | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||
* ---> query from Step 2 with additional where clause | ||||||||||||||||||
* WHERE timecol < materialization threshold | ||||||||||||||||||
* group by <internal-columns> , a , timebucket(a); | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||
* UNION ALL | ||||||||||||||||||
* SELECT a, min(b)+max(d) from foo ---> original view stmt | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||
* ----> with additional where clause | ||||||||||||||||||
* WHERE timecol >= materialization threshold | ||||||||||||||||||
* GROUP BY a, time_bucket(a) | ||||||||||||||||||
* ) | ||||||||||||||||||
*/ | ||||||||||||||||||
static void | ||||||||||||||||||
cagg_create(ViewStmt *stmt, Query *panquery, CAggTimebucketInfo *origquery_ht, | ||||||||||||||||||
|
@@ -1779,36 +1798,34 @@ cagg_update_view_definition(ContinuousAgg *agg, Hypertable *mat_ht, | |||||||||||||||||
Query *user_query = get_view_query(user_view_rel); | ||||||||||||||||||
relation_close(user_view_rel, AccessShareLock); | ||||||||||||||||||
|
||||||||||||||||||
Oid direct_view_oid = relation_oid(agg->data.direct_view_schema, agg->data.direct_view_name); | ||||||||||||||||||
|
||||||||||||||||||
Relation direct_view_rel = relation_open(direct_view_oid, AccessShareLock); | ||||||||||||||||||
Query *direct_query = copyObject(get_view_query(direct_view_rel)); | ||||||||||||||||||
|
||||||||||||||||||
CAggTimebucketInfo timebucket_exprinfo = cagg_validate_query(direct_query); | ||||||||||||||||||
|
||||||||||||||||||
relation_close(direct_view_rel, AccessShareLock); | ||||||||||||||||||
FinalizeQueryInfo fqi; | ||||||||||||||||||
MatTableColumnInfo mattblinfo; | ||||||||||||||||||
ObjectAddress mataddress = { | ||||||||||||||||||
.classId = RelationRelationId, | ||||||||||||||||||
.objectId = mat_ht->main_table_relid, | ||||||||||||||||||
}; | ||||||||||||||||||
|
||||||||||||||||||
Oid direct_view_oid = relation_oid(agg->data.direct_view_schema, agg->data.direct_view_name); | ||||||||||||||||||
Relation direct_view_rel = relation_open(direct_view_oid, AccessShareLock); | ||||||||||||||||||
Query *direct_query = copyObject(get_view_query(direct_view_rel)); | ||||||||||||||||||
List *rtable = direct_query->rtable; | ||||||||||||||||||
/* When a view is created (StoreViewQuery), 2 dummy rtable entries corresponding to "old" and | ||||||||||||||||||
* "new" are prepended to the rtable list. We remove these and adjust the varnos to recreate | ||||||||||||||||||
Comment on lines
+1812
to
+1813
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is not clear what "old" and "new" are here. Could you elaborate? |
||||||||||||||||||
* the original user supplied direct query | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||
*/ | ||||||||||||||||||
Assert(list_length(rtable) == 3); | ||||||||||||||||||
rtable = list_delete_first(rtable); | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we have some asserts here that rtable is actually in the shape we assume There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added. |
||||||||||||||||||
direct_query->rtable = list_delete_first(rtable); | ||||||||||||||||||
OffsetVarNodes((Node *) direct_query, -2, 0); | ||||||||||||||||||
relation_close(direct_view_rel, AccessShareLock); | ||||||||||||||||||
Assert(list_length(direct_query->rtable) == 1); | ||||||||||||||||||
CAggTimebucketInfo timebucket_exprinfo = cagg_validate_query(direct_query); | ||||||||||||||||||
|
||||||||||||||||||
mattablecolumninfo_init(&mattblinfo, NIL, NIL, copyObject(direct_query->groupClause)); | ||||||||||||||||||
finalizequery_init(&fqi, direct_query, &mattblinfo); | ||||||||||||||||||
|
||||||||||||||||||
Query *view_query = finalizequery_get_select_query(&fqi, mattblinfo.matcollist, &mataddress); | ||||||||||||||||||
|
||||||||||||||||||
/* adjust varnos in targetlist */ | ||||||||||||||||||
Assert(list_length(view_query->rtable) > 1); | ||||||||||||||||||
foreach (lc1, view_query->targetList) | ||||||||||||||||||
{ | ||||||||||||||||||
ChangeVarNodes((Node *) lfirst_node(TargetEntry, lc1), | ||||||||||||||||||
1, | ||||||||||||||||||
list_length(view_query->rtable), | ||||||||||||||||||
0); | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
if (with_clause_options[ContinuousViewOptionMaterializedOnly].parsed == BoolGetDatum(false)) | ||||||||||||||||||
view_query = build_union_query(&timebucket_exprinfo, &mattblinfo, view_query, direct_query); | ||||||||||||||||||
|
||||||||||||||||||
|
@@ -2038,6 +2055,14 @@ make_subquery_rte(Query *subquery, const char *aliasname) | |||||||||||||||||
* | ||||||||||||||||||
* q1 is the query on the materialization hypertable with the finalize call | ||||||||||||||||||
* q2 is the query on the raw hypertable which was supplied in the inital CREATE VIEW statement | ||||||||||||||||||
* returns a query as | ||||||||||||||||||
* SELECT * from ( SELECT * from q1 where <coale_qual> | ||||||||||||||||||
* UNION ALL | ||||||||||||||||||
* SELECT * from q2 where existing_qual and <coale_qual> | ||||||||||||||||||
* where coale_qual is: time < ----> (or >= ) | ||||||||||||||||||
Comment on lines
+2059
to
+2062
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||
* COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark( <htid>)), | ||||||||||||||||||
* '-infinity'::timestamp with time zone) | ||||||||||||||||||
* see build_union_quals for COALESCE clause | ||||||||||||||||||
*/ | ||||||||||||||||||
static Query * | ||||||||||||||||||
build_union_query(CAggTimebucketInfo *tbinfo, MatTableColumnInfo *mattblinfo, Query *q1, Query *q2) | ||||||||||||||||||
|
@@ -2049,6 +2074,7 @@ build_union_query(CAggTimebucketInfo *tbinfo, MatTableColumnInfo *mattblinfo, Qu | |||||||||||||||||
List *tlist = NIL; | ||||||||||||||||||
int varno; | ||||||||||||||||||
AttrNumber attno; | ||||||||||||||||||
Node *q2_quals = NULL; | ||||||||||||||||||
|
||||||||||||||||||
Assert(list_length(q1->targetList) == list_length(q2->targetList)); | ||||||||||||||||||
|
||||||||||||||||||
|
@@ -2064,11 +2090,12 @@ build_union_query(CAggTimebucketInfo *tbinfo, MatTableColumnInfo *mattblinfo, Qu | |||||||||||||||||
attno = | ||||||||||||||||||
get_attnum(tbinfo->htoid, get_attname_compat(tbinfo->htoid, tbinfo->htpartcolno, false)); | ||||||||||||||||||
varno = list_length(q2->rtable); | ||||||||||||||||||
q2->jointree->quals = build_union_query_quals(tbinfo->htid, | ||||||||||||||||||
tbinfo->htpartcoltype, | ||||||||||||||||||
get_negator(tce->lt_opr), | ||||||||||||||||||
varno, | ||||||||||||||||||
attno); | ||||||||||||||||||
q2_quals = build_union_query_quals(tbinfo->htid, | ||||||||||||||||||
tbinfo->htpartcoltype, | ||||||||||||||||||
get_negator(tce->lt_opr), | ||||||||||||||||||
varno, | ||||||||||||||||||
attno); | ||||||||||||||||||
q2->jointree->quals = make_and_qual(q2->jointree->quals, q2_quals); | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. quals are implicitly ANDed, can you elaborate why you introduce nesting here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @svenklemm Made the and explicit. Is there a reason not to do it this way? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You introduce nesting lets say we have 3 quals q1, q2, q3 the usual representation would be in a flat list AND(q1, q2, q3) you are storing them AS AND(q1, AND(q2, q3)) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. jointree->quals are sometimes not Lists. (e.g. col IN (1,2) ). Also regression tests fail in canonicalize_qual if we use lappend. |
||||||||||||||||||
|
||||||||||||||||||
Query *query = makeNode(Query); | ||||||||||||||||||
SetOperationStmt *setop = makeNode(SetOperationStmt); | ||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -436,3 +436,196 @@ SELECT * FROM boundary_view ORDER BY time_bucket; | |
40 | 400 | ||
(4 rows) | ||
|
||
---- TEST union view with WHERE, GROUP BY and HAVING clause ---- | ||
create table ht_intdata (a integer, b integer, c integer); | ||
select table_name FROM create_hypertable('ht_intdata', 'a', chunk_time_interval=> 10); | ||
NOTICE: adding not-null constraint to column "a" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. May be put |
||
table_name | ||
------------ | ||
ht_intdata | ||
(1 row) | ||
|
||
INSERT into ht_intdata values( 3 , 16 , 20); | ||
INSERT into ht_intdata values( 1 , 10 , 20); | ||
INSERT into ht_intdata values( 1 , 11 , 20); | ||
INSERT into ht_intdata values( 1 , 12 , 20); | ||
INSERT into ht_intdata values( 1 , 13 , 20); | ||
INSERT into ht_intdata values( 1 , 14 , 20); | ||
INSERT into ht_intdata values( 2 , 14 , 20); | ||
INSERT into ht_intdata values( 2 , 15 , 20); | ||
INSERT into ht_intdata values( 2 , 16 , 20); | ||
INSERT into ht_intdata values( 20 , 16 , 20); | ||
INSERT into ht_intdata values( 20 , 26 , 20); | ||
INSERT into ht_intdata values( 20 , 16 , 20); | ||
INSERT into ht_intdata values( 21 , 15 , 30); | ||
INSERT into ht_intdata values( 21 , 15 , 30); | ||
INSERT into ht_intdata values( 21 , 15 , 30); | ||
CREATE OR REPLACE FUNCTION integer_now_ht_intdata() returns int LANGUAGE SQL STABLE as $$ SELECT coalesce(max(a), 0) FROM ht_intdata $$; | ||
SELECT set_integer_now_func('ht_intdata', 'integer_now_ht_intdata'); | ||
set_integer_now_func | ||
---------------------- | ||
|
||
(1 row) | ||
|
||
CREATE OR REPLACE VIEW mat_m1( a, countb, sumbc, spreadcb, avgc ) | ||
WITH (timescaledb.continuous, timescaledb.refresh_lag = 10) | ||
AS | ||
SELECT time_bucket(1, a), count(*), sum(b+c), max(c)-min(b), avg(c)::int | ||
FROM ht_intdata | ||
WHERE b < 16 | ||
GROUP BY time_bucket(1, a) | ||
HAVING sum(c) > 50; | ||
REFRESH MATERIALIZED VIEW mat_m1; | ||
SELECT view_name, completed_threshold from timescaledb_information.continuous_aggregate_stats | ||
WHERE view_name::text like 'mat_m1'; | ||
view_name | completed_threshold | ||
-----------+--------------------- | ||
mat_m1 | 11 | ||
(1 row) | ||
|
||
--results from real time cont.agg and direct query should match | ||
SELECT time_bucket(1, a), count(*), sum(b+c), max(c)-min(b), avg(c)::int | ||
FROM ht_intdata | ||
WHERE b < 16 | ||
GROUP BY time_bucket(1, a) | ||
HAVING sum(c) > 50 | ||
ORDER BY 1; | ||
time_bucket | count | sum | ?column? | avg | ||
-------------+-------+-----+----------+----- | ||
1 | 5 | 160 | 10 | 20 | ||
21 | 3 | 135 | 15 | 30 | ||
(2 rows) | ||
|
||
SELECT * FROM mat_m1 ORDER BY 1; | ||
a | countb | sumbc | spreadcb | avgc | ||
----+--------+-------+----------+------ | ||
1 | 5 | 160 | 10 | 20 | ||
21 | 3 | 135 | 15 | 30 | ||
(2 rows) | ||
|
||
--verify that materialized only doesn't have rows with a> 20 | ||
ALTER VIEW mat_m1 SET(timescaledb.materialized_only = true); | ||
SELECT * FROM mat_m1 ORDER BY 1; | ||
a | countb | sumbc | spreadcb | avgc | ||
---+--------+-------+----------+------ | ||
1 | 5 | 160 | 10 | 20 | ||
(1 row) | ||
|
||
--again revert the view to include real time aggregates | ||
ALTER VIEW mat_m1 SET(timescaledb.materialized_only = false); | ||
INSERT into ht_intdata values( 31 , 15 , 30); | ||
INSERT into ht_intdata values( 31 , 14 , 70); | ||
--cagg was not refreshed, should include all rows | ||
SELECT * FROM mat_m1 ORDER BY 1; | ||
a | countb | sumbc | spreadcb | avgc | ||
----+--------+-------+----------+------ | ||
1 | 5 | 160 | 10 | 20 | ||
21 | 3 | 135 | 15 | 30 | ||
31 | 2 | 129 | 56 | 50 | ||
(3 rows) | ||
|
||
REFRESH MATERIALIZED VIEW mat_m1; | ||
SELECT view_name, completed_threshold from timescaledb_information.continuous_aggregate_stats | ||
WHERE view_name::text like 'mat_m1'; | ||
view_name | completed_threshold | ||
-----------+--------------------- | ||
mat_m1 | 21 | ||
(1 row) | ||
|
||
SELECT * FROM mat_m1 ORDER BY 1; | ||
a | countb | sumbc | spreadcb | avgc | ||
----+--------+-------+----------+------ | ||
1 | 5 | 160 | 10 | 20 | ||
21 | 3 | 135 | 15 | 30 | ||
31 | 2 | 129 | 56 | 50 | ||
(3 rows) | ||
|
||
--the selects against mat_m1 before and after refresh should match this query | ||
SELECT time_bucket(1, a), count(*), sum(b+c), max(c)-min(b), avg(c)::int | ||
FROM ht_intdata | ||
WHERE b < 16 | ||
GROUP BY time_bucket(1, a) | ||
HAVING sum(c) > 50 | ||
ORDER BY 1; | ||
time_bucket | count | sum | ?column? | avg | ||
-------------+-------+-----+----------+----- | ||
1 | 5 | 160 | 10 | 20 | ||
21 | 3 | 135 | 15 | 30 | ||
31 | 2 | 129 | 56 | 50 | ||
(3 rows) | ||
|
||
DROP VIEW mat_m1 cascade; | ||
NOTICE: drop cascades to table _timescaledb_internal._hyper_8_12_chunk | ||
--- TEST union view with multiple WHERE and HAVING clauses | ||
CREATE VIEW mat_m1 | ||
WITH (timescaledb.continuous, timescaledb.refresh_lag = 10, | ||
timescaledb.max_interval_per_job = 100) | ||
AS | ||
SELECT time_bucket(5, a), sum(b+c) | ||
FROM ht_intdata | ||
WHERE b < 16 and c > 20 | ||
GROUP BY time_bucket(5, a) | ||
HAVING sum(c) > 50 and avg(b)::int > 12 ; | ||
INSERT into ht_intdata values( 42 , 15 , 80); | ||
INSERT into ht_intdata values( 42 , 15 , 18); | ||
INSERT into ht_intdata values( 41 , 18 , 21); | ||
REFRESH MATERIALIZED VIEW mat_m1; | ||
SELECT view_name, completed_threshold from timescaledb_information.continuous_aggregate_stats | ||
WHERE view_name::text like 'mat_m1'; | ||
view_name | completed_threshold | ||
-----------+--------------------- | ||
mat_m1 | 30 | ||
(1 row) | ||
|
||
--view and direct query should return same results | ||
SELECT * from mat_m1 ORDER BY 1; | ||
time_bucket | sum | ||
-------------+----- | ||
20 | 135 | ||
30 | 129 | ||
40 | 95 | ||
(3 rows) | ||
|
||
SELECT time_bucket(5, a), sum(b+c) | ||
FROM ht_intdata | ||
WHERE b < 16 and c > 20 | ||
GROUP BY time_bucket(5, a) | ||
HAVING sum(c) > 50 and avg(b)::int > 12 | ||
ORDER by 1; | ||
time_bucket | sum | ||
-------------+----- | ||
20 | 135 | ||
30 | 129 | ||
40 | 95 | ||
(3 rows) | ||
|
||
-- plan output | ||
:PREFIX SELECT * FROM mat_m1 ORDER BY 1; | ||
QUERY PLAN | ||
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ||
Merge Append (actual rows=3 loops=1) | ||
Sort Key: _hyper_9_15_chunk.time_bucket | ||
-> GroupAggregate (actual rows=1 loops=1) | ||
Group Key: _hyper_9_15_chunk.time_bucket | ||
Filter: ((_timescaledb_internal.finalize_agg('sum(integer)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], _hyper_9_15_chunk.agg_0_3, NULL::bigint) > 50) AND ((_timescaledb_internal.finalize_agg('avg(integer)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], _hyper_9_15_chunk.agg_0_4, NULL::numeric))::integer > 12)) | ||
-> Merge Append (actual rows=1 loops=1) | ||
Sort Key: _hyper_9_15_chunk.time_bucket | ||
-> Index Scan Backward using _hyper_9_15_chunk__materialized_hypertable_9_time_bucket_idx on _hyper_9_15_chunk (actual rows=1 loops=1) | ||
Index Cond: (time_bucket < COALESCE((_timescaledb_internal.cagg_watermark('7'::oid))::integer, '-2147483648'::integer)) | ||
-> GroupAggregate (actual rows=2 loops=1) | ||
Group Key: (time_bucket(5, ht_intdata.a)) | ||
Filter: ((sum(ht_intdata.c) > 50) AND ((avg(ht_intdata.b))::integer > 12)) | ||
-> Custom Scan (ConstraintAwareAppend) (actual rows=3 loops=1) | ||
Hypertable: ht_intdata | ||
Chunks left after exclusion: 2 | ||
-> Merge Append (actual rows=3 loops=1) | ||
Sort Key: (time_bucket(5, _hyper_7_13_chunk.a)) | ||
-> Index Scan Backward using _hyper_7_13_chunk_ht_intdata_a_idx on _hyper_7_13_chunk (actual rows=2 loops=1) | ||
Index Cond: (a >= COALESCE((_timescaledb_internal.cagg_watermark('7'::oid))::integer, '-2147483648'::integer)) | ||
Filter: ((b < 16) AND (c > 20)) | ||
-> Index Scan Backward using _hyper_7_14_chunk_ht_intdata_a_idx on _hyper_7_14_chunk (actual rows=1 loops=1) | ||
Index Cond: (a >= COALESCE((_timescaledb_internal.cagg_watermark('7'::oid))::integer, '-2147483648'::integer)) | ||
Filter: ((b < 16) AND (c > 20)) | ||
Rows Removed by Filter: 2 | ||
(24 rows) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.