-
Notifications
You must be signed in to change notification settings - Fork 848
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Mark partialize_agg as parallel safe
Postgres knows whether a given aggregate is parallel-safe, and creates parallel aggregation plans based on that. The `partialize_agg` is a wrapper we use to perform partial aggregation on data nodes. It is a pure function that produces serialized aggregation state as a result. Being pure, it doesn't influence parallel safety. This means we don't need to mark it parallel-unsafe to artificially disable the parallel plans for partial aggregation. They will be chosen as usual based on the parallel-safety of the underlying aggregate function.
- Loading branch information
Showing
5 changed files
with
159 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
-- This file and its contents are licensed under the Timescale License. | ||
-- Please see the included NOTICE for copyright information and | ||
-- LICENSE-TIMESCALE for a copy of the license. | ||
-- Test that for parallel-safe aggregate function a parallel plan is generated | ||
-- on data nodes, and for unsafe it is not. We use a manually created safe | ||
-- function and not a builtin one, to check that we can in fact create a | ||
-- function that is parallelized, to prevent a false negative (i.e. it's not | ||
-- parallelized, but for a different reason, not because it's unsafe). | ||
-- Create a relatively big table on one data node to test parallel plans and | ||
-- avoid flakiness. | ||
create table metrics_dist1(like metrics_dist); | ||
select table_name from create_distributed_hypertable('metrics_dist1', 'time', 'device_id', | ||
data_nodes => '{"data_node_1"}'); | ||
WARNING: only one data node was assigned to the hypertable | ||
table_name | ||
metrics_dist1 | ||
(1 row) | ||
|
||
insert into metrics_dist1 select * from metrics_dist order by metrics_dist limit 20000; | ||
\set safe 'create or replace aggregate ts_debug_shippable_safe_count(*) (sfunc = int8inc, combinefunc=int8pl, stype = bigint, initcond = 0, parallel = safe);' | ||
\set unsafe 'create or replace aggregate ts_debug_shippable_unsafe_count(*) (sfunc = int8inc, combinefunc=int8pl, stype = bigint, initcond = 0, parallel = unsafe);' | ||
:safe | ||
call distributed_exec(:'safe'); | ||
:unsafe | ||
call distributed_exec(:'unsafe'); | ||
call distributed_exec($$ set parallel_tuple_cost = 0; $$); | ||
call distributed_exec($$ set parallel_setup_cost = 0; $$); | ||
call distributed_exec($$ set max_parallel_workers_per_gather = 1; $$); | ||
set timescaledb.enable_remote_explain = 1; | ||
set enable_partitionwise_aggregate = 1; | ||
\set analyze 'explain (analyze, verbose, costs off, timing off, summary off)' | ||
:analyze | ||
select count(*) from metrics_dist1; | ||
QUERY PLAN | ||
Custom Scan (DataNodeScan) (actual rows=1 loops=1) | ||
Output: (count(*)) | ||
Relations: Aggregate on (public.metrics_dist1) | ||
Data node: data_node_1 | ||
Fetcher Type: Row by row | ||
Chunks: _dist_hyper_X_X_chunk, _dist_hyper_X_X_chunk | ||
Remote SQL: SELECT count(*) FROM public.metrics_dist1 WHERE _timescaledb_internal.chunks_in(public.metrics_dist1.*, ARRAY[..]) | ||
Remote EXPLAIN: | ||
Finalize Aggregate (actual rows=1 loops=1) | ||
Output: count(*) | ||
-> Gather (actual rows=2 loops=1) | ||
Output: (PARTIAL count(*)) | ||
Workers Planned: 1 | ||
Workers Launched: 1 | ||
-> Partial Aggregate (actual rows=1 loops=2) | ||
Output: PARTIAL count(*) | ||
Worker 0: actual rows=1 loops=1 | ||
-> Parallel Append (actual rows=10000 loops=2) | ||
Worker 0: actual rows=0 loops=1 | ||
-> Parallel Seq Scan on _timescaledb_internal._dist_hyper_X_X_chunk (actual rows=17990 loops=1) | ||
-> Parallel Seq Scan on _timescaledb_internal._dist_hyper_X_X_chunk (actual rows=2010 loops=1) | ||
|
||
(22 rows) | ||
|
||
:analyze | ||
select ts_debug_shippable_safe_count(*) from metrics_dist1; | ||
QUERY PLAN | ||
Custom Scan (DataNodeScan) (actual rows=1 loops=1) | ||
Output: (ts_debug_shippable_safe_count(*)) | ||
Relations: Aggregate on (public.metrics_dist1) | ||
Data node: data_node_1 | ||
Fetcher Type: Row by row | ||
Chunks: _dist_hyper_X_X_chunk, _dist_hyper_X_X_chunk | ||
Remote SQL: SELECT public.ts_debug_shippable_safe_count(*) FROM public.metrics_dist1 WHERE _timescaledb_internal.chunks_in(public.metrics_dist1.*, ARRAY[..]) | ||
Remote EXPLAIN: | ||
Finalize Aggregate (actual rows=1 loops=1) | ||
Output: public.ts_debug_shippable_safe_count(*) | ||
-> Gather (actual rows=2 loops=1) | ||
Output: (PARTIAL public.ts_debug_shippable_safe_count(*)) | ||
Workers Planned: 1 | ||
Workers Launched: 1 | ||
-> Partial Aggregate (actual rows=1 loops=2) | ||
Output: PARTIAL public.ts_debug_shippable_safe_count(*) | ||
Worker 0: actual rows=1 loops=1 | ||
-> Parallel Append (actual rows=10000 loops=2) | ||
Worker 0: actual rows=0 loops=1 | ||
-> Parallel Seq Scan on _timescaledb_internal._dist_hyper_X_X_chunk (actual rows=17990 loops=1) | ||
-> Parallel Seq Scan on _timescaledb_internal._dist_hyper_X_X_chunk (actual rows=2010 loops=1) | ||
|
||
(22 rows) | ||
|
||
:analyze | ||
select ts_debug_shippable_unsafe_count(*) from metrics_dist1; | ||
QUERY PLAN | ||
Custom Scan (DataNodeScan) (actual rows=1 loops=1) | ||
Output: (ts_debug_shippable_unsafe_count(*)) | ||
Relations: Aggregate on (public.metrics_dist1) | ||
Data node: data_node_1 | ||
Fetcher Type: Row by row | ||
Chunks: _dist_hyper_X_X_chunk, _dist_hyper_X_X_chunk | ||
Remote SQL: SELECT public.ts_debug_shippable_unsafe_count(*) FROM public.metrics_dist1 WHERE _timescaledb_internal.chunks_in(public.metrics_dist1.*, ARRAY[..]) | ||
Remote EXPLAIN: | ||
Aggregate (actual rows=1 loops=1) | ||
Output: public.ts_debug_shippable_unsafe_count(*) | ||
-> Append (actual rows=20000 loops=1) | ||
-> Seq Scan on _timescaledb_internal._dist_hyper_X_X_chunk (actual rows=17990 loops=1) | ||
-> Seq Scan on _timescaledb_internal._dist_hyper_X_X_chunk (actual rows=2010 loops=1) | ||
|
||
(14 rows) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
-- This file and its contents are licensed under the Timescale License. | ||
-- Please see the included NOTICE for copyright information and | ||
-- LICENSE-TIMESCALE for a copy of the license. | ||
|
||
-- Test that for parallel-safe aggregate function a parallel plan is generated | ||
-- on data nodes, and for unsafe it is not. We use a manually created safe | ||
-- function and not a builtin one, to check that we can in fact create a | ||
-- function that is parallelized, to prevent a false negative (i.e. it's not | ||
-- parallelized, but for a different reason, not because it's unsafe). | ||
|
||
-- Create a relatively big table on one data node to test parallel plans and | ||
-- avoid flakiness. | ||
create table metrics_dist1(like metrics_dist); | ||
select table_name from create_distributed_hypertable('metrics_dist1', 'time', 'device_id', | ||
data_nodes => '{"data_node_1"}'); | ||
insert into metrics_dist1 select * from metrics_dist order by metrics_dist limit 20000; | ||
|
||
\set safe 'create or replace aggregate ts_debug_shippable_safe_count(*) (sfunc = int8inc, combinefunc=int8pl, stype = bigint, initcond = 0, parallel = safe);' | ||
\set unsafe 'create or replace aggregate ts_debug_shippable_unsafe_count(*) (sfunc = int8inc, combinefunc=int8pl, stype = bigint, initcond = 0, parallel = unsafe);' | ||
|
||
:safe | ||
call distributed_exec(:'safe'); | ||
:unsafe | ||
call distributed_exec(:'unsafe'); | ||
|
||
call distributed_exec($$ set parallel_tuple_cost = 0; $$); | ||
call distributed_exec($$ set parallel_setup_cost = 0; $$); | ||
call distributed_exec($$ set max_parallel_workers_per_gather = 1; $$); | ||
|
||
set timescaledb.enable_remote_explain = 1; | ||
set enable_partitionwise_aggregate = 1; | ||
|
||
\set analyze 'explain (analyze, verbose, costs off, timing off, summary off)' | ||
|
||
:analyze | ||
select count(*) from metrics_dist1; | ||
|
||
:analyze | ||
select ts_debug_shippable_safe_count(*) from metrics_dist1; | ||
|
||
:analyze | ||
select ts_debug_shippable_unsafe_count(*) from metrics_dist1; |