Skip to content

Commit

Permalink
Enable real time aggregation for caggs with joins
Browse files Browse the repository at this point in the history
  • Loading branch information
RafiaSabih committed Feb 8, 2023
1 parent caf79e0 commit fc57aa3
Show file tree
Hide file tree
Showing 7 changed files with 633 additions and 203 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -11,6 +11,7 @@ accidentally triggering the load of a previous DB version.**
* #5245 Mange life-cycle of connections via memory contexts
* #5246 Make connection establishment interruptible
* #5253 Make data node command execution interruptible
* #5243 Enable real-time aggregation for continuous aggregates with joins

**Bugfixes**
* #4926 Fix corruption when inserting into compressed chunks
Expand Down
29 changes: 20 additions & 9 deletions tsl/src/continuous_aggs/create.c
Expand Up @@ -2635,14 +2635,6 @@ cagg_create(const CreateTableAsStmt *create_stmt, ViewStmt *stmt, Query *panquer
bool finalized = DatumGetBool(with_clause_options[ContinuousViewOptionFinalized].parsed);

finalqinfo.finalized = finalized;
if (list_length(panquery->jointree->fromlist) >= CONTINUOUS_AGG_MAX_JOIN_RELATIONS &&
!materialized_only)
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("real-time continuous aggregates are not supported with joins"),
errhint("set materialized_only to true")));
}

/*
* Assign the column_name aliases in CREATE VIEW to the query.
Expand Down Expand Up @@ -3387,7 +3379,26 @@ build_union_query(CAggTimebucketInfo *tbinfo, int matpartcolno, Query *q1, Query
tce->lt_opr,
varno,
matpartcolno);
varno = list_length(q2->rtable);
/*
* If there is join in CAgg definition then adjust varno
* to get time column from hypertable only.
*/
if (list_length(q2->rtable) == CONTINUOUS_AGG_MAX_JOIN_RELATIONS)
{
RangeTblRef *rtref = linitial_node(RangeTblRef, q2->jointree->fromlist);
RangeTblEntry *rte = list_nth(q2->rtable, rtref->rtindex - 1);
RangeTblRef *rtref_other = lsecond_node(RangeTblRef, q2->jointree->fromlist);
RangeTblEntry *rte_other = list_nth(q2->rtable, rtref_other->rtindex - 1);

Oid normal_table_id = ts_is_hypertable(rte->relid) ? rte_other->relid : rte->relid;
if (normal_table_id == rte->relid)
varno = 2;
else
varno = 1;

}
else
varno = list_length(q2->rtable);
q2_quals = build_union_query_quals(materialize_htid,
tbinfo->htpartcoltype,
get_negator(tce->lt_opr),
Expand Down
175 changes: 141 additions & 34 deletions tsl/test/expected/cagg_joins-12.out
Expand Up @@ -52,7 +52,141 @@ INSERT INTO devices values (1, 'thermo_1', 'Moscow'), (2, 'thermo_2', 'Berlin'),
CREATE TABLE devices_dup AS SELECT * FROM devices;
CREATE VIEW devices_view AS SELECT * FROM devices;
--Create a cagg with join between a hypertable and a normal table
-- with equality condition on inner join type
-- with equality condition on inner join type and realtime aggregation enabled
CREATE MATERIALIZED VIEW conditions_summary_daily_realtime
WITH (timescaledb.continuous, timescaledb.materialized_only = FALSE) AS
SELECT time_bucket(INTERVAL '1 day', day) AS bucket,
AVG(temperature),
MAX(temperature),
MIN(temperature),
name
FROM conditions, devices
WHERE conditions.device_id = devices.device_id
GROUP BY name, bucket;
NOTICE: refreshing continuous aggregate "conditions_summary_daily_realtime"
HINT: Use WITH NO DATA if you do not want to refresh the continuous aggregate on creation.
\d+ conditions_summary_daily_realtime
View "public.conditions_summary_daily_realtime"
Column | Type | Collation | Nullable | Default | Storage | Description
--------+---------+-----------+----------+---------+----------+-------------
bucket | date | | | | plain |
avg | numeric | | | | main |
max | integer | | | | plain |
min | integer | | | | plain |
name | text | | | | extended |
View definition:
SELECT _materialized_hypertable_3.bucket,
_materialized_hypertable_3.avg,
_materialized_hypertable_3.max,
_materialized_hypertable_3.min,
_materialized_hypertable_3.name
FROM _timescaledb_internal._materialized_hypertable_3 _materialized_hypertable_3
WHERE _materialized_hypertable_3.bucket < COALESCE(_timescaledb_internal.to_date(_timescaledb_internal.cagg_watermark(3)), '-infinity'::date)
UNION ALL
SELECT time_bucket('@ 1 day'::interval, conditions.day) AS bucket,
avg(conditions.temperature) AS avg,
max(conditions.temperature) AS max,
min(conditions.temperature) AS min,
devices.name
FROM conditions,
devices
WHERE conditions.device_id = devices.device_id AND conditions.day >= COALESCE(_timescaledb_internal.to_date(_timescaledb_internal.cagg_watermark(3)), '-infinity'::date)
GROUP BY devices.name, (time_bucket('@ 1 day'::interval, conditions.day));

INSERT INTO conditions (day, city, temperature, device_id) VALUES
('2021-06-30', 'Moscow', 28, 3);
SELECT *
FROM conditions_summary_daily_realtime
ORDER BY bucket;
bucket | avg | max | min | name
------------+---------------------+-----+-----+----------
06-14-2021 | 26.0000000000000000 | 26 | 26 | thermo_1
06-15-2021 | 22.0000000000000000 | 22 | 22 | thermo_2
06-16-2021 | 24.0000000000000000 | 24 | 24 | thermo_3
06-17-2021 | 24.0000000000000000 | 24 | 24 | thermo_4
06-18-2021 | 27.0000000000000000 | 27 | 27 | thermo_4
06-19-2021 | 28.0000000000000000 | 28 | 28 | thermo_4
06-20-2021 | 30.0000000000000000 | 30 | 30 | thermo_1
06-21-2021 | 31.0000000000000000 | 31 | 31 | thermo_1
06-22-2021 | 34.0000000000000000 | 34 | 34 | thermo_1
06-23-2021 | 34.0000000000000000 | 34 | 34 | thermo_2
06-24-2021 | 34.0000000000000000 | 34 | 34 | thermo_2
06-25-2021 | 32.0000000000000000 | 32 | 32 | thermo_3
06-26-2021 | 32.0000000000000000 | 32 | 32 | thermo_3
06-27-2021 | 31.0000000000000000 | 31 | 31 | thermo_3
06-30-2021 | 28.0000000000000000 | 28 | 28 | thermo_3
(15 rows)

--Create a cagg with join between a hypertable and a normal table
-- with equality condition on inner join type and realtime aggregation enabled
-- with a different order in the from clause
CREATE MATERIALIZED VIEW conditions_summary_daily_realtime_reorder
WITH (timescaledb.continuous, timescaledb.materialized_only = FALSE) AS
SELECT time_bucket(INTERVAL '1 day', day) AS bucket,
AVG(temperature),
MAX(temperature),
MIN(temperature),
name
FROM devices, conditions
WHERE conditions.device_id = devices.device_id
GROUP BY name, bucket;
NOTICE: refreshing continuous aggregate "conditions_summary_daily_realtime_reorder"
HINT: Use WITH NO DATA if you do not want to refresh the continuous aggregate on creation.
\d+ conditions_summary_daily_realtime_reorder
View "public.conditions_summary_daily_realtime_reorder"
Column | Type | Collation | Nullable | Default | Storage | Description
--------+---------+-----------+----------+---------+----------+-------------
bucket | date | | | | plain |
avg | numeric | | | | main |
max | integer | | | | plain |
min | integer | | | | plain |
name | text | | | | extended |
View definition:
SELECT _materialized_hypertable_4.bucket,
_materialized_hypertable_4.avg,
_materialized_hypertable_4.max,
_materialized_hypertable_4.min,
_materialized_hypertable_4.name
FROM _timescaledb_internal._materialized_hypertable_4 _materialized_hypertable_4
WHERE _materialized_hypertable_4.bucket < COALESCE(_timescaledb_internal.to_date(_timescaledb_internal.cagg_watermark(4)), '-infinity'::date)
UNION ALL
SELECT time_bucket('@ 1 day'::interval, conditions.day) AS bucket,
avg(conditions.temperature) AS avg,
max(conditions.temperature) AS max,
min(conditions.temperature) AS min,
devices.name
FROM devices,
conditions
WHERE conditions.device_id = devices.device_id AND conditions.day >= COALESCE(_timescaledb_internal.to_date(_timescaledb_internal.cagg_watermark(4)), '-infinity'::date)
GROUP BY devices.name, (time_bucket('@ 1 day'::interval, conditions.day));

INSERT INTO conditions (day, city, temperature, device_id) VALUES
('2021-07-01', 'Moscow', 28, 3);
SELECT *
FROM conditions_summary_daily_realtime_reorder
ORDER BY bucket;
bucket | avg | max | min | name
------------+---------------------+-----+-----+----------
06-14-2021 | 26.0000000000000000 | 26 | 26 | thermo_1
06-15-2021 | 22.0000000000000000 | 22 | 22 | thermo_2
06-16-2021 | 24.0000000000000000 | 24 | 24 | thermo_3
06-17-2021 | 24.0000000000000000 | 24 | 24 | thermo_4
06-18-2021 | 27.0000000000000000 | 27 | 27 | thermo_4
06-19-2021 | 28.0000000000000000 | 28 | 28 | thermo_4
06-20-2021 | 30.0000000000000000 | 30 | 30 | thermo_1
06-21-2021 | 31.0000000000000000 | 31 | 31 | thermo_1
06-22-2021 | 34.0000000000000000 | 34 | 34 | thermo_1
06-23-2021 | 34.0000000000000000 | 34 | 34 | thermo_2
06-24-2021 | 34.0000000000000000 | 34 | 34 | thermo_2
06-25-2021 | 32.0000000000000000 | 32 | 32 | thermo_3
06-26-2021 | 32.0000000000000000 | 32 | 32 | thermo_3
06-27-2021 | 31.0000000000000000 | 31 | 31 | thermo_3
06-30-2021 | 28.0000000000000000 | 28 | 28 | thermo_3
07-01-2021 | 28.0000000000000000 | 28 | 28 | thermo_3
(16 rows)

--Create a cagg with join between a hypertable and a normal table
-- with equality condition on inner join type and realtime aggregation disabled
CREATE MATERIALIZED VIEW conditions_summary_daily
WITH (timescaledb.continuous, timescaledb.materialized_only = TRUE) AS
SELECT time_bucket(INTERVAL '1 day', day) AS bucket,
Expand Down Expand Up @@ -188,21 +322,8 @@ NOTICE: refreshing continuous aggregate "cagg_on_cagg"
HINT: Use WITH NO DATA if you do not want to refresh the continuous aggregate on creation.
DROP MATERIALIZED VIEW cagg_on_cagg CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table _timescaledb_internal._hyper_6_35_chunk
drop cascades to table _timescaledb_internal._hyper_6_36_chunk
--Error out when real time aggregation is enabled
CREATE MATERIALIZED VIEW conditions_summary_daily_realtime
WITH (timescaledb.continuous, timescaledb.materialized_only = FALSE) AS
SELECT time_bucket(INTERVAL '1 day', day) AS bucket,
AVG(temperature),
MAX(temperature),
MIN(temperature),
name
FROM conditions, devices
WHERE conditions.device_id = devices.device_id
GROUP BY name, bucket;
ERROR: real-time continuous aggregates are not supported with joins
HINT: set materialized_only to true
DETAIL: drop cascades to table _timescaledb_internal._hyper_8_41_chunk
drop cascades to table _timescaledb_internal._hyper_8_42_chunk
CREATE TABLE cities(name text, currency text);
INSERT INTO cities VALUES ('Berlin', 'EUR'), ('London', 'PND');
--Error out when from clause has sub selects
Expand Down Expand Up @@ -330,29 +451,15 @@ FROM conditions_summary_daily_cagg cagg, conditions
WHERE cagg.device_id = conditions.device_id
GROUP BY conditions.temperature, bucket, cagg.name;
ERROR: joins for hierarchical continuous aggregates are not supported
\set VERBOSITY terse
DROP TABLE conditions CASCADE;
NOTICE: drop cascades to 9 other objects
DETAIL: drop cascades to view _timescaledb_internal._partial_view_3
drop cascades to view _timescaledb_internal._direct_view_3
drop cascades to view _timescaledb_internal._partial_view_4
drop cascades to view _timescaledb_internal._direct_view_4
drop cascades to view conditions_summary_daily_2
drop cascades to view _timescaledb_internal._partial_view_5
drop cascades to view _timescaledb_internal._direct_view_5
drop cascades to view _timescaledb_internal._partial_view_7
drop cascades to view _timescaledb_internal._direct_view_7
NOTICE: drop cascades to 15 other objects
NOTICE: drop cascades to 2 other objects
NOTICE: drop cascades to 2 other objects
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table _timescaledb_internal._hyper_3_29_chunk
drop cascades to table _timescaledb_internal._hyper_3_30_chunk
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table _timescaledb_internal._hyper_4_31_chunk
drop cascades to table _timescaledb_internal._hyper_4_32_chunk
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table _timescaledb_internal._hyper_5_33_chunk
drop cascades to table _timescaledb_internal._hyper_5_34_chunk
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table _timescaledb_internal._hyper_7_37_chunk
drop cascades to table _timescaledb_internal._hyper_7_38_chunk
DROP TABLE devices CASCADE;
NOTICE: drop cascades to view devices_view
DROP TABLE conditions_dup CASCADE;
Expand Down

0 comments on commit fc57aa3

Please sign in to comment.