diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index a2452525299b0..2d774567e089c 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -145,15 +145,15 @@ logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_da } /* - * Write PREPARE to the output stream. + * The core functionality for logicalrep_write_prepare. */ -void -logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, - XLogRecPtr prepare_lsn) +static void +logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type, + ReorderBufferTXN *txn, XLogRecPtr prepare_lsn) { uint8 flags = 0; - pq_sendbyte(out, LOGICAL_REP_MSG_PREPARE); + pq_sendbyte(out, type); /* * This should only ever happen for two-phase commit transactions, in @@ -161,6 +161,7 @@ logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, */ Assert(txn->gid != NULL); Assert(rbtxn_prepared(txn)); + Assert(TransactionIdIsValid(txn->xid)); /* send the flags field */ pq_sendbyte(out, flags); @@ -176,24 +177,36 @@ logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, } /* - * Read transaction PREPARE from the stream. + * Write PREPARE to the output stream. */ void -logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data) +logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_PREPARE, + txn, prepare_lsn); +} + +/* + * The core functionality for logicalrep_read_prepare. + */ +static void +logicalrep_read_prepare_common(StringInfo in, char *msgtype, + LogicalRepPreparedTxnData *prepare_data) { /* read flags */ uint8 flags = pq_getmsgbyte(in); if (flags != 0) - elog(ERROR, "unrecognized flags %u in prepare message", flags); + elog(ERROR, "unrecognized flags %u in %s message", flags, msgtype); /* read fields */ prepare_data->prepare_lsn = pq_getmsgint64(in); if (prepare_data->prepare_lsn == InvalidXLogRecPtr) - elog(ERROR, "prepare_lsn is not set in prepare message"); + elog(ERROR, "prepare_lsn is not set in %s message", msgtype); prepare_data->end_lsn = pq_getmsgint64(in); if (prepare_data->end_lsn == InvalidXLogRecPtr) - elog(ERROR, "end_lsn is not set in prepare message"); + elog(ERROR, "end_lsn is not set in %s message", msgtype); prepare_data->prepare_time = pq_getmsgint64(in); prepare_data->xid = pq_getmsgint(in, 4); @@ -201,6 +214,15 @@ logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data) strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid)); } +/* + * Read transaction PREPARE from the stream. + */ +void +logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data) +{ + logicalrep_read_prepare_common(in, "prepare", prepare_data); +} + /* * Write COMMIT PREPARED to the output stream. */ diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index b9a7a7ffbb32d..3f499b11f72d9 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -333,6 +333,8 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata, /* Compute GID for two_phase transactions */ static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid); +/* Common streaming function to apply all the spooled messages */ +static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn); /* * Should this worker apply changes for given relation. @@ -884,6 +886,40 @@ apply_handle_begin_prepare(StringInfo s) pgstat_report_activity(STATE_RUNNING, NULL); } +/* + * Common function to prepare the GID. + */ +static void +apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data) +{ + char gid[GIDSIZE]; + + /* + * Compute unique GID for two_phase transactions. We don't use GID of + * prepared transaction sent by server as that can lead to deadlock when + * we have multiple subscriptions from same node point to publications on + * the same node. See comments atop worker.c + */ + TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid, + gid, sizeof(gid)); + + /* + * BeginTransactionBlock is necessary to balance the EndTransactionBlock + * called within the PrepareTransactionBlock below. + */ + BeginTransactionBlock(); + CommitTransactionCommand(); /* Completes the preceding Begin command. */ + + /* + * Update origin state so we can restart streaming from correct position + * in case of crash. + */ + replorigin_session_origin_lsn = prepare_data->end_lsn; + replorigin_session_origin_timestamp = prepare_data->prepare_time; + + PrepareTransactionBlock(gid); +} + /* * Handle PREPARE message. */ @@ -891,7 +927,6 @@ static void apply_handle_prepare(StringInfo s) { LogicalRepPreparedTxnData prepare_data; - char gid[GIDSIZE]; logicalrep_read_prepare(s, &prepare_data); @@ -902,15 +937,6 @@ apply_handle_prepare(StringInfo s) LSN_FORMAT_ARGS(prepare_data.prepare_lsn), LSN_FORMAT_ARGS(remote_final_lsn)))); - /* - * Compute unique GID for two_phase transactions. We don't use GID of - * prepared transaction sent by server as that can lead to deadlock when - * we have multiple subscriptions from same node point to publications on - * the same node. See comments atop worker.c - */ - TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid, - gid, sizeof(gid)); - /* * Unlike commit, here, we always prepare the transaction even though no * change has happened in this transaction. It is done this way because at @@ -923,21 +949,8 @@ apply_handle_prepare(StringInfo s) */ begin_replication_step(); - /* - * BeginTransactionBlock is necessary to balance the EndTransactionBlock - * called within the PrepareTransactionBlock below. - */ - BeginTransactionBlock(); - CommitTransactionCommand(); /* Completes the preceding Begin command. */ - - /* - * Update origin state so we can restart streaming from correct position - * in case of crash. - */ - replorigin_session_origin_lsn = prepare_data.end_lsn; - replorigin_session_origin_timestamp = prepare_data.prepare_time; + apply_handle_prepare_internal(&prepare_data); - PrepareTransactionBlock(gid); end_replication_step(); CommitTransactionCommand(); pgstat_report_stat(false); @@ -1256,30 +1269,19 @@ apply_handle_stream_abort(StringInfo s) } /* - * Handle STREAM COMMIT message. + * Common spoolfile processing. */ static void -apply_handle_stream_commit(StringInfo s) +apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) { - TransactionId xid; StringInfoData s2; int nchanges; char path[MAXPGPATH]; char *buffer = NULL; - LogicalRepCommitData commit_data; StreamXidHash *ent; MemoryContext oldcxt; BufFile *fd; - if (in_streamed_transaction) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("STREAM COMMIT message without STREAM STOP"))); - - xid = logicalrep_read_stream_commit(s, &commit_data); - - elog(DEBUG1, "received commit for streamed transaction %u", xid); - /* Make sure we have an open transaction */ begin_replication_step(); @@ -1311,7 +1313,7 @@ apply_handle_stream_commit(StringInfo s) MemoryContextSwitchTo(oldcxt); - remote_final_lsn = commit_data.commit_lsn; + remote_final_lsn = lsn; /* * Make sure the handle apply_dispatch methods are aware we're in a remote @@ -1390,6 +1392,29 @@ apply_handle_stream_commit(StringInfo s) elog(DEBUG1, "replayed %d (all) changes from file \"%s\"", nchanges, path); + return; +} + +/* + * Handle STREAM COMMIT message. + */ +static void +apply_handle_stream_commit(StringInfo s) +{ + TransactionId xid; + LogicalRepCommitData commit_data; + + if (in_streamed_transaction) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("STREAM COMMIT message without STREAM STOP"))); + + xid = logicalrep_read_stream_commit(s, &commit_data); + + elog(DEBUG1, "received commit for streamed transaction %u", xid); + + apply_spooled_messages(xid, commit_data.commit_lsn); + apply_handle_commit_internal(s, &commit_data); /* unlink the files with serialized changes and subxact info */