Skip to content

Commit

Permalink
Merge branch 'main' into dev/pallavi/flag_test_for_release
Browse files Browse the repository at this point in the history
  • Loading branch information
pallavisontakke committed Mar 3, 2023
2 parents 771d898 + 386d31b commit 1e35942
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ accidentally triggering the load of a previous DB version.**

**Bugfixes**
* #5364 Fix num_chunks inconsistency in hypertables view
* #5362 Make copy fetcher more async
* #5336 Use NameData and namestrcpy for names
* #5317 Fix some incorrect memory handling

Expand Down
60 changes: 43 additions & 17 deletions tsl/src/remote/copy_fetcher.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ typedef struct CopyFetcher
Datum *batch_values;
bool *batch_nulls;
bool file_trailer_received;
AsyncRequest *req;
} CopyFetcher;

static void copy_fetcher_send_fetch_request(DataFetcher *df);
Expand Down Expand Up @@ -61,6 +62,13 @@ copy_fetcher_reset(CopyFetcher *fetcher)
{
fetcher->state.open = false;
fetcher->file_trailer_received = false;

if (fetcher->req != NULL)
{
pfree(fetcher->req);
fetcher->req = NULL;
}

data_fetcher_reset(&fetcher->state);
}

Expand Down Expand Up @@ -113,24 +121,8 @@ copy_fetcher_send_fetch_request(DataFetcher *df)
" Use cursor fetcher instead.")));
}

PGresult *res = PQgetResult(remote_connection_get_pg_conn(fetcher->state.conn));
if (res == NULL)
{
/* Shouldn't really happen but technically possible. */
TSConnectionError err;
remote_connection_get_error(fetcher->state.conn, &err);
remote_connection_error_elog(&err, ERROR);
}
if (PQresultStatus(res) != PGRES_COPY_OUT)
{
TSConnectionError err;
remote_connection_get_result_error(res, &err);
remote_connection_error_elog(&err, ERROR);
}

fetcher->state.open = true;
PQclear(res);
pfree(req);
fetcher->req = req;
}
PG_CATCH();
{
Expand Down Expand Up @@ -310,6 +302,37 @@ end_copy_before_eof(CopyFetcher *fetcher)
end_copy(fetcher, true);
}

static void
copy_fetcher_read_fetch_response(CopyFetcher *fetcher)
{
PGconn *conn = remote_connection_get_pg_conn(fetcher->state.conn);
PGresult *res;

if (fetcher->req == NULL)
return;

res = PQgetResult(conn);
pfree(fetcher->req);
fetcher->req = NULL;

if (res == NULL)
{
/* Shouldn't really happen but technically possible. */
TSConnectionError err;
remote_connection_get_error(fetcher->state.conn, &err);
remote_connection_error_elog(&err, ERROR);
}
if (PQresultStatus(res) != PGRES_COPY_OUT)
{
TSConnectionError err;
remote_connection_get_result_error(res, &err);
PQclear(res);
remote_connection_error_elog(&err, ERROR);
}

PQclear(res);
}

/*
* Process response for ongoing async request
*/
Expand All @@ -325,6 +348,9 @@ copy_fetcher_complete(CopyFetcher *fetcher)
Assert(fetcher->state.open);
data_fetcher_validate(&fetcher->state);

if (fetcher->req != NULL)
copy_fetcher_read_fetch_response(fetcher);

/*
* We'll store the tuples in the batch_mctx. First, flush the previous
* batch.
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion tsl/test/sql/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ set(TEST_FILES
partialize_finalize.sql
reorder.sql
skip_scan.sql
size_utils.sql)
size_utils_tsl.sql)

if(CMAKE_BUILD_TYPE MATCHES Debug)
list(
Expand Down
File renamed without changes.

0 comments on commit 1e35942

Please sign in to comment.