Skip to content

Commit

Permalink
Mark cagg_watermark parallel safe
Browse files Browse the repository at this point in the history
The `cagg_watermark` function perform just read-only operations so is
safe to make it parallel safe to take advantage of the Postgres
parallel query.

Since 2.7 when we introduced the new Continuous Aggregate format we
don't use partials anymore and those aggregate functions
`partialize_agg` and `finalize_agg` are not parallel safe, so make no
sense don't take advantage of Postgres parallel query for realtime
Continuous Aggregates.
  • Loading branch information
fabriziomello committed Jan 30, 2023
1 parent 5d12a38 commit fc8c12f
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 4 deletions.
2 changes: 1 addition & 1 deletion sql/util_time.sql
Expand Up @@ -70,7 +70,7 @@ CREATE OR REPLACE FUNCTION _timescaledb_internal.time_to_internal(time_val ANYEL
RETURNS BIGINT AS '@MODULE_PATHNAME@', 'ts_time_to_internal' LANGUAGE C VOLATILE STRICT;

CREATE OR REPLACE FUNCTION _timescaledb_internal.cagg_watermark(hypertable_id INTEGER)
RETURNS INT8 AS '@MODULE_PATHNAME@', 'ts_continuous_agg_watermark' LANGUAGE C STABLE STRICT;
RETURNS INT8 AS '@MODULE_PATHNAME@', 'ts_continuous_agg_watermark' LANGUAGE C STABLE PARALLEL SAFE;

CREATE OR REPLACE FUNCTION _timescaledb_internal.subtract_integer_from_now( hypertable_relid REGCLASS, lag INT8 )
RETURNS INT8 AS '@MODULE_PATHNAME@', 'ts_subtract_integer_from_now' LANGUAGE C STABLE STRICT;
4 changes: 1 addition & 3 deletions src/ts_catalog/continuous_agg.c
Expand Up @@ -1610,9 +1610,7 @@ ts_continuous_agg_watermark(PG_FUNCTION_ARGS)
AclResult aclresult;

if (PG_ARGISNULL(0))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("materialized hypertable cannot be NULL")));
PG_RETURN_NULL();

if (watermark != NULL)
{
Expand Down
27 changes: 27 additions & 0 deletions tsl/test/expected/continuous_aggs-12.out
Expand Up @@ -1964,6 +1964,33 @@ SELECT * FROM mat_m1 ORDER BY 1, 2;
100 | 100
(5 rows)

-- Parallel planning for realtime continuous aggregate
SET force_parallel_mode = 'on';
SET parallel_setup_cost = 0;
SET max_parallel_workers_per_gather = 1;
EXPLAIN (COSTS OFF, TIMING OFF) SELECT * FROM mat_m1;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Append
-> Subquery Scan on "*SELECT* 1"
-> Custom Scan (ChunkAppend) on _materialized_hypertable_58
Chunks excluded during startup: 0
-> Index Scan using _hyper_58_121_chunk__materialized_hypertable_58_time_partition_ on _hyper_58_121_chunk
Index Cond: (time_partition_col < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(58)), '-infinity'::timestamp with time zone))
-> Index Scan using _hyper_58_122_chunk__materialized_hypertable_58_time_partition_ on _hyper_58_122_chunk
Index Cond: (time_partition_col < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(58)), '-infinity'::timestamp with time zone))
-> Subquery Scan on "*SELECT* 2"
-> HashAggregate
Group Key: time_bucket('@ 7 days'::interval, conditions.timec), conditions.location
-> Custom Scan (ChunkAppend) on conditions
Chunks excluded during startup: 1
-> Index Scan using _hyper_52_111_chunk_conditions_timec_idx on _hyper_52_111_chunk
Index Cond: (timec >= COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(58)), '-infinity'::timestamp with time zone))
(15 rows)

RESET force_parallel_mode;
RESET parallel_setup_cost;
RESET max_parallel_workers_per_gather;
-- ORDER BY in the view definition
DROP MATERIALIZED VIEW mat_m1;
NOTICE: drop cascades to 2 other objects
Expand Down
27 changes: 27 additions & 0 deletions tsl/test/expected/continuous_aggs-13.out
Expand Up @@ -1964,6 +1964,33 @@ SELECT * FROM mat_m1 ORDER BY 1, 2;
100 | 100
(5 rows)

-- Parallel planning for realtime continuous aggregate
SET force_parallel_mode = 'on';
SET parallel_setup_cost = 0;
SET max_parallel_workers_per_gather = 1;
EXPLAIN (COSTS OFF, TIMING OFF) SELECT * FROM mat_m1;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Append
-> Subquery Scan on "*SELECT* 1"
-> Custom Scan (ChunkAppend) on _materialized_hypertable_58
Chunks excluded during startup: 0
-> Index Scan using _hyper_58_121_chunk__materialized_hypertable_58_time_partition_ on _hyper_58_121_chunk
Index Cond: (time_partition_col < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(58)), '-infinity'::timestamp with time zone))
-> Index Scan using _hyper_58_122_chunk__materialized_hypertable_58_time_partition_ on _hyper_58_122_chunk
Index Cond: (time_partition_col < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(58)), '-infinity'::timestamp with time zone))
-> Subquery Scan on "*SELECT* 2"
-> HashAggregate
Group Key: time_bucket('@ 7 days'::interval, conditions.timec), conditions.location
-> Custom Scan (ChunkAppend) on conditions
Chunks excluded during startup: 1
-> Index Scan using _hyper_52_111_chunk_conditions_timec_idx on _hyper_52_111_chunk
Index Cond: (timec >= COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(58)), '-infinity'::timestamp with time zone))
(15 rows)

RESET force_parallel_mode;
RESET parallel_setup_cost;
RESET max_parallel_workers_per_gather;
-- ORDER BY in the view definition
DROP MATERIALIZED VIEW mat_m1;
NOTICE: drop cascades to 2 other objects
Expand Down
27 changes: 27 additions & 0 deletions tsl/test/expected/continuous_aggs-14.out
Expand Up @@ -1964,6 +1964,33 @@ SELECT * FROM mat_m1 ORDER BY 1, 2;
100 | 100
(5 rows)

-- Parallel planning for realtime continuous aggregate
SET force_parallel_mode = 'on';
SET parallel_setup_cost = 0;
SET max_parallel_workers_per_gather = 1;
EXPLAIN (COSTS OFF, TIMING OFF) SELECT * FROM mat_m1;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Append
-> Subquery Scan on "*SELECT* 1"
-> Custom Scan (ChunkAppend) on _materialized_hypertable_58
Chunks excluded during startup: 0
-> Index Scan using _hyper_58_121_chunk__materialized_hypertable_58_time_partition_ on _hyper_58_121_chunk
Index Cond: (time_partition_col < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(58)), '-infinity'::timestamp with time zone))
-> Index Scan using _hyper_58_122_chunk__materialized_hypertable_58_time_partition_ on _hyper_58_122_chunk
Index Cond: (time_partition_col < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(58)), '-infinity'::timestamp with time zone))
-> Subquery Scan on "*SELECT* 2"
-> HashAggregate
Group Key: time_bucket('@ 7 days'::interval, conditions.timec), conditions.location
-> Custom Scan (ChunkAppend) on conditions
Chunks excluded during startup: 1
-> Index Scan using _hyper_52_111_chunk_conditions_timec_idx on _hyper_52_111_chunk
Index Cond: (timec >= COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(58)), '-infinity'::timestamp with time zone))
(15 rows)

RESET force_parallel_mode;
RESET parallel_setup_cost;
RESET max_parallel_workers_per_gather;
-- ORDER BY in the view definition
DROP MATERIALIZED VIEW mat_m1;
NOTICE: drop cascades to 2 other objects
Expand Down
28 changes: 28 additions & 0 deletions tsl/test/expected/continuous_aggs-15.out
Expand Up @@ -1964,6 +1964,34 @@ SELECT * FROM mat_m1 ORDER BY 1, 2;
100 | 100
(5 rows)

-- Parallel planning for realtime continuous aggregate
SET force_parallel_mode = 'on';
SET parallel_setup_cost = 0;
SET max_parallel_workers_per_gather = 1;
EXPLAIN (COSTS OFF, TIMING OFF) SELECT * FROM mat_m1;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Append
-> Subquery Scan on "*SELECT* 1"
-> Custom Scan (ChunkAppend) on _materialized_hypertable_58
Chunks excluded during startup: 0
-> Index Scan using _hyper_58_121_chunk__materialized_hypertable_58_time_partition_ on _hyper_58_121_chunk
Index Cond: (time_partition_col < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(58)), '-infinity'::timestamp with time zone))
-> Index Scan using _hyper_58_122_chunk__materialized_hypertable_58_time_partition_ on _hyper_58_122_chunk
Index Cond: (time_partition_col < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(58)), '-infinity'::timestamp with time zone))
-> Subquery Scan on "*SELECT* 2"
-> HashAggregate
Group Key: time_bucket('@ 7 days'::interval, conditions.timec), conditions.location
-> Result
-> Custom Scan (ChunkAppend) on conditions
Chunks excluded during startup: 1
-> Index Scan using _hyper_52_111_chunk_conditions_timec_idx on _hyper_52_111_chunk
Index Cond: (timec >= COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(58)), '-infinity'::timestamp with time zone))
(16 rows)

RESET force_parallel_mode;
RESET parallel_setup_cost;
RESET max_parallel_workers_per_gather;
-- ORDER BY in the view definition
DROP MATERIALIZED VIEW mat_m1;
NOTICE: drop cascades to 2 other objects
Expand Down
9 changes: 9 additions & 0 deletions tsl/test/sql/continuous_aggs.sql.in
Expand Up @@ -1393,6 +1393,15 @@ GROUP BY time_bucket('1week', timec), location;

SELECT * FROM mat_m1 ORDER BY 1, 2;

-- Parallel planning for realtime continuous aggregate
SET force_parallel_mode = 'on';
SET parallel_setup_cost = 0;
SET max_parallel_workers_per_gather = 1;
EXPLAIN (COSTS OFF, TIMING OFF) SELECT * FROM mat_m1;
RESET force_parallel_mode;
RESET parallel_setup_cost;
RESET max_parallel_workers_per_gather;

-- ORDER BY in the view definition
DROP MATERIALIZED VIEW mat_m1;
CREATE MATERIALIZED VIEW mat_m1 WITH (timescaledb.continuous)
Expand Down

0 comments on commit fc8c12f

Please sign in to comment.