Skip to content

Commit

Permalink
Mark partialize_agg as parallel safe
Browse files Browse the repository at this point in the history
Postgres knows whether a given aggregate is parallel-safe, and creates
parallel aggregation plans based on that. The `partialize_agg` is a
wrapper we use to perform partial aggregation on data nodes. It is a
pure function that produces serialized aggregation state as a result.
Being pure, it doesn't influence parallel safety. This means we don't
need to mark it parallel-unsafe to artificially disable the parallel
plans for partial aggregation. They will be chosen as usual based on
the parallel-safety of the underlying aggregate function.
  • Loading branch information
akuzm committed May 31, 2022
1 parent 1d0670e commit 5c0110c
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 6 deletions.
14 changes: 10 additions & 4 deletions sql/partialize_finalize.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,30 @@
-- Please see the included NOTICE for copyright information and
-- LICENSE-APACHE for a copy of the license.

-- These wrapper functions are used to push down aggregation to data nodes.
-- They can be marked parallel-safe, and the parallel plan will be chosen
-- depending on whether the underlying aggregate function itself is
-- parallel-safe.

CREATE OR REPLACE FUNCTION _timescaledb_internal.partialize_agg(arg ANYELEMENT)
RETURNS BYTEA AS '@MODULE_PATHNAME@', 'ts_partialize_agg' LANGUAGE C VOLATILE;
RETURNS BYTEA AS '@MODULE_PATHNAME@', 'ts_partialize_agg' LANGUAGE C STABLE PARALLEL SAFE;

CREATE OR REPLACE FUNCTION _timescaledb_internal.finalize_agg_sfunc(
tstate internal, aggfn TEXT, inner_agg_collation_schema NAME, inner_agg_collation_name NAME, inner_agg_input_types NAME[][], inner_agg_serialized_state BYTEA, return_type_dummy_val ANYELEMENT)
RETURNS internal
AS '@MODULE_PATHNAME@', 'ts_finalize_agg_sfunc'
LANGUAGE C IMMUTABLE ;
LANGUAGE C IMMUTABLE PARALLEL SAFE;

CREATE OR REPLACE FUNCTION _timescaledb_internal.finalize_agg_ffunc(
tstate internal, aggfn TEXT, inner_agg_collation_schema NAME, inner_agg_collation_name NAME, inner_agg_input_types NAME[][], inner_agg_serialized_state BYTEA, return_type_dummy_val ANYELEMENT)
RETURNS anyelement
AS '@MODULE_PATHNAME@', 'ts_finalize_agg_ffunc'
LANGUAGE C IMMUTABLE ;
LANGUAGE C IMMUTABLE PARALLEL SAFE;

CREATE OR REPLACE AGGREGATE _timescaledb_internal.finalize_agg(agg_name TEXT, inner_agg_collation_schema NAME, inner_agg_collation_name NAME, inner_agg_input_types NAME[][], inner_agg_serialized_state BYTEA, return_type_dummy_val anyelement) (
SFUNC = _timescaledb_internal.finalize_agg_sfunc,
STYPE = internal,
FINALFUNC = _timescaledb_internal.finalize_agg_ffunc,
FINALFUNC_EXTRA
FINALFUNC_EXTRA,
PARALLEL = SAFE
);
1 change: 1 addition & 0 deletions test/runner_shared.sh
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ ${PSQL} -U ${TEST_PGUSER} \
sed -e '/<exclude_from_test>/,/<\/exclude_from_test>/d' \
-e 's!_[0-9]\{1,\}_[0-9]\{1,\}_chunk!_X_X_chunk!g' \
-e 's!^ \{1,\}QUERY PLAN \{1,\}$!QUERY PLAN!' \
-e 's!: actual rows!: actual rows!' \
-e '/^-\{1,\}$/d' \
-e 's!\(_timescaledb_internal.chunks_in([^,]\{1,\}, ARRAY\[\)[^]]\{1,\}\]!\1..]!' \
-e 's! Memory: [0-9]\{1,\}kB!!' \
Expand Down
104 changes: 104 additions & 0 deletions tsl/test/shared/expected/dist_parallel_agg.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.
-- Test that for parallel-safe aggregate function a parallel plan is generated
-- on data nodes, and for unsafe it is not. We use a manually created safe
-- function and not a builtin one, to check that we can in fact create a
-- function that is parallelized, to prevent a false negative (i.e. it's not
-- parallelized, but for a different reason, not because it's unsafe).
-- Create a relatively big table on one data node to test parallel plans and
-- avoid flakiness.
create table metrics_dist1(like metrics_dist);
select table_name from create_distributed_hypertable('metrics_dist1', 'time', 'device_id',
data_nodes => '{"data_node_1"}');
WARNING: only one data node was assigned to the hypertable
table_name
metrics_dist1
(1 row)

insert into metrics_dist1 select * from metrics_dist order by metrics_dist limit 20000;
\set safe 'create or replace aggregate ts_debug_shippable_safe_count(*) (sfunc = int8inc, combinefunc=int8pl, stype = bigint, initcond = 0, parallel = safe);'
\set unsafe 'create or replace aggregate ts_debug_shippable_unsafe_count(*) (sfunc = int8inc, combinefunc=int8pl, stype = bigint, initcond = 0, parallel = unsafe);'
:safe
call distributed_exec(:'safe');
:unsafe
call distributed_exec(:'unsafe');
call distributed_exec($$ set parallel_tuple_cost = 0; $$);
call distributed_exec($$ set parallel_setup_cost = 0; $$);
call distributed_exec($$ set max_parallel_workers_per_gather = 1; $$);
set timescaledb.enable_remote_explain = 1;
set enable_partitionwise_aggregate = 1;
\set analyze 'explain (analyze, verbose, costs off, timing off, summary off)'
:analyze
select count(*) from metrics_dist1;
QUERY PLAN
Custom Scan (DataNodeScan) (actual rows=1 loops=1)
Output: (count(*))
Relations: Aggregate on (public.metrics_dist1)
Data node: data_node_1
Fetcher Type: Row by row
Chunks: _dist_hyper_X_X_chunk, _dist_hyper_X_X_chunk
Remote SQL: SELECT count(*) FROM public.metrics_dist1 WHERE _timescaledb_internal.chunks_in(public.metrics_dist1.*, ARRAY[..])
Remote EXPLAIN:
Finalize Aggregate (actual rows=1 loops=1)
Output: count(*)
-> Gather (actual rows=2 loops=1)
Output: (PARTIAL count(*))
Workers Planned: 1
Workers Launched: 1
-> Partial Aggregate (actual rows=1 loops=2)
Output: PARTIAL count(*)
Worker 0: actual rows=1 loops=1
-> Parallel Append (actual rows=10000 loops=2)
Worker 0: actual rows=0 loops=1
-> Parallel Seq Scan on _timescaledb_internal._dist_hyper_X_X_chunk (actual rows=17990 loops=1)
-> Parallel Seq Scan on _timescaledb_internal._dist_hyper_X_X_chunk (actual rows=2010 loops=1)

(22 rows)

:analyze
select ts_debug_shippable_safe_count(*) from metrics_dist1;
QUERY PLAN
Custom Scan (DataNodeScan) (actual rows=1 loops=1)
Output: (ts_debug_shippable_safe_count(*))
Relations: Aggregate on (public.metrics_dist1)
Data node: data_node_1
Fetcher Type: Row by row
Chunks: _dist_hyper_X_X_chunk, _dist_hyper_X_X_chunk
Remote SQL: SELECT public.ts_debug_shippable_safe_count(*) FROM public.metrics_dist1 WHERE _timescaledb_internal.chunks_in(public.metrics_dist1.*, ARRAY[..])
Remote EXPLAIN:
Finalize Aggregate (actual rows=1 loops=1)
Output: public.ts_debug_shippable_safe_count(*)
-> Gather (actual rows=2 loops=1)
Output: (PARTIAL public.ts_debug_shippable_safe_count(*))
Workers Planned: 1
Workers Launched: 1
-> Partial Aggregate (actual rows=1 loops=2)
Output: PARTIAL public.ts_debug_shippable_safe_count(*)
Worker 0: actual rows=1 loops=1
-> Parallel Append (actual rows=10000 loops=2)
Worker 0: actual rows=0 loops=1
-> Parallel Seq Scan on _timescaledb_internal._dist_hyper_X_X_chunk (actual rows=17990 loops=1)
-> Parallel Seq Scan on _timescaledb_internal._dist_hyper_X_X_chunk (actual rows=2010 loops=1)

(22 rows)

:analyze
select ts_debug_shippable_unsafe_count(*) from metrics_dist1;
QUERY PLAN
Custom Scan (DataNodeScan) (actual rows=1 loops=1)
Output: (ts_debug_shippable_unsafe_count(*))
Relations: Aggregate on (public.metrics_dist1)
Data node: data_node_1
Fetcher Type: Row by row
Chunks: _dist_hyper_X_X_chunk, _dist_hyper_X_X_chunk
Remote SQL: SELECT public.ts_debug_shippable_unsafe_count(*) FROM public.metrics_dist1 WHERE _timescaledb_internal.chunks_in(public.metrics_dist1.*, ARRAY[..])
Remote EXPLAIN:
Aggregate (actual rows=1 loops=1)
Output: public.ts_debug_shippable_unsafe_count(*)
-> Append (actual rows=20000 loops=1)
-> Seq Scan on _timescaledb_internal._dist_hyper_X_X_chunk (actual rows=17990 loops=1)
-> Seq Scan on _timescaledb_internal._dist_hyper_X_X_chunk (actual rows=2010 loops=1)

(14 rows)

4 changes: 2 additions & 2 deletions tsl/test/shared/sql/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ if((${PG_VERSION_MAJOR} GREATER_EQUAL "14"))
endif()

if(CMAKE_BUILD_TYPE MATCHES Debug)
list(APPEND TEST_FILES_SHARED dist_remote_error.sql timestamp_limits.sql
with_clause_parser.sql)
list(APPEND TEST_FILES_SHARED dist_parallel_agg.sql dist_remote_error.sql
timestamp_limits.sql with_clause_parser.sql)
list(APPEND TEST_TEMPLATES_SHARED constify_now.sql.in)
endif(CMAKE_BUILD_TYPE MATCHES Debug)

Expand Down
42 changes: 42 additions & 0 deletions tsl/test/shared/sql/dist_parallel_agg.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.

-- Test that for parallel-safe aggregate function a parallel plan is generated
-- on data nodes, and for unsafe it is not. We use a manually created safe
-- function and not a builtin one, to check that we can in fact create a
-- function that is parallelized, to prevent a false negative (i.e. it's not
-- parallelized, but for a different reason, not because it's unsafe).

-- Create a relatively big table on one data node to test parallel plans and
-- avoid flakiness.
create table metrics_dist1(like metrics_dist);
select table_name from create_distributed_hypertable('metrics_dist1', 'time', 'device_id',
data_nodes => '{"data_node_1"}');
insert into metrics_dist1 select * from metrics_dist order by metrics_dist limit 20000;

\set safe 'create or replace aggregate ts_debug_shippable_safe_count(*) (sfunc = int8inc, combinefunc=int8pl, stype = bigint, initcond = 0, parallel = safe);'
\set unsafe 'create or replace aggregate ts_debug_shippable_unsafe_count(*) (sfunc = int8inc, combinefunc=int8pl, stype = bigint, initcond = 0, parallel = unsafe);'

:safe
call distributed_exec(:'safe');
:unsafe
call distributed_exec(:'unsafe');

call distributed_exec($$ set parallel_tuple_cost = 0; $$);
call distributed_exec($$ set parallel_setup_cost = 0; $$);
call distributed_exec($$ set max_parallel_workers_per_gather = 1; $$);

set timescaledb.enable_remote_explain = 1;
set enable_partitionwise_aggregate = 1;

\set analyze 'explain (analyze, verbose, costs off, timing off, summary off)'

:analyze
select count(*) from metrics_dist1;

:analyze
select ts_debug_shippable_safe_count(*) from metrics_dist1;

:analyze
select ts_debug_shippable_unsafe_count(*) from metrics_dist1;

0 comments on commit 5c0110c

Please sign in to comment.