Skip to content

Commit

Permalink
Mark cagg_watermark parallel safe
Browse files Browse the repository at this point in the history
The `cagg_watermark` function perform just read-only operations so is
safe to make it parallel safe to take advantage of the Postgres
parallel query.

Since 2.7 when we introduced the new Continuous Aggregate format we
don't use partials anymore and those aggregate functions
`partialize_agg` and `finalize_agg` are not parallel safe, so make no
sense don't take advantage of Postgres parallel query for realtime
Continuous Aggregates.
  • Loading branch information
fabriziomello committed Jan 31, 2023
1 parent e6173d1 commit c0f2ed1
Show file tree
Hide file tree
Showing 7 changed files with 283 additions and 6 deletions.
2 changes: 1 addition & 1 deletion sql/util_time.sql
Expand Up @@ -70,7 +70,7 @@ CREATE OR REPLACE FUNCTION _timescaledb_internal.time_to_internal(time_val ANYEL
RETURNS BIGINT AS '@MODULE_PATHNAME@', 'ts_time_to_internal' LANGUAGE C VOLATILE STRICT;

CREATE OR REPLACE FUNCTION _timescaledb_internal.cagg_watermark(hypertable_id INTEGER)
RETURNS INT8 AS '@MODULE_PATHNAME@', 'ts_continuous_agg_watermark' LANGUAGE C STABLE STRICT;
RETURNS INT8 AS '@MODULE_PATHNAME@', 'ts_continuous_agg_watermark' LANGUAGE C STABLE STRICT PARALLEL SAFE;

CREATE OR REPLACE FUNCTION _timescaledb_internal.subtract_integer_from_now( hypertable_relid REGCLASS, lag INT8 )
RETURNS INT8 AS '@MODULE_PATHNAME@', 'ts_subtract_integer_from_now' LANGUAGE C STABLE STRICT;
5 changes: 0 additions & 5 deletions src/ts_catalog/continuous_agg.c
Expand Up @@ -1609,11 +1609,6 @@ ts_continuous_agg_watermark(PG_FUNCTION_ARGS)
ContinuousAgg *cagg;
AclResult aclresult;

if (PG_ARGISNULL(0))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("materialized hypertable cannot be NULL")));

if (watermark != NULL)
{
if (watermark_valid(watermark, hyper_id))
Expand Down
63 changes: 63 additions & 0 deletions tsl/test/expected/continuous_aggs-12.out
Expand Up @@ -2280,3 +2280,66 @@ ORDER BY timec;
Thu Nov 01 17:00:00 2018 PDT | NYC | 35 | 15
(4 rows)

-- Parallel planning test for realtime Continuous Aggregate
DROP TABLE conditions CASCADE;
NOTICE: drop cascades to 4 other objects
NOTICE: drop cascades to 2 other objects
NOTICE: drop cascades to 2 other objects
CREATE TABLE conditions (
timec TIMESTAMPTZ NOT NULL,
temperature DOUBLE PRECISION NULL
);
SELECT table_name FROM create_hypertable('conditions', 'timec');
table_name
------------
conditions
(1 row)

INSERT INTO conditions
SELECT t, 10 FROM generate_series('2023-01-01 00:00-03'::timestamptz, '2023-12-31 23:59-03'::timestamptz, '1 hour'::interval) AS t;
CREATE MATERIALIZED VIEW conditions_daily WITH (timescaledb.continuous)
AS
SELECT
time_bucket('1 day', timec),
SUM(temperature)
FROM conditions
GROUP BY 1
ORDER BY 2 DESC;
NOTICE: refreshing continuous aggregate "conditions_daily"
SET force_parallel_mode = 'on';
SET max_parallel_workers_per_gather = 4;
SET parallel_setup_cost = 0;
SET parallel_tuple_cost = 0;
-- Parallel planning
EXPLAIN (COSTS OFF, TIMING OFF) SELECT * FROM conditions_daily WHERE time_bucket >= '2023-07-01';
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Merge Append
Sort Key: _materialized_hypertable_64.sum DESC
-> Gather Merge
Workers Planned: 2
-> Sort
Sort Key: _materialized_hypertable_64.sum DESC
-> Parallel Custom Scan (ChunkAppend) on _materialized_hypertable_64
Chunks excluded during startup: 0
-> Parallel Index Scan using _hyper_64_185_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_185_chunk
Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
-> Parallel Index Scan using _hyper_64_189_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_189_chunk
Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
-> Parallel Index Scan using _hyper_64_190_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_190_chunk
Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
-> Sort
Sort Key: (sum(conditions.temperature)) DESC
-> Finalize HashAggregate
Group Key: (time_bucket('@ 1 day'::interval, conditions.timec))
-> Gather
Workers Planned: 4
-> Partial HashAggregate
Group Key: time_bucket('@ 1 day'::interval, conditions.timec)
-> Parallel Custom Scan (ChunkAppend) on conditions
Chunks excluded during startup: 26
-> Parallel Index Scan Backward using _hyper_63_184_chunk_conditions_timec_idx on _hyper_63_184_chunk
Index Cond: ((timec >= COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (timec >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
Filter: (time_bucket('@ 1 day'::interval, timec) >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)
(27 rows)

63 changes: 63 additions & 0 deletions tsl/test/expected/continuous_aggs-13.out
Expand Up @@ -2280,3 +2280,66 @@ ORDER BY timec;
Thu Nov 01 17:00:00 2018 PDT | NYC | 35 | 15
(4 rows)

-- Parallel planning test for realtime Continuous Aggregate
DROP TABLE conditions CASCADE;
NOTICE: drop cascades to 4 other objects
NOTICE: drop cascades to 2 other objects
NOTICE: drop cascades to 2 other objects
CREATE TABLE conditions (
timec TIMESTAMPTZ NOT NULL,
temperature DOUBLE PRECISION NULL
);
SELECT table_name FROM create_hypertable('conditions', 'timec');
table_name
------------
conditions
(1 row)

INSERT INTO conditions
SELECT t, 10 FROM generate_series('2023-01-01 00:00-03'::timestamptz, '2023-12-31 23:59-03'::timestamptz, '1 hour'::interval) AS t;
CREATE MATERIALIZED VIEW conditions_daily WITH (timescaledb.continuous)
AS
SELECT
time_bucket('1 day', timec),
SUM(temperature)
FROM conditions
GROUP BY 1
ORDER BY 2 DESC;
NOTICE: refreshing continuous aggregate "conditions_daily"
SET force_parallel_mode = 'on';
SET max_parallel_workers_per_gather = 4;
SET parallel_setup_cost = 0;
SET parallel_tuple_cost = 0;
-- Parallel planning
EXPLAIN (COSTS OFF, TIMING OFF) SELECT * FROM conditions_daily WHERE time_bucket >= '2023-07-01';
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Merge Append
Sort Key: _materialized_hypertable_64.sum DESC
-> Gather Merge
Workers Planned: 2
-> Sort
Sort Key: _materialized_hypertable_64.sum DESC
-> Parallel Custom Scan (ChunkAppend) on _materialized_hypertable_64
Chunks excluded during startup: 0
-> Parallel Index Scan using _hyper_64_185_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_185_chunk
Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
-> Parallel Index Scan using _hyper_64_189_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_189_chunk
Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
-> Parallel Index Scan using _hyper_64_190_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_190_chunk
Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
-> Sort
Sort Key: (sum(conditions.temperature)) DESC
-> Finalize HashAggregate
Group Key: (time_bucket('@ 1 day'::interval, conditions.timec))
-> Gather
Workers Planned: 4
-> Partial HashAggregate
Group Key: time_bucket('@ 1 day'::interval, conditions.timec)
-> Parallel Custom Scan (ChunkAppend) on conditions
Chunks excluded during startup: 26
-> Parallel Index Scan Backward using _hyper_63_184_chunk_conditions_timec_idx on _hyper_63_184_chunk
Index Cond: ((timec >= COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (timec >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
Filter: (time_bucket('@ 1 day'::interval, timec) >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)
(27 rows)

63 changes: 63 additions & 0 deletions tsl/test/expected/continuous_aggs-14.out
Expand Up @@ -2280,3 +2280,66 @@ ORDER BY timec;
Thu Nov 01 17:00:00 2018 PDT | NYC | 35 | 15
(4 rows)

-- Parallel planning test for realtime Continuous Aggregate
DROP TABLE conditions CASCADE;
NOTICE: drop cascades to 4 other objects
NOTICE: drop cascades to 2 other objects
NOTICE: drop cascades to 2 other objects
CREATE TABLE conditions (
timec TIMESTAMPTZ NOT NULL,
temperature DOUBLE PRECISION NULL
);
SELECT table_name FROM create_hypertable('conditions', 'timec');
table_name
------------
conditions
(1 row)

INSERT INTO conditions
SELECT t, 10 FROM generate_series('2023-01-01 00:00-03'::timestamptz, '2023-12-31 23:59-03'::timestamptz, '1 hour'::interval) AS t;
CREATE MATERIALIZED VIEW conditions_daily WITH (timescaledb.continuous)
AS
SELECT
time_bucket('1 day', timec),
SUM(temperature)
FROM conditions
GROUP BY 1
ORDER BY 2 DESC;
NOTICE: refreshing continuous aggregate "conditions_daily"
SET force_parallel_mode = 'on';
SET max_parallel_workers_per_gather = 4;
SET parallel_setup_cost = 0;
SET parallel_tuple_cost = 0;
-- Parallel planning
EXPLAIN (COSTS OFF, TIMING OFF) SELECT * FROM conditions_daily WHERE time_bucket >= '2023-07-01';
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Merge Append
Sort Key: _materialized_hypertable_64.sum DESC
-> Gather Merge
Workers Planned: 2
-> Sort
Sort Key: _materialized_hypertable_64.sum DESC
-> Parallel Custom Scan (ChunkAppend) on _materialized_hypertable_64
Chunks excluded during startup: 0
-> Parallel Index Scan using _hyper_64_185_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_185_chunk
Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
-> Parallel Index Scan using _hyper_64_189_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_189_chunk
Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
-> Parallel Index Scan using _hyper_64_190_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_190_chunk
Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
-> Sort
Sort Key: (sum(conditions.temperature)) DESC
-> Finalize HashAggregate
Group Key: (time_bucket('@ 1 day'::interval, conditions.timec))
-> Gather
Workers Planned: 4
-> Partial HashAggregate
Group Key: time_bucket('@ 1 day'::interval, conditions.timec)
-> Parallel Custom Scan (ChunkAppend) on conditions
Chunks excluded during startup: 26
-> Parallel Index Scan Backward using _hyper_63_184_chunk_conditions_timec_idx on _hyper_63_184_chunk
Index Cond: ((timec >= COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (timec >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
Filter: (time_bucket('@ 1 day'::interval, timec) >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)
(27 rows)

64 changes: 64 additions & 0 deletions tsl/test/expected/continuous_aggs-15.out
Expand Up @@ -2282,3 +2282,67 @@ ORDER BY timec;
Thu Nov 01 17:00:00 2018 PDT | NYC | 35 | 15
(4 rows)

-- Parallel planning test for realtime Continuous Aggregate
DROP TABLE conditions CASCADE;
NOTICE: drop cascades to 4 other objects
NOTICE: drop cascades to 2 other objects
NOTICE: drop cascades to 2 other objects
CREATE TABLE conditions (
timec TIMESTAMPTZ NOT NULL,
temperature DOUBLE PRECISION NULL
);
SELECT table_name FROM create_hypertable('conditions', 'timec');
table_name
------------
conditions
(1 row)

INSERT INTO conditions
SELECT t, 10 FROM generate_series('2023-01-01 00:00-03'::timestamptz, '2023-12-31 23:59-03'::timestamptz, '1 hour'::interval) AS t;
CREATE MATERIALIZED VIEW conditions_daily WITH (timescaledb.continuous)
AS
SELECT
time_bucket('1 day', timec),
SUM(temperature)
FROM conditions
GROUP BY 1
ORDER BY 2 DESC;
NOTICE: refreshing continuous aggregate "conditions_daily"
SET force_parallel_mode = 'on';
SET max_parallel_workers_per_gather = 4;
SET parallel_setup_cost = 0;
SET parallel_tuple_cost = 0;
-- Parallel planning
EXPLAIN (COSTS OFF, TIMING OFF) SELECT * FROM conditions_daily WHERE time_bucket >= '2023-07-01';
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Merge Append
Sort Key: _materialized_hypertable_64.sum DESC
-> Gather Merge
Workers Planned: 2
-> Sort
Sort Key: _materialized_hypertable_64.sum DESC
-> Parallel Custom Scan (ChunkAppend) on _materialized_hypertable_64
Chunks excluded during startup: 0
-> Parallel Index Scan using _hyper_64_185_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_185_chunk
Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
-> Parallel Index Scan using _hyper_64_189_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_189_chunk
Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
-> Parallel Index Scan using _hyper_64_190_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_190_chunk
Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
-> Sort
Sort Key: (sum(conditions.temperature)) DESC
-> Finalize HashAggregate
Group Key: (time_bucket('@ 1 day'::interval, conditions.timec))
-> Gather
Workers Planned: 4
-> Partial HashAggregate
Group Key: time_bucket('@ 1 day'::interval, conditions.timec)
-> Result
-> Parallel Custom Scan (ChunkAppend) on conditions
Chunks excluded during startup: 26
-> Parallel Index Scan Backward using _hyper_63_184_chunk_conditions_timec_idx on _hyper_63_184_chunk
Index Cond: ((timec >= COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (timec >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
Filter: (time_bucket('@ 1 day'::interval, timec) >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)
(28 rows)

29 changes: 29 additions & 0 deletions tsl/test/sql/continuous_aggs.sql.in
Expand Up @@ -1516,3 +1516,32 @@ SELECT *
FROM conditions_summary_new
NATURAL JOIN conditions_summary_old
ORDER BY timec;

-- Parallel planning test for realtime Continuous Aggregate
DROP TABLE conditions CASCADE;

CREATE TABLE conditions (
timec TIMESTAMPTZ NOT NULL,
temperature DOUBLE PRECISION NULL
);

SELECT table_name FROM create_hypertable('conditions', 'timec');

INSERT INTO conditions
SELECT t, 10 FROM generate_series('2023-01-01 00:00-03'::timestamptz, '2023-12-31 23:59-03'::timestamptz, '1 hour'::interval) AS t;

CREATE MATERIALIZED VIEW conditions_daily WITH (timescaledb.continuous)
AS
SELECT
time_bucket('1 day', timec),
SUM(temperature)
FROM conditions
GROUP BY 1
ORDER BY 2 DESC;

SET force_parallel_mode = 'on';
SET max_parallel_workers_per_gather = 4;
SET parallel_setup_cost = 0;
SET parallel_tuple_cost = 0;
-- Parallel planning
EXPLAIN (COSTS OFF, TIMING OFF) SELECT * FROM conditions_daily WHERE time_bucket >= '2023-07-01';

0 comments on commit c0f2ed1

Please sign in to comment.