diff --git a/CHANGELOG.md b/CHANGELOG.md index dc12d2f7361..df597e75d55 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,10 @@ accidentally triggering the load of a previous DB version.** **Bugfixes** * #5396 Fix SEGMENTBY columns predicates to be pushed down +* #5410 Fix file trailer handling in the COPY fetcher + +**Thanks** +* @nikolaps for reporting an issue with the COPY fetcher ## 2.10.1 (2023-03-07) diff --git a/tsl/src/remote/copy_fetcher.c b/tsl/src/remote/copy_fetcher.c index 3c1ceab2863..3816a6c469b 100644 --- a/tsl/src/remote/copy_fetcher.c +++ b/tsl/src/remote/copy_fetcher.c @@ -333,6 +333,50 @@ copy_fetcher_read_fetch_response(CopyFetcher *fetcher) PQclear(res); } +/* + * Read the next data from the connection and store the data in copy_data. + * If no data can be read return false, or throw an error, otherwise + * return true. + */ +static bool +copy_fetcher_read_data(CopyFetcher *fetcher, PGconn *conn, char *volatile *dataptr, + StringInfoData *copy_data) +{ + copy_data->len = PQgetCopyData(conn, + ©_data->data, + /* async = */ false); + + /* Set dataptr to ensure data is freed with PQfreemem() in + * PG_CATCH() clause in case error is thrown. */ + *dataptr = copy_data->data; + + if (copy_data->len == -1) + { + /* Note: it is possible to get EOF without having received the + * file trailer in case there's e.g., a remote error. */ + fetcher->state.eof = true; + + /* Should read final result with PQgetResult() until it + * returns NULL. This happens later in end_copy. */ + return false; + } + else if (copy_data->len == -2) + { + /* + * Error. The docs say: consult PQerrorMessage() for the reason. + * remote_connection_elog() will do this for us. + */ + remote_connection_elog(fetcher->state.conn, ERROR); + + /* remote_connection_elog should raise an ERROR */ + pg_unreachable(); + } + + copy_data->maxlen = copy_data->len; + + return true; +} + /* * Process response for ongoing async request */ @@ -378,34 +422,12 @@ copy_fetcher_complete(CopyFetcher *fetcher) MemoryContextSwitchTo(fetcher->state.req_mctx); StringInfoData copy_data = { 0 }; + bool tuple_read = copy_fetcher_read_data(fetcher, conn, &dataptr, ©_data); - copy_data.len = PQgetCopyData(conn, - ©_data.data, - /* async = */ false); - - /* Set dataptr to ensure data is freed with PQfreemem() in - * PG_CATCH() clause in case error is thrown. */ - dataptr = copy_data.data; - - if (copy_data.len == -1) - { - /* Note: it is possible to get EOF without having received the - * file trailer in case there's e.g., a remote error. */ - fetcher->state.eof = true; - /* Should read final result with PQgetResult() until it - * returns NULL. This happens below. */ + /* Were we able to fetch new data? */ + if (!tuple_read) break; - } - else if (copy_data.len == -2) - { - /* - * Error. The docs say: consult PQerrorMessage() for the reason. - * remote_connection_elog() will do this for us. - */ - remote_connection_elog(fetcher->state.conn, ERROR); - } - copy_data.maxlen = copy_data.len; Assert(copy_data.cursor == 0); if (fetcher->state.batch_count == 0 && row == 0) @@ -432,7 +454,16 @@ copy_fetcher_complete(CopyFetcher *fetcher) /* Next PQgetCopyData() should return -1, indicating EOF and * that the remote side ended the copy. The final result * (PGRES_COMMAND_OK) should then be read with - * PQgetResult(). */ + * PQgetResult(). + * + * Perform a PQgetCopyData directly in this branch because + * if row = state.fetch_size - 1 (i.e., file_trailer is the last + * tuple of the batch), the for loop will not executed + * and PQgetCopyData will never be called. + */ + tuple_read = copy_fetcher_read_data(fetcher, conn, &dataptr, ©_data); + Assert(tuple_read == false); + break; } else { @@ -510,15 +541,14 @@ copy_fetcher_complete(CopyFetcher *fetcher) dataptr = NULL; } - /* Don't count the file trailer as a row if this was the last batch */ - fetcher->state.num_tuples = fetcher->file_trailer_received ? row - 1 : row; + fetcher->state.num_tuples = row; fetcher->state.next_tuple_idx = 0; /* Must be EOF if we didn't get as many tuples as we asked for. */ +#ifdef USE_ASSERT_CHECKING if (fetcher->state.num_tuples < fetcher->state.fetch_size) - { Assert(fetcher->state.eof); - } +#endif fetcher->state.batch_count++; diff --git a/tsl/test/expected/data_fetcher.out b/tsl/test/expected/data_fetcher.out index 64c9886aeda..faa57c77d95 100644 --- a/tsl/test/expected/data_fetcher.out +++ b/tsl/test/expected/data_fetcher.out @@ -41,6 +41,26 @@ SELECT setseed(1); INSERT INTO disttable SELECT t, (abs(timestamp_hash(t::timestamp)) % 10) + 1, random() * 10 FROM generate_series('2019-01-01'::timestamptz, '2019-01-02'::timestamptz, '1 second') as t; +-- This table contains the content for precisely one batch of the copy fetcher. The fetch_size +-- will be set to 100 below and this table contains 99 tuples and the last element on the first +-- copy batch is the file trailer (#5323). +CREATE table one_batch(ts timestamptz NOT NULL, sensor_id int NOT NULL, value float NOT NULL); +SELECT create_distributed_hypertable('one_batch', 'ts'); + create_distributed_hypertable +------------------------------- + (2,public,one_batch,t) +(1 row) + +INSERT INTO one_batch SELECT '2023-01-01'::timestamptz AS time, sensor_id, random() AS value FROM generate_series(1, 99, 1) AS g1(sensor_id) ORDER BY time; +-- Same but for the DEFAULT_FDW_FETCH_SIZE (10000) +CREATE table one_batch_default(ts timestamptz NOT NULL, sensor_id int NOT NULL, value float NOT NULL); +SELECT create_distributed_hypertable('one_batch_default', 'ts'); + create_distributed_hypertable +-------------------------------- + (3,public,one_batch_default,t) +(1 row) + +INSERT INTO one_batch_default SELECT '2023-01-01'::timestamptz AS time, sensor_id, random() AS value FROM generate_series(1, 9999, 1) AS g1(sensor_id) ORDER BY time; SET client_min_messages TO error; -- Set a smaller fetch size to ensure that the result is split into -- mutliple batches. @@ -59,6 +79,10 @@ SELECT time_bucket('1 hour', time) AS time, device, avg(temp) FROM disttable GROUP BY 1,2 ORDER BY 1,2; +-- Test for #5323 - ensure that no NULL tuples are generated +-- if the last element of the batch is the file trailer. +SELECT count(*), count(value) FROM one_batch; +SELECT count(*), count(value) FROM one_batch_default; \o \set ON_ERROR_STOP 1 -- run queries using cursor fetcher @@ -74,6 +98,10 @@ SELECT time_bucket('1 hour', time) AS time, device, avg(temp) FROM disttable GROUP BY 1,2 ORDER BY 1,2; +-- Test for #5323 - ensure that no NULL tuples are generated +-- if the last element of the batch is the file trailer. +SELECT count(*), count(value) FROM one_batch; +SELECT count(*), count(value) FROM one_batch_default; \o -- compare results :DIFF_CMD diff --git a/tsl/test/sql/data_fetcher.sql b/tsl/test/sql/data_fetcher.sql index a786636b0e1..b4f871bfcba 100644 --- a/tsl/test/sql/data_fetcher.sql +++ b/tsl/test/sql/data_fetcher.sql @@ -31,6 +31,18 @@ INSERT INTO disttable SELECT t, (abs(timestamp_hash(t::timestamp)) % 10) + 1, random() * 10 FROM generate_series('2019-01-01'::timestamptz, '2019-01-02'::timestamptz, '1 second') as t; +-- This table contains the content for precisely one batch of the copy fetcher. The fetch_size +-- will be set to 100 below and this table contains 99 tuples and the last element on the first +-- copy batch is the file trailer (#5323). +CREATE table one_batch(ts timestamptz NOT NULL, sensor_id int NOT NULL, value float NOT NULL); +SELECT create_distributed_hypertable('one_batch', 'ts'); +INSERT INTO one_batch SELECT '2023-01-01'::timestamptz AS time, sensor_id, random() AS value FROM generate_series(1, 99, 1) AS g1(sensor_id) ORDER BY time; + +-- Same but for the DEFAULT_FDW_FETCH_SIZE (10000) +CREATE table one_batch_default(ts timestamptz NOT NULL, sensor_id int NOT NULL, value float NOT NULL); +SELECT create_distributed_hypertable('one_batch_default', 'ts'); +INSERT INTO one_batch_default SELECT '2023-01-01'::timestamptz AS time, sensor_id, random() AS value FROM generate_series(1, 9999, 1) AS g1(sensor_id) ORDER BY time; + SET client_min_messages TO error; -- Set a smaller fetch size to ensure that the result is split into diff --git a/tsl/test/sql/include/data_fetcher_run.sql b/tsl/test/sql/include/data_fetcher_run.sql index f42e1b6bdf1..4855b7f962e 100644 --- a/tsl/test/sql/include/data_fetcher_run.sql +++ b/tsl/test/sql/include/data_fetcher_run.sql @@ -10,3 +10,10 @@ SELECT time_bucket('1 hour', time) AS time, device, avg(temp) FROM disttable GROUP BY 1,2 ORDER BY 1,2; + +-- Test for #5323 - ensure that no NULL tuples are generated +-- if the last element of the batch is the file trailer. +SELECT count(*), count(value) FROM one_batch; + +SELECT count(*), count(value) FROM one_batch_default; +