Skip to content

Commit

Permalink
Improve SQLSTATE reporting in some replication-related code.
Browse files Browse the repository at this point in the history
I started out with the goal of reporting ERRCODE_CONNECTION_FAILURE
when walrcv_connect() fails, but as I looked around I realized that
whoever wrote this code was of the opinion that errcodes are purely
optional.  That's not my understanding of our project policy.  Hence,
make sure that an errcode is provided in each ereport that (a) is
ERROR or higher level and (b) isn't arguably an internal logic error.
Also fix some very dubious existing errcode assignments.

While this is not per policy, it's also largely cosmetic, since few
of these cases could get reported to applications.  So I don't
feel a need to back-patch.

Discussion: https://postgr.es/m/2189704.1623512522@sss.pgh.pa.us
  • Loading branch information
tglsfdc committed Jun 16, 2021
1 parent d0303bc commit 6b787d9
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 47 deletions.
29 changes: 17 additions & 12 deletions src/backend/commands/subscriptioncmds.c
Expand Up @@ -468,7 +468,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
wrconn = walrcv_connect(conninfo, true, stmt->subname, &err);
if (!wrconn)
ereport(ERROR,
(errmsg("could not connect to the publisher: %s", err)));
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not connect to the publisher: %s", err)));

PG_TRY();
{
Expand Down Expand Up @@ -565,7 +566,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
if (!wrconn)
ereport(ERROR,
(errmsg("could not connect to the publisher: %s", err)));
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not connect to the publisher: %s", err)));

PG_TRY();
{
Expand Down Expand Up @@ -820,7 +822,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
{
if (sub->enabled && !slotname)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot set %s for enabled subscription",
"slot_name = NONE")));

Expand Down Expand Up @@ -876,7 +878,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)

if (!sub->slotname && enabled)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot enable subscription that does not have a slot name")));

values[Anum_pg_subscription_subenabled - 1] =
Expand Down Expand Up @@ -928,7 +930,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
{
if (!sub->enabled)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));

Expand Down Expand Up @@ -976,7 +978,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
{
if (!sub->enabled)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));

Expand All @@ -997,7 +999,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)

if (!sub->enabled)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));

parse_subscription_options(stmt->options,
Expand Down Expand Up @@ -1354,7 +1356,8 @@ ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missi
{
/* ERROR. */
ereport(ERROR,
(errmsg("could not drop replication slot \"%s\" on publisher: %s",
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not drop replication slot \"%s\" on publisher: %s",
slotname, res->err)));
}

Expand Down Expand Up @@ -1505,7 +1508,8 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)

if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
(errmsg("could not receive list of replicated tables from the publisher: %s",
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not receive list of replicated tables from the publisher: %s",
res->err)));

/* Process tables. */
Expand Down Expand Up @@ -1569,7 +1573,8 @@ ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
}

ereport(ERROR,
(errmsg("could not connect to publisher when attempting to "
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not connect to publisher when attempting to "
"drop replication slot \"%s\": %s", slotname, err),
/* translator: %s is an SQL ALTER command */
errhint("Use %s to disassociate the subscription from the slot.",
Expand Down Expand Up @@ -1601,7 +1606,7 @@ check_duplicates_in_publist(List *publist, Datum *datums)

if (strcmp(name, pname) == 0)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("publication name \"%s\" used more than once",
pname)));
}
Expand Down Expand Up @@ -1659,7 +1664,7 @@ merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *
oldpublist = lappend(oldpublist, makeString(name));
else if (!addpub && !found)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("publication \"%s\" is not in subscription \"%s\"",
name, subname)));
}
Expand Down
60 changes: 40 additions & 20 deletions src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
Expand Up @@ -278,7 +278,8 @@ libpqrcv_get_conninfo(WalReceiverConn *conn)

if (conn_opts == NULL)
ereport(ERROR,
(errmsg("could not parse connection string: %s",
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("could not parse connection string: %s",
_("out of memory"))));

/* build a clean connection string from pieces */
Expand Down Expand Up @@ -350,7 +351,8 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
{
PQclear(res);
ereport(ERROR,
(errmsg("could not receive database system identifier and timeline ID from "
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("could not receive database system identifier and timeline ID from "
"the primary server: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
}
Expand All @@ -361,7 +363,8 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)

PQclear(res);
ereport(ERROR,
(errmsg("invalid response from primary server"),
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid response from primary server"),
errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
ntuples, nfields, 3, 1)));
}
Expand Down Expand Up @@ -437,13 +440,15 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
if (!pubnames_str)
ereport(ERROR,
(errmsg("could not start WAL streaming: %s",
(errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */
errmsg("could not start WAL streaming: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
pubnames_literal = PQescapeLiteral(conn->streamConn, pubnames_str,
strlen(pubnames_str));
if (!pubnames_literal)
ereport(ERROR,
(errmsg("could not start WAL streaming: %s",
(errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */
errmsg("could not start WAL streaming: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
appendStringInfo(&cmd, ", publication_names %s", pubnames_literal);
PQfreemem(pubnames_literal);
Expand Down Expand Up @@ -472,7 +477,8 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
{
PQclear(res);
ereport(ERROR,
(errmsg("could not start WAL streaming: %s",
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("could not start WAL streaming: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
}
PQclear(res);
Expand All @@ -495,7 +501,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
PQflush(conn->streamConn))
ereport(ERROR,
(errmsg("could not send end-of-streaming message to primary: %s",
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not send end-of-streaming message to primary: %s",
pchomp(PQerrorMessage(conn->streamConn)))));

*next_tli = 0;
Expand All @@ -517,7 +524,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
*/
if (PQnfields(res) < 2 || PQntuples(res) != 1)
ereport(ERROR,
(errmsg("unexpected result set after end-of-streaming")));
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("unexpected result set after end-of-streaming")));
*next_tli = pg_strtoint32(PQgetvalue(res, 0, 0));
PQclear(res);

Expand All @@ -531,7 +539,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
/* End the copy */
if (PQendcopy(conn->streamConn))
ereport(ERROR,
(errmsg("error while shutting down streaming COPY: %s",
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("error while shutting down streaming COPY: %s",
pchomp(PQerrorMessage(conn->streamConn)))));

/* CommandComplete should follow */
Expand All @@ -540,15 +549,17 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)

if (PQresultStatus(res) != PGRES_COMMAND_OK)
ereport(ERROR,
(errmsg("error reading result of streaming command: %s",
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("error reading result of streaming command: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
PQclear(res);

/* Verify that there are no more results */
res = libpqrcv_PQgetResult(conn->streamConn);
if (res != NULL)
ereport(ERROR,
(errmsg("unexpected result after CommandComplete: %s",
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("unexpected result after CommandComplete: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
}

Expand All @@ -574,7 +585,8 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
{
PQclear(res);
ereport(ERROR,
(errmsg("could not receive timeline history file from "
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("could not receive timeline history file from "
"the primary server: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
}
Expand All @@ -585,7 +597,8 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,

PQclear(res);
ereport(ERROR,
(errmsg("invalid response from primary server"),
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid response from primary server"),
errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
ntuples, nfields)));
}
Expand Down Expand Up @@ -746,7 +759,8 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
/* Try consuming some data. */
if (PQconsumeInput(conn->streamConn) == 0)
ereport(ERROR,
(errmsg("could not receive data from WAL stream: %s",
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not receive data from WAL stream: %s",
pchomp(PQerrorMessage(conn->streamConn)))));

/* Now that we've consumed some input, try again */
Expand Down Expand Up @@ -782,7 +796,8 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
return -1;

ereport(ERROR,
(errmsg("unexpected result after CommandComplete: %s",
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("unexpected result after CommandComplete: %s",
PQerrorMessage(conn->streamConn))));
}

Expand All @@ -797,13 +812,15 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
{
PQclear(res);
ereport(ERROR,
(errmsg("could not receive data from WAL stream: %s",
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("could not receive data from WAL stream: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
}
}
if (rawlen < -1)
ereport(ERROR,
(errmsg("could not receive data from WAL stream: %s",
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("could not receive data from WAL stream: %s",
pchomp(PQerrorMessage(conn->streamConn)))));

/* Return received messages to caller */
Expand All @@ -822,7 +839,8 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
if (PQputCopyData(conn->streamConn, buffer, nbytes) <= 0 ||
PQflush(conn->streamConn))
ereport(ERROR,
(errmsg("could not send data to WAL stream: %s",
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not send data to WAL stream: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
}

Expand Down Expand Up @@ -875,7 +893,8 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
{
PQclear(res);
ereport(ERROR,
(errmsg("could not create replication slot \"%s\": %s",
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("could not create replication slot \"%s\": %s",
slotname, pchomp(PQerrorMessage(conn->streamConn)))));
}

Expand Down Expand Up @@ -920,7 +939,8 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
/* Make sure we got expected number of fields. */
if (nfields != nRetTypes)
ereport(ERROR,
(errmsg("invalid query response"),
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid query response"),
errdetail("Expected %d fields, got %d fields.",
nRetTypes, nfields)));

Expand Down
21 changes: 14 additions & 7 deletions src/backend/replication/logical/tablesync.c
Expand Up @@ -723,13 +723,15 @@ fetch_remote_table_info(char *nspname, char *relname,

if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
(errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
nspname, relname, res->err)));

slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
ereport(ERROR,
(errmsg("table \"%s.%s\" not found on publisher",
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("table \"%s.%s\" not found on publisher",
nspname, relname)));

lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
Expand Down Expand Up @@ -764,7 +766,8 @@ fetch_remote_table_info(char *nspname, char *relname,

if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
(errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
nspname, relname, res->err)));

/* We don't know the number of rows coming, so allocate enough space. */
Expand Down Expand Up @@ -851,7 +854,8 @@ copy_table(Relation rel)
pfree(cmd.data);
if (res->status != WALRCV_OK_COPY_OUT)
ereport(ERROR,
(errmsg("could not start initial contents copy for table \"%s.%s\": %s",
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not start initial contents copy for table \"%s.%s\": %s",
lrel.nspname, lrel.relname, res->err)));
walrcv_clear_result(res);

Expand Down Expand Up @@ -967,7 +971,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
walrcv_connect(MySubscription->conninfo, true, slotname, &err);
if (LogRepWorkerWalRcvConn == NULL)
ereport(ERROR,
(errmsg("could not connect to the publisher: %s", err)));
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not connect to the publisher: %s", err)));

Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
Expand Down Expand Up @@ -1050,7 +1055,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
0, NULL);
if (res->status != WALRCV_OK_COMMAND)
ereport(ERROR,
(errmsg("table copy could not start transaction on publisher: %s",
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("table copy could not start transaction on publisher: %s",
res->err)));
walrcv_clear_result(res);

Expand Down Expand Up @@ -1110,7 +1116,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
if (res->status != WALRCV_OK_COMMAND)
ereport(ERROR,
(errmsg("table copy could not finish transaction on publisher: %s",
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("table copy could not finish transaction on publisher: %s",
res->err)));
walrcv_clear_result(res);

Expand Down
6 changes: 4 additions & 2 deletions src/backend/replication/logical/worker.c
Expand Up @@ -2388,7 +2388,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)

if (now >= timeout)
ereport(ERROR,
(errmsg("terminating logical replication worker due to timeout")));
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("terminating logical replication worker due to timeout")));

/* Check to see if it's time for a ping. */
if (!ping_sent)
Expand Down Expand Up @@ -3207,7 +3208,8 @@ ApplyWorkerMain(Datum main_arg)
MySubscription->name, &err);
if (LogRepWorkerWalRcvConn == NULL)
ereport(ERROR,
(errmsg("could not connect to the publisher: %s", err)));
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not connect to the publisher: %s", err)));

/*
* We don't really use the output identify_system for anything but it
Expand Down

0 comments on commit 6b787d9

Please sign in to comment.