Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more tests for errors on data nodes #4434

Merged
merged 1 commit into from Jun 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 28 additions & 1 deletion test/src/test_utils.c
Expand Up @@ -97,7 +97,7 @@ throw_after_n_rows(int max_rows, int severity)
if (max_rows <= rows_seen)
{
ereport(severity,
(errmsg("debug point: requested to error out every %d rows, %d rows seen",
(errmsg("debug point: requested to error out after %d rows, %d rows seen",
max_rows,
rows_seen)));
}
Expand All @@ -117,6 +117,33 @@ ts_debug_shippable_fatal_after_n_rows(PG_FUNCTION_ARGS)
PG_RETURN_INT32(throw_after_n_rows(PG_GETARG_INT32(0), FATAL));
}

/*
* Broken send/receive functions for int4 that throw after an (arbitrarily
* chosen prime) number of rows.
* Use ERROR, not FATAL, because PG versions < 14 are unable to report a FATAL
* error to the access node before closing the connection, so the test results
* would be different.
*/
#define ARBITRARY_PRIME_NUMBER 7103

TS_FUNCTION_INFO_V1(ts_debug_broken_int4recv);

Datum
ts_debug_broken_int4recv(PG_FUNCTION_ARGS)
{
(void) throw_after_n_rows(ARBITRARY_PRIME_NUMBER, ERROR);
return int4recv(fcinfo);
}

TS_FUNCTION_INFO_V1(ts_debug_broken_int4send);

Datum
ts_debug_broken_int4send(PG_FUNCTION_ARGS)
{
(void) throw_after_n_rows(ARBITRARY_PRIME_NUMBER, ERROR);
return int4send(fcinfo);
}

TS_FUNCTION_INFO_V1(ts_bgw_wait);
Datum
ts_bgw_wait(PG_FUNCTION_ARGS)
Expand Down
42 changes: 32 additions & 10 deletions tsl/src/remote/async.c
Expand Up @@ -723,27 +723,49 @@ wait_to_consume_data(AsyncRequestSet *set, TimestampTz end_time)

CHECK_FOR_INTERRUPTS();

if (event.events & ~(WL_SOCKET_READABLE | WL_LATCH_SET))
{
/*
* Sanity check on the wait result: we haven't requested anything
* other than my latch or the socket becoming readable.
*/
result = async_response_error_create(
psprintf("Unexpected event 0x%X while waiting for async request result",
event.events));
break;
}

if (event.events & WL_LATCH_SET)
{
ResetLatch(MyLatch);
else if (event.events & WL_SOCKET_READABLE)
}

if (event.events & WL_SOCKET_READABLE)
{
wait_req = event.user_data;
Assert(wait_req != NULL);
PGconn *pg_conn = remote_connection_get_pg_conn(wait_req->conn);

if (0 == PQconsumeInput(remote_connection_get_pg_conn(wait_req->conn)))
if (0 == PQconsumeInput(pg_conn))
{
/* remove connection from set */
/* An error has occurred, remove connection from set. */
set->requests = list_delete_ptr(set->requests, wait_req);
result = &async_response_communication_error_create(wait_req)->base;
break;
}
result = NULL;
break;
}
else
{
result = async_response_error_create("unexpected event");
break;

/*
* From postgres docs on PQConsumeInput():
* Note that the result does not say whether any input data was
* actually collected. After calling PQconsumeInput, the
* application can check PQisBusy and/or PQnotifies to see if their
* state has changed.
*/
if (PQisBusy(pg_conn) == 0)
{
result = NULL;
break;
}
}
}

Expand Down
12 changes: 12 additions & 0 deletions tsl/src/remote/connection.c
Expand Up @@ -1222,6 +1222,18 @@ set_ssl_options(const char *user_name, const char **keywords, const char **value
if (!ssl_enabled || strcmp(ssl_enabled, "on") != 0)
return;

#ifndef NDEBUG
ssl_enabled = GetConfigOption("timescaledb.debug_enable_ssl", true, false);
if (ssl_enabled && strcmp(ssl_enabled, "on") != 0)
{
keywords[option_pos] = "sslmode";
values[option_pos] = "disable";
option_pos++;
*option_start = option_pos;
return;
}
#endif

/* If SSL is enabled on AN then we assume it is also should be used for DN
* connections as well, otherwise we need to introduce some other way to
* control it */
Expand Down
131 changes: 76 additions & 55 deletions tsl/test/shared/expected/dist_remote_error.out
@@ -1,29 +1,22 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.
\c :TEST_DBNAME :ROLE_SUPERUSER;
-- Import setup file to data nodes.
\unset ECHO
-- Disable SSL to get stable error output across versions. SSL adds some output
-- that changed in PG 14.
\c -reuse-previous=on sslmode=disable
set timescaledb.debug_enable_ssl to off;
set client_min_messages to error;
-- A relatively big table on one data node
CREATE TABLE metrics_dist_remote_error(LIKE metrics_dist);
SELECT table_name FROM create_distributed_hypertable('metrics_dist_remote_error', 'time', 'device_id',
create table metrics_dist_remote_error(like metrics_dist);
select table_name from create_distributed_hypertable('metrics_dist_remote_error', 'time', 'device_id',
data_nodes => '{"data_node_1"}');
WARNING: only one data node was assigned to the hypertable
table_name
metrics_dist_remote_error
(1 row)

INSERT INTO metrics_dist_remote_error SELECT * FROM metrics_dist ORDER BY random() LIMIT 20000;
-- Create a function that raises an error every nth row.
-- It's stable, takes a second argument and returns current number of rows,
-- so that it is shipped to data nodes and not optimized out.
-- It's written in one line because I don't know how to make \set accept
-- several lines.
\set error_function 'CREATE OR REPLACE FUNCTION ts_debug_shippable_error_after_n_rows(integer, anyelement) RETURNS integer AS ':MODULE_PATHNAME', ''ts_debug_shippable_error_after_n_rows'' LANGUAGE C STABLE STRICT; '
-- Same as above, but fatal.
\set fatal_function 'CREATE OR REPLACE FUNCTION ts_debug_shippable_fatal_after_n_rows(integer, anyelement) RETURNS integer AS ':MODULE_PATHNAME', ''ts_debug_shippable_error_after_n_rows'' LANGUAGE C STABLE STRICT; '
:error_function
call distributed_exec(:'error_function');
:fatal_function
call distributed_exec(:'fatal_function');
insert into metrics_dist_remote_error select * from metrics_dist order by metrics_dist limit 20000;
-- The error messages vary wildly between the Postgres versions, dependent on
-- the particular behavior of libqp in this or that case. The purpose of this
-- test is not to solidify this accidental behavior, but to merely exercise the
Expand All @@ -33,72 +26,100 @@ set client_min_messages to ERROR;
\set ON_ERROR_STOP off
set timescaledb.remote_data_fetcher = 'rowbyrow';
explain (analyze, verbose, costs off, timing off, summary off)
select 1 from metrics_dist_remote_error where ts_debug_shippable_error_after_n_rows(0, device_id)::int != 0;
ERROR: [data_node_1]: debug point: requested to error out after 0 rows, 1 rows seen
explain (analyze, verbose, costs off, timing off, summary off)
select 1 from metrics_dist_remote_error where ts_debug_shippable_error_after_n_rows(1, device_id)::int != 0;
ERROR: [data_node_1]: debug point: requested to error out every 1 rows, 1 rows seen
ERROR: [data_node_1]: debug point: requested to error out after 1 rows, 1 rows seen
explain (analyze, verbose, costs off, timing off, summary off)
select 1 from metrics_dist_remote_error where ts_debug_shippable_error_after_n_rows(2, device_id)::int != 0;
ERROR: [data_node_1]: debug point: requested to error out after 2 rows, 2 rows seen
explain (analyze, verbose, costs off, timing off, summary off)
select 1 from metrics_dist_remote_error where ts_debug_shippable_error_after_n_rows(701, device_id)::int != 0;
ERROR: [data_node_1]: debug point: requested to error out after 701 rows, 701 rows seen
explain (analyze, verbose, costs off, timing off, summary off)
select 1 from metrics_dist_remote_error where ts_debug_shippable_error_after_n_rows(10000, device_id)::int != 0;
ERROR: [data_node_1]: debug point: requested to error out every 10000 rows, 10000 rows seen
ERROR: [data_node_1]: debug point: requested to error out after 10000 rows, 10000 rows seen
explain (analyze, verbose, costs off, timing off, summary off)
select 1 from metrics_dist_remote_error where ts_debug_shippable_error_after_n_rows(16384, device_id)::int != 0;
ERROR: [data_node_1]: debug point: requested to error out after 16384 rows, 16384 rows seen
explain (analyze, verbose, costs off, timing off, summary off)
select 1 from metrics_dist_remote_error where ts_debug_shippable_error_after_n_rows(10000000, device_id)::int != 0;
QUERY PLAN
Custom Scan (DataNodeScan) on public.metrics_dist_remote_error (actual rows=20000 loops=1)
Output: 1
Data node: data_node_1
Fetcher Type: Row by row
Chunks: _dist_hyper_X_X_chunk, _dist_hyper_X_X_chunk, _dist_hyper_X_X_chunk
Chunks: _dist_hyper_X_X_chunk, _dist_hyper_X_X_chunk
Remote SQL: SELECT NULL FROM public.metrics_dist_remote_error WHERE _timescaledb_internal.chunks_in(public.metrics_dist_remote_error.*, ARRAY[..]) AND ((public.ts_debug_shippable_error_after_n_rows(10000000, device_id) <> 0))
(6 rows)

-- We don't test fatal errors here, because PG versions before 14 are unable to
-- report them properly to the access node, so we get different errors in these
-- versions.
-- Now test the same with the cursor fetcher.
set timescaledb.remote_data_fetcher = 'cursor';
explain (analyze, verbose, costs off, timing off, summary off)
select 1 from metrics_dist_remote_error where ts_debug_shippable_fatal_after_n_rows(1, device_id)::int != 0;
ERROR: [data_node_1]: debug point: requested to error out every 1 rows, 1 rows seen
select 1 from metrics_dist_remote_error where ts_debug_shippable_error_after_n_rows(0, device_id)::int != 0;
ERROR: [data_node_1]: debug point: requested to error out after 0 rows, 1 rows seen
explain (analyze, verbose, costs off, timing off, summary off)
select 1 from metrics_dist_remote_error where ts_debug_shippable_fatal_after_n_rows(10000, device_id)::int != 0;
ERROR: [data_node_1]: debug point: requested to error out every 10000 rows, 10000 rows seen
select 1 from metrics_dist_remote_error where ts_debug_shippable_error_after_n_rows(1, device_id)::int != 0;
ERROR: [data_node_1]: debug point: requested to error out after 1 rows, 1 rows seen
explain (analyze, verbose, costs off, timing off, summary off)
select 1 from metrics_dist_remote_error where ts_debug_shippable_fatal_after_n_rows(10000000, device_id)::int != 0;
QUERY PLAN
Custom Scan (DataNodeScan) on public.metrics_dist_remote_error (actual rows=20000 loops=1)
Output: 1
Data node: data_node_1
Fetcher Type: Row by row
Chunks: _dist_hyper_X_X_chunk, _dist_hyper_X_X_chunk, _dist_hyper_X_X_chunk
Remote SQL: SELECT NULL FROM public.metrics_dist_remote_error WHERE _timescaledb_internal.chunks_in(public.metrics_dist_remote_error.*, ARRAY[..]) AND ((public.ts_debug_shippable_fatal_after_n_rows(10000000, device_id) <> 0))
(6 rows)

set timescaledb.remote_data_fetcher = 'cursor';
select 1 from metrics_dist_remote_error where ts_debug_shippable_error_after_n_rows(2, device_id)::int != 0;
ERROR: [data_node_1]: debug point: requested to error out after 2 rows, 2 rows seen
explain (analyze, verbose, costs off, timing off, summary off)
select 1 from metrics_dist_remote_error where ts_debug_shippable_error_after_n_rows(1, device_id)::int != 0;
ERROR: [data_node_1]: debug point: requested to error out every 1 rows, 1 rows seen
select 1 from metrics_dist_remote_error where ts_debug_shippable_error_after_n_rows(701, device_id)::int != 0;
ERROR: [data_node_1]: debug point: requested to error out after 701 rows, 701 rows seen
explain (analyze, verbose, costs off, timing off, summary off)
select 1 from metrics_dist_remote_error where ts_debug_shippable_error_after_n_rows(10000, device_id)::int != 0;
ERROR: [data_node_1]: debug point: requested to error out every 10000 rows, 10000 rows seen
ERROR: [data_node_1]: debug point: requested to error out after 10000 rows, 10000 rows seen
explain (analyze, verbose, costs off, timing off, summary off)
select 1 from metrics_dist_remote_error where ts_debug_shippable_error_after_n_rows(10000000, device_id)::int != 0;
QUERY PLAN
Custom Scan (DataNodeScan) on public.metrics_dist_remote_error (actual rows=20000 loops=1)
Output: 1
Data node: data_node_1
Fetcher Type: Cursor
Chunks: _dist_hyper_X_X_chunk, _dist_hyper_X_X_chunk, _dist_hyper_X_X_chunk
Chunks: _dist_hyper_X_X_chunk, _dist_hyper_X_X_chunk
Remote SQL: SELECT NULL FROM public.metrics_dist_remote_error WHERE _timescaledb_internal.chunks_in(public.metrics_dist_remote_error.*, ARRAY[..]) AND ((public.ts_debug_shippable_error_after_n_rows(10000000, device_id) <> 0))
(6 rows)

-- Table with broken send for a data type.
create table metrics_dist_bs(like metrics_dist);
alter table metrics_dist_bs alter column v0 type bs;
select table_name from create_distributed_hypertable('metrics_dist_bs',
'time', 'device_id');
table_name
metrics_dist_bs
(1 row)

set timescaledb.enable_connection_binary_data to off;
insert into metrics_dist_bs
select * from metrics_dist_remote_error;
set timescaledb.enable_connection_binary_data to on;
explain (analyze, verbose, costs off, timing off, summary off)
select 1 from metrics_dist_remote_error where ts_debug_shippable_fatal_after_n_rows(1, device_id)::int != 0;
ERROR: [data_node_1]: debug point: requested to error out every 1 rows, 1 rows seen
explain (analyze, verbose, costs off, timing off, summary off)
select 1 from metrics_dist_remote_error where ts_debug_shippable_fatal_after_n_rows(10000, device_id)::int != 0;
ERROR: [data_node_1]: debug point: requested to error out every 10000 rows, 10000 rows seen
explain (analyze, verbose, costs off, timing off, summary off)
select 1 from metrics_dist_remote_error where ts_debug_shippable_fatal_after_n_rows(10000000, device_id)::int != 0;
QUERY PLAN
Custom Scan (DataNodeScan) on public.metrics_dist_remote_error (actual rows=20000 loops=1)
Output: 1
Data node: data_node_1
Fetcher Type: Cursor
Chunks: _dist_hyper_X_X_chunk, _dist_hyper_X_X_chunk, _dist_hyper_X_X_chunk
Remote SQL: SELECT NULL FROM public.metrics_dist_remote_error WHERE _timescaledb_internal.chunks_in(public.metrics_dist_remote_error.*, ARRAY[..]) AND ((public.ts_debug_shippable_fatal_after_n_rows(10000000, device_id) <> 0))
(6 rows)
select * from metrics_dist_bs;
ERROR: [data_node_2]: debug point: requested to error out after 7103 rows, 7103 rows seen
drop table metrics_dist_bs;
-- Table with broken receive for a data type.
create table metrics_dist_br(like metrics_dist);
alter table metrics_dist_br alter column v0 type br;
select table_name from create_distributed_hypertable('metrics_dist_br',
'time', 'device_id');
table_name
metrics_dist_br
(1 row)

DROP TABLE metrics_dist_remote_error;
-- Test that the insert fails on data nodes.
insert into metrics_dist_br select * from metrics_dist_remote_error;
ERROR: [data_node_2]: debug point: requested to error out after 7103 rows, 7103 rows seen
-- Also test that the COPY fails on data nodes. Note that we use the text format
-- for the insert, so that the access node doesn't call `recv` and fail by itself.
-- It's going to use binary format for transferring COPY data to data nodes
-- regardless of the input format.
\copy (select * from metrics_dist_remote_error) to 'dist_remote_error.text' with (format text);
\copy metrics_dist_br from 'dist_remote_error.text' with (format text);
ERROR: [data_node_2]: debug point: requested to error out after 7103 rows, 7103 rows seen
drop table metrics_dist_br;
drop table metrics_dist_remote_error;
1 change: 1 addition & 0 deletions tsl/test/shared/sql/.gitignore
@@ -1,6 +1,7 @@
/continuous_aggs_compression-*.sql
/constify_now-*.sql
/dist_remote_error-*.sql
/dist_remote_error.text
/gapfill-*.sql
/generated_columns-*.sql
/ordered_append-*.sql
Expand Down