Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix file trailer handling in the copy fetcher #5410

Merged
merged 1 commit into from Mar 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Expand Up @@ -13,6 +13,10 @@ accidentally triggering the load of a previous DB version.**

**Bugfixes**
* #5396 Fix SEGMENTBY columns predicates to be pushed down
* #5410 Fix file trailer handling in the COPY fetcher

**Thanks**
* @nikolaps for reporting an issue with the COPY fetcher

## 2.10.1 (2023-03-07)

Expand Down
90 changes: 60 additions & 30 deletions tsl/src/remote/copy_fetcher.c
Expand Up @@ -333,6 +333,50 @@ copy_fetcher_read_fetch_response(CopyFetcher *fetcher)
PQclear(res);
}

/*
* Read the next data from the connection and store the data in copy_data.
* If no data can be read return false, or throw an error, otherwise
* return true.
*/
static bool
copy_fetcher_read_data(CopyFetcher *fetcher, PGconn *conn, char *volatile *dataptr,
StringInfoData *copy_data)
{
copy_data->len = PQgetCopyData(conn,
&copy_data->data,
/* async = */ false);

/* Set dataptr to ensure data is freed with PQfreemem() in
* PG_CATCH() clause in case error is thrown. */
*dataptr = copy_data->data;

if (copy_data->len == -1)
{
/* Note: it is possible to get EOF without having received the
* file trailer in case there's e.g., a remote error. */
fetcher->state.eof = true;

/* Should read final result with PQgetResult() until it
* returns NULL. This happens later in end_copy. */
return false;
}
else if (copy_data->len == -2)
{
/*
* Error. The docs say: consult PQerrorMessage() for the reason.
* remote_connection_elog() will do this for us.
*/
remote_connection_elog(fetcher->state.conn, ERROR);

/* remote_connection_elog should raise an ERROR */
pg_unreachable();
}

copy_data->maxlen = copy_data->len;

return true;
}

/*
* Process response for ongoing async request
*/
Expand Down Expand Up @@ -378,34 +422,12 @@ copy_fetcher_complete(CopyFetcher *fetcher)
MemoryContextSwitchTo(fetcher->state.req_mctx);

StringInfoData copy_data = { 0 };
bool tuple_read = copy_fetcher_read_data(fetcher, conn, &dataptr, &copy_data);

copy_data.len = PQgetCopyData(conn,
&copy_data.data,
/* async = */ false);

/* Set dataptr to ensure data is freed with PQfreemem() in
* PG_CATCH() clause in case error is thrown. */
dataptr = copy_data.data;

if (copy_data.len == -1)
{
/* Note: it is possible to get EOF without having received the
* file trailer in case there's e.g., a remote error. */
fetcher->state.eof = true;
/* Should read final result with PQgetResult() until it
* returns NULL. This happens below. */
/* Were we able to fetch new data? */
if (!tuple_read)
break;
}
else if (copy_data.len == -2)
{
/*
* Error. The docs say: consult PQerrorMessage() for the reason.
* remote_connection_elog() will do this for us.
*/
remote_connection_elog(fetcher->state.conn, ERROR);
}

copy_data.maxlen = copy_data.len;
Assert(copy_data.cursor == 0);

if (fetcher->state.batch_count == 0 && row == 0)
Expand All @@ -432,7 +454,16 @@ copy_fetcher_complete(CopyFetcher *fetcher)
/* Next PQgetCopyData() should return -1, indicating EOF and
* that the remote side ended the copy. The final result
* (PGRES_COMMAND_OK) should then be read with
* PQgetResult(). */
* PQgetResult().
*
* Perform a PQgetCopyData directly in this branch because
* if row = state.fetch_size - 1 (i.e., file_trailer is the last
* tuple of the batch), the for loop will not executed
* and PQgetCopyData will never be called.
*/
tuple_read = copy_fetcher_read_data(fetcher, conn, &dataptr, &copy_data);
Assert(tuple_read == false);
break;
}
else
{
Expand Down Expand Up @@ -510,15 +541,14 @@ copy_fetcher_complete(CopyFetcher *fetcher)
dataptr = NULL;
}

/* Don't count the file trailer as a row if this was the last batch */
fetcher->state.num_tuples = fetcher->file_trailer_received ? row - 1 : row;
fetcher->state.num_tuples = row;
fetcher->state.next_tuple_idx = 0;

/* Must be EOF if we didn't get as many tuples as we asked for. */
#ifdef USE_ASSERT_CHECKING
if (fetcher->state.num_tuples < fetcher->state.fetch_size)
{
Assert(fetcher->state.eof);
}
#endif

fetcher->state.batch_count++;

Expand Down
28 changes: 28 additions & 0 deletions tsl/test/expected/data_fetcher.out
Expand Up @@ -41,6 +41,26 @@ SELECT setseed(1);
INSERT INTO disttable
SELECT t, (abs(timestamp_hash(t::timestamp)) % 10) + 1, random() * 10
FROM generate_series('2019-01-01'::timestamptz, '2019-01-02'::timestamptz, '1 second') as t;
-- This table contains the content for precisely one batch of the copy fetcher. The fetch_size
-- will be set to 100 below and this table contains 99 tuples and the last element on the first
-- copy batch is the file trailer (#5323).
CREATE table one_batch(ts timestamptz NOT NULL, sensor_id int NOT NULL, value float NOT NULL);
SELECT create_distributed_hypertable('one_batch', 'ts');
create_distributed_hypertable
-------------------------------
(2,public,one_batch,t)
(1 row)

INSERT INTO one_batch SELECT '2023-01-01'::timestamptz AS time, sensor_id, random() AS value FROM generate_series(1, 99, 1) AS g1(sensor_id) ORDER BY time;
-- Same but for the DEFAULT_FDW_FETCH_SIZE (10000)
CREATE table one_batch_default(ts timestamptz NOT NULL, sensor_id int NOT NULL, value float NOT NULL);
SELECT create_distributed_hypertable('one_batch_default', 'ts');
create_distributed_hypertable
--------------------------------
(3,public,one_batch_default,t)
(1 row)

INSERT INTO one_batch_default SELECT '2023-01-01'::timestamptz AS time, sensor_id, random() AS value FROM generate_series(1, 9999, 1) AS g1(sensor_id) ORDER BY time;
SET client_min_messages TO error;
-- Set a smaller fetch size to ensure that the result is split into
-- mutliple batches.
Expand All @@ -59,6 +79,10 @@ SELECT time_bucket('1 hour', time) AS time, device, avg(temp)
FROM disttable
GROUP BY 1,2
ORDER BY 1,2;
-- Test for #5323 - ensure that no NULL tuples are generated
-- if the last element of the batch is the file trailer.
SELECT count(*), count(value) FROM one_batch;
SELECT count(*), count(value) FROM one_batch_default;
\o
\set ON_ERROR_STOP 1
-- run queries using cursor fetcher
Expand All @@ -74,6 +98,10 @@ SELECT time_bucket('1 hour', time) AS time, device, avg(temp)
FROM disttable
GROUP BY 1,2
ORDER BY 1,2;
-- Test for #5323 - ensure that no NULL tuples are generated
-- if the last element of the batch is the file trailer.
SELECT count(*), count(value) FROM one_batch;
SELECT count(*), count(value) FROM one_batch_default;
\o
-- compare results
:DIFF_CMD
Expand Down
12 changes: 12 additions & 0 deletions tsl/test/sql/data_fetcher.sql
Expand Up @@ -31,6 +31,18 @@ INSERT INTO disttable
SELECT t, (abs(timestamp_hash(t::timestamp)) % 10) + 1, random() * 10
FROM generate_series('2019-01-01'::timestamptz, '2019-01-02'::timestamptz, '1 second') as t;

-- This table contains the content for precisely one batch of the copy fetcher. The fetch_size
-- will be set to 100 below and this table contains 99 tuples and the last element on the first
-- copy batch is the file trailer (#5323).
CREATE table one_batch(ts timestamptz NOT NULL, sensor_id int NOT NULL, value float NOT NULL);
SELECT create_distributed_hypertable('one_batch', 'ts');
INSERT INTO one_batch SELECT '2023-01-01'::timestamptz AS time, sensor_id, random() AS value FROM generate_series(1, 99, 1) AS g1(sensor_id) ORDER BY time;

-- Same but for the DEFAULT_FDW_FETCH_SIZE (10000)
CREATE table one_batch_default(ts timestamptz NOT NULL, sensor_id int NOT NULL, value float NOT NULL);
SELECT create_distributed_hypertable('one_batch_default', 'ts');
INSERT INTO one_batch_default SELECT '2023-01-01'::timestamptz AS time, sensor_id, random() AS value FROM generate_series(1, 9999, 1) AS g1(sensor_id) ORDER BY time;

SET client_min_messages TO error;

-- Set a smaller fetch size to ensure that the result is split into
Expand Down
7 changes: 7 additions & 0 deletions tsl/test/sql/include/data_fetcher_run.sql
Expand Up @@ -10,3 +10,10 @@ SELECT time_bucket('1 hour', time) AS time, device, avg(temp)
FROM disttable
GROUP BY 1,2
ORDER BY 1,2;

-- Test for #5323 - ensure that no NULL tuples are generated
-- if the last element of the batch is the file trailer.
SELECT count(*), count(value) FROM one_batch;

SELECT count(*), count(value) FROM one_batch_default;