Skip to content

Commit

Permalink
libpqwalreceiver: Convert to libpq-be-fe-helpers.h
Browse files Browse the repository at this point in the history
In contrast to the changes to dblink and postgres_fdw, this does not fix a
bug, as libpqwalreceiver did already process interrupts.

Besides reducing code duplication, the conversion leads to libpqwalreceiver
now using reserving file descriptors for libpq connections. While not strictly
required for the use in walreceiver, we are also using libpqwalreceiver for
logical replication, where it does seem more important.

Even if we eventually decide to backpatch the prior commits, there'd be no
need to backpatch this commit, due to not fixing an active bug.

Reviewed-by: Thomas Munro <thomas.munro@gmail.com>
Discussion: https://postgr.es/m/20220925232237.p6uskba2dw6fnwj2@awork3.anarazel.de
  • Loading branch information
anarazel committed Jan 24, 2023
1 parent e460248 commit 728f86f
Showing 1 changed file with 7 additions and 46 deletions.
53 changes: 7 additions & 46 deletions src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "common/connect.h"
#include "funcapi.h"
#include "libpq-fe.h"
#include "libpq/libpq-be-fe-helpers.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "pgstat.h"
Expand Down Expand Up @@ -125,7 +126,6 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
char **err)
{
WalReceiverConn *conn;
PostgresPollingStatusType status;
const char *keys[6];
const char *vals[6];
int i = 0;
Expand Down Expand Up @@ -172,49 +172,10 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
Assert(i < sizeof(keys));

conn = palloc0(sizeof(WalReceiverConn));
conn->streamConn = PQconnectStartParams(keys, vals,
/* expand_dbname = */ true);
if (PQstatus(conn->streamConn) == CONNECTION_BAD)
goto bad_connection_errmsg;

/*
* Poll connection until we have OK or FAILED status.
*
* Per spec for PQconnectPoll, first wait till socket is write-ready.
*/
status = PGRES_POLLING_WRITING;
do
{
int io_flag;
int rc;

if (status == PGRES_POLLING_READING)
io_flag = WL_SOCKET_READABLE;
#ifdef WIN32
/* Windows needs a different test while waiting for connection-made */
else if (PQstatus(conn->streamConn) == CONNECTION_STARTED)
io_flag = WL_SOCKET_CONNECTED;
#endif
else
io_flag = WL_SOCKET_WRITEABLE;

rc = WaitLatchOrSocket(MyLatch,
WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
PQsocket(conn->streamConn),
0,
WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);

/* Interrupted? */
if (rc & WL_LATCH_SET)
{
ResetLatch(MyLatch);
ProcessWalRcvInterrupts();
}

/* If socket is ready, advance the libpq state machine */
if (rc & io_flag)
status = PQconnectPoll(conn->streamConn);
} while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
conn->streamConn =
libpqsrv_connect_params(keys, vals,
/* expand_dbname = */ true,
WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);

if (PQstatus(conn->streamConn) != CONNECTION_OK)
goto bad_connection_errmsg;
Expand Down Expand Up @@ -245,7 +206,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,

/* error path, error already set */
bad_connection:
PQfinish(conn->streamConn);
libpqsrv_disconnect(conn->streamConn);
pfree(conn);
return NULL;
}
Expand Down Expand Up @@ -744,7 +705,7 @@ libpqrcv_PQgetResult(PGconn *streamConn)
static void
libpqrcv_disconnect(WalReceiverConn *conn)
{
PQfinish(conn->streamConn);
libpqsrv_disconnect(conn->streamConn);
PQfreemem(conn->recvBuf);
pfree(conn);
}
Expand Down

0 comments on commit 728f86f

Please sign in to comment.