Skip to content

Commit

Permalink
INSERT .. SELECT on distributed hypertable fails on PG15
Browse files Browse the repository at this point in the history
INSERT .. SELECT query containing distributed hypertables generates plan
with DataNodeCopy node which is not supported. Issue is in function
tsl_create_distributed_insert_path() where we decide if we should
generate DataNodeCopy or DataNodeDispatch node based on the kind of
query. In PG15 for INSERT .. SELECT query timescaledb planner generates
DataNodeCopy as rte->subquery is set to NULL. This is because of a commit
in PG15 where rte->subquery is set to NULL as part of a fix.

This patch checks if SELECT subquery has distributed hypertables or not
by looking into root->parse->jointree which represents subquery.

Fixes #4983
  • Loading branch information
sb230132 authored and SachinSetiya committed Nov 28, 2022
1 parent 8a2a9b0 commit bdfabb4
Show file tree
Hide file tree
Showing 11 changed files with 921 additions and 185 deletions.
4 changes: 2 additions & 2 deletions .github/gh_matrix_builder.py
Expand Up @@ -153,9 +153,9 @@ def macos_config(overrides):
"snapshot": "snapshot",
"tsdb_build_args": "-DASSERTIONS=ON -DREQUIRE_ALL_TESTS=ON -DEXPERIMENTAL=ON",
# below tests are tracked as part of #4838
"installcheck_args": "SKIPS='003_connections_privs 001_simple_multinode 004_multinode_rdwr_1pc dist_hypertable-15 bgw_custom cagg_dump dist_move_chunk' "
"installcheck_args": "SKIPS='003_connections_privs 001_simple_multinode 004_multinode_rdwr_1pc bgw_custom cagg_dump dist_move_chunk' "
# below tests are tracked as part of #4835
"IGNORES='telemetry_stats dist_query dist_partial_agg plan_hashagg partialize_finalize dist_fetcher_type dist_remote_error jit-15 "
"IGNORES='telemetry_stats dist_query dist_partial_agg plan_hashagg partialize_finalize dist_fetcher_type "
# below tests are tracked as part of #4837
"remote_txn'",
}
Expand Down
12 changes: 7 additions & 5 deletions test/expected/query-15.out
Expand Up @@ -293,14 +293,16 @@ BEGIN;

:PREFIX SELECT time_bucket('1 minute', time, INTERVAL '30 seconds') t, avg(series_0), min(series_1), trunc(avg(series_2)::numeric,5)
FROM hyper_1 GROUP BY t ORDER BY t DESC limit 2;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN
---------------------------------------------------------------------------------------------------------
Limit
-> GroupAggregate
Group Key: ((time_bucket('@ 1 min'::interval, (_hyper_1_1_chunk."time" - '@ 30 secs'::interval)) + '@ 30 secs'::interval))
Group Key: (time_bucket('@ 1 min'::interval, hyper_1."time", '@ 30 secs'::interval))
-> Result
-> Index Scan using _hyper_1_1_chunk_time_plain on _hyper_1_1_chunk
(5 rows)
-> Custom Scan (ChunkAppend) on hyper_1
Order: time_bucket('@ 1 min'::interval, hyper_1."time", '@ 30 secs'::interval) DESC
-> Index Scan using _hyper_1_1_chunk_time_plain on _hyper_1_1_chunk
(7 rows)

:PREFIX SELECT time_bucket('1 minute', time - INTERVAL '30 seconds') t, avg(series_0), min(series_1), trunc(avg(series_2)::numeric,5)
FROM hyper_1 GROUP BY t ORDER BY t DESC limit 2;
Expand Down
34 changes: 30 additions & 4 deletions tsl/src/planner.c
Expand Up @@ -315,11 +315,37 @@ tsl_create_distributed_insert_path(PlannerInfo *root, ModifyTablePath *mtpath, I
if (rte->rtekind == RTE_SUBQUERY)
{
distributed = false;
if (distributed_rtes_walker((Node *) rte->subquery, &distributed) &&
distributed)
Node *jtnode = (Node *) root->parse->jointree;
if (IsA(jtnode, FromExpr))
{
copy_possible = false;
break;
FromExpr *f = (FromExpr *) jtnode;
ListCell *l;
foreach (l, f->fromlist)
{
Node *n = (Node *) lfirst(l);
if (IsA(n, RangeTblRef))
{
RangeTblEntry *r =
planner_rt_fetch(((RangeTblRef *) n)->rtindex, root);
switch (r->rtekind)
{
case RTE_RELATION:
distributed_rtes_walker((Node *) r, &distributed);
break;
case RTE_SUBQUERY:
distributed_rtes_walker((Node *) r->subquery,
&distributed);
break;
default:
break;
}
if (distributed)
{
copy_possible = false;
break;
}
}
}
}
}
}
Expand Down
342 changes: 179 additions & 163 deletions tsl/test/expected/dist_hypertable-15.out

Large diffs are not rendered by default.

18 changes: 10 additions & 8 deletions tsl/test/expected/jit-15.out
Expand Up @@ -206,15 +206,17 @@ SELECT * FROM jit_device_summary WHERE metric_spread = 1800 ORDER BY bucket DESC
Output: (time_bucket('@ 1 hour'::interval, jit_test_contagg.observation_time)), jit_test_contagg.device_id, avg(jit_test_contagg.metric), (max(jit_test_contagg.metric) - min(jit_test_contagg.metric))
Group Key: time_bucket('@ 1 hour'::interval, jit_test_contagg.observation_time), jit_test_contagg.device_id
Filter: ((max(jit_test_contagg.metric) - min(jit_test_contagg.metric)) = '1800'::double precision)
-> Custom Scan (ChunkAppend) on public.jit_test_contagg
-> Result
Output: time_bucket('@ 1 hour'::interval, jit_test_contagg.observation_time), jit_test_contagg.device_id, jit_test_contagg.metric
Startup Exclusion: true
Runtime Exclusion: false
Chunks excluded during startup: 4
-> Index Scan using _hyper_3_5_chunk_jit_test_contagg_observation_time_idx on _timescaledb_internal._hyper_3_5_chunk
Output: _hyper_3_5_chunk.observation_time, _hyper_3_5_chunk.device_id, _hyper_3_5_chunk.metric
Index Cond: (_hyper_3_5_chunk.observation_time >= COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(4)), '-infinity'::timestamp with time zone))
(27 rows)
-> Custom Scan (ChunkAppend) on public.jit_test_contagg
Output: jit_test_contagg.observation_time, jit_test_contagg.device_id, jit_test_contagg.metric
Startup Exclusion: true
Runtime Exclusion: false
Chunks excluded during startup: 4
-> Index Scan using _hyper_3_5_chunk_jit_test_contagg_observation_time_idx on _timescaledb_internal._hyper_3_5_chunk
Output: _hyper_3_5_chunk.observation_time, _hyper_3_5_chunk.device_id, _hyper_3_5_chunk.metric
Index Cond: (_hyper_3_5_chunk.observation_time >= COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(4)), '-infinity'::timestamp with time zone))
(29 rows)

-- generate the results into two different files
\set ECHO errors
Expand Down
229 changes: 229 additions & 0 deletions tsl/test/shared/expected/dist_remote_error-13.out
@@ -0,0 +1,229 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.
-- Import setup file to data nodes.
\unset ECHO
-- Disable SSL to get stable error output across versions. SSL adds some output
-- that changed in PG 14.
set timescaledb.debug_enable_ssl to off;
set client_min_messages to error;
SET timescaledb.hide_data_node_name_in_errors = 'on';
-- A relatively big table on one data node
create table metrics_dist_remote_error(like metrics_dist);
select table_name from create_distributed_hypertable('metrics_dist_remote_error', 'time', 'device_id',
data_nodes => '{"data_node_1"}');
table_name
metrics_dist_remote_error
(1 row)

insert into metrics_dist_remote_error select * from metrics_dist order by metrics_dist limit 20000;
-- The error messages vary wildly between the Postgres versions, dependent on
-- the particular behavior of libqp in this or that case. The purpose of this
-- test is not to solidify this accidental behavior, but to merely exercise the
-- error handling code to make sure it doesn't have fatal errors. Unfortunately,
-- there is no way to suppress error output from a psql script.
set client_min_messages to ERROR;
\set ON_ERROR_STOP off
set timescaledb.remote_data_fetcher = 'copy';
explain (analyze, verbose, costs off, timing off, summary off)
select 1 from metrics_dist_remote_error where ts_debug_shippable_error_after_n_rows(0, device_id)::int != 0;
ERROR: [<hidden node name>]: debug point: requested to error out after 0 rows, 1 rows seen
explain (analyze, verbose, costs off, timing off, summary off)
select 1 from metrics_dist_remote_error where ts_debug_shippable_error_after_n_rows(1, device_id)::int != 0;
ERROR: [<hidden node name>]: debug point: requested to error out after 1 rows, 1 rows seen
explain (analyze, verbose, costs off, timing off, summary off)
select 1 from metrics_dist_remote_error where ts_debug_shippable_error_after_n_rows(2, device_id)::int != 0;
ERROR: [<hidden node name>]: debug point: requested to error out after 2 rows, 2 rows seen
explain (analyze, verbose, costs off, timing off, summary off)
select 1 from metrics_dist_remote_error where ts_debug_shippable_error_after_n_rows(701, device_id)::int != 0;
ERROR: [<hidden node name>]: debug point: requested to error out after 701 rows, 701 rows seen
explain (analyze, verbose, costs off, timing off, summary off)
select 1 from metrics_dist_remote_error where ts_debug_shippable_error_after_n_rows(10000, device_id)::int != 0;
ERROR: [<hidden node name>]: debug point: requested to error out after 10000 rows, 10000 rows seen
explain (analyze, verbose, costs off, timing off, summary off)
select 1 from metrics_dist_remote_error where ts_debug_shippable_error_after_n_rows(16384, device_id)::int != 0;
ERROR: [<hidden node name>]: debug point: requested to error out after 16384 rows, 16384 rows seen
explain (analyze, verbose, costs off, timing off, summary off)
select 1 from metrics_dist_remote_error where ts_debug_shippable_error_after_n_rows(10000000, device_id)::int != 0;
QUERY PLAN
Custom Scan (DataNodeScan) on public.metrics_dist_remote_error (actual rows=20000 loops=1)
Output: 1
Data node: data_node_1
Fetcher Type: COPY
Chunks: _dist_hyper_X_X_chunk, _dist_hyper_X_X_chunk
Remote SQL: SELECT NULL FROM public.metrics_dist_remote_error WHERE _timescaledb_internal.chunks_in(public.metrics_dist_remote_error.*, ARRAY[..]) AND ((public.ts_debug_shippable_error_after_n_rows(10000000, device_id) <> 0))
(6 rows)

-- We don't test fatal errors here, because PG versions before 14 are unable to
-- report them properly to the access node, so we get different errors in these
-- versions.
-- Now test the same with the cursor fetcher.
set timescaledb.remote_data_fetcher = 'cursor';
explain (analyze, verbose, costs off, timing off, summary off)
select 1 from metrics_dist_remote_error where ts_debug_shippable_error_after_n_rows(0, device_id)::int != 0;
ERROR: [<hidden node name>]: debug point: requested to error out after 0 rows, 1 rows seen
explain (analyze, verbose, costs off, timing off, summary off)
select 1 from metrics_dist_remote_error where ts_debug_shippable_error_after_n_rows(1, device_id)::int != 0;
ERROR: [<hidden node name>]: debug point: requested to error out after 1 rows, 1 rows seen
explain (analyze, verbose, costs off, timing off, summary off)
select 1 from metrics_dist_remote_error where ts_debug_shippable_error_after_n_rows(2, device_id)::int != 0;
ERROR: [<hidden node name>]: debug point: requested to error out after 2 rows, 2 rows seen
explain (analyze, verbose, costs off, timing off, summary off)
select 1 from metrics_dist_remote_error where ts_debug_shippable_error_after_n_rows(701, device_id)::int != 0;
ERROR: [<hidden node name>]: debug point: requested to error out after 701 rows, 701 rows seen
explain (analyze, verbose, costs off, timing off, summary off)
select 1 from metrics_dist_remote_error where ts_debug_shippable_error_after_n_rows(10000, device_id)::int != 0;
ERROR: [<hidden node name>]: debug point: requested to error out after 10000 rows, 10000 rows seen
explain (analyze, verbose, costs off, timing off, summary off)
select 1 from metrics_dist_remote_error where ts_debug_shippable_error_after_n_rows(10000000, device_id)::int != 0;
QUERY PLAN
Custom Scan (DataNodeScan) on public.metrics_dist_remote_error (actual rows=20000 loops=1)
Output: 1
Data node: data_node_1
Fetcher Type: Cursor
Chunks: _dist_hyper_X_X_chunk, _dist_hyper_X_X_chunk
Remote SQL: SELECT NULL FROM public.metrics_dist_remote_error WHERE _timescaledb_internal.chunks_in(public.metrics_dist_remote_error.*, ARRAY[..]) AND ((public.ts_debug_shippable_error_after_n_rows(10000000, device_id) <> 0))
(6 rows)

-- Table with broken send for a data type.
create table metrics_dist_bs(like metrics_dist);
alter table metrics_dist_bs alter column v0 type bs;
select table_name from create_distributed_hypertable('metrics_dist_bs',
'time', 'device_id');
table_name
metrics_dist_bs
(1 row)

set timescaledb.enable_connection_binary_data to off;
insert into metrics_dist_bs
select * from metrics_dist_remote_error;
set timescaledb.enable_connection_binary_data to on;
explain (analyze, verbose, costs off, timing off, summary off)
select * from metrics_dist_bs;
ERROR: [<hidden node name>]: debug point: requested to error out after 7103 rows, 7103 rows seen
drop table metrics_dist_bs;
-- Table with broken receive for a data type.
create table metrics_dist_br(like metrics_dist);
alter table metrics_dist_br alter column v0 type br;
select table_name from create_distributed_hypertable('metrics_dist_br',
'time', 'device_id');
table_name
metrics_dist_br
(1 row)

select hypertable_name, replication_factor from timescaledb_information.hypertables
where hypertable_name = 'metrics_dist_br';
hypertable_name | replication_factor
-----------------+--------------------
metrics_dist_br | 1
(1 row)

-- Test that INSERT and COPY fail on data nodes.
-- Note that we use the text format for the COPY input, so that the access node
-- doesn't call `recv` and fail by itself. It's going to use binary format for
-- transfer to data nodes regardless of the input format.
set timescaledb.dist_copy_transfer_format = 'binary';
-- First, create the reference.
\copy (select * from metrics_dist_remote_error) to 'dist_remote_error.text' with (format text);
-- We have to test various interleavings of COPY and INSERT to check that
-- one can recover from connection failure states introduced by another.
\copy metrics_dist_br from 'dist_remote_error.text' with (format text);
ERROR: [<hidden node name>]: debug point: requested to error out after 7103 rows, 7103 rows seen
\copy metrics_dist_br from 'dist_remote_error.text' with (format text);
ERROR: [<hidden node name>]: debug point: requested to error out after 7103 rows, 7103 rows seen
insert into metrics_dist_br select * from metrics_dist_remote_error;
ERROR: [<hidden node name>]: debug point: requested to error out after 7103 rows, 7103 rows seen
insert into metrics_dist_br select * from metrics_dist_remote_error;
ERROR: [<hidden node name>]: debug point: requested to error out after 7103 rows, 7103 rows seen
\copy metrics_dist_br from 'dist_remote_error.text' with (format text);
ERROR: [<hidden node name>]: debug point: requested to error out after 7103 rows, 7103 rows seen
-- Fail at different points
set timescaledb.debug_broken_sendrecv_throw_after = 1;
\copy metrics_dist_br from 'dist_remote_error.text' with (format text);
ERROR: [<hidden node name>]: debug point: requested to error out after 1 rows, 1 rows seen
set timescaledb.debug_broken_sendrecv_throw_after = 2;
\copy metrics_dist_br from 'dist_remote_error.text' with (format text);
ERROR: [<hidden node name>]: debug point: requested to error out after 2 rows, 2 rows seen
set timescaledb.debug_broken_sendrecv_throw_after = 1023;
\copy metrics_dist_br from 'dist_remote_error.text' with (format text);
ERROR: [<hidden node name>]: debug point: requested to error out after 1023 rows, 1023 rows seen
set timescaledb.debug_broken_sendrecv_throw_after = 1024;
\copy metrics_dist_br from 'dist_remote_error.text' with (format text);
ERROR: [<hidden node name>]: debug point: requested to error out after 1024 rows, 1024 rows seen
set timescaledb.debug_broken_sendrecv_throw_after = 1025;
\copy metrics_dist_br from 'dist_remote_error.text' with (format text);
ERROR: [<hidden node name>]: debug point: requested to error out after 1025 rows, 1025 rows seen
reset timescaledb.debug_broken_sendrecv_throw_after;
-- Same with different replication factor
truncate metrics_dist_br;
select set_replication_factor('metrics_dist_br', 2);
set_replication_factor

(1 row)

select hypertable_name, replication_factor from timescaledb_information.hypertables
where hypertable_name = 'metrics_dist_br';
hypertable_name | replication_factor
-----------------+--------------------
metrics_dist_br | 2
(1 row)

\copy metrics_dist_br from 'dist_remote_error.text' with (format text);
ERROR: [<hidden node name>]: debug point: requested to error out after 7103 rows, 7103 rows seen
\copy metrics_dist_br from 'dist_remote_error.text' with (format text);
ERROR: [<hidden node name>]: debug point: requested to error out after 7103 rows, 7103 rows seen
insert into metrics_dist_br select * from metrics_dist_remote_error;
ERROR: [<hidden node name>]: debug point: requested to error out after 7103 rows, 7103 rows seen
insert into metrics_dist_br select * from metrics_dist_remote_error;
ERROR: [<hidden node name>]: debug point: requested to error out after 7103 rows, 7103 rows seen
set timescaledb.debug_broken_sendrecv_throw_after = 1;
\copy metrics_dist_br from 'dist_remote_error.text' with (format text);
ERROR: [<hidden node name>]: debug point: requested to error out after 1 rows, 1 rows seen
set timescaledb.debug_broken_sendrecv_throw_after = 2;
\copy metrics_dist_br from 'dist_remote_error.text' with (format text);
ERROR: [<hidden node name>]: debug point: requested to error out after 2 rows, 2 rows seen
set timescaledb.debug_broken_sendrecv_throw_after = 1023;
\copy metrics_dist_br from 'dist_remote_error.text' with (format text);
ERROR: [<hidden node name>]: debug point: requested to error out after 1023 rows, 1023 rows seen
set timescaledb.debug_broken_sendrecv_throw_after = 1024;
\copy metrics_dist_br from 'dist_remote_error.text' with (format text);
ERROR: [<hidden node name>]: debug point: requested to error out after 1024 rows, 1024 rows seen
set timescaledb.debug_broken_sendrecv_throw_after = 1025;
\copy metrics_dist_br from 'dist_remote_error.text' with (format text);
ERROR: [<hidden node name>]: debug point: requested to error out after 1025 rows, 1025 rows seen
-- Should succeed with text format for data transfer.
set timescaledb.dist_copy_transfer_format = 'text';
\copy metrics_dist_br from 'dist_remote_error.text' with (format text);
-- Final check.
set timescaledb.enable_connection_binary_data = false;
select count(*) from metrics_dist_br;
count
20000
(1 row)

set timescaledb.enable_connection_binary_data = true;
reset timescaledb.debug_broken_sendrecv_throw_after;
drop table metrics_dist_br;
-- Table with sleepy receive for a data type, to improve coverage of the waiting
-- code on the access node.
create table metrics_dist_bl(like metrics_dist);
alter table metrics_dist_bl alter column v0 type bl;
select table_name from create_distributed_hypertable('metrics_dist_bl',
'time', 'device_id');
table_name
metrics_dist_bl
(1 row)

-- We're using sleepy recv function, so need the binary transfer format for it
-- to be called on the data nodes.
set timescaledb.dist_copy_transfer_format = 'binary';
-- Test INSERT and COPY with slow data node.
\copy metrics_dist_bl from 'dist_remote_error.text' with (format text);
insert into metrics_dist_bl select * from metrics_dist_remote_error;
select count(*) from metrics_dist_bl;
count
40000
(1 row)

drop table metrics_dist_bl;
drop table metrics_dist_remote_error;

0 comments on commit bdfabb4

Please sign in to comment.