Skip to content

Commit

Permalink
Introduce macros for protocol characters.
Browse files Browse the repository at this point in the history
This commit introduces descriptively-named macros for the
identifiers used in wire protocol messages.  These new macros are
placed in a new header file so that they can be easily used by
third-party code.

Author: Dave Cramer
Reviewed-by: Alvaro Herrera, Tatsuo Ishii, Peter Smith, Robert Haas, Tom Lane, Peter Eisentraut, Michael Paquier
Discussion: https://postgr.es/m/CADK3HHKbBmK-PKf1bPNFoMC%2BoBt%2BpD9PH8h5nvmBQskEHm-Ehw%40mail.gmail.com
  • Loading branch information
nathan-bossart committed Aug 23, 2023
1 parent 7114791 commit f4b54e1
Show file tree
Hide file tree
Showing 25 changed files with 305 additions and 204 deletions.
5 changes: 3 additions & 2 deletions src/backend/access/common/printsimple.c
Expand Up @@ -20,6 +20,7 @@

#include "access/printsimple.h"
#include "catalog/pg_type.h"
#include "libpq/protocol.h"
#include "libpq/pqformat.h"
#include "utils/builtins.h"

Expand All @@ -32,7 +33,7 @@ printsimple_startup(DestReceiver *self, int operation, TupleDesc tupdesc)
StringInfoData buf;
int i;

pq_beginmessage(&buf, 'T'); /* RowDescription */
pq_beginmessage(&buf, PqMsg_RowDescription);
pq_sendint16(&buf, tupdesc->natts);

for (i = 0; i < tupdesc->natts; ++i)
Expand Down Expand Up @@ -65,7 +66,7 @@ printsimple(TupleTableSlot *slot, DestReceiver *self)
slot_getallattrs(slot);

/* Prepare and send message */
pq_beginmessage(&buf, 'D');
pq_beginmessage(&buf, PqMsg_DataRow);
pq_sendint16(&buf, tupdesc->natts);

for (i = 0; i < tupdesc->natts; ++i)
Expand Down
14 changes: 7 additions & 7 deletions src/backend/access/transam/parallel.c
Expand Up @@ -1127,7 +1127,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)

switch (msgtype)
{
case 'K': /* BackendKeyData */
case PqMsg_BackendKeyData:
{
int32 pid = pq_getmsgint(msg, 4);

Expand All @@ -1137,8 +1137,8 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
break;
}

case 'E': /* ErrorResponse */
case 'N': /* NoticeResponse */
case PqMsg_ErrorResponse:
case PqMsg_NoticeResponse:
{
ErrorData edata;
ErrorContextCallback *save_error_context_stack;
Expand Down Expand Up @@ -1183,7 +1183,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
break;
}

case 'A': /* NotifyResponse */
case PqMsg_NotificationResponse:
{
/* Propagate NotifyResponse. */
int32 pid;
Expand Down Expand Up @@ -1217,7 +1217,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
break;
}

case 'X': /* Terminate, indicating clean exit */
case PqMsg_Terminate:
{
shm_mq_detach(pcxt->worker[i].error_mqh);
pcxt->worker[i].error_mqh = NULL;
Expand Down Expand Up @@ -1372,7 +1372,7 @@ ParallelWorkerMain(Datum main_arg)
* protocol message is defined, but it won't actually be used for anything
* in this case.
*/
pq_beginmessage(&msgbuf, 'K');
pq_beginmessage(&msgbuf, PqMsg_BackendKeyData);
pq_sendint32(&msgbuf, (int32) MyProcPid);
pq_sendint32(&msgbuf, (int32) MyCancelKey);
pq_endmessage(&msgbuf);
Expand Down Expand Up @@ -1550,7 +1550,7 @@ ParallelWorkerMain(Datum main_arg)
DetachSession();

/* Report success. */
pq_putmessage('X', NULL, 0);
pq_putmessage(PqMsg_Terminate, NULL, 0);
}

/*
Expand Down
16 changes: 8 additions & 8 deletions src/backend/backup/basebackup_copy.c
Expand Up @@ -152,7 +152,7 @@ bbsink_copystream_begin_backup(bbsink *sink)
SendTablespaceList(state->tablespaces);

/* Send a CommandComplete message */
pq_puttextmessage('C', "SELECT");
pq_puttextmessage(PqMsg_CommandComplete, "SELECT");

/* Begin COPY stream. This will be used for all archives + manifest. */
SendCopyOutResponse();
Expand All @@ -169,7 +169,7 @@ bbsink_copystream_begin_archive(bbsink *sink, const char *archive_name)
StringInfoData buf;

ti = list_nth(state->tablespaces, state->tablespace_num);
pq_beginmessage(&buf, 'd'); /* CopyData */
pq_beginmessage(&buf, PqMsg_CopyData);
pq_sendbyte(&buf, 'n'); /* New archive */
pq_sendstring(&buf, archive_name);
pq_sendstring(&buf, ti->path == NULL ? "" : ti->path);
Expand Down Expand Up @@ -220,7 +220,7 @@ bbsink_copystream_archive_contents(bbsink *sink, size_t len)
{
mysink->last_progress_report_time = now;

pq_beginmessage(&buf, 'd'); /* CopyData */
pq_beginmessage(&buf, PqMsg_CopyData);
pq_sendbyte(&buf, 'p'); /* Progress report */
pq_sendint64(&buf, state->bytes_done);
pq_endmessage(&buf);
Expand All @@ -246,7 +246,7 @@ bbsink_copystream_end_archive(bbsink *sink)

mysink->bytes_done_at_last_time_check = state->bytes_done;
mysink->last_progress_report_time = GetCurrentTimestamp();
pq_beginmessage(&buf, 'd'); /* CopyData */
pq_beginmessage(&buf, PqMsg_CopyData);
pq_sendbyte(&buf, 'p'); /* Progress report */
pq_sendint64(&buf, state->bytes_done);
pq_endmessage(&buf);
Expand All @@ -261,7 +261,7 @@ bbsink_copystream_begin_manifest(bbsink *sink)
{
StringInfoData buf;

pq_beginmessage(&buf, 'd'); /* CopyData */
pq_beginmessage(&buf, PqMsg_CopyData);
pq_sendbyte(&buf, 'm'); /* Manifest */
pq_endmessage(&buf);
}
Expand Down Expand Up @@ -318,7 +318,7 @@ SendCopyOutResponse(void)
{
StringInfoData buf;

pq_beginmessage(&buf, 'H');
pq_beginmessage(&buf, PqMsg_CopyOutResponse);
pq_sendbyte(&buf, 0); /* overall format */
pq_sendint16(&buf, 0); /* natts */
pq_endmessage(&buf);
Expand All @@ -330,7 +330,7 @@ SendCopyOutResponse(void)
static void
SendCopyDone(void)
{
pq_putemptymessage('c');
pq_putemptymessage(PqMsg_CopyDone);
}

/*
Expand Down Expand Up @@ -368,7 +368,7 @@ SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
end_tup_output(tstate);

/* Send a CommandComplete message */
pq_puttextmessage('C', "SELECT");
pq_puttextmessage(PqMsg_CommandComplete, "SELECT");
}

/*
Expand Down
2 changes: 1 addition & 1 deletion src/backend/commands/async.c
Expand Up @@ -2281,7 +2281,7 @@ NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
{
StringInfoData buf;

pq_beginmessage(&buf, 'A');
pq_beginmessage(&buf, PqMsg_NotificationResponse);
pq_sendint32(&buf, srcPid);
pq_sendstring(&buf, channel);
pq_sendstring(&buf, payload);
Expand Down
22 changes: 11 additions & 11 deletions src/backend/commands/copyfromparse.c
Expand Up @@ -174,7 +174,7 @@ ReceiveCopyBegin(CopyFromState cstate)
int16 format = (cstate->opts.binary ? 1 : 0);
int i;

pq_beginmessage(&buf, 'G');
pq_beginmessage(&buf, PqMsg_CopyInResponse);
pq_sendbyte(&buf, format); /* overall format */
pq_sendint16(&buf, natts);
for (i = 0; i < natts; i++)
Expand Down Expand Up @@ -279,13 +279,13 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread)
/* Validate message type and set packet size limit */
switch (mtype)
{
case 'd': /* CopyData */
case PqMsg_CopyData:
maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
break;
case 'c': /* CopyDone */
case 'f': /* CopyFail */
case 'H': /* Flush */
case 'S': /* Sync */
case PqMsg_CopyDone:
case PqMsg_CopyFail:
case PqMsg_Flush:
case PqMsg_Sync:
maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
break;
default:
Expand All @@ -305,20 +305,20 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread)
/* ... and process it */
switch (mtype)
{
case 'd': /* CopyData */
case PqMsg_CopyData:
break;
case 'c': /* CopyDone */
case PqMsg_CopyDone:
/* COPY IN correctly terminated by frontend */
cstate->raw_reached_eof = true;
return bytesread;
case 'f': /* CopyFail */
case PqMsg_CopyFail:
ereport(ERROR,
(errcode(ERRCODE_QUERY_CANCELED),
errmsg("COPY from stdin failed: %s",
pq_getmsgstring(cstate->fe_msgbuf))));
break;
case 'H': /* Flush */
case 'S': /* Sync */
case PqMsg_Flush:
case PqMsg_Sync:

/*
* Ignore Flush/Sync for the convenience of client
Expand Down
6 changes: 3 additions & 3 deletions src/backend/commands/copyto.c
Expand Up @@ -144,7 +144,7 @@ SendCopyBegin(CopyToState cstate)
int16 format = (cstate->opts.binary ? 1 : 0);
int i;

pq_beginmessage(&buf, 'H');
pq_beginmessage(&buf, PqMsg_CopyOutResponse);
pq_sendbyte(&buf, format); /* overall format */
pq_sendint16(&buf, natts);
for (i = 0; i < natts; i++)
Expand All @@ -159,7 +159,7 @@ SendCopyEnd(CopyToState cstate)
/* Shouldn't have any unsent data */
Assert(cstate->fe_msgbuf->len == 0);
/* Send Copy Done message */
pq_putemptymessage('c');
pq_putemptymessage(PqMsg_CopyDone);
}

/*----------
Expand Down Expand Up @@ -247,7 +247,7 @@ CopySendEndOfRow(CopyToState cstate)
CopySendChar(cstate, '\n');

/* Dump the accumulated row as one CopyData message */
(void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
(void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len);
break;
case COPY_CALLBACK:
cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len);
Expand Down
2 changes: 1 addition & 1 deletion src/backend/libpq/auth-sasl.c
Expand Up @@ -87,7 +87,7 @@ CheckSASLAuth(const pg_be_sasl_mech *mech, Port *port, char *shadow_pass,
{
pq_startmsgread();
mtype = pq_getbyte();
if (mtype != 'p')
if (mtype != PqMsg_SASLResponse)
{
/* Only log error if client didn't disconnect. */
if (mtype != EOF)
Expand Down
8 changes: 4 additions & 4 deletions src/backend/libpq/auth.c
Expand Up @@ -665,7 +665,7 @@ sendAuthRequest(Port *port, AuthRequest areq, const char *extradata, int extrale

CHECK_FOR_INTERRUPTS();

pq_beginmessage(&buf, 'R');
pq_beginmessage(&buf, PqMsg_AuthenticationRequest);
pq_sendint32(&buf, (int32) areq);
if (extralen > 0)
pq_sendbytes(&buf, extradata, extralen);
Expand Down Expand Up @@ -698,7 +698,7 @@ recv_password_packet(Port *port)

/* Expect 'p' message type */
mtype = pq_getbyte();
if (mtype != 'p')
if (mtype != PqMsg_PasswordMessage)
{
/*
* If the client just disconnects without offering a password, don't
Expand Down Expand Up @@ -961,7 +961,7 @@ pg_GSS_recvauth(Port *port)
CHECK_FOR_INTERRUPTS();

mtype = pq_getbyte();
if (mtype != 'p')
if (mtype != PqMsg_GSSResponse)
{
/* Only log error if client didn't disconnect. */
if (mtype != EOF)
Expand Down Expand Up @@ -1232,7 +1232,7 @@ pg_SSPI_recvauth(Port *port)
{
pq_startmsgread();
mtype = pq_getbyte();
if (mtype != 'p')
if (mtype != PqMsg_GSSResponse)
{
if (sspictx != NULL)
{
Expand Down
2 changes: 1 addition & 1 deletion src/backend/postmaster/postmaster.c
Expand Up @@ -2357,7 +2357,7 @@ SendNegotiateProtocolVersion(List *unrecognized_protocol_options)
StringInfoData buf;
ListCell *lc;

pq_beginmessage(&buf, 'v'); /* NegotiateProtocolVersion */
pq_beginmessage(&buf, PqMsg_NegotiateProtocolVersion);
pq_sendint32(&buf, PG_PROTOCOL_LATEST);
pq_sendint32(&buf, list_length(unrecognized_protocol_options));
foreach(lc, unrecognized_protocol_options)
Expand Down
18 changes: 9 additions & 9 deletions src/backend/replication/walsender.c
Expand Up @@ -603,7 +603,7 @@ SendTimeLineHistory(TimeLineHistoryCmd *cmd)
dest->rStartup(dest, CMD_SELECT, tupdesc);

/* Send a DataRow message */
pq_beginmessage(&buf, 'D');
pq_beginmessage(&buf, PqMsg_DataRow);
pq_sendint16(&buf, 2); /* # of columns */
len = strlen(histfname);
pq_sendint32(&buf, len); /* col1 len */
Expand Down Expand Up @@ -801,7 +801,7 @@ StartReplication(StartReplicationCmd *cmd)
WalSndSetState(WALSNDSTATE_CATCHUP);

/* Send a CopyBothResponse message, and start streaming */
pq_beginmessage(&buf, 'W');
pq_beginmessage(&buf, PqMsg_CopyBothResponse);
pq_sendbyte(&buf, 0);
pq_sendint16(&buf, 0);
pq_endmessage(&buf);
Expand Down Expand Up @@ -1294,7 +1294,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
WalSndSetState(WALSNDSTATE_CATCHUP);

/* Send a CopyBothResponse message, and start streaming */
pq_beginmessage(&buf, 'W');
pq_beginmessage(&buf, PqMsg_CopyBothResponse);
pq_sendbyte(&buf, 0);
pq_sendint16(&buf, 0);
pq_endmessage(&buf);
Expand Down Expand Up @@ -1923,11 +1923,11 @@ ProcessRepliesIfAny(void)
/* Validate message type and set packet size limit */
switch (firstchar)
{
case 'd':
case PqMsg_CopyData:
maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
break;
case 'c':
case 'X':
case PqMsg_CopyDone:
case PqMsg_Terminate:
maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
break;
default:
Expand Down Expand Up @@ -1955,7 +1955,7 @@ ProcessRepliesIfAny(void)
/*
* 'd' means a standby reply wrapped in a CopyData packet.
*/
case 'd':
case PqMsg_CopyData:
ProcessStandbyMessage();
received = true;
break;
Expand All @@ -1964,7 +1964,7 @@ ProcessRepliesIfAny(void)
* CopyDone means the standby requested to finish streaming.
* Reply with CopyDone, if we had not sent that already.
*/
case 'c':
case PqMsg_CopyDone:
if (!streamingDoneSending)
{
pq_putmessage_noblock('c', NULL, 0);
Expand All @@ -1978,7 +1978,7 @@ ProcessRepliesIfAny(void)
/*
* 'X' means that the standby is closing down the socket.
*/
case 'X':
case PqMsg_Terminate:
proc_exit(0);

default:
Expand Down

0 comments on commit f4b54e1

Please sign in to comment.