Skip to content

Commit

Permalink
Make dblink interruptible, via new libpqsrv APIs.
Browse files Browse the repository at this point in the history
This replaces dblink's blocking libpq calls, allowing cancellation and
allowing DROP DATABASE (of a database not involved in the query).  Apart
from explicit dblink_cancel_query() calls, dblink still doesn't cancel
the remote side.  The replacement for the blocking calls consists of
new, general-purpose query execution wrappers in the libpqsrv facility.
Out-of-tree extensions should adopt these.  Use them in postgres_fdw,
replacing a local implementation from which the libpqsrv implementation
derives.  This is a bug fix for dblink.  Code inspection identified the
bug at least thirteen years ago, but user complaints have not appeared.
Hence, no back-patch for now.

Discussion: https://postgr.es/m/20231122012945.74@rfd.leadboat.com
  • Loading branch information
nmisch committed Jan 8, 2024
1 parent 0efc831 commit d3c5f37
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 95 deletions.
28 changes: 17 additions & 11 deletions contrib/dblink/dblink.c
Expand Up @@ -61,6 +61,7 @@
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/varlena.h"
#include "utils/wait_event.h"

PG_MODULE_MAGIC;

Expand Down Expand Up @@ -133,6 +134,7 @@ static HTAB *remoteConnHash = NULL;
/* custom wait event values, retrieved from shared memory */
static uint32 dblink_we_connect = 0;
static uint32 dblink_we_get_conn = 0;
static uint32 dblink_we_get_result = 0;

/*
* Following is list that holds multiple remote connections.
Expand Down Expand Up @@ -252,6 +254,9 @@ dblink_init(void)
{
if (!pconn)
{
if (dblink_we_get_result == 0)
dblink_we_get_result = WaitEventExtensionNew("DblinkGetResult");

pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn));
pconn->conn = NULL;
pconn->openCursorCount = 0;
Expand Down Expand Up @@ -442,7 +447,7 @@ dblink_open(PG_FUNCTION_ARGS)
/* If we are not in a transaction, start one */
if (PQtransactionStatus(conn) == PQTRANS_IDLE)
{
res = PQexec(conn, "BEGIN");
res = libpqsrv_exec(conn, "BEGIN", dblink_we_get_result);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
dblink_res_internalerror(conn, res, "begin error");
PQclear(res);
Expand All @@ -461,7 +466,7 @@ dblink_open(PG_FUNCTION_ARGS)
(rconn->openCursorCount)++;

appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
res = PQexec(conn, buf.data);
res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
dblink_res_error(conn, conname, res, fail,
Expand Down Expand Up @@ -530,7 +535,7 @@ dblink_close(PG_FUNCTION_ARGS)
appendStringInfo(&buf, "CLOSE %s", curname);

/* close the cursor */
res = PQexec(conn, buf.data);
res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
dblink_res_error(conn, conname, res, fail,
Expand All @@ -550,7 +555,7 @@ dblink_close(PG_FUNCTION_ARGS)
{
rconn->newXactForCursor = false;

res = PQexec(conn, "COMMIT");
res = libpqsrv_exec(conn, "COMMIT", dblink_we_get_result);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
dblink_res_internalerror(conn, res, "commit error");
PQclear(res);
Expand Down Expand Up @@ -632,7 +637,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
* PGresult will be long-lived even though we are still in a short-lived
* memory context.
*/
res = PQexec(conn, buf.data);
res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
if (!res ||
(PQresultStatus(res) != PGRES_COMMAND_OK &&
PQresultStatus(res) != PGRES_TUPLES_OK))
Expand Down Expand Up @@ -780,7 +785,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
else
{
/* async result retrieval, do it the old way */
PGresult *res = PQgetResult(conn);
PGresult *res = libpqsrv_get_result(conn, dblink_we_get_result);

/* NULL means we're all done with the async results */
if (res)
Expand Down Expand Up @@ -1088,7 +1093,8 @@ materializeQueryResult(FunctionCallInfo fcinfo,
PQclear(sinfo.last_res);
PQclear(sinfo.cur_res);
/* and clear out any pending data in libpq */
while ((res = PQgetResult(conn)) != NULL)
while ((res = libpqsrv_get_result(conn, dblink_we_get_result)) !=
NULL)
PQclear(res);
PG_RE_THROW();
}
Expand All @@ -1115,7 +1121,7 @@ storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql)
{
CHECK_FOR_INTERRUPTS();

sinfo->cur_res = PQgetResult(conn);
sinfo->cur_res = libpqsrv_get_result(conn, dblink_we_get_result);
if (!sinfo->cur_res)
break;

Expand Down Expand Up @@ -1443,7 +1449,7 @@ dblink_exec(PG_FUNCTION_ARGS)
if (!conn)
dblink_conn_not_avail(conname);

res = PQexec(conn, sql);
res = libpqsrv_exec(conn, sql, dblink_we_get_result);
if (!res ||
(PQresultStatus(res) != PGRES_COMMAND_OK &&
PQresultStatus(res) != PGRES_TUPLES_OK))
Expand Down Expand Up @@ -2739,8 +2745,8 @@ dblink_res_error(PGconn *conn, const char *conname, PGresult *res,

/*
* If we don't get a message from the PGresult, try the PGconn. This is
* needed because for connection-level failures, PQexec may just return
* NULL, not a PGresult at all.
* needed because for connection-level failures, PQgetResult may just
* return NULL, not a PGresult at all.
*/
if (message_primary == NULL)
message_primary = pchomp(PQerrorMessage(conn));
Expand Down
88 changes: 17 additions & 71 deletions contrib/postgres_fdw/connection.c
Expand Up @@ -187,6 +187,10 @@ GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
{
HASHCTL ctl;

if (pgfdw_we_get_result == 0)
pgfdw_we_get_result =
WaitEventExtensionNew("PostgresFdwGetResult");

ctl.keysize = sizeof(ConnCacheKey);
ctl.entrysize = sizeof(ConnCacheEntry);
ConnectionHash = hash_create("postgres_fdw connections", 8,
Expand Down Expand Up @@ -716,7 +720,7 @@ do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
*/
if (consume_input && !PQconsumeInput(conn))
pgfdw_report_error(ERROR, NULL, conn, false, sql);
res = pgfdw_get_result(conn, sql);
res = pgfdw_get_result(conn);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, conn, true, sql);
PQclear(res);
Expand Down Expand Up @@ -819,7 +823,9 @@ GetPrepStmtNumber(PGconn *conn)
/*
* Submit a query and wait for the result.
*
* This function is interruptible by signals.
* Since we don't use non-blocking mode, this can't process interrupts while
* pushing the query text to the server. That risk is relatively small, so we
* ignore that for now.
*
* Caller is responsible for the error handling on the result.
*/
Expand All @@ -830,81 +836,20 @@ pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
if (state && state->pendingAreq)
process_pending_request(state->pendingAreq);

/*
* Submit a query. Since we don't use non-blocking mode, this also can
* block. But its risk is relatively small, so we ignore that for now.
*/
if (!PQsendQuery(conn, query))
pgfdw_report_error(ERROR, NULL, conn, false, query);

/* Wait for the result. */
return pgfdw_get_result(conn, query);
return NULL;
return pgfdw_get_result(conn);
}

/*
* Wait for the result from a prior asynchronous execution function call.
*
* This function offers quick responsiveness by checking for any interruptions.
*
* This function emulates PQexec()'s behavior of returning the last result
* when there are many.
* Wrap libpqsrv_get_result_last(), adding wait event.
*
* Caller is responsible for the error handling on the result.
*/
PGresult *
pgfdw_get_result(PGconn *conn, const char *query)
pgfdw_get_result(PGconn *conn)
{
PGresult *volatile last_res = NULL;

/* In what follows, do not leak any PGresults on an error. */
PG_TRY();
{
for (;;)
{
PGresult *res;

while (PQisBusy(conn))
{
int wc;

/* first time, allocate or get the custom wait event */
if (pgfdw_we_get_result == 0)
pgfdw_we_get_result = WaitEventExtensionNew("PostgresFdwGetResult");

/* Sleep until there's something to do */
wc = WaitLatchOrSocket(MyLatch,
WL_LATCH_SET | WL_SOCKET_READABLE |
WL_EXIT_ON_PM_DEATH,
PQsocket(conn),
-1L, pgfdw_we_get_result);
ResetLatch(MyLatch);

CHECK_FOR_INTERRUPTS();

/* Data available in socket? */
if (wc & WL_SOCKET_READABLE)
{
if (!PQconsumeInput(conn))
pgfdw_report_error(ERROR, NULL, conn, false, query);
}
}

res = PQgetResult(conn);
if (res == NULL)
break; /* query is complete */

PQclear(last_res);
last_res = res;
}
}
PG_CATCH();
{
PQclear(last_res);
PG_RE_THROW();
}
PG_END_TRY();

return last_res;
return libpqsrv_get_result_last(conn, pgfdw_we_get_result);
}

/*
Expand Down Expand Up @@ -945,8 +890,8 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,

/*
* If we don't get a message from the PGresult, try the PGconn. This
* is needed because for connection-level failures, PQexec may just
* return NULL, not a PGresult at all.
* is needed because for connection-level failures, PQgetResult may
* just return NULL, not a PGresult at all.
*/
if (message_primary == NULL)
message_primary = pchomp(PQerrorMessage(conn));
Expand Down Expand Up @@ -1046,7 +991,8 @@ pgfdw_xact_callback(XactEvent event, void *arg)
*/
if (entry->have_prep_stmt && entry->have_error)
{
res = PQexec(entry->conn, "DEALLOCATE ALL");
res = pgfdw_exec_query(entry->conn, "DEALLOCATE ALL",
NULL);
PQclear(res);
}
entry->have_prep_stmt = false;
Expand Down
2 changes: 1 addition & 1 deletion contrib/postgres_fdw/deparse.c
Expand Up @@ -3815,7 +3815,7 @@ appendOrderBySuffix(Oid sortop, Oid sortcoltype, bool nulls_first,
* Print the representation of a parameter to be sent to the remote side.
*
* Note: we always label the Param's type explicitly rather than relying on
* transmitting a numeric type OID in PQexecParams(). This allows us to
* transmitting a numeric type OID in PQsendQueryParams(). This allows us to
* avoid assuming that types have the same OIDs on the remote side as they
* do locally --- they need only have the same names.
*/
Expand Down
10 changes: 5 additions & 5 deletions contrib/postgres_fdw/postgres_fdw.c
Expand Up @@ -3760,7 +3760,7 @@ create_cursor(ForeignScanState *node)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
res = pgfdw_get_result(conn, buf.data);
res = pgfdw_get_result(conn);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
PQclear(res);
Expand Down Expand Up @@ -3810,7 +3810,7 @@ fetch_more_data(ForeignScanState *node)
* The query was already sent by an earlier call to
* fetch_more_data_begin. So now we just fetch the result.
*/
res = pgfdw_get_result(conn, fsstate->query);
res = pgfdw_get_result(conn);
/* On error, report the original query, not the FETCH. */
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
Expand Down Expand Up @@ -4159,7 +4159,7 @@ execute_foreign_modify(EState *estate,
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
res = pgfdw_get_result(fmstate->conn, fmstate->query);
res = pgfdw_get_result(fmstate->conn);
if (PQresultStatus(res) !=
(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
Expand Down Expand Up @@ -4229,7 +4229,7 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
res = pgfdw_get_result(fmstate->conn, fmstate->query);
res = pgfdw_get_result(fmstate->conn);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
PQclear(res);
Expand Down Expand Up @@ -4571,7 +4571,7 @@ execute_dml_stmt(ForeignScanState *node)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
dmstate->result = pgfdw_get_result(dmstate->conn);
if (PQresultStatus(dmstate->result) !=
(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
Expand Down
2 changes: 1 addition & 1 deletion contrib/postgres_fdw/postgres_fdw.h
Expand Up @@ -162,7 +162,7 @@ extern void ReleaseConnection(PGconn *conn);
extern unsigned int GetCursorNumber(PGconn *conn);
extern unsigned int GetPrepStmtNumber(PGconn *conn);
extern void do_sql_command(PGconn *conn, const char *sql);
extern PGresult *pgfdw_get_result(PGconn *conn, const char *query);
extern PGresult *pgfdw_get_result(PGconn *conn);
extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query,
PgFdwConnState *state);
extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
Expand Down
9 changes: 9 additions & 0 deletions doc/src/sgml/dblink.sgml
Expand Up @@ -37,6 +37,15 @@
</para>
</listitem>
</varlistentry>

<varlistentry>
<term><literal>DblinkGetResult</literal></term>
<listitem>
<para>
Waiting to receive the results of a query from a remote server.
</para>
</listitem>
</varlistentry>
</variablelist>

<para>
Expand Down
9 changes: 3 additions & 6 deletions src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
Expand Up @@ -653,12 +653,9 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
* Send a query and wait for the results by using the asynchronous libpq
* functions and socket readiness events.
*
* We must not use the regular blocking libpq functions like PQexec()
* since they are uninterruptible by signals on some platforms, such as
* Windows.
*
* The function is modeled on PQexec() in libpq, but only implements
* those parts that are in use in the walreceiver api.
* The function is modeled on libpqsrv_exec(), with the behavior difference
* being that it calls ProcessWalRcvInterrupts(). As an optimization, it
* skips try/catch, since all errors terminate the process.
*
* May return NULL, rather than an error result, on failure.
*/
Expand Down

0 comments on commit d3c5f37

Please sign in to comment.