Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mark partialize_agg as parallel safe #4307

Merged
merged 1 commit into from
May 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 10 additions & 4 deletions sql/partialize_finalize.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,30 @@
-- Please see the included NOTICE for copyright information and
-- LICENSE-APACHE for a copy of the license.

-- These wrapper functions are used to push down aggregation to data nodes.
-- They can be marked parallel-safe, and the parallel plan will be chosen
-- depending on whether the underlying aggregate function itself is
-- parallel-safe.

CREATE OR REPLACE FUNCTION _timescaledb_internal.partialize_agg(arg ANYELEMENT)
RETURNS BYTEA AS '@MODULE_PATHNAME@', 'ts_partialize_agg' LANGUAGE C VOLATILE;
RETURNS BYTEA AS '@MODULE_PATHNAME@', 'ts_partialize_agg' LANGUAGE C STABLE PARALLEL SAFE;

CREATE OR REPLACE FUNCTION _timescaledb_internal.finalize_agg_sfunc(
tstate internal, aggfn TEXT, inner_agg_collation_schema NAME, inner_agg_collation_name NAME, inner_agg_input_types NAME[][], inner_agg_serialized_state BYTEA, return_type_dummy_val ANYELEMENT)
RETURNS internal
AS '@MODULE_PATHNAME@', 'ts_finalize_agg_sfunc'
LANGUAGE C IMMUTABLE ;
LANGUAGE C IMMUTABLE PARALLEL SAFE;

CREATE OR REPLACE FUNCTION _timescaledb_internal.finalize_agg_ffunc(
tstate internal, aggfn TEXT, inner_agg_collation_schema NAME, inner_agg_collation_name NAME, inner_agg_input_types NAME[][], inner_agg_serialized_state BYTEA, return_type_dummy_val ANYELEMENT)
RETURNS anyelement
AS '@MODULE_PATHNAME@', 'ts_finalize_agg_ffunc'
LANGUAGE C IMMUTABLE ;
LANGUAGE C IMMUTABLE PARALLEL SAFE;

CREATE OR REPLACE AGGREGATE _timescaledb_internal.finalize_agg(agg_name TEXT, inner_agg_collation_schema NAME, inner_agg_collation_name NAME, inner_agg_input_types NAME[][], inner_agg_serialized_state BYTEA, return_type_dummy_val anyelement) (
SFUNC = _timescaledb_internal.finalize_agg_sfunc,
STYPE = internal,
FINALFUNC = _timescaledb_internal.finalize_agg_ffunc,
FINALFUNC_EXTRA
FINALFUNC_EXTRA,
PARALLEL = SAFE
);
1 change: 1 addition & 0 deletions test/runner_shared.sh
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ ${PSQL} -U ${TEST_PGUSER} \
sed -e '/<exclude_from_test>/,/<\/exclude_from_test>/d' \
-e 's!_[0-9]\{1,\}_[0-9]\{1,\}_chunk!_X_X_chunk!g' \
-e 's!^ \{1,\}QUERY PLAN \{1,\}$!QUERY PLAN!' \
-e 's!: actual rows!: actual rows!' \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm not a fan of this. Can you get a stable test output without this change as the number of rows is useful for other tests so I would rather not have this removed globally.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm keeping the number, just replacing double space for single. The formatting was changed between postgres versions, so I'm normalizing it to avoid having separate references.

-e '/^-\{1,\}$/d' \
-e 's!\(_timescaledb_internal.chunks_in([^,]\{1,\}, ARRAY\[\)[^]]\{1,\}\]!\1..]!' \
-e 's! Memory: [0-9]\{1,\}kB!!' \
Expand Down
104 changes: 104 additions & 0 deletions tsl/test/shared/expected/dist_parallel_agg.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.
-- Test that for parallel-safe aggregate function a parallel plan is generated
-- on data nodes, and for unsafe it is not. We use a manually created safe
-- function and not a builtin one, to check that we can in fact create a
-- function that is parallelized, to prevent a false negative (i.e. it's not
-- parallelized, but for a different reason, not because it's unsafe).
-- Create a relatively big table on one data node to test parallel plans and
-- avoid flakiness.
create table metrics_dist1(like metrics_dist);
select table_name from create_distributed_hypertable('metrics_dist1', 'time', 'device_id',
data_nodes => '{"data_node_1"}');
WARNING: only one data node was assigned to the hypertable
table_name
metrics_dist1
(1 row)

insert into metrics_dist1 select * from metrics_dist order by metrics_dist limit 20000;
\set safe 'create or replace aggregate ts_debug_shippable_safe_count(*) (sfunc = int8inc, combinefunc=int8pl, stype = bigint, initcond = 0, parallel = safe);'
\set unsafe 'create or replace aggregate ts_debug_shippable_unsafe_count(*) (sfunc = int8inc, combinefunc=int8pl, stype = bigint, initcond = 0, parallel = unsafe);'
:safe
call distributed_exec(:'safe');
:unsafe
call distributed_exec(:'unsafe');
call distributed_exec($$ set parallel_tuple_cost = 0; $$);
call distributed_exec($$ set parallel_setup_cost = 0; $$);
call distributed_exec($$ set max_parallel_workers_per_gather = 1; $$);
set timescaledb.enable_remote_explain = 1;
set enable_partitionwise_aggregate = 1;
\set analyze 'explain (analyze, verbose, costs off, timing off, summary off)'
:analyze
select count(*) from metrics_dist1;
QUERY PLAN
Custom Scan (DataNodeScan) (actual rows=1 loops=1)
Output: (count(*))
Relations: Aggregate on (public.metrics_dist1)
Data node: data_node_1
Fetcher Type: Row by row
Chunks: _dist_hyper_X_X_chunk, _dist_hyper_X_X_chunk
Remote SQL: SELECT count(*) FROM public.metrics_dist1 WHERE _timescaledb_internal.chunks_in(public.metrics_dist1.*, ARRAY[..])
Remote EXPLAIN:
Finalize Aggregate (actual rows=1 loops=1)
Output: count(*)
-> Gather (actual rows=2 loops=1)
Output: (PARTIAL count(*))
Workers Planned: 1
Workers Launched: 1
-> Partial Aggregate (actual rows=1 loops=2)
Output: PARTIAL count(*)
Worker 0: actual rows=1 loops=1
-> Parallel Append (actual rows=10000 loops=2)
Worker 0: actual rows=0 loops=1
-> Parallel Seq Scan on _timescaledb_internal._dist_hyper_X_X_chunk (actual rows=17990 loops=1)
-> Parallel Seq Scan on _timescaledb_internal._dist_hyper_X_X_chunk (actual rows=2010 loops=1)

(22 rows)

:analyze
select ts_debug_shippable_safe_count(*) from metrics_dist1;
QUERY PLAN
Custom Scan (DataNodeScan) (actual rows=1 loops=1)
Output: (ts_debug_shippable_safe_count(*))
Relations: Aggregate on (public.metrics_dist1)
Data node: data_node_1
Fetcher Type: Row by row
Chunks: _dist_hyper_X_X_chunk, _dist_hyper_X_X_chunk
Remote SQL: SELECT public.ts_debug_shippable_safe_count(*) FROM public.metrics_dist1 WHERE _timescaledb_internal.chunks_in(public.metrics_dist1.*, ARRAY[..])
Remote EXPLAIN:
Finalize Aggregate (actual rows=1 loops=1)
Output: public.ts_debug_shippable_safe_count(*)
-> Gather (actual rows=2 loops=1)
Output: (PARTIAL public.ts_debug_shippable_safe_count(*))
Workers Planned: 1
Workers Launched: 1
-> Partial Aggregate (actual rows=1 loops=2)
Output: PARTIAL public.ts_debug_shippable_safe_count(*)
Worker 0: actual rows=1 loops=1
-> Parallel Append (actual rows=10000 loops=2)
Worker 0: actual rows=0 loops=1
-> Parallel Seq Scan on _timescaledb_internal._dist_hyper_X_X_chunk (actual rows=17990 loops=1)
-> Parallel Seq Scan on _timescaledb_internal._dist_hyper_X_X_chunk (actual rows=2010 loops=1)

(22 rows)

:analyze
select ts_debug_shippable_unsafe_count(*) from metrics_dist1;
QUERY PLAN
Custom Scan (DataNodeScan) (actual rows=1 loops=1)
Output: (ts_debug_shippable_unsafe_count(*))
Relations: Aggregate on (public.metrics_dist1)
Data node: data_node_1
Fetcher Type: Row by row
Chunks: _dist_hyper_X_X_chunk, _dist_hyper_X_X_chunk
Remote SQL: SELECT public.ts_debug_shippable_unsafe_count(*) FROM public.metrics_dist1 WHERE _timescaledb_internal.chunks_in(public.metrics_dist1.*, ARRAY[..])
Remote EXPLAIN:
Aggregate (actual rows=1 loops=1)
Output: public.ts_debug_shippable_unsafe_count(*)
-> Append (actual rows=20000 loops=1)
-> Seq Scan on _timescaledb_internal._dist_hyper_X_X_chunk (actual rows=17990 loops=1)
-> Seq Scan on _timescaledb_internal._dist_hyper_X_X_chunk (actual rows=2010 loops=1)

(14 rows)

4 changes: 2 additions & 2 deletions tsl/test/shared/sql/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ if((${PG_VERSION_MAJOR} GREATER_EQUAL "14"))
endif()

if(CMAKE_BUILD_TYPE MATCHES Debug)
list(APPEND TEST_FILES_SHARED dist_remote_error.sql timestamp_limits.sql
with_clause_parser.sql)
list(APPEND TEST_FILES_SHARED dist_parallel_agg.sql dist_remote_error.sql
timestamp_limits.sql with_clause_parser.sql)
list(APPEND TEST_TEMPLATES_SHARED constify_now.sql.in)
endif(CMAKE_BUILD_TYPE MATCHES Debug)

Expand Down
42 changes: 42 additions & 0 deletions tsl/test/shared/sql/dist_parallel_agg.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.

-- Test that for parallel-safe aggregate function a parallel plan is generated
-- on data nodes, and for unsafe it is not. We use a manually created safe
-- function and not a builtin one, to check that we can in fact create a
-- function that is parallelized, to prevent a false negative (i.e. it's not
-- parallelized, but for a different reason, not because it's unsafe).

-- Create a relatively big table on one data node to test parallel plans and
-- avoid flakiness.
create table metrics_dist1(like metrics_dist);
select table_name from create_distributed_hypertable('metrics_dist1', 'time', 'device_id',
data_nodes => '{"data_node_1"}');
insert into metrics_dist1 select * from metrics_dist order by metrics_dist limit 20000;

\set safe 'create or replace aggregate ts_debug_shippable_safe_count(*) (sfunc = int8inc, combinefunc=int8pl, stype = bigint, initcond = 0, parallel = safe);'
\set unsafe 'create or replace aggregate ts_debug_shippable_unsafe_count(*) (sfunc = int8inc, combinefunc=int8pl, stype = bigint, initcond = 0, parallel = unsafe);'

:safe
call distributed_exec(:'safe');
:unsafe
call distributed_exec(:'unsafe');

call distributed_exec($$ set parallel_tuple_cost = 0; $$);
call distributed_exec($$ set parallel_setup_cost = 0; $$);
call distributed_exec($$ set max_parallel_workers_per_gather = 1; $$);

set timescaledb.enable_remote_explain = 1;
set enable_partitionwise_aggregate = 1;

\set analyze 'explain (analyze, verbose, costs off, timing off, summary off)'

:analyze
select count(*) from metrics_dist1;

:analyze
select ts_debug_shippable_safe_count(*) from metrics_dist1;

:analyze
select ts_debug_shippable_unsafe_count(*) from metrics_dist1;