Skip to content

Commit

Permalink
Pass join related structs to the cagg rte
Browse files Browse the repository at this point in the history
In case of joins in the continuous aggregates, pass the required
structs to the new rte created. These values are required by the
planner to finally query the materialized view.

Fixes #5433
  • Loading branch information
RafiaSabih committed Mar 23, 2023
1 parent 72c0f5b commit a243b75
Show file tree
Hide file tree
Showing 18 changed files with 801 additions and 25 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ accidentally triggering the load of a previous DB version.**
* #5442 Decompression may have lost DEFAULT values
* #5446 Add checks for malloc failure in libpq calls
* #5470 Ensure superuser perms during copy/move chunk
* #5433 Fix join rte in CAggs with joins

**Thanks**
* @nikolaps for reporting an issue with the COPY fetcher
* @S-imo-n for reporting the issue on Background Worker Scheduler crash
* @kovetskiy and @DZDomi for reporting peformance regression in Realtime Continuous Aggregates
* @mwahlthuetter for reporting the issue with joins in CAggs

## 2.10.1 (2023-03-07)

Expand Down
2 changes: 2 additions & 0 deletions scripts/test_updates_pg12.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ run_tests "$@" -v7 \
run_tests "$@" -v8 \
2.5.0-pg12 2.5.1-pg12 2.5.2-pg12 2.6.0-pg12 2.6.1-pg12 2.7.0-pg12 2.7.1-pg12 2.7.2-pg12 \
2.8.0-pg12 2.8.1-pg12 2.9.0-pg12 2.9.1-pg12 2.9.2-pg12 2.9.3-pg12 2.10.0-pg12 2.10.1-pg12
run_tests "$@" -v9 \
2.10.0-pg12 2.10.1-pg12
2 changes: 2 additions & 0 deletions scripts/test_updates_pg13.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ run_tests "$@" -v7 \
run_tests "$@" -v8 \
2.5.0-pg13 2.5.1-pg13 2.5.2-pg13 2.6.0-pg13 2.6.1-pg13 2.7.0-pg13 2.7.1-pg13 2.7.2-pg13 \
2.8.0-pg13 2.8.1-pg13 2.9.0-pg13 2.9.1-pg13 2.9.2-pg13 2.9.3-pg13 2.10.0-pg13 2.10.1-pg13
run_tests "$@" -v9 \
2.10.0-pg13 2.10.1-pg13

3 changes: 2 additions & 1 deletion scripts/test_updates_pg14.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ run_tests "$@" -v7 \
run_tests "$@" -v8 \
2.5.0-pg14 2.5.1-pg14 2.5.2-pg14 2.6.0-pg14 2.6.1-pg14 2.7.0-pg14 2.7.1-pg14 2.7.2-pg14 \
2.8.0-pg14 2.8.1-pg14 2.9.0-pg14 2.9.1-pg14 2.9.2-pg14 2.9.3-pg14 2.10.0-pg14 2.10.1-pg14

run_tests "$@" -v9 \
2.10.0-pg14 2.10.1-pg14
2 changes: 2 additions & 0 deletions scripts/test_updates_pg15.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ source ${SCRIPT_DIR}/test_functions.inc

run_tests "$@" -v8 \
2.9.0-pg15 2.9.1-pg15 2.9.2-pg15 2.9.3-pg15 2.10.0-pg15 2.10.1-pg15
run_tests "$@" -v9 \
2.10.0-pg15 2.10.1-pg15
24 changes: 24 additions & 0 deletions sql/updates/post-update.sql
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,30 @@ BEGIN
END
$$;

--Check for correct upgrade for the caggs with joins
DO $$
DECLARE
vname regclass;
ts_version TEXT;
BEGIN
SELECT extversion INTO ts_version FROM pg_extension WHERE extname = 'timescaledb';
IF ts_version >= '2.10.0' THEN
CREATE PROCEDURE _timescaledb_internal.post_update_cagg_try_repair(
cagg_view REGCLASS
) AS '@MODULE_PATHNAME@', 'ts_cagg_try_repair' LANGUAGE C;
FOR vname IN select format('%I.%I', cagg.user_view_schema, cagg.user_view_name)::regclass from _timescaledb_catalog.continuous_agg cagg
LOOP
SET log_error_verbosity TO VERBOSE;
CALL _timescaledb_internal.post_update_cagg_try_repair(vname);
END LOOP;
END IF;
IF ts_version >= '2.10.0' THEN
DROP PROCEDURE IF EXISTS _timescaledb_internal.post_update_cagg_try_repair;
END IF;
EXCEPTION WHEN OTHERS THEN RAISE;
END
$$;

-- can only be dropped after views have been rebuilt
DROP FUNCTION IF EXISTS _timescaledb_internal.cagg_watermark(oid);

Expand Down
9 changes: 9 additions & 0 deletions test/sql/updates/cleanup.continuous_aggs.v9.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- This file and its contents are licensed under the Apache License 2.0.
-- Please see the included NOTICE for copyright information and
-- LICENSE-APACHE for a copy of the license.

DROP MATERIALIZED VIEW cagg_joins_upgrade_test_with_realtime;
DROP MATERIALIZED VIEW cagg_joins_upgrade_test;

DROP TABLE ht_cagg_joins CASCADE;
DROP TABLE nt_cagg_joins CASCADE;
6 changes: 6 additions & 0 deletions test/sql/updates/cleanup.v9.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- This file and its contents are licensed under the Apache License 2.0.
-- Please see the included NOTICE for copyright information and
-- LICENSE-APACHE for a copy of the license.

\ir cleanup.v7.sql
\ir cleanup.continuous_aggs.v9.sql
7 changes: 7 additions & 0 deletions test/sql/updates/post.continuous_aggs.v9.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- This file and its contents are licensed under the Apache License 2.0.
-- Please see the included NOTICE for copyright information and
-- LICENSE-APACHE for a copy of the license.

SELECT * FROM cagg_joins_upgrade_test_with_realtime ORDER BY bucket;
SELECT * FROM cagg_joins_upgrade_test ORDER BY bucket;

5 changes: 5 additions & 0 deletions test/sql/updates/post.v9.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- This file and its contents are licensed under the Apache License 2.0.
-- Please see the included NOTICE for copyright information and
-- LICENSE-APACHE for a copy of the license.

--\ir post.v8.sql
53 changes: 53 additions & 0 deletions test/sql/updates/setup.continuous_aggs.v9.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
-- This file and its contents are licensed under the Apache License 2.0.
-- Please see the included NOTICE for copyright information and
-- LICENSE-APACHE for a copy of the license.

CREATE TABLE ht_cagg_joins(
day DATE NOT NULL,
city text NOT NULL,
temperature INT NOT NULL,
device_id int NOT NULL);
SELECT create_hypertable(
'ht_cagg_joins', 'day',
chunk_time_interval => INTERVAL '1 day'
);
INSERT INTO ht_cagg_joins (day, city, temperature, device_id) VALUES
('2021-06-14', 'Moscow', 26,1),
('2021-06-15', 'Moscow', 22,2),
('2021-06-16', 'Moscow', 24,3),
('2021-06-17', 'Moscow', 24,4),
('2021-06-18', 'Moscow', 27,4),
('2021-06-19', 'Moscow', 28,4),
('2021-06-20', 'Moscow', 30,1),
('2021-06-21', 'Moscow', 31,1),
('2021-06-22', 'Moscow', 34,1),
('2021-06-23', 'Moscow', 34,2),
('2021-06-24', 'Moscow', 34,2),
('2021-06-25', 'Moscow', 32,3),
('2021-06-26', 'Moscow', 32,3),
('2021-06-27', 'Moscow', 31,3);

CREATE TABLE nt_cagg_joins ( device_id int not null, name text, location text);
INSERT INTO nt_cagg_joins values (1, 'thermo_1', 'Moscow'), (2, 'thermo_2', 'Berlin'),(3, 'thermo_3', 'London'),(4, 'thermo_4', 'Stockholm');

--Create a cagg with join between a hypertable and a normal table
-- with equality condition on inner join type and realtime aggregation enabled
CREATE MATERIALIZED VIEW cagg_joins_upgrade_test_with_realtime
WITH (timescaledb.continuous, timescaledb.materialized_only = FALSE) AS
SELECT time_bucket(INTERVAL '1 day', day) AS bucket,
AVG(temperature),
name
FROM ht_cagg_joins JOIN nt_cagg_joins
ON ht_cagg_joins.device_id = nt_cagg_joins.device_id
GROUP BY 1,3;

--Create a cagg with join between a hypertable and a normal table
-- with equality condition on inner join type and realtime aggregation disabled
CREATE MATERIALIZED VIEW cagg_joins_upgrade_test
WITH (timescaledb.continuous, timescaledb.materialized_only = TRUE) AS
SELECT time_bucket(INTERVAL '1 day', day) AS bucket,
AVG(temperature),
name
FROM nt_cagg_joins JOIN ht_cagg_joins
ON ht_cagg_joins.device_id = nt_cagg_joins.device_id
GROUP BY 1,3;
6 changes: 6 additions & 0 deletions test/sql/updates/setup.v9.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- This file and its contents are licensed under the Apache License 2.0.
-- Please see the included NOTICE for copyright information and
-- LICENSE-APACHE for a copy of the license.

\ir setup.v8.sql
\ir setup.continuous_aggs.v9.sql
60 changes: 46 additions & 14 deletions tsl/src/continuous_aggs/create.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include <parser/parse_oper.h>
#include <parser/parse_relation.h>
#include <parser/parse_type.h>
#include <parser/parsetree.h>
#include <rewrite/rewriteHandler.h>
#include <rewrite/rewriteManip.h>
#include <utils/acl.h>
Expand Down Expand Up @@ -2450,32 +2451,61 @@ finalizequery_get_select_query(FinalizeQueryInfo *inp, List *matcollist,
* which contains the information of the materialised hypertable
* that is created for this cagg.
*/
if (list_length(inp->final_userquery->jointree->fromlist) >= CONTINUOUS_AGG_MAX_JOIN_RELATIONS)
if (list_length(inp->final_userquery->jointree->fromlist) >=
CONTINUOUS_AGG_MAX_JOIN_RELATIONS ||
!IsA(linitial(inp->final_userquery->jointree->fromlist), RangeTblRef))
{
rte = makeNode(RangeTblEntry);
rte->alias = makeAlias(relname, NIL);
rte->inFromCl = true;
rte->inh = true;
rte->rellockmode = 1;
rte->eref = copyObject(rte->alias);
ListCell *l;
foreach (l, inp->final_userquery->jointree->fromlist)
{
Node *jtnode = (Node *) lfirst(l);
JoinExpr *join = NULL;
if (IsA(jtnode, JoinExpr))
{
join = castNode(JoinExpr, jtnode);
RangeTblEntry *jrte = rt_fetch(join->rtindex, inp->final_userquery->rtable);
rte->joinaliasvars = jrte->joinaliasvars;
rte->eref = copyObject(jrte->eref);
#if PG13_GE
rte->joinleftcols = jrte->joinleftcols;
rte->joinrightcols = jrte->joinrightcols;
#endif
#if PG14_GE
rte->join_using_alias = jrte->join_using_alias;
#endif
rte->selectedCols = jrte->selectedCols;
}
}
}
else
{
rte = llast_node(RangeTblEntry, inp->final_userquery->rtable);
rte->eref->colnames = NIL;
rte->selectedCols = NULL;
}
if (rte->eref->colnames == NIL)
{
/* Aliases for column names for the materialization table. */
foreach (lc, matcollist)
{
ColumnDef *cdef = (ColumnDef *) lfirst(lc);
rte->eref->colnames = lappend(rte->eref->colnames, makeString(cdef->colname));
rte->selectedCols = bms_add_member(rte->selectedCols,
list_length(rte->eref->colnames) -
FirstLowInvalidHeapAttributeNumber);
}
}
rte->relid = mattbladdress->objectId;
rte->rtekind = RTE_RELATION;
rte->relkind = RELKIND_RELATION;
rte->tablesample = NULL;
rte->eref->colnames = NIL;
rte->selectedCols = NULL;
/* Aliases for column names for the materialization table. */
foreach (lc, matcollist)
{
ColumnDef *cdef = (ColumnDef *) lfirst(lc);
rte->eref->colnames = lappend(rte->eref->colnames, makeString(cdef->colname));
rte->selectedCols =
bms_add_member(rte->selectedCols,
list_length(rte->eref->colnames) - FirstLowInvalidHeapAttributeNumber);
}

rte->requiredPerms |= ACL_SELECT;
rte->insertedCols = NULL;
rte->updatedCols = NULL;
Expand All @@ -2484,7 +2514,7 @@ finalizequery_get_select_query(FinalizeQueryInfo *inp, List *matcollist,
foreach (lc, inp->final_seltlist)
{
TargetEntry *tle = (TargetEntry *) lfirst(lc);
if (IsA(tle->expr, Var))
if (IsA(tle->expr, Var) && tle->resorigtbl == 0)
{
tle->resorigtbl = rte->relid;
tle->resorigcol = ((Var *) tle->expr)->varattno;
Expand All @@ -2493,7 +2523,9 @@ finalizequery_get_select_query(FinalizeQueryInfo *inp, List *matcollist,

CAGG_MAKEQUERY(final_selquery, inp->final_userquery);
final_selquery->hasAggs = !inp->finalized;
if (list_length(inp->final_userquery->jointree->fromlist) >= CONTINUOUS_AGG_MAX_JOIN_RELATIONS)
if (list_length(inp->final_userquery->jointree->fromlist) >=
CONTINUOUS_AGG_MAX_JOIN_RELATIONS ||
!IsA(linitial(inp->final_userquery->jointree->fromlist), RangeTblRef))
{
RangeTblRef *rtr;
final_selquery->rtable = list_make1(rte);
Expand Down
Loading

0 comments on commit a243b75

Please sign in to comment.