Skip to content

Commit

Permalink
replication: persist confirmed vclock on replicas
Browse files Browse the repository at this point in the history
Previously the replicas only persisted the confirmed lsn of the current
synchronous transaction queue owner. As soon as the onwer changed, the
info about which lsn was confirmed by the previous owner was lost.

Actually, this info is needed to correctly filter synchro requests
coming from the old term, so start tracking confirmed vclock instead of
the confirmed lsn on replicas.

In-scope of #9138

NO_TEST=covered by the next commit
NO_CHANGELOG=internal change

@TarantoolBot document
Title: Document new IPROTO_RAFT_PROMOTE request field

IPROTO_RAFT_PROMOTE and IPROTO_RAFT_DEMOTE requests receive a new key
value pair:

IPROTO_VCLOCK : MP_MAP

The vclock holds a confirmed vclock of the node sending the request.

(cherry picked from commit c4415d4)
  • Loading branch information
sergepetrenko committed Dec 2, 2023
1 parent f4027bf commit d925e1b
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 35 deletions.
7 changes: 5 additions & 2 deletions src/box/applier.cc
Original file line number Diff line number Diff line change
Expand Up @@ -483,8 +483,11 @@ applier_wait_snapshot(struct applier *applier)
xrow_decode_error_xc(&row);
} else if (iproto_type_is_promote_request(row.type)) {
struct synchro_request req;
if (xrow_decode_synchro(&row, &req) != 0)
struct vclock limbo_vclock;
if (xrow_decode_synchro(&row, &req,
&limbo_vclock) != 0) {
diag_raise();
}
if (txn_limbo_process(&txn_limbo, &req) != 0)
diag_raise();
} else if (iproto_type_is_raft_request(row.type)) {
Expand Down Expand Up @@ -784,7 +787,7 @@ applier_parse_tx_row(struct applier_tx_row *tx_row)
diag_raise();
}
} else if (iproto_type_is_synchro_request(type)) {
if (xrow_decode_synchro(row, &tx_row->req.synchro) != 0) {
if (xrow_decode_synchro(row, &tx_row->req.synchro, NULL) != 0) {
diag_raise();
}
} else if (iproto_type_is_raft_request(type)) {
Expand Down
2 changes: 1 addition & 1 deletion src/box/box.cc
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ wal_stream_apply_synchro_row(struct wal_stream *stream, struct xrow_header *row)
return -1;
}
struct synchro_request syn_req;
if (xrow_decode_synchro(row, &syn_req) != 0) {
if (xrow_decode_synchro(row, &syn_req, NULL) != 0) {
say_error("couldn't decode a synchro request");
return -1;
}
Expand Down
8 changes: 6 additions & 2 deletions src/box/memtx_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ memtx_engine_recover_synchro(const struct xrow_header *row)
{
assert(row->type == IPROTO_RAFT_PROMOTE);
struct synchro_request req;
if (xrow_decode_synchro(row, &req) != 0)
struct vclock synchro_vclock;
if (xrow_decode_synchro(row, &req, &synchro_vclock) != 0)
return -1;
/*
* Origin id cannot be deduced from row.replica_id in a checkpoint,
Expand Down Expand Up @@ -759,6 +760,8 @@ struct checkpoint {
struct xdir dir;
struct raft_request raft;
struct synchro_request synchro_state;
/** The limbo confirmed vclock at the moment of checkpoint creation. */
struct vclock synchro_vclock;
/**
* Do nothing, just touch the snapshot file - the
* checkpoint already exists.
Expand All @@ -784,7 +787,8 @@ checkpoint_new(const char *snap_dirname, uint64_t snap_io_rate_limit)
xdir_create(&ckpt->dir, snap_dirname, SNAP, &INSTANCE_UUID, &opts);
vclock_create(&ckpt->vclock);
box_raft_checkpoint_local(&ckpt->raft);
txn_limbo_checkpoint(&txn_limbo, &ckpt->synchro_state);
txn_limbo_checkpoint(&txn_limbo, &ckpt->synchro_state,
&ckpt->synchro_vclock);
ckpt->touch = false;
return ckpt;
}
Expand Down
5 changes: 3 additions & 2 deletions src/box/relay.cc
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,8 @@ relay_initial_join(struct iostream *io, uint64_t sync, struct vclock *vclock,

struct synchro_request req;
struct raft_request raft_req;
txn_limbo_checkpoint(&txn_limbo, &req);
struct vclock limbo_vclock;
txn_limbo_checkpoint(&txn_limbo, &req, &limbo_vclock);
box_raft_checkpoint_local(&raft_req);

/* Respond to the JOIN request with the current vclock. */
Expand Down Expand Up @@ -1230,7 +1231,7 @@ relay_filter_row(struct relay *relay, struct xrow_header *packet)
*/
if (iproto_type_is_promote_request(packet->type)) {
struct synchro_request req;
xrow_decode_synchro(packet, &req);
xrow_decode_synchro(packet, &req, NULL);
while (relay->sent_raft_term < req.term) {
if (fiber_is_cancelled()) {
diag_set(FiberIsCancelled);
Expand Down
51 changes: 33 additions & 18 deletions src/box/txn_limbo.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ txn_limbo_create(struct txn_limbo *limbo)
fiber_cond_create(&limbo->wait_cond);
vclock_create(&limbo->vclock);
vclock_create(&limbo->promote_term_map);
vclock_create(&limbo->confirmed_vclock);
limbo->promote_greatest_term = 0;
latch_create(&limbo->promote_latch);
limbo->confirmed_lsn = 0;
Expand Down Expand Up @@ -358,13 +359,16 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
}

void
txn_limbo_checkpoint(const struct txn_limbo *limbo,
struct synchro_request *req)
txn_limbo_checkpoint(const struct txn_limbo *limbo, struct synchro_request *req,
struct vclock *vclock)
{
req->type = IPROTO_RAFT_PROMOTE;
req->replica_id = limbo->owner_id;
req->lsn = limbo->confirmed_lsn;
req->term = limbo->promote_greatest_term;
if (vclock != NULL)
vclock_copy(vclock, &limbo->confirmed_vclock);
req->confirmed_vclock = vclock;
}

/** Write a request to WAL. */
Expand Down Expand Up @@ -411,7 +415,7 @@ synchro_request_write(const struct synchro_request *req)
/** Create a request for a specific limbo and write it to WAL. */
static void
txn_limbo_write_synchro(struct txn_limbo *limbo, uint16_t type, int64_t lsn,
uint64_t term)
uint64_t term, struct vclock *vclock)
{
assert(lsn >= 0);

Expand All @@ -420,6 +424,7 @@ txn_limbo_write_synchro(struct txn_limbo *limbo, uint16_t type, int64_t lsn,
.replica_id = limbo->owner_id,
.lsn = lsn,
.term = term,
.confirmed_vclock = vclock,
};
synchro_request_write(&req);
}
Expand All @@ -434,7 +439,8 @@ txn_limbo_write_confirm(struct txn_limbo *limbo, int64_t lsn)
assert(lsn > limbo->confirmed_lsn);
assert(!limbo->is_in_rollback);
limbo->confirmed_lsn = lsn;
txn_limbo_write_synchro(limbo, IPROTO_RAFT_CONFIRM, lsn, 0);
vclock_follow(&limbo->confirmed_vclock, limbo->owner_id, lsn);
txn_limbo_write_synchro(limbo, IPROTO_RAFT_CONFIRM, lsn, 0, NULL);
}

/** Confirm all the entries <= @a lsn. */
Expand Down Expand Up @@ -506,8 +512,10 @@ txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
* comparing existing confirm_lsn with the one arriving from a remote
* instance.
*/
if (limbo->confirmed_lsn < lsn)
if (limbo->confirmed_lsn < lsn) {
limbo->confirmed_lsn = lsn;
vclock_follow(&limbo->confirmed_vclock, limbo->owner_id, lsn);
}
}

/**
Expand All @@ -521,7 +529,7 @@ txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn)
assert(lsn > limbo->confirmed_lsn);
assert(!limbo->is_in_rollback);
limbo->is_in_rollback = true;
txn_limbo_write_synchro(limbo, IPROTO_RAFT_ROLLBACK, lsn, 0);
txn_limbo_write_synchro(limbo, IPROTO_RAFT_ROLLBACK, lsn, 0, NULL);
limbo->is_in_rollback = false;
}

Expand Down Expand Up @@ -575,6 +583,11 @@ txn_limbo_write_promote(struct txn_limbo *limbo, int64_t lsn, uint64_t term)
.origin_id = instance_id,
.lsn = lsn,
.term = term,
/*
* Confirmed_vclock is only persisted in checkpoints. It doesn't
* appear in WALs and replication.
*/
.confirmed_vclock = NULL,
};
if (txn_limbo_req_prepare(limbo, &req) < 0)
return -1;
Expand All @@ -589,19 +602,15 @@ txn_limbo_write_promote(struct txn_limbo *limbo, int64_t lsn, uint64_t term)
*/
static void
txn_limbo_read_promote(struct txn_limbo *limbo, uint32_t replica_id,
uint32_t prev_id, int64_t lsn)
int64_t lsn)
{
txn_limbo_read_confirm(limbo, lsn);
txn_limbo_read_rollback(limbo, lsn + 1);
assert(txn_limbo_is_empty(limbo));
limbo->owner_id = replica_id;
limbo->confirmed_lsn = vclock_get(&limbo->confirmed_vclock,
replica_id);
box_update_ro_summary();
/*
* Only nullify confirmed_lsn when the new value is unknown. I.e. when
* prev_id != replica_id.
*/
if (replica_id != prev_id)
limbo->confirmed_lsn = 0;
}

int
Expand All @@ -617,6 +626,7 @@ txn_limbo_write_demote(struct txn_limbo *limbo, int64_t lsn, uint64_t term)
.origin_id = instance_id,
.lsn = lsn,
.term = term,
.confirmed_vclock = NULL,
};
if (txn_limbo_req_prepare(limbo, &req) < 0)
return -1;
Expand All @@ -631,9 +641,9 @@ txn_limbo_write_demote(struct txn_limbo *limbo, int64_t lsn, uint64_t term)
* @sa txn_limbo_read_promote.
*/
static void
txn_limbo_read_demote(struct txn_limbo *limbo, uint32_t prev_id, int64_t lsn)
txn_limbo_read_demote(struct txn_limbo *limbo, int64_t lsn)
{
return txn_limbo_read_promote(limbo, REPLICA_ID_NIL, prev_id, lsn);
return txn_limbo_read_promote(limbo, REPLICA_ID_NIL, lsn);
}

void
Expand Down Expand Up @@ -1116,6 +1126,8 @@ txn_limbo_req_prepare(struct txn_limbo *limbo,
assert(limbo->svp_confirmed_lsn == -1);
limbo->svp_confirmed_lsn = limbo->confirmed_lsn;
limbo->confirmed_lsn = req->lsn;
vclock_reset(&limbo->confirmed_vclock, limbo->owner_id,
req->lsn);
break;
}
/*
Expand All @@ -1137,6 +1149,8 @@ txn_limbo_req_rollback(struct txn_limbo *limbo,
assert(limbo->is_in_rollback);
assert(limbo->svp_confirmed_lsn >= 0);
limbo->confirmed_lsn = limbo->svp_confirmed_lsn;
vclock_reset(&limbo->confirmed_vclock, limbo->owner_id,
limbo->svp_confirmed_lsn);
limbo->svp_confirmed_lsn = -1;
limbo->is_in_rollback = false;
break;
Expand Down Expand Up @@ -1192,6 +1206,8 @@ txn_limbo_req_commit(struct txn_limbo *limbo, const struct synchro_request *req)
}
}
}
if (req->confirmed_vclock != NULL)
vclock_copy(&limbo->confirmed_vclock, req->confirmed_vclock);

int64_t lsn = req->lsn;
switch (req->type) {
Expand All @@ -1202,11 +1218,10 @@ txn_limbo_req_commit(struct txn_limbo *limbo, const struct synchro_request *req)
txn_limbo_read_rollback(limbo, lsn);
break;
case IPROTO_RAFT_PROMOTE:
txn_limbo_read_promote(limbo, req->origin_id, req->replica_id,
lsn);
txn_limbo_read_promote(limbo, req->origin_id, lsn);
break;
case IPROTO_RAFT_DEMOTE:
txn_limbo_read_demote(limbo, req->replica_id, lsn);
txn_limbo_read_demote(limbo, lsn);
break;
default:
unreachable();
Expand Down
9 changes: 7 additions & 2 deletions src/box/txn_limbo.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ struct txn_limbo {
* except outdated nodes.
*/
struct vclock promote_term_map;
/**
* A vclock containing biggest known confirmed lsns for each previous
* limbo owner.
*/
struct vclock confirmed_vclock;
/**
* The biggest PROMOTE term seen by the instance and persisted in WAL.
* It is related to raft term, but not the same. Synchronous replication
Expand Down Expand Up @@ -401,8 +406,8 @@ txn_limbo_wait_empty(struct txn_limbo *limbo, double timeout);
* Persist limbo state to a given synchro request.
*/
void
txn_limbo_checkpoint(const struct txn_limbo *limbo,
struct synchro_request *req);
txn_limbo_checkpoint(const struct txn_limbo *limbo, struct synchro_request *req,
struct vclock *vclock);

/**
* Write a PROMOTE request, which has the same effect as CONFIRM(@a lsn) and
Expand Down
29 changes: 25 additions & 4 deletions src/box/xrow.c
Original file line number Diff line number Diff line change
Expand Up @@ -1093,20 +1093,32 @@ xrow_encode_synchro(struct xrow_header *row, char *body,

char *pos = body;

pos = mp_encode_map(pos,
iproto_type_is_promote_request(req->type) ? 3 : 2);
/* Skip one byte for the map. */
pos++;
uint32_t map_size = 0;

pos = mp_encode_uint(pos, IPROTO_REPLICA_ID);
pos = mp_encode_uint(pos, req->replica_id);
map_size++;

pos = mp_encode_uint(pos, IPROTO_LSN);
pos = mp_encode_uint(pos, req->lsn);
map_size++;

if (iproto_type_is_promote_request(req->type)) {
if (req->term != 0) {
pos = mp_encode_uint(pos, IPROTO_TERM);
pos = mp_encode_uint(pos, req->term);
map_size++;
}

if (req->confirmed_vclock != NULL) {
pos = mp_encode_uint(pos, IPROTO_VCLOCK);
pos = mp_encode_vclock_ignore0(pos, req->confirmed_vclock);
map_size++;
}

mp_encode_map(body, map_size);

assert(pos - body < XROW_BODY_LEN_MAX);

memset(row, 0, sizeof(*row));
Expand All @@ -1117,7 +1129,8 @@ xrow_encode_synchro(struct xrow_header *row, char *body,
}

int
xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req)
xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req,
struct vclock *vclock)
{
if (row->bodycnt == 0) {
diag_set(ClientError, ER_INVALID_MSGPACK, "request body");
Expand All @@ -1144,6 +1157,7 @@ xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req)
uint8_t key = mp_decode_uint(&d);
if (key < IPROTO_KEY_MAX &&
iproto_key_type[key] != mp_typeof(*d)) {
bad_msgpack:
xrow_on_decode_err(row, ER_INVALID_MSGPACK,
"request body");
return -1;
Expand All @@ -1158,6 +1172,13 @@ xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req)
case IPROTO_TERM:
req->term = mp_decode_uint(&d);
break;
case IPROTO_VCLOCK:
if (vclock == NULL)
mp_next(&d);
else if (mp_decode_vclock_ignore0(&d, vclock) != 0)
goto bad_msgpack;
req->confirmed_vclock = vclock;
break;
default:
mp_next(&d);
}
Expand Down
11 changes: 9 additions & 2 deletions src/box/xrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,14 @@ struct synchro_request {
int64_t lsn;
/**
* The new term the instance issuing this request is in. Only used for
* PROMOTE request.
* PROMOTE and DEMOTE requests.
*/
uint64_t term;
/**
* Confirmed lsns of all the previous limbo owners. Only used for
* PROMOTE and DEMOTE requests.
*/
struct vclock *confirmed_vclock;
};

/**
Expand All @@ -301,11 +306,13 @@ xrow_encode_synchro(struct xrow_header *row, char *body,
* Decode synchronous replication request.
* @param row xrow header.
* @param[out] req Request parameters.
* @param[out] vclock Storage for request vclock.
* @retval -1 on error.
* @retval 0 success.
*/
int
xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req);
xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req,
struct vclock *vclock);

/**
* Raft request. It repeats Raft message to the letter, but can be extended in
Expand Down
5 changes: 3 additions & 2 deletions test/unit/xrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,8 @@ test_xrow_decode_unknown_key(void)

struct synchro_request synchro;
header.type = IPROTO_RAFT_PROMOTE;
is(xrow_decode_synchro(&header, &synchro), 0, "xrow_decode_synchro");
is(xrow_decode_synchro(&header, &synchro, NULL), 0,
"xrow_decode_synchro");

struct raft_request raft;
header.type = IPROTO_RAFT;
Expand Down Expand Up @@ -451,7 +452,7 @@ test_xrow_decode_synchro_types(void)
header.body[0].iov_len = mp_format(buf, sizeof(buf), "{%u%s}",
IPROTO_INSTANCE_UUID, "someuuid");
struct synchro_request synchro;
is(xrow_decode_synchro(&header, &synchro), 0,
is(xrow_decode_synchro(&header, &synchro, NULL), 0,
"xrow_decode_synchro correctly handles key types");

check_plan();
Expand Down

0 comments on commit d925e1b

Please sign in to comment.