From 96574a749757b20e995dd6fcd9be5140f34092c1 Mon Sep 17 00:00:00 2001 From: Jan Nidzwetzki Date: Tue, 7 Mar 2023 13:32:02 +0100 Subject: [PATCH] Fix file trailer handling in the COPY fetcher The copy fetcher fetches tuples in batches. When the last element in the batch is the file trailer, the trailer was not handled correctly. The existing logic did not perform a PQgetCopyData in that case. Therefore the state of the fetcher was not set to EOF and the copy operation was not correctly finished at this point. Fixes: #5323 --- CHANGELOG.md | 4 + tsl/src/remote/copy_fetcher.c | 90 +++++++++++++++-------- tsl/test/expected/data_fetcher.out | 28 +++++++ tsl/test/sql/data_fetcher.sql | 12 +++ tsl/test/sql/include/data_fetcher_run.sql | 7 ++ 5 files changed, 111 insertions(+), 30 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index dc12d2f7361..df597e75d55 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,10 @@ accidentally triggering the load of a previous DB version.** **Bugfixes** * #5396 Fix SEGMENTBY columns predicates to be pushed down +* #5410 Fix file trailer handling in the COPY fetcher + +**Thanks** +* @nikolaps for reporting an issue with the COPY fetcher ## 2.10.1 (2023-03-07) diff --git a/tsl/src/remote/copy_fetcher.c b/tsl/src/remote/copy_fetcher.c index 3c1ceab2863..3816a6c469b 100644 --- a/tsl/src/remote/copy_fetcher.c +++ b/tsl/src/remote/copy_fetcher.c @@ -333,6 +333,50 @@ copy_fetcher_read_fetch_response(CopyFetcher *fetcher) PQclear(res); } +/* + * Read the next data from the connection and store the data in copy_data. + * If no data can be read return false, or throw an error, otherwise + * return true. + */ +static bool +copy_fetcher_read_data(CopyFetcher *fetcher, PGconn *conn, char *volatile *dataptr, + StringInfoData *copy_data) +{ + copy_data->len = PQgetCopyData(conn, + ©_data->data, + /* async = */ false); + + /* Set dataptr to ensure data is freed with PQfreemem() in + * PG_CATCH() clause in case error is thrown. */ + *dataptr = copy_data->data; + + if (copy_data->len == -1) + { + /* Note: it is possible to get EOF without having received the + * file trailer in case there's e.g., a remote error. */ + fetcher->state.eof = true; + + /* Should read final result with PQgetResult() until it + * returns NULL. This happens later in end_copy. */ + return false; + } + else if (copy_data->len == -2) + { + /* + * Error. The docs say: consult PQerrorMessage() for the reason. + * remote_connection_elog() will do this for us. + */ + remote_connection_elog(fetcher->state.conn, ERROR); + + /* remote_connection_elog should raise an ERROR */ + pg_unreachable(); + } + + copy_data->maxlen = copy_data->len; + + return true; +} + /* * Process response for ongoing async request */ @@ -378,34 +422,12 @@ copy_fetcher_complete(CopyFetcher *fetcher) MemoryContextSwitchTo(fetcher->state.req_mctx); StringInfoData copy_data = { 0 }; + bool tuple_read = copy_fetcher_read_data(fetcher, conn, &dataptr, ©_data); - copy_data.len = PQgetCopyData(conn, - ©_data.data, - /* async = */ false); - - /* Set dataptr to ensure data is freed with PQfreemem() in - * PG_CATCH() clause in case error is thrown. */ - dataptr = copy_data.data; - - if (copy_data.len == -1) - { - /* Note: it is possible to get EOF without having received the - * file trailer in case there's e.g., a remote error. */ - fetcher->state.eof = true; - /* Should read final result with PQgetResult() until it - * returns NULL. This happens below. */ + /* Were we able to fetch new data? */ + if (!tuple_read) break; - } - else if (copy_data.len == -2) - { - /* - * Error. The docs say: consult PQerrorMessage() for the reason. - * remote_connection_elog() will do this for us. - */ - remote_connection_elog(fetcher->state.conn, ERROR); - } - copy_data.maxlen = copy_data.len; Assert(copy_data.cursor == 0); if (fetcher->state.batch_count == 0 && row == 0) @@ -432,7 +454,16 @@ copy_fetcher_complete(CopyFetcher *fetcher) /* Next PQgetCopyData() should return -1, indicating EOF and * that the remote side ended the copy. The final result * (PGRES_COMMAND_OK) should then be read with - * PQgetResult(). */ + * PQgetResult(). + * + * Perform a PQgetCopyData directly in this branch because + * if row = state.fetch_size - 1 (i.e., file_trailer is the last + * tuple of the batch), the for loop will not executed + * and PQgetCopyData will never be called. + */ + tuple_read = copy_fetcher_read_data(fetcher, conn, &dataptr, ©_data); + Assert(tuple_read == false); + break; } else { @@ -510,15 +541,14 @@ copy_fetcher_complete(CopyFetcher *fetcher) dataptr = NULL; } - /* Don't count the file trailer as a row if this was the last batch */ - fetcher->state.num_tuples = fetcher->file_trailer_received ? row - 1 : row; + fetcher->state.num_tuples = row; fetcher->state.next_tuple_idx = 0; /* Must be EOF if we didn't get as many tuples as we asked for. */ +#ifdef USE_ASSERT_CHECKING if (fetcher->state.num_tuples < fetcher->state.fetch_size) - { Assert(fetcher->state.eof); - } +#endif fetcher->state.batch_count++; diff --git a/tsl/test/expected/data_fetcher.out b/tsl/test/expected/data_fetcher.out index 64c9886aeda..faa57c77d95 100644 --- a/tsl/test/expected/data_fetcher.out +++ b/tsl/test/expected/data_fetcher.out @@ -41,6 +41,26 @@ SELECT setseed(1); INSERT INTO disttable SELECT t, (abs(timestamp_hash(t::timestamp)) % 10) + 1, random() * 10 FROM generate_series('2019-01-01'::timestamptz, '2019-01-02'::timestamptz, '1 second') as t; +-- This table contains the content for precisely one batch of the copy fetcher. The fetch_size +-- will be set to 100 below and this table contains 99 tuples and the last element on the first +-- copy batch is the file trailer (#5323). +CREATE table one_batch(ts timestamptz NOT NULL, sensor_id int NOT NULL, value float NOT NULL); +SELECT create_distributed_hypertable('one_batch', 'ts'); + create_distributed_hypertable +------------------------------- + (2,public,one_batch,t) +(1 row) + +INSERT INTO one_batch SELECT '2023-01-01'::timestamptz AS time, sensor_id, random() AS value FROM generate_series(1, 99, 1) AS g1(sensor_id) ORDER BY time; +-- Same but for the DEFAULT_FDW_FETCH_SIZE (10000) +CREATE table one_batch_default(ts timestamptz NOT NULL, sensor_id int NOT NULL, value float NOT NULL); +SELECT create_distributed_hypertable('one_batch_default', 'ts'); + create_distributed_hypertable +-------------------------------- + (3,public,one_batch_default,t) +(1 row) + +INSERT INTO one_batch_default SELECT '2023-01-01'::timestamptz AS time, sensor_id, random() AS value FROM generate_series(1, 9999, 1) AS g1(sensor_id) ORDER BY time; SET client_min_messages TO error; -- Set a smaller fetch size to ensure that the result is split into -- mutliple batches. @@ -59,6 +79,10 @@ SELECT time_bucket('1 hour', time) AS time, device, avg(temp) FROM disttable GROUP BY 1,2 ORDER BY 1,2; +-- Test for #5323 - ensure that no NULL tuples are generated +-- if the last element of the batch is the file trailer. +SELECT count(*), count(value) FROM one_batch; +SELECT count(*), count(value) FROM one_batch_default; \o \set ON_ERROR_STOP 1 -- run queries using cursor fetcher @@ -74,6 +98,10 @@ SELECT time_bucket('1 hour', time) AS time, device, avg(temp) FROM disttable GROUP BY 1,2 ORDER BY 1,2; +-- Test for #5323 - ensure that no NULL tuples are generated +-- if the last element of the batch is the file trailer. +SELECT count(*), count(value) FROM one_batch; +SELECT count(*), count(value) FROM one_batch_default; \o -- compare results :DIFF_CMD diff --git a/tsl/test/sql/data_fetcher.sql b/tsl/test/sql/data_fetcher.sql index a786636b0e1..b4f871bfcba 100644 --- a/tsl/test/sql/data_fetcher.sql +++ b/tsl/test/sql/data_fetcher.sql @@ -31,6 +31,18 @@ INSERT INTO disttable SELECT t, (abs(timestamp_hash(t::timestamp)) % 10) + 1, random() * 10 FROM generate_series('2019-01-01'::timestamptz, '2019-01-02'::timestamptz, '1 second') as t; +-- This table contains the content for precisely one batch of the copy fetcher. The fetch_size +-- will be set to 100 below and this table contains 99 tuples and the last element on the first +-- copy batch is the file trailer (#5323). +CREATE table one_batch(ts timestamptz NOT NULL, sensor_id int NOT NULL, value float NOT NULL); +SELECT create_distributed_hypertable('one_batch', 'ts'); +INSERT INTO one_batch SELECT '2023-01-01'::timestamptz AS time, sensor_id, random() AS value FROM generate_series(1, 99, 1) AS g1(sensor_id) ORDER BY time; + +-- Same but for the DEFAULT_FDW_FETCH_SIZE (10000) +CREATE table one_batch_default(ts timestamptz NOT NULL, sensor_id int NOT NULL, value float NOT NULL); +SELECT create_distributed_hypertable('one_batch_default', 'ts'); +INSERT INTO one_batch_default SELECT '2023-01-01'::timestamptz AS time, sensor_id, random() AS value FROM generate_series(1, 9999, 1) AS g1(sensor_id) ORDER BY time; + SET client_min_messages TO error; -- Set a smaller fetch size to ensure that the result is split into diff --git a/tsl/test/sql/include/data_fetcher_run.sql b/tsl/test/sql/include/data_fetcher_run.sql index f42e1b6bdf1..4855b7f962e 100644 --- a/tsl/test/sql/include/data_fetcher_run.sql +++ b/tsl/test/sql/include/data_fetcher_run.sql @@ -10,3 +10,10 @@ SELECT time_bucket('1 hour', time) AS time, device, avg(temp) FROM disttable GROUP BY 1,2 ORDER BY 1,2; + +-- Test for #5323 - ensure that no NULL tuples are generated +-- if the last element of the batch is the file trailer. +SELECT count(*), count(value) FROM one_batch; + +SELECT count(*), count(value) FROM one_batch_default; +