Skip to content

Commit

Permalink
Pass join related structs to the cagg rte
Browse files Browse the repository at this point in the history
In case of joins in the continuous aggregates, pass the required
structs to the new rte created. These values are required by the
planner to finally query the materialized view.

Fixes #5433
  • Loading branch information
RafiaSabih committed Mar 14, 2023
1 parent f8022eb commit 07cd13d
Show file tree
Hide file tree
Showing 7 changed files with 683 additions and 24 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ accidentally triggering the load of a previous DB version.**
* #5410 Fix file trailer handling in the COPY fetcher
* #5233 Out of on_proc_exit slots on guc license change
* #5427 Handle user-defined FDW options properly
* #5433 Fix joins targetlist in CAggs with joins

**Thanks**
* @nikolaps for reporting an issue with the COPY fetcher
* @S-imo-n for reporting the issue on Background Worker Scheduler crash
* @mwahlthuetter for reporting the issue with joins in CAggs

## 2.10.1 (2023-03-07)

Expand Down
60 changes: 46 additions & 14 deletions tsl/src/continuous_aggs/create.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include <parser/parse_oper.h>
#include <parser/parse_relation.h>
#include <parser/parse_type.h>
#include <parser/parsetree.h>
#include <rewrite/rewriteHandler.h>
#include <rewrite/rewriteManip.h>
#include <utils/acl.h>
Expand Down Expand Up @@ -2449,32 +2450,61 @@ finalizequery_get_select_query(FinalizeQueryInfo *inp, List *matcollist,
* which contains the information of the materialised hypertable
* that is created for this cagg.
*/
if (list_length(inp->final_userquery->jointree->fromlist) >= CONTINUOUS_AGG_MAX_JOIN_RELATIONS)
if (list_length(inp->final_userquery->jointree->fromlist) >=
CONTINUOUS_AGG_MAX_JOIN_RELATIONS ||
!IsA(linitial(inp->final_userquery->jointree->fromlist), RangeTblRef))
{
rte = makeNode(RangeTblEntry);
rte->alias = makeAlias(relname, NIL);
rte->inFromCl = true;
rte->inh = true;
rte->rellockmode = 1;
rte->eref = copyObject(rte->alias);
ListCell *l;
foreach (l, inp->final_userquery->jointree->fromlist)
{
Node *jtnode = (Node *) lfirst(l);
JoinExpr *join = NULL;
if (IsA(jtnode, JoinExpr))
{
join = castNode(JoinExpr, jtnode);
RangeTblEntry *jrte = rt_fetch(join->rtindex, inp->final_userquery->rtable);
rte->joinaliasvars = jrte->joinaliasvars;
rte->eref = copyObject(jrte->eref);
#if PG13_GE
rte->joinleftcols = jrte->joinleftcols;
rte->joinrightcols = jrte->joinrightcols;
#endif
#if PG14_GE
rte->join_using_alias = jrte->join_using_alias;
#endif
rte->selectedCols = jrte->selectedCols;
}
}
}
else
{
rte = llast_node(RangeTblEntry, inp->final_userquery->rtable);
rte->eref->colnames = NIL;
rte->selectedCols = NULL;
}
if (rte->eref->colnames == NIL)
{
/* Aliases for column names for the materialization table. */
foreach (lc, matcollist)
{
ColumnDef *cdef = (ColumnDef *) lfirst(lc);
rte->eref->colnames = lappend(rte->eref->colnames, makeString(cdef->colname));
rte->selectedCols = bms_add_member(rte->selectedCols,
list_length(rte->eref->colnames) -
FirstLowInvalidHeapAttributeNumber);
}
}
rte->relid = mattbladdress->objectId;
rte->rtekind = RTE_RELATION;
rte->relkind = RELKIND_RELATION;
rte->tablesample = NULL;
rte->eref->colnames = NIL;
rte->selectedCols = NULL;
/* Aliases for column names for the materialization table. */
foreach (lc, matcollist)
{
ColumnDef *cdef = (ColumnDef *) lfirst(lc);
rte->eref->colnames = lappend(rte->eref->colnames, makeString(cdef->colname));
rte->selectedCols =
bms_add_member(rte->selectedCols,
list_length(rte->eref->colnames) - FirstLowInvalidHeapAttributeNumber);
}

rte->requiredPerms |= ACL_SELECT;
rte->insertedCols = NULL;
rte->updatedCols = NULL;
Expand All @@ -2483,7 +2513,7 @@ finalizequery_get_select_query(FinalizeQueryInfo *inp, List *matcollist,
foreach (lc, inp->final_seltlist)
{
TargetEntry *tle = (TargetEntry *) lfirst(lc);
if (IsA(tle->expr, Var))
if (IsA(tle->expr, Var) && tle->resorigtbl == 0)
{
tle->resorigtbl = rte->relid;
tle->resorigcol = ((Var *) tle->expr)->varattno;
Expand All @@ -2492,7 +2522,9 @@ finalizequery_get_select_query(FinalizeQueryInfo *inp, List *matcollist,

CAGG_MAKEQUERY(final_selquery, inp->final_userquery);
final_selquery->hasAggs = !inp->finalized;
if (list_length(inp->final_userquery->jointree->fromlist) >= CONTINUOUS_AGG_MAX_JOIN_RELATIONS)
if (list_length(inp->final_userquery->jointree->fromlist) >=
CONTINUOUS_AGG_MAX_JOIN_RELATIONS ||
!IsA(linitial(inp->final_userquery->jointree->fromlist), RangeTblRef))
{
RangeTblRef *rtr;
final_selquery->rtable = list_make1(rte);
Expand Down
118 changes: 116 additions & 2 deletions tsl/test/expected/cagg_joins-12.out
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ SELECT time_bucket(INTERVAL '1 day', day) AS bucket,
name
FROM conditions, devices
WHERE conditions.device_id = devices.device_id
GROUP BY name, bucket;
GROUP BY name, 1;
NOTICE: refreshing continuous aggregate "conditions_summary_daily_realtime"
HINT: Use WITH NO DATA if you do not want to refresh the continuous aggregate on creation.
\d+ conditions_summary_daily_realtime
Expand Down Expand Up @@ -93,6 +93,25 @@ UNION ALL
WHERE conditions.device_id = devices.device_id AND conditions.day >= COALESCE(_timescaledb_internal.to_date(_timescaledb_internal.cagg_watermark(3)), '-infinity'::date)
GROUP BY devices.name, (time_bucket('@ 1 day'::interval, conditions.day));

SELECT * FROM conditions_summary_daily_realtime ORDER BY bucket;
bucket | avg | max | min | name
------------+---------------------+-----+-----+----------
06-14-2021 | 26.0000000000000000 | 26 | 26 | thermo_1
06-15-2021 | 22.0000000000000000 | 22 | 22 | thermo_2
06-16-2021 | 24.0000000000000000 | 24 | 24 | thermo_3
06-17-2021 | 24.0000000000000000 | 24 | 24 | thermo_4
06-18-2021 | 27.0000000000000000 | 27 | 27 | thermo_4
06-19-2021 | 28.0000000000000000 | 28 | 28 | thermo_4
06-20-2021 | 30.0000000000000000 | 30 | 30 | thermo_1
06-21-2021 | 31.0000000000000000 | 31 | 31 | thermo_1
06-22-2021 | 34.0000000000000000 | 34 | 34 | thermo_1
06-23-2021 | 34.0000000000000000 | 34 | 34 | thermo_2
06-24-2021 | 34.0000000000000000 | 34 | 34 | thermo_2
06-25-2021 | 32.0000000000000000 | 32 | 32 | thermo_3
06-26-2021 | 32.0000000000000000 | 32 | 32 | thermo_3
06-27-2021 | 31.0000000000000000 | 31 | 31 | thermo_3
(14 rows)

INSERT INTO conditions (day, city, temperature, device_id) VALUES
('2021-06-30', 'Moscow', 28, 3);
SELECT *
Expand Down Expand Up @@ -160,6 +179,26 @@ UNION ALL
WHERE conditions.device_id = devices.device_id AND conditions.day >= COALESCE(_timescaledb_internal.to_date(_timescaledb_internal.cagg_watermark(4)), '-infinity'::date)
GROUP BY devices.name, (time_bucket('@ 1 day'::interval, conditions.day));

SELECT * FROM conditions_summary_daily_realtime_reorder ORDER BY bucket;
bucket | avg | max | min | name
------------+---------------------+-----+-----+----------
06-14-2021 | 26.0000000000000000 | 26 | 26 | thermo_1
06-15-2021 | 22.0000000000000000 | 22 | 22 | thermo_2
06-16-2021 | 24.0000000000000000 | 24 | 24 | thermo_3
06-17-2021 | 24.0000000000000000 | 24 | 24 | thermo_4
06-18-2021 | 27.0000000000000000 | 27 | 27 | thermo_4
06-19-2021 | 28.0000000000000000 | 28 | 28 | thermo_4
06-20-2021 | 30.0000000000000000 | 30 | 30 | thermo_1
06-21-2021 | 31.0000000000000000 | 31 | 31 | thermo_1
06-22-2021 | 34.0000000000000000 | 34 | 34 | thermo_1
06-23-2021 | 34.0000000000000000 | 34 | 34 | thermo_2
06-24-2021 | 34.0000000000000000 | 34 | 34 | thermo_2
06-25-2021 | 32.0000000000000000 | 32 | 32 | thermo_3
06-26-2021 | 32.0000000000000000 | 32 | 32 | thermo_3
06-27-2021 | 31.0000000000000000 | 31 | 31 | thermo_3
06-30-2021 | 28.0000000000000000 | 28 | 28 | thermo_3
(15 rows)

INSERT INTO conditions (day, city, temperature, device_id) VALUES
('2021-07-01', 'Moscow', 28, 3);
SELECT *
Expand Down Expand Up @@ -196,9 +235,30 @@ SELECT time_bucket(INTERVAL '1 day', day) AS bucket,
name
FROM conditions, devices
WHERE conditions.device_id = devices.device_id
GROUP BY name, bucket;
GROUP BY name, 1;
NOTICE: refreshing continuous aggregate "conditions_summary_daily"
HINT: Use WITH NO DATA if you do not want to refresh the continuous aggregate on creation.
SELECT * FROM conditions_summary_daily ORDER BY bucket;
bucket | avg | max | min | name
------------+---------------------+-----+-----+----------
06-14-2021 | 26.0000000000000000 | 26 | 26 | thermo_1
06-15-2021 | 22.0000000000000000 | 22 | 22 | thermo_2
06-16-2021 | 24.0000000000000000 | 24 | 24 | thermo_3
06-17-2021 | 24.0000000000000000 | 24 | 24 | thermo_4
06-18-2021 | 27.0000000000000000 | 27 | 27 | thermo_4
06-19-2021 | 28.0000000000000000 | 28 | 28 | thermo_4
06-20-2021 | 30.0000000000000000 | 30 | 30 | thermo_1
06-21-2021 | 31.0000000000000000 | 31 | 31 | thermo_1
06-22-2021 | 34.0000000000000000 | 34 | 34 | thermo_1
06-23-2021 | 34.0000000000000000 | 34 | 34 | thermo_2
06-24-2021 | 34.0000000000000000 | 34 | 34 | thermo_2
06-25-2021 | 32.0000000000000000 | 32 | 32 | thermo_3
06-26-2021 | 32.0000000000000000 | 32 | 32 | thermo_3
06-27-2021 | 31.0000000000000000 | 31 | 31 | thermo_3
06-30-2021 | 28.0000000000000000 | 28 | 28 | thermo_3
07-01-2021 | 28.0000000000000000 | 28 | 28 | thermo_3
(16 rows)

CREATE MATERIALIZED VIEW conditions_summary_daily_reorder
WITH (timescaledb.continuous, timescaledb.materialized_only = TRUE) AS
SELECT time_bucket(INTERVAL '1 day', day) AS bucket,
Expand All @@ -211,6 +271,27 @@ WHERE conditions.device_id = devices.device_id
GROUP BY name, bucket;
NOTICE: refreshing continuous aggregate "conditions_summary_daily_reorder"
HINT: Use WITH NO DATA if you do not want to refresh the continuous aggregate on creation.
SELECT * FROM conditions_summary_daily_reorder ORDER BY bucket;
bucket | avg | max | min | name
------------+---------------------+-----+-----+----------
06-14-2021 | 26.0000000000000000 | 26 | 26 | thermo_1
06-15-2021 | 22.0000000000000000 | 22 | 22 | thermo_2
06-16-2021 | 24.0000000000000000 | 24 | 24 | thermo_3
06-17-2021 | 24.0000000000000000 | 24 | 24 | thermo_4
06-18-2021 | 27.0000000000000000 | 27 | 27 | thermo_4
06-19-2021 | 28.0000000000000000 | 28 | 28 | thermo_4
06-20-2021 | 30.0000000000000000 | 30 | 30 | thermo_1
06-21-2021 | 31.0000000000000000 | 31 | 31 | thermo_1
06-22-2021 | 34.0000000000000000 | 34 | 34 | thermo_1
06-23-2021 | 34.0000000000000000 | 34 | 34 | thermo_2
06-24-2021 | 34.0000000000000000 | 34 | 34 | thermo_2
06-25-2021 | 32.0000000000000000 | 32 | 32 | thermo_3
06-26-2021 | 32.0000000000000000 | 32 | 32 | thermo_3
06-27-2021 | 31.0000000000000000 | 31 | 31 | thermo_3
06-30-2021 | 28.0000000000000000 | 28 | 28 | thermo_3
07-01-2021 | 28.0000000000000000 | 28 | 28 | thermo_3
(16 rows)

CREATE MATERIALIZED VIEW conditions_summary_daily_2
WITH (timescaledb.continuous) AS
SELECT time_bucket(INTERVAL '1 day', day) AS bucket,
Expand All @@ -222,6 +303,27 @@ FROM conditions JOIN devices ON conditions.device_id = devices.device_id
GROUP BY name, bucket;
NOTICE: refreshing continuous aggregate "conditions_summary_daily_2"
HINT: Use WITH NO DATA if you do not want to refresh the continuous aggregate on creation.
SELECT * FROM conditions_summary_daily_2 ORDER BY bucket;
bucket | avg | max | min | name
------------+---------------------+-----+-----+----------
06-14-2021 | 26.0000000000000000 | 26 | 26 | thermo_1
06-15-2021 | 22.0000000000000000 | 22 | 22 | thermo_2
06-16-2021 | 24.0000000000000000 | 24 | 24 | thermo_3
06-17-2021 | 24.0000000000000000 | 24 | 24 | thermo_4
06-18-2021 | 27.0000000000000000 | 27 | 27 | thermo_4
06-19-2021 | 28.0000000000000000 | 28 | 28 | thermo_4
06-20-2021 | 30.0000000000000000 | 30 | 30 | thermo_1
06-21-2021 | 31.0000000000000000 | 31 | 31 | thermo_1
06-22-2021 | 34.0000000000000000 | 34 | 34 | thermo_1
06-23-2021 | 34.0000000000000000 | 34 | 34 | thermo_2
06-24-2021 | 34.0000000000000000 | 34 | 34 | thermo_2
06-25-2021 | 32.0000000000000000 | 32 | 32 | thermo_3
06-26-2021 | 32.0000000000000000 | 32 | 32 | thermo_3
06-27-2021 | 31.0000000000000000 | 31 | 31 | thermo_3
06-30-2021 | 28.0000000000000000 | 28 | 28 | thermo_3
07-01-2021 | 28.0000000000000000 | 28 | 28 | thermo_3
(16 rows)

CREATE MATERIALIZED VIEW conditions_summary_daily_2_reorder
WITH (timescaledb.continuous) AS
SELECT time_bucket(INTERVAL '1 day', day) AS bucket,
Expand All @@ -232,6 +334,10 @@ SELECT time_bucket(INTERVAL '1 day', day) AS bucket,
FROM devices JOIN conditions ON conditions.device_id = devices.device_id
GROUP BY name, bucket;
ERROR: time bucket function must reference a hypertable dimension column
SELECT * FROM conditions_summary_daily_2_reorder ORDER BY bucket;
ERROR: relation "conditions_summary_daily_2_reorder" does not exist
LINE 1: SELECT * FROM conditions_summary_daily_2_reorder ORDER BY bu...
^
CREATE MATERIALIZED VIEW conditions_summary_daily_3
WITH (timescaledb.continuous) AS
SELECT time_bucket(INTERVAL '1 day', day) AS bucket,
Expand All @@ -243,6 +349,10 @@ FROM devices JOIN conditions USING (device_id)
GROUP BY name, bucket;
ERROR: invalid continuous aggregate view
DETAIL: joins with using clause in continuous aggregate definition work for Postgres versions 13 and above
SELECT * FROM conditions_summary_daily_3 ORDER BY bucket;
ERROR: relation "conditions_summary_daily_3" does not exist
LINE 1: SELECT * FROM conditions_summary_daily_3 ORDER BY bucket;
^
CREATE MATERIALIZED VIEW conditions_summary_daily_3_reorder
WITH (timescaledb.continuous) AS
SELECT time_bucket(INTERVAL '1 day', day) AS bucket,
Expand All @@ -254,6 +364,10 @@ FROM conditions JOIN devices USING (device_id)
GROUP BY name, bucket;
ERROR: invalid continuous aggregate view
DETAIL: joins with using clause in continuous aggregate definition work for Postgres versions 13 and above
SELECT * FROM conditions_summary_daily_3_reorder ORDER BY bucket;
ERROR: relation "conditions_summary_daily_3_reorder" does not exist
LINE 1: SELECT * FROM conditions_summary_daily_3_reorder ORDER BY bu...
^
--Error out for old format cagg definition
CREATE MATERIALIZED VIEW conditions_summary_daily_cagg
WITH (timescaledb.continuous, timescaledb.materialized_only = TRUE, timescaledb.finalized = FALSE) AS
Expand Down

0 comments on commit 07cd13d

Please sign in to comment.