Skip to content

Commit

Permalink
Fix file trailer handling in the COPY fetcher
Browse files Browse the repository at this point in the history
The copy fetcher fetches tuples in batches. When the last element in the
batch is the file trailer, the trailer was not handled correctly. The
existing logic did not perform a PQgetCopyData in that case. Therefore
the state of the fetcher was not set to EOF and the copy operation was
not correctly finished at this point.

Fixes: #5323
  • Loading branch information
jnidzwetzki committed Mar 9, 2023
1 parent a854b27 commit 7b8177a
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 30 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ accidentally triggering the load of a previous DB version.**

**Bugfixes**
* #5396 Fix SEGMENTBY columns predicates to be pushed down
* #5410 Fix file trailer handling in the COPY fetcher

**Thanks**
* @nikolaps for reporting an issue with the COPY fetcher

## 2.10.1 (2023-03-07)

Expand Down
90 changes: 60 additions & 30 deletions tsl/src/remote/copy_fetcher.c
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,50 @@ copy_fetcher_read_fetch_response(CopyFetcher *fetcher)
PQclear(res);
}

/*
* Read the next data from the connection and store the data in copy_data.
* If no data can be read return false, or throw an error, otherwise
* return true.
*/
static bool
copy_fetcher_read_data(CopyFetcher *fetcher, PGconn *conn, char *volatile *dataptr,
StringInfoData *copy_data)
{
copy_data->len = PQgetCopyData(conn,
&copy_data->data,
/* async = */ false);

/* Set dataptr to ensure data is freed with PQfreemem() in
* PG_CATCH() clause in case error is thrown. */
*dataptr = copy_data->data;

if (copy_data->len == -1)
{
/* Note: it is possible to get EOF without having received the
* file trailer in case there's e.g., a remote error. */
fetcher->state.eof = true;

/* Should read final result with PQgetResult() until it
* returns NULL. This happens later in end_copy. */
return false;
}
else if (copy_data->len == -2)
{
/*
* Error. The docs say: consult PQerrorMessage() for the reason.
* remote_connection_elog() will do this for us.
*/
remote_connection_elog(fetcher->state.conn, ERROR);

/* remote_connection_elog should raise an ERROR */
pg_unreachable();
}

copy_data->maxlen = copy_data->len;

return true;
}

/*
* Process response for ongoing async request
*/
Expand Down Expand Up @@ -378,34 +422,12 @@ copy_fetcher_complete(CopyFetcher *fetcher)
MemoryContextSwitchTo(fetcher->state.req_mctx);

StringInfoData copy_data = { 0 };
bool tuple_read = copy_fetcher_read_data(fetcher, conn, &dataptr, &copy_data);

copy_data.len = PQgetCopyData(conn,
&copy_data.data,
/* async = */ false);

/* Set dataptr to ensure data is freed with PQfreemem() in
* PG_CATCH() clause in case error is thrown. */
dataptr = copy_data.data;

if (copy_data.len == -1)
{
/* Note: it is possible to get EOF without having received the
* file trailer in case there's e.g., a remote error. */
fetcher->state.eof = true;
/* Should read final result with PQgetResult() until it
* returns NULL. This happens below. */
/* Were we able to fetch new data? */
if (!tuple_read)
break;
}
else if (copy_data.len == -2)
{
/*
* Error. The docs say: consult PQerrorMessage() for the reason.
* remote_connection_elog() will do this for us.
*/
remote_connection_elog(fetcher->state.conn, ERROR);
}

copy_data.maxlen = copy_data.len;
Assert(copy_data.cursor == 0);

if (fetcher->state.batch_count == 0 && row == 0)
Expand All @@ -432,7 +454,16 @@ copy_fetcher_complete(CopyFetcher *fetcher)
/* Next PQgetCopyData() should return -1, indicating EOF and
* that the remote side ended the copy. The final result
* (PGRES_COMMAND_OK) should then be read with
* PQgetResult(). */
* PQgetResult().
*
* Perform a PQgetCopyData directly in this branch because
* if row = state.fetch_size - 1 (i.e., file_trailer is the last
* tuple of the batch), the for loop will not executed
* and PQgetCopyData will never be called.
*/
tuple_read = copy_fetcher_read_data(fetcher, conn, &dataptr, &copy_data);
Assert(tuple_read == false);
break;
}
else
{
Expand Down Expand Up @@ -510,15 +541,14 @@ copy_fetcher_complete(CopyFetcher *fetcher)
dataptr = NULL;
}

/* Don't count the file trailer as a row if this was the last batch */
fetcher->state.num_tuples = fetcher->file_trailer_received ? row - 1 : row;
fetcher->state.num_tuples = row;
fetcher->state.next_tuple_idx = 0;

/* Must be EOF if we didn't get as many tuples as we asked for. */
#ifdef USE_ASSERT_CHECKING
if (fetcher->state.num_tuples < fetcher->state.fetch_size)
{
Assert(fetcher->state.eof);
}
#endif

fetcher->state.batch_count++;

Expand Down
28 changes: 28 additions & 0 deletions tsl/test/expected/data_fetcher.out
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,26 @@ SELECT setseed(1);
INSERT INTO disttable
SELECT t, (abs(timestamp_hash(t::timestamp)) % 10) + 1, random() * 10
FROM generate_series('2019-01-01'::timestamptz, '2019-01-02'::timestamptz, '1 second') as t;
-- This table contains the content for precisely one batch of the copy fetcher. The fetch_size
-- will be set to 100 below and this table contains 99 tuples and the last element on the first
-- copy batch is the file trailer (#5323).
CREATE table one_batch(ts timestamptz NOT NULL, sensor_id int NOT NULL, value float NOT NULL);
SELECT create_distributed_hypertable('one_batch', 'ts');
create_distributed_hypertable
-------------------------------
(2,public,one_batch,t)
(1 row)

INSERT INTO one_batch SELECT '2023-01-01'::timestamptz AS time, sensor_id, random() AS value FROM generate_series(1, 99, 1) AS g1(sensor_id) ORDER BY time;
-- Same but for the DEFAULT_FDW_FETCH_SIZE (10000)
CREATE table one_batch_default(ts timestamptz NOT NULL, sensor_id int NOT NULL, value float NOT NULL);
SELECT create_distributed_hypertable('one_batch_default', 'ts');
create_distributed_hypertable
--------------------------------
(3,public,one_batch_default,t)
(1 row)

INSERT INTO one_batch_default SELECT '2023-01-01'::timestamptz AS time, sensor_id, random() AS value FROM generate_series(1, 9999, 1) AS g1(sensor_id) ORDER BY time;
SET client_min_messages TO error;
-- Set a smaller fetch size to ensure that the result is split into
-- mutliple batches.
Expand All @@ -59,6 +79,10 @@ SELECT time_bucket('1 hour', time) AS time, device, avg(temp)
FROM disttable
GROUP BY 1,2
ORDER BY 1,2;
-- Test for #5323 - ensure that no NULL tuples are generated
-- if the last element of the batch is the file trailer.
SELECT count(*), count(value) FROM one_batch;
SELECT count(*), count(value) FROM one_batch_default;
\o
\set ON_ERROR_STOP 1
-- run queries using cursor fetcher
Expand All @@ -74,6 +98,10 @@ SELECT time_bucket('1 hour', time) AS time, device, avg(temp)
FROM disttable
GROUP BY 1,2
ORDER BY 1,2;
-- Test for #5323 - ensure that no NULL tuples are generated
-- if the last element of the batch is the file trailer.
SELECT count(*), count(value) FROM one_batch;
SELECT count(*), count(value) FROM one_batch_default;
\o
-- compare results
:DIFF_CMD
Expand Down
12 changes: 12 additions & 0 deletions tsl/test/sql/data_fetcher.sql
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ INSERT INTO disttable
SELECT t, (abs(timestamp_hash(t::timestamp)) % 10) + 1, random() * 10
FROM generate_series('2019-01-01'::timestamptz, '2019-01-02'::timestamptz, '1 second') as t;

-- This table contains the content for precisely one batch of the copy fetcher. The fetch_size
-- will be set to 100 below and this table contains 99 tuples and the last element on the first
-- copy batch is the file trailer (#5323).
CREATE table one_batch(ts timestamptz NOT NULL, sensor_id int NOT NULL, value float NOT NULL);
SELECT create_distributed_hypertable('one_batch', 'ts');
INSERT INTO one_batch SELECT '2023-01-01'::timestamptz AS time, sensor_id, random() AS value FROM generate_series(1, 99, 1) AS g1(sensor_id) ORDER BY time;

-- Same but for the DEFAULT_FDW_FETCH_SIZE (10000)
CREATE table one_batch_default(ts timestamptz NOT NULL, sensor_id int NOT NULL, value float NOT NULL);
SELECT create_distributed_hypertable('one_batch_default', 'ts');
INSERT INTO one_batch_default SELECT '2023-01-01'::timestamptz AS time, sensor_id, random() AS value FROM generate_series(1, 9999, 1) AS g1(sensor_id) ORDER BY time;

SET client_min_messages TO error;

-- Set a smaller fetch size to ensure that the result is split into
Expand Down
7 changes: 7 additions & 0 deletions tsl/test/sql/include/data_fetcher_run.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,10 @@ SELECT time_bucket('1 hour', time) AS time, device, avg(temp)
FROM disttable
GROUP BY 1,2
ORDER BY 1,2;

-- Test for #5323 - ensure that no NULL tuples are generated
-- if the last element of the batch is the file trailer.
SELECT count(*), count(value) FROM one_batch;

SELECT count(*), count(value) FROM one_batch_default;

0 comments on commit 7b8177a

Please sign in to comment.