From 6b787d9e32005867ee3660d1ea20f447810a403d Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Wed, 16 Jun 2021 11:52:05 -0400 Subject: [PATCH] Improve SQLSTATE reporting in some replication-related code. I started out with the goal of reporting ERRCODE_CONNECTION_FAILURE when walrcv_connect() fails, but as I looked around I realized that whoever wrote this code was of the opinion that errcodes are purely optional. That's not my understanding of our project policy. Hence, make sure that an errcode is provided in each ereport that (a) is ERROR or higher level and (b) isn't arguably an internal logic error. Also fix some very dubious existing errcode assignments. While this is not per policy, it's also largely cosmetic, since few of these cases could get reported to applications. So I don't feel a need to back-patch. Discussion: https://postgr.es/m/2189704.1623512522@sss.pgh.pa.us --- src/backend/commands/subscriptioncmds.c | 29 +++++---- .../libpqwalreceiver/libpqwalreceiver.c | 60 ++++++++++++------- src/backend/replication/logical/tablesync.c | 21 ++++--- src/backend/replication/logical/worker.c | 6 +- src/backend/replication/walreceiver.c | 19 ++++-- 5 files changed, 88 insertions(+), 47 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 8aa6de17850af..75e195f286e8c 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -468,7 +468,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) wrconn = walrcv_connect(conninfo, true, stmt->subname, &err); if (!wrconn) ereport(ERROR, - (errmsg("could not connect to the publisher: %s", err))); + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the publisher: %s", err))); PG_TRY(); { @@ -565,7 +566,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err); if (!wrconn) ereport(ERROR, - (errmsg("could not connect to the publisher: %s", err))); + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the publisher: %s", err))); PG_TRY(); { @@ -820,7 +822,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) { if (sub->enabled && !slotname) ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot set %s for enabled subscription", "slot_name = NONE"))); @@ -876,7 +878,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) if (!sub->slotname && enabled) ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot enable subscription that does not have a slot name"))); values[Anum_pg_subscription_subenabled - 1] = @@ -928,7 +930,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) { if (!sub->enabled) ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"), errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false)."))); @@ -976,7 +978,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) { if (!sub->enabled) ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"), errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false)."))); @@ -997,7 +999,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) if (!sub->enabled) ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions"))); parse_subscription_options(stmt->options, @@ -1354,7 +1356,8 @@ ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missi { /* ERROR. */ ereport(ERROR, - (errmsg("could not drop replication slot \"%s\" on publisher: %s", + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not drop replication slot \"%s\" on publisher: %s", slotname, res->err))); } @@ -1505,7 +1508,8 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, - (errmsg("could not receive list of replicated tables from the publisher: %s", + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not receive list of replicated tables from the publisher: %s", res->err))); /* Process tables. */ @@ -1569,7 +1573,8 @@ ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err) } ereport(ERROR, - (errmsg("could not connect to publisher when attempting to " + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to publisher when attempting to " "drop replication slot \"%s\": %s", slotname, err), /* translator: %s is an SQL ALTER command */ errhint("Use %s to disassociate the subscription from the slot.", @@ -1601,7 +1606,7 @@ check_duplicates_in_publist(List *publist, Datum *datums) if (strcmp(name, pname) == 0) ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), + (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("publication name \"%s\" used more than once", pname))); } @@ -1659,7 +1664,7 @@ merge_publications(List *oldpublist, List *newpublist, bool addpub, const char * oldpublist = lappend(oldpublist, makeString(name)); else if (!addpub && !found) ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), errmsg("publication \"%s\" is not in subscription \"%s\"", name, subname))); } diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 021c1b36f3ecb..6eaa84a0315a2 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -278,7 +278,8 @@ libpqrcv_get_conninfo(WalReceiverConn *conn) if (conn_opts == NULL) ereport(ERROR, - (errmsg("could not parse connection string: %s", + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("could not parse connection string: %s", _("out of memory")))); /* build a clean connection string from pieces */ @@ -350,7 +351,8 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli) { PQclear(res); ereport(ERROR, - (errmsg("could not receive database system identifier and timeline ID from " + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("could not receive database system identifier and timeline ID from " "the primary server: %s", pchomp(PQerrorMessage(conn->streamConn))))); } @@ -361,7 +363,8 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli) PQclear(res); ereport(ERROR, - (errmsg("invalid response from primary server"), + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid response from primary server"), errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.", ntuples, nfields, 3, 1))); } @@ -437,13 +440,15 @@ libpqrcv_startstreaming(WalReceiverConn *conn, pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames); if (!pubnames_str) ereport(ERROR, - (errmsg("could not start WAL streaming: %s", + (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */ + errmsg("could not start WAL streaming: %s", pchomp(PQerrorMessage(conn->streamConn))))); pubnames_literal = PQescapeLiteral(conn->streamConn, pubnames_str, strlen(pubnames_str)); if (!pubnames_literal) ereport(ERROR, - (errmsg("could not start WAL streaming: %s", + (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */ + errmsg("could not start WAL streaming: %s", pchomp(PQerrorMessage(conn->streamConn))))); appendStringInfo(&cmd, ", publication_names %s", pubnames_literal); PQfreemem(pubnames_literal); @@ -472,7 +477,8 @@ libpqrcv_startstreaming(WalReceiverConn *conn, { PQclear(res); ereport(ERROR, - (errmsg("could not start WAL streaming: %s", + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("could not start WAL streaming: %s", pchomp(PQerrorMessage(conn->streamConn))))); } PQclear(res); @@ -495,7 +501,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) if (PQputCopyEnd(conn->streamConn, NULL) <= 0 || PQflush(conn->streamConn)) ereport(ERROR, - (errmsg("could not send end-of-streaming message to primary: %s", + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not send end-of-streaming message to primary: %s", pchomp(PQerrorMessage(conn->streamConn))))); *next_tli = 0; @@ -517,7 +524,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) */ if (PQnfields(res) < 2 || PQntuples(res) != 1) ereport(ERROR, - (errmsg("unexpected result set after end-of-streaming"))); + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("unexpected result set after end-of-streaming"))); *next_tli = pg_strtoint32(PQgetvalue(res, 0, 0)); PQclear(res); @@ -531,7 +539,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) /* End the copy */ if (PQendcopy(conn->streamConn)) ereport(ERROR, - (errmsg("error while shutting down streaming COPY: %s", + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("error while shutting down streaming COPY: %s", pchomp(PQerrorMessage(conn->streamConn))))); /* CommandComplete should follow */ @@ -540,7 +549,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) if (PQresultStatus(res) != PGRES_COMMAND_OK) ereport(ERROR, - (errmsg("error reading result of streaming command: %s", + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("error reading result of streaming command: %s", pchomp(PQerrorMessage(conn->streamConn))))); PQclear(res); @@ -548,7 +558,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) res = libpqrcv_PQgetResult(conn->streamConn); if (res != NULL) ereport(ERROR, - (errmsg("unexpected result after CommandComplete: %s", + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("unexpected result after CommandComplete: %s", pchomp(PQerrorMessage(conn->streamConn))))); } @@ -574,7 +585,8 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, { PQclear(res); ereport(ERROR, - (errmsg("could not receive timeline history file from " + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("could not receive timeline history file from " "the primary server: %s", pchomp(PQerrorMessage(conn->streamConn))))); } @@ -585,7 +597,8 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, PQclear(res); ereport(ERROR, - (errmsg("invalid response from primary server"), + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid response from primary server"), errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.", ntuples, nfields))); } @@ -746,7 +759,8 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer, /* Try consuming some data. */ if (PQconsumeInput(conn->streamConn) == 0) ereport(ERROR, - (errmsg("could not receive data from WAL stream: %s", + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not receive data from WAL stream: %s", pchomp(PQerrorMessage(conn->streamConn))))); /* Now that we've consumed some input, try again */ @@ -782,7 +796,8 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer, return -1; ereport(ERROR, - (errmsg("unexpected result after CommandComplete: %s", + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("unexpected result after CommandComplete: %s", PQerrorMessage(conn->streamConn)))); } @@ -797,13 +812,15 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer, { PQclear(res); ereport(ERROR, - (errmsg("could not receive data from WAL stream: %s", + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("could not receive data from WAL stream: %s", pchomp(PQerrorMessage(conn->streamConn))))); } } if (rawlen < -1) ereport(ERROR, - (errmsg("could not receive data from WAL stream: %s", + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("could not receive data from WAL stream: %s", pchomp(PQerrorMessage(conn->streamConn))))); /* Return received messages to caller */ @@ -822,7 +839,8 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes) if (PQputCopyData(conn->streamConn, buffer, nbytes) <= 0 || PQflush(conn->streamConn)) ereport(ERROR, - (errmsg("could not send data to WAL stream: %s", + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not send data to WAL stream: %s", pchomp(PQerrorMessage(conn->streamConn))))); } @@ -875,7 +893,8 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, { PQclear(res); ereport(ERROR, - (errmsg("could not create replication slot \"%s\": %s", + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("could not create replication slot \"%s\": %s", slotname, pchomp(PQerrorMessage(conn->streamConn))))); } @@ -920,7 +939,8 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres, /* Make sure we got expected number of fields. */ if (nfields != nRetTypes) ereport(ERROR, - (errmsg("invalid query response"), + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid query response"), errdetail("Expected %d fields, got %d fields.", nRetTypes, nfields))); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 67f907cdd968f..cc50eb875b1d1 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -723,13 +723,15 @@ fetch_remote_table_info(char *nspname, char *relname, if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, - (errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s", + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s", nspname, relname, res->err))); slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot)) ereport(ERROR, - (errmsg("table \"%s.%s\" not found on publisher", + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("table \"%s.%s\" not found on publisher", nspname, relname))); lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull)); @@ -764,7 +766,8 @@ fetch_remote_table_info(char *nspname, char *relname, if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, - (errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s", + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s", nspname, relname, res->err))); /* We don't know the number of rows coming, so allocate enough space. */ @@ -851,7 +854,8 @@ copy_table(Relation rel) pfree(cmd.data); if (res->status != WALRCV_OK_COPY_OUT) ereport(ERROR, - (errmsg("could not start initial contents copy for table \"%s.%s\": %s", + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not start initial contents copy for table \"%s.%s\": %s", lrel.nspname, lrel.relname, res->err))); walrcv_clear_result(res); @@ -967,7 +971,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) walrcv_connect(MySubscription->conninfo, true, slotname, &err); if (LogRepWorkerWalRcvConn == NULL) ereport(ERROR, - (errmsg("could not connect to the publisher: %s", err))); + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the publisher: %s", err))); Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT || MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC || @@ -1050,7 +1055,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) 0, NULL); if (res->status != WALRCV_OK_COMMAND) ereport(ERROR, - (errmsg("table copy could not start transaction on publisher: %s", + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("table copy could not start transaction on publisher: %s", res->err))); walrcv_clear_result(res); @@ -1110,7 +1116,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL); if (res->status != WALRCV_OK_COMMAND) ereport(ERROR, - (errmsg("table copy could not finish transaction on publisher: %s", + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("table copy could not finish transaction on publisher: %s", res->err))); walrcv_clear_result(res); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 4b112593c65ee..bbb659dad063e 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -2388,7 +2388,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) if (now >= timeout) ereport(ERROR, - (errmsg("terminating logical replication worker due to timeout"))); + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("terminating logical replication worker due to timeout"))); /* Check to see if it's time for a ping. */ if (!ping_sent) @@ -3207,7 +3208,8 @@ ApplyWorkerMain(Datum main_arg) MySubscription->name, &err); if (LogRepWorkerWalRcvConn == NULL) ereport(ERROR, - (errmsg("could not connect to the publisher: %s", err))); + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the publisher: %s", err))); /* * We don't really use the output identify_system for anything but it diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index b94910bfe9ad5..faeea9f0cc563 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -279,10 +279,13 @@ WalReceiverMain(void) PG_SETMASK(&UnBlockSig); /* Establish the connection to the primary for XLOG streaming */ - wrconn = walrcv_connect(conninfo, false, cluster_name[0] ? cluster_name : "walreceiver", &err); + wrconn = walrcv_connect(conninfo, false, + cluster_name[0] ? cluster_name : "walreceiver", + &err); if (!wrconn) ereport(ERROR, - (errmsg("could not connect to the primary server: %s", err))); + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the primary server: %s", err))); /* * Save user-visible connection string. This clobbers the original @@ -328,7 +331,8 @@ WalReceiverMain(void) if (strcmp(primary_sysid, standby_sysid) != 0) { ereport(ERROR, - (errmsg("database system identifier differs between the primary and standby"), + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("database system identifier differs between the primary and standby"), errdetail("The primary's identifier is %s, the standby's identifier is %s.", primary_sysid, standby_sysid))); } @@ -339,7 +343,8 @@ WalReceiverMain(void) */ if (primaryTLI < startpointTLI) ereport(ERROR, - (errmsg("highest timeline %u of the primary is behind recovery timeline %u", + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("highest timeline %u of the primary is behind recovery timeline %u", primaryTLI, startpointTLI))); /* @@ -425,7 +430,8 @@ WalReceiverMain(void) */ if (!RecoveryInProgress()) ereport(FATAL, - (errmsg("cannot continue WAL streaming, recovery has already ended"))); + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot continue WAL streaming, recovery has already ended"))); /* Process any requests or signals received recently */ ProcessWalRcvInterrupts(); @@ -551,7 +557,8 @@ WalReceiverMain(void) if (now >= timeout) ereport(ERROR, - (errmsg("terminating walreceiver due to timeout"))); + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("terminating walreceiver due to timeout"))); /* * We didn't receive anything new, for half of