Skip to content

Commit

Permalink
Refactor to make common functions in proto.c and worker.c.
Browse files Browse the repository at this point in the history
This is a non-functional change only to refactor code to extract some
replication logic into static functions.

This is done as preparation for the 2PC streaming patch which also shares
this common logic.

Author: Peter Smith
Reviewed-By: Amit Kapila
Discussion: https://postgr.es/m/CAHut+PuiSA8AiLcE2N5StzSKs46SQEP_vDOUD5fX2XCVtfZ7mQ@mail.gmail.com
  • Loading branch information
Amit Kapila committed Jul 29, 2021
1 parent 454ae15 commit 91f9861
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 48 deletions.
42 changes: 32 additions & 10 deletions src/backend/replication/logical/proto.c
Expand Up @@ -145,22 +145,23 @@ logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_da
}

/*
* Write PREPARE to the output stream.
* The core functionality for logicalrep_write_prepare.
*/
void
logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn)
static void
logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type,
ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
{
uint8 flags = 0;

pq_sendbyte(out, LOGICAL_REP_MSG_PREPARE);
pq_sendbyte(out, type);

/*
* This should only ever happen for two-phase commit transactions, in
* which case we expect to have a valid GID.
*/
Assert(txn->gid != NULL);
Assert(rbtxn_prepared(txn));
Assert(TransactionIdIsValid(txn->xid));

/* send the flags field */
pq_sendbyte(out, flags);
Expand All @@ -176,31 +177,52 @@ logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
}

/*
* Read transaction PREPARE from the stream.
* Write PREPARE to the output stream.
*/
void
logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn)
{
logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_PREPARE,
txn, prepare_lsn);
}

/*
* The core functionality for logicalrep_read_prepare.
*/
static void
logicalrep_read_prepare_common(StringInfo in, char *msgtype,
LogicalRepPreparedTxnData *prepare_data)
{
/* read flags */
uint8 flags = pq_getmsgbyte(in);

if (flags != 0)
elog(ERROR, "unrecognized flags %u in prepare message", flags);
elog(ERROR, "unrecognized flags %u in %s message", flags, msgtype);

/* read fields */
prepare_data->prepare_lsn = pq_getmsgint64(in);
if (prepare_data->prepare_lsn == InvalidXLogRecPtr)
elog(ERROR, "prepare_lsn is not set in prepare message");
elog(ERROR, "prepare_lsn is not set in %s message", msgtype);
prepare_data->end_lsn = pq_getmsgint64(in);
if (prepare_data->end_lsn == InvalidXLogRecPtr)
elog(ERROR, "end_lsn is not set in prepare message");
elog(ERROR, "end_lsn is not set in %s message", msgtype);
prepare_data->prepare_time = pq_getmsgint64(in);
prepare_data->xid = pq_getmsgint(in, 4);

/* read gid (copy it into a pre-allocated buffer) */
strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
}

/*
* Read transaction PREPARE from the stream.
*/
void
logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
{
logicalrep_read_prepare_common(in, "prepare", prepare_data);
}

/*
* Write COMMIT PREPARED to the output stream.
*/
Expand Down
101 changes: 63 additions & 38 deletions src/backend/replication/logical/worker.c
Expand Up @@ -333,6 +333,8 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata,
/* Compute GID for two_phase transactions */
static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid);

/* Common streaming function to apply all the spooled messages */
static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);

/*
* Should this worker apply changes for given relation.
Expand Down Expand Up @@ -884,14 +886,47 @@ apply_handle_begin_prepare(StringInfo s)
pgstat_report_activity(STATE_RUNNING, NULL);
}

/*
* Common function to prepare the GID.
*/
static void
apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
{
char gid[GIDSIZE];

/*
* Compute unique GID for two_phase transactions. We don't use GID of
* prepared transaction sent by server as that can lead to deadlock when
* we have multiple subscriptions from same node point to publications on
* the same node. See comments atop worker.c
*/
TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid,
gid, sizeof(gid));

/*
* BeginTransactionBlock is necessary to balance the EndTransactionBlock
* called within the PrepareTransactionBlock below.
*/
BeginTransactionBlock();
CommitTransactionCommand(); /* Completes the preceding Begin command. */

/*
* Update origin state so we can restart streaming from correct position
* in case of crash.
*/
replorigin_session_origin_lsn = prepare_data->end_lsn;
replorigin_session_origin_timestamp = prepare_data->prepare_time;

PrepareTransactionBlock(gid);
}

/*
* Handle PREPARE message.
*/
static void
apply_handle_prepare(StringInfo s)
{
LogicalRepPreparedTxnData prepare_data;
char gid[GIDSIZE];

logicalrep_read_prepare(s, &prepare_data);

Expand All @@ -902,15 +937,6 @@ apply_handle_prepare(StringInfo s)
LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
LSN_FORMAT_ARGS(remote_final_lsn))));

/*
* Compute unique GID for two_phase transactions. We don't use GID of
* prepared transaction sent by server as that can lead to deadlock when
* we have multiple subscriptions from same node point to publications on
* the same node. See comments atop worker.c
*/
TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
gid, sizeof(gid));

/*
* Unlike commit, here, we always prepare the transaction even though no
* change has happened in this transaction. It is done this way because at
Expand All @@ -923,21 +949,8 @@ apply_handle_prepare(StringInfo s)
*/
begin_replication_step();

/*
* BeginTransactionBlock is necessary to balance the EndTransactionBlock
* called within the PrepareTransactionBlock below.
*/
BeginTransactionBlock();
CommitTransactionCommand(); /* Completes the preceding Begin command. */

/*
* Update origin state so we can restart streaming from correct position
* in case of crash.
*/
replorigin_session_origin_lsn = prepare_data.end_lsn;
replorigin_session_origin_timestamp = prepare_data.prepare_time;
apply_handle_prepare_internal(&prepare_data);

PrepareTransactionBlock(gid);
end_replication_step();
CommitTransactionCommand();
pgstat_report_stat(false);
Expand Down Expand Up @@ -1256,30 +1269,19 @@ apply_handle_stream_abort(StringInfo s)
}

/*
* Handle STREAM COMMIT message.
* Common spoolfile processing.
*/
static void
apply_handle_stream_commit(StringInfo s)
apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
{
TransactionId xid;
StringInfoData s2;
int nchanges;
char path[MAXPGPATH];
char *buffer = NULL;
LogicalRepCommitData commit_data;
StreamXidHash *ent;
MemoryContext oldcxt;
BufFile *fd;

if (in_streamed_transaction)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("STREAM COMMIT message without STREAM STOP")));

xid = logicalrep_read_stream_commit(s, &commit_data);

elog(DEBUG1, "received commit for streamed transaction %u", xid);

/* Make sure we have an open transaction */
begin_replication_step();

Expand Down Expand Up @@ -1311,7 +1313,7 @@ apply_handle_stream_commit(StringInfo s)

MemoryContextSwitchTo(oldcxt);

remote_final_lsn = commit_data.commit_lsn;
remote_final_lsn = lsn;

/*
* Make sure the handle apply_dispatch methods are aware we're in a remote
Expand Down Expand Up @@ -1390,6 +1392,29 @@ apply_handle_stream_commit(StringInfo s)
elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
nchanges, path);

return;
}

/*
* Handle STREAM COMMIT message.
*/
static void
apply_handle_stream_commit(StringInfo s)
{
TransactionId xid;
LogicalRepCommitData commit_data;

if (in_streamed_transaction)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("STREAM COMMIT message without STREAM STOP")));

xid = logicalrep_read_stream_commit(s, &commit_data);

elog(DEBUG1, "received commit for streamed transaction %u", xid);

apply_spooled_messages(xid, commit_data.commit_lsn);

apply_handle_commit_internal(s, &commit_data);

/* unlink the files with serialized changes and subxact info */
Expand Down

0 comments on commit 91f9861

Please sign in to comment.