Skip to content

Commit

Permalink
Make copy fetcher more async
Browse files Browse the repository at this point in the history
Make the copy fetcher more asynchronous by separating the sending of
the request for data from the receiving of the response. By doing
that, the async append node can send the request to each data node
before it starts reading the first response. This can massively
improve the performance because the response isn't returned until the
remote node has finished executing the query and is ready to return
the first tuple.
  • Loading branch information
erimatnor committed Mar 2, 2023
1 parent 750e69e commit 386d31b
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ accidentally triggering the load of a previous DB version.**

**Bugfixes**
* #5364 Fix num_chunks inconsistency in hypertables view
* #5362 Make copy fetcher more async
* #5336 Use NameData and namestrcpy for names
* #5317 Fix some incorrect memory handling

Expand Down
60 changes: 43 additions & 17 deletions tsl/src/remote/copy_fetcher.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ typedef struct CopyFetcher
Datum *batch_values;
bool *batch_nulls;
bool file_trailer_received;
AsyncRequest *req;
} CopyFetcher;

static void copy_fetcher_send_fetch_request(DataFetcher *df);
Expand Down Expand Up @@ -61,6 +62,13 @@ copy_fetcher_reset(CopyFetcher *fetcher)
{
fetcher->state.open = false;
fetcher->file_trailer_received = false;

if (fetcher->req != NULL)
{
pfree(fetcher->req);
fetcher->req = NULL;
}

data_fetcher_reset(&fetcher->state);
}

Expand Down Expand Up @@ -113,24 +121,8 @@ copy_fetcher_send_fetch_request(DataFetcher *df)
" Use cursor fetcher instead.")));
}

PGresult *res = PQgetResult(remote_connection_get_pg_conn(fetcher->state.conn));
if (res == NULL)
{
/* Shouldn't really happen but technically possible. */
TSConnectionError err;
remote_connection_get_error(fetcher->state.conn, &err);
remote_connection_error_elog(&err, ERROR);
}
if (PQresultStatus(res) != PGRES_COPY_OUT)
{
TSConnectionError err;
remote_connection_get_result_error(res, &err);
remote_connection_error_elog(&err, ERROR);
}

fetcher->state.open = true;
PQclear(res);
pfree(req);
fetcher->req = req;
}
PG_CATCH();
{
Expand Down Expand Up @@ -310,6 +302,37 @@ end_copy_before_eof(CopyFetcher *fetcher)
end_copy(fetcher, true);
}

static void
copy_fetcher_read_fetch_response(CopyFetcher *fetcher)
{
PGconn *conn = remote_connection_get_pg_conn(fetcher->state.conn);
PGresult *res;

if (fetcher->req == NULL)
return;

res = PQgetResult(conn);
pfree(fetcher->req);
fetcher->req = NULL;

if (res == NULL)
{
/* Shouldn't really happen but technically possible. */
TSConnectionError err;
remote_connection_get_error(fetcher->state.conn, &err);
remote_connection_error_elog(&err, ERROR);
}
if (PQresultStatus(res) != PGRES_COPY_OUT)
{
TSConnectionError err;
remote_connection_get_result_error(res, &err);
PQclear(res);
remote_connection_error_elog(&err, ERROR);
}

PQclear(res);
}

/*
* Process response for ongoing async request
*/
Expand All @@ -325,6 +348,9 @@ copy_fetcher_complete(CopyFetcher *fetcher)
Assert(fetcher->state.open);
data_fetcher_validate(&fetcher->state);

if (fetcher->req != NULL)
copy_fetcher_read_fetch_response(fetcher);

/*
* We'll store the tuples in the batch_mctx. First, flush the previous
* batch.
Expand Down

0 comments on commit 386d31b

Please sign in to comment.