Skip to content

Commit

Permalink
replication: fix extraneous split-brain alerting
Browse files Browse the repository at this point in the history
Current split-brain detector implementation raises an error each time a
CONFIRM or ROLLBACK entry is received from the previous synchronous
transaction queue owner. It is assumed that the new queue owner must
have witnessed all the previous CONFIRMS. Besides, according to Raft,
ROLLBACK should never happen.

Actually there is a case when a CONFIRM from an old term is legal: it's
possible that during leader transition old leader writes a CONFIRM for
the same transaction that is confirmed by the new leader's PROMOTE. If
PROMOTE and CONFIRM lsns match there is nothing bad about such
situation.

Symmetrically, when an old leader issues a ROLLBACK with the lsn right
after the new leader's PROMOTE lsn, it is not a split-brain.

Allow such cases by tracking the last confirmed lsn for each synchronous
transaction queue owner and silently nopifying CONFIRMs with an lsn less
than the one recorded and ROLLBACKs with lsn greater than that.

Additionally, persist the confirmed lsns for all previous synchronous
transaction queue owner for the sake of correct filtering after
restarts.

Closes tarantool#9138

NO_DOC=bugfix
  • Loading branch information
sergepetrenko committed Nov 14, 2023
1 parent 028d7b2 commit 1a4aede
Show file tree
Hide file tree
Showing 11 changed files with 211 additions and 52 deletions.
6 changes: 6 additions & 0 deletions changelogs/unreleased/gh-9138-relax-split-brain-check.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
## bugfix/replication

* Fixed a false-positive split-brain error when an old synchronous transaction
queue owner confirmed the same transactions which were already confirmed by
the new queue owner, or rolled back the same transactions which were rolled
back by the new queue owner (gh-9138).
41 changes: 31 additions & 10 deletions src/box/applier.cc
Original file line number Diff line number Diff line change
Expand Up @@ -795,8 +795,11 @@ applier_wait_snapshot(struct applier *applier)
xrow_decode_error_xc(&row);
} else if (iproto_type_is_promote_request(row.type)) {
struct synchro_request req;
if (xrow_decode_synchro(&row, &req) != 0)
struct vclock limbo_vclock;
if (xrow_decode_synchro(&row, &req,
&limbo_vclock) != 0) {
diag_raise();
}
if (txn_limbo_process(&txn_limbo, &req) != 0)
diag_raise();
} else if (iproto_type_is_raft_request(row.type)) {
Expand Down Expand Up @@ -1104,7 +1107,7 @@ applier_parse_tx_row(struct applier_tx_row *tx_row)
diag_raise();
}
} else if (iproto_type_is_synchro_request(type)) {
if (xrow_decode_synchro(row, &tx_row->req.synchro) != 0) {
if (xrow_decode_synchro(row, &tx_row->req.synchro, NULL) != 0) {
diag_raise();
}
} else if (iproto_type_is_raft_request(type)) {
Expand Down Expand Up @@ -1516,14 +1519,14 @@ applier_synchro_filter_tx(struct stailq *rows)
* It may happen that we receive the instance's rows via some third
* node, so cannot check for applier->instance_id here.
*/
row = &stailq_first_entry(rows, struct applier_tx_row, next)->row;
row = &stailq_last_entry(rows, struct applier_tx_row, next)->row;
uint64_t term = txn_limbo_replica_term(&txn_limbo, row->replica_id);
assert(term <= txn_limbo.promote_greatest_term);
if (term == txn_limbo.promote_greatest_term)
return;

/*
* We do not nopify promotion/demotion and confirm/rollback.
* We do not nopify promotion/demotion and most of confirm/rollback.
* Such syncrhonous requests should be filtered by txn_limbo to detect
* possible split brain situations.
*
Expand All @@ -1534,14 +1537,10 @@ applier_synchro_filter_tx(struct stailq *rows)
* claimed by someone is a marker of split-brain by itself: consider it
* a synchronous transaction, which is committed with quorum 1.
*/
struct xrow_header *last_row =
&stailq_last_entry(rows, struct applier_tx_row, next)->row;
struct applier_tx_row *item;
if (!last_row->wait_sync) {
if (!iproto_type_is_dml(last_row->type) ||
txn_limbo.owner_id == REPLICA_ID_NIL) {
if (iproto_type_is_dml(row->type) && !row->wait_sync) {
if (txn_limbo.owner_id == REPLICA_ID_NIL)
return;
}
stailq_foreach_entry(item, rows, next) {
row = &item->row;
if (row->type == IPROTO_NOP)
Expand All @@ -1550,6 +1549,28 @@ applier_synchro_filter_tx(struct stailq *rows)
"got an async transaction from an old term");
}
return;
} else if (iproto_type_is_synchro_request(row->type)) {
item = stailq_last_entry(rows, typeof(*item), next);
struct synchro_request req = item->req.synchro;
/* Note! Might be different from row->replica_id. */
uint32_t owner_id = req.replica_id;
int64_t confirmed_lsn =
txn_limbo_replica_confirmed_lsn(&txn_limbo, owner_id);
switch (row->type) {
case IPROTO_RAFT_PROMOTE:
case IPROTO_RAFT_DEMOTE:
return;
case IPROTO_RAFT_CONFIRM:
if (req.lsn > confirmed_lsn)
return;
break;
case IPROTO_RAFT_ROLLBACK:
if (req.lsn <= confirmed_lsn)
return;
break;
default:
unreachable();
}
}
stailq_foreach_entry(item, rows, next) {
row = &item->row;
Expand Down
2 changes: 1 addition & 1 deletion src/box/box.cc
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,7 @@ wal_stream_apply_synchro_row(struct wal_stream *stream, struct xrow_header *row)
return -1;
}
struct synchro_request syn_req;
if (xrow_decode_synchro(row, &syn_req) != 0) {
if (xrow_decode_synchro(row, &syn_req, NULL) != 0) {
say_error("couldn't decode a synchro request");
return -1;
}
Expand Down
10 changes: 7 additions & 3 deletions src/box/memtx_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,8 @@ memtx_engine_recover_synchro(const struct xrow_header *row)
{
assert(row->type == IPROTO_RAFT_PROMOTE);
struct synchro_request req;
if (xrow_decode_synchro(row, &req) != 0)
struct vclock synchro_vclock;
if (xrow_decode_synchro(row, &req, &synchro_vclock) != 0)
return -1;
/*
* Origin id cannot be deduced from row.replica_id in a checkpoint,
Expand Down Expand Up @@ -789,6 +790,8 @@ struct checkpoint {
struct raft_request raft;
/** Synchro request to be written to the snapshot file. */
struct synchro_request synchro_state;
/** The limbo confirmed vclock at the moment of checkpoint creation. */
struct vclock synchro_vclock;
/**
* Do nothing, just touch the snapshot file - the
* checkpoint already exists.
Expand Down Expand Up @@ -886,7 +889,8 @@ checkpoint_new(const char *snap_dirname, uint64_t snap_io_rate_limit)
xlog_clear(&ckpt->snap);
vclock_create(&ckpt->vclock);
box_raft_checkpoint_local(&ckpt->raft);
txn_limbo_checkpoint(&txn_limbo, &ckpt->synchro_state);
txn_limbo_checkpoint(&txn_limbo, &ckpt->synchro_state,
&ckpt->synchro_vclock);
ckpt->touch = false;
return ckpt;
}
Expand Down Expand Up @@ -939,7 +943,7 @@ static int
checkpoint_write_synchro(struct xlog *l, const struct synchro_request *req)
{
struct xrow_header row;
char body[XROW_SYNCHRO_BODY_LEN_MAX];
char body[XROW_BODY_LEN_MAX];
xrow_encode_synchro(&row, body, req);
return checkpoint_write_row(l, &row);
}
Expand Down
7 changes: 4 additions & 3 deletions src/box/relay.cc
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,8 @@ relay_initial_join(struct iostream *io, uint64_t sync, struct vclock *vclock,

struct synchro_request req;
struct raft_request raft_req;
txn_limbo_checkpoint(&txn_limbo, &req);
struct vclock limbo_vclock;
txn_limbo_checkpoint(&txn_limbo, &req, &limbo_vclock);
box_raft_checkpoint_local(&raft_req);

/* Respond to the JOIN request with the current vclock. */
Expand All @@ -511,7 +512,7 @@ relay_initial_join(struct iostream *io, uint64_t sync, struct vclock *vclock,
xrow_encode_raft(&row, &fiber()->gc, &raft_req);
xstream_write(&relay->stream, &row);

char body[XROW_SYNCHRO_BODY_LEN_MAX];
char body[XROW_BODY_LEN_MAX];
xrow_encode_synchro(&row, body, &req);
row.replica_id = req.replica_id;
xstream_write(&relay->stream, &row);
Expand Down Expand Up @@ -1279,7 +1280,7 @@ relay_filter_row(struct relay *relay, struct xrow_header *packet)
*/
if (iproto_type_is_promote_request(packet->type)) {
struct synchro_request req;
xrow_decode_synchro(packet, &req);
xrow_decode_synchro(packet, &req, NULL);
while (relay->sent_raft_term < req.term) {
if (fiber_is_cancelled()) {
diag_set(FiberIsCancelled);
Expand Down
57 changes: 39 additions & 18 deletions src/box/txn_limbo.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ txn_limbo_create(struct txn_limbo *limbo)
fiber_cond_create(&limbo->wait_cond);
vclock_create(&limbo->vclock);
vclock_create(&limbo->promote_term_map);
vclock_create(&limbo->confirmed_vclock);
limbo->promote_greatest_term = 0;
latch_create(&limbo->promote_latch);
limbo->confirmed_lsn = 0;
Expand Down Expand Up @@ -355,13 +356,16 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
}

void
txn_limbo_checkpoint(const struct txn_limbo *limbo,
struct synchro_request *req)
txn_limbo_checkpoint(const struct txn_limbo *limbo, struct synchro_request *req,
struct vclock *vclock)
{
req->type = IPROTO_RAFT_PROMOTE;
req->replica_id = limbo->owner_id;
req->lsn = limbo->confirmed_lsn;
req->term = limbo->promote_greatest_term;
if (vclock != NULL)
vclock_copy(vclock, &limbo->confirmed_vclock);
req->confirmed_vclock = vclock;
}

/** Write a request to WAL. */
Expand All @@ -372,7 +376,7 @@ synchro_request_write(const struct synchro_request *req)
* This is a synchronous commit so we can
* allocate everything on a stack.
*/
char body[XROW_SYNCHRO_BODY_LEN_MAX];
char body[XROW_BODY_LEN_MAX];
struct xrow_header row;
char buf[sizeof(struct journal_entry) +
sizeof(struct xrow_header *)];
Expand Down Expand Up @@ -408,7 +412,7 @@ synchro_request_write(const struct synchro_request *req)
/** Create a request for a specific limbo and write it to WAL. */
static void
txn_limbo_write_synchro(struct txn_limbo *limbo, uint16_t type, int64_t lsn,
uint64_t term)
uint64_t term, struct vclock *vclock)
{
assert(lsn >= 0);

Expand All @@ -417,6 +421,7 @@ txn_limbo_write_synchro(struct txn_limbo *limbo, uint16_t type, int64_t lsn,
.replica_id = limbo->owner_id,
.lsn = lsn,
.term = term,
.confirmed_vclock = vclock,
};
synchro_request_write(&req);
}
Expand All @@ -431,7 +436,8 @@ txn_limbo_write_confirm(struct txn_limbo *limbo, int64_t lsn)
assert(lsn > limbo->confirmed_lsn);
assert(!limbo->is_in_rollback);
limbo->confirmed_lsn = lsn;
txn_limbo_write_synchro(limbo, IPROTO_RAFT_CONFIRM, lsn, 0);
vclock_follow(&limbo->confirmed_vclock, limbo->owner_id, lsn);
txn_limbo_write_synchro(limbo, IPROTO_RAFT_CONFIRM, lsn, 0, NULL);
}

/** Confirm all the entries <= @a lsn. */
Expand Down Expand Up @@ -503,8 +509,10 @@ txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
* comparing existing confirm_lsn with the one arriving from a remote
* instance.
*/
if (limbo->confirmed_lsn < lsn)
if (limbo->confirmed_lsn < lsn) {
limbo->confirmed_lsn = lsn;
vclock_follow(&limbo->confirmed_vclock, limbo->owner_id, lsn);
}
}

/**
Expand All @@ -518,7 +526,7 @@ txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn)
assert(lsn > limbo->confirmed_lsn);
assert(!limbo->is_in_rollback);
limbo->is_in_rollback = true;
txn_limbo_write_synchro(limbo, IPROTO_RAFT_ROLLBACK, lsn, 0);
txn_limbo_write_synchro(limbo, IPROTO_RAFT_ROLLBACK, lsn, 0, NULL);
limbo->is_in_rollback = false;
}

Expand Down Expand Up @@ -572,6 +580,11 @@ txn_limbo_write_promote(struct txn_limbo *limbo, int64_t lsn, uint64_t term)
.origin_id = instance_id,
.lsn = lsn,
.term = term,
/*
* Confirmed_vclock is only persisted in checkpoints. It doesn't
* appear in WALs and replication.
*/
.confirmed_vclock = NULL,
};
if (txn_limbo_req_prepare(limbo, &req) < 0)
return -1;
Expand All @@ -586,19 +599,19 @@ txn_limbo_write_promote(struct txn_limbo *limbo, int64_t lsn, uint64_t term)
*/
static void
txn_limbo_read_promote(struct txn_limbo *limbo, uint32_t replica_id,
uint32_t prev_id, int64_t lsn)
uint32_t prev_id, int64_t lsn,
struct vclock *confirmed_vclock)
{
(void)prev_id;
if (confirmed_vclock != NULL)
vclock_copy(&limbo->confirmed_vclock, confirmed_vclock);
txn_limbo_read_confirm(limbo, lsn);
txn_limbo_read_rollback(limbo, lsn + 1);
assert(txn_limbo_is_empty(limbo));
limbo->owner_id = replica_id;
box_update_ro_summary();
/*
* Only nullify confirmed_lsn when the new value is unknown. I.e. when
* prev_id != replica_id.
*/
if (replica_id != prev_id)
limbo->confirmed_lsn = 0;
limbo->confirmed_lsn = vclock_get(&limbo->confirmed_vclock,
replica_id);
}

int
Expand All @@ -614,6 +627,7 @@ txn_limbo_write_demote(struct txn_limbo *limbo, int64_t lsn, uint64_t term)
.origin_id = instance_id,
.lsn = lsn,
.term = term,
.confirmed_vclock = NULL,
};
if (txn_limbo_req_prepare(limbo, &req) < 0)
return -1;
Expand All @@ -628,9 +642,11 @@ txn_limbo_write_demote(struct txn_limbo *limbo, int64_t lsn, uint64_t term)
* @sa txn_limbo_read_promote.
*/
static void
txn_limbo_read_demote(struct txn_limbo *limbo, uint32_t prev_id, int64_t lsn)
txn_limbo_read_demote(struct txn_limbo *limbo, uint32_t prev_id, int64_t lsn,
struct vclock *confirmed_vclock)
{
return txn_limbo_read_promote(limbo, REPLICA_ID_NIL, prev_id, lsn);
return txn_limbo_read_promote(limbo, REPLICA_ID_NIL, prev_id, lsn,
confirmed_vclock);
}

void
Expand Down Expand Up @@ -1108,6 +1124,8 @@ txn_limbo_req_prepare(struct txn_limbo *limbo,
assert(limbo->svp_confirmed_lsn == -1);
limbo->svp_confirmed_lsn = limbo->confirmed_lsn;
limbo->confirmed_lsn = req->lsn;
vclock_reset(&limbo->confirmed_vclock, limbo->owner_id,
req->lsn);
break;
}
/*
Expand All @@ -1129,6 +1147,8 @@ txn_limbo_req_rollback(struct txn_limbo *limbo,
assert(limbo->is_in_rollback);
assert(limbo->svp_confirmed_lsn >= 0);
limbo->confirmed_lsn = limbo->svp_confirmed_lsn;
vclock_reset(&limbo->confirmed_vclock, limbo->owner_id,
limbo->svp_confirmed_lsn);
limbo->svp_confirmed_lsn = -1;
limbo->is_in_rollback = false;
break;
Expand Down Expand Up @@ -1195,10 +1215,11 @@ txn_limbo_req_commit(struct txn_limbo *limbo, const struct synchro_request *req)
break;
case IPROTO_RAFT_PROMOTE:
txn_limbo_read_promote(limbo, req->origin_id, req->replica_id,
lsn);
lsn, req->confirmed_vclock);
break;
case IPROTO_RAFT_DEMOTE:
txn_limbo_read_demote(limbo, req->replica_id, lsn);
txn_limbo_read_demote(limbo, req->replica_id, lsn,
req->confirmed_vclock);
break;
default:
unreachable();
Expand Down
19 changes: 17 additions & 2 deletions src/box/txn_limbo.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ struct txn_limbo {
* except outdated nodes.
*/
struct vclock promote_term_map;
/**
* A vclock containing biggest known confirmed lsns for each previous
* limbo owner.
*/
struct vclock confirmed_vclock;
/**
* The biggest PROMOTE term seen by the instance and persisted in WAL.
* It is related to raft term, but not the same. Synchronous replication
Expand Down Expand Up @@ -267,6 +272,16 @@ txn_limbo_replica_term(const struct txn_limbo *limbo, uint32_t replica_id)
return vclock_get(&limbo->promote_term_map, replica_id);
}

/**
* Return the latest confirmed lsn for the replica with id @replica_id.
*/
static inline int64_t
txn_limbo_replica_confirmed_lsn(const struct txn_limbo *limbo,
uint32_t replica_id)
{
return vclock_get(&limbo->confirmed_vclock, replica_id);
}

/**
* Return the last synchronous transaction in the limbo or NULL when it is
* empty.
Expand Down Expand Up @@ -408,8 +423,8 @@ txn_limbo_wait_empty(struct txn_limbo *limbo, double timeout);
* Persist limbo state to a given synchro request.
*/
void
txn_limbo_checkpoint(const struct txn_limbo *limbo,
struct synchro_request *req);
txn_limbo_checkpoint(const struct txn_limbo *limbo, struct synchro_request *req,
struct vclock *vclock);

/**
* Write a PROMOTE request, which has the same effect as CONFIRM(@a lsn) and
Expand Down

0 comments on commit 1a4aede

Please sign in to comment.