Skip to content

Commit

Permalink
replication: recovery mixed transacrtions
Browse files Browse the repository at this point in the history
See the docbot request for details.

Closes tarantool#7932

@TarantoolBot document
Title: correct recovery of mixed transactions

Transaction boundaries in WAL were introduced in 2.1.2 (`872d6f1`), but
replication applier started to apply transaction atomically only in
2.2.1(`e94ba2e`), so within this range it is possible that transactions
are mixed in WAL. There was no problem with them until 2.8.1 because
recovering process was still working row-by-row, but in 2.8.1 (`9311113`)
it became transactional as well so starting from 2.8.1 tarantool fails to
handle mixed transactions from WAL.

Mixed transactions is obviously the one of unexpected scenarios in modern
tarantool, so we should not handle it in normal mode, and we take care of
it only in force recovery mode. But current 'force recovery' treats all
the rows of the unexpected transaction as unexpected and thus just skips
the entire transaction row-by-row. However we have enough information in
this case to handle it more data-friendly.

Note:
Let there be two nodes (`node#1` and `node#2`). And let the data be
replicated from `node#1` to `node#2`. Suppose that at some point in time,
`node#1` is restoring data from an xlog containing mixed transactions. To
replicate data from `node#1` to `node#2`, do the following:
1. Stop `node#2` and delete all xlog files from it
2. Restart `node#1` by setting `force_recovery` to `true`
3. Make `node#2` rejoin to `node#1` again (tarantoolgh-7932).
  • Loading branch information
yanshtunder committed May 22, 2023
1 parent 0e19a96 commit 8e0ce96
Show file tree
Hide file tree
Showing 6 changed files with 314 additions and 7 deletions.
13 changes: 13 additions & 0 deletions changelogs/unreleased/gh-7932-recovery-mixed-transactions.md
@@ -0,0 +1,13 @@
## feature/replication

* Implemented correct recovery of mixed transactions. To do this, set
`box.cfg.force_recovery` to `true`. If you need to revert to the old
behavior, don't set the `force_recovery` option.

* Let there be two nodes (`node#1` and `node#2`). And let the data be
replicated from `node#1` to `node#2`. Suppose that at some point in time,
`node#1` is restoring data from an xlog containing mixed transactions. To
replicate data from `node#1` to `node#2`, do the following:
1. Stop `node#2` and delete all xlog files from it
2. Restart `node#1` by setting `force_recovery` to `true`
3. Make `node#2` rejoin to `node#1` again (gh-7932).
181 changes: 174 additions & 7 deletions src/box/box.cc
Expand Up @@ -95,6 +95,7 @@
#include "wal_ext.h"
#include "mp_util.h"
#include "small/static.h"
#include "memory.h"
#include "node_name.h"

static char status[64] = "unconfigured";
Expand Down Expand Up @@ -556,6 +557,10 @@ box_set_orphan(bool orphan)
struct wal_stream {
/** Base class. */
struct xstream base;
/** The lsregion for allocating rows. */
struct lsregion lsr;
/** The array of lists keeping rows from xlog. */
struct rlist nodes_rows[VCLOCK_MAX];
/** Current transaction ID. 0 when no transaction. */
int64_t tsn;
/**
Expand Down Expand Up @@ -641,18 +646,43 @@ wal_stream_abort(struct wal_stream *stream)
* checking even in release.
*/
static bool
wal_stream_has_tx(const struct wal_stream *stream)
wal_stream_has_tx_in_progress(const struct wal_stream *stream)
{
bool has = stream->tsn != 0;
assert(has == (in_txn() != NULL));
return has;
}

/**
* The wrapper exists only for the debug purposes, to ensure tsn being non-0 is
* in sync with the fiber's txn being non-NULL. In addition, there is a check
* for the completion of the transaction.It has nothing to do with the journal
* content, and therefore can use assertions instead of rigorous error checking
* even in release.
*/
static bool
wal_stream_has_unfinished_tx(const struct wal_stream *stream)
{
bool has = stream->tsn != 0;
assert(has == (in_txn() != NULL));

bool unfinished = false;
const struct rlist *nodes_rows = stream->nodes_rows;
for (int i = 0; i < VCLOCK_MAX; i++) {
if (!rlist_empty((struct rlist *)&nodes_rows[i])) {
unfinished = true;
break;
}
}

return has || unfinished;
}

static int
wal_stream_apply_synchro_row(struct wal_stream *stream, struct xrow_header *row)
{
assert(iproto_type_is_synchro_request(row->type));
if (wal_stream_has_tx(stream)) {
if (wal_stream_has_tx_in_progress(stream)) {
diag_set(XlogError, "found synchro request in a transaction");
return -1;
}
Expand All @@ -668,7 +698,7 @@ static int
wal_stream_apply_raft_row(struct wal_stream *stream, struct xrow_header *row)
{
assert(iproto_type_is_raft_request(row->type));
if (wal_stream_has_tx(stream)) {
if (wal_stream_has_tx_in_progress(stream)) {
diag_set(XlogError, "found raft request in a transaction");
return -1;
}
Expand Down Expand Up @@ -742,7 +772,7 @@ wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row)
}
stream->has_global_row = true;
}
assert(wal_stream_has_tx(stream));
assert(wal_stream_has_tx_in_progress(stream));
/* Nops might appear at least after before_replace skipping rows. */
if (request.type != IPROTO_NOP) {
struct space *space = space_cache_find(request.space_id);
Expand Down Expand Up @@ -797,14 +827,136 @@ wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row)
return -1;
}

/* Storage, adding rows to a rlist to separate mixed transactions. */
struct wal_row {
/* Base class. */
struct xrow_header row;
/* A link on the list of rows stacked in the nodes_rows array. */
struct rlist in_row_list;
/* A growing identifier to track lsregion allocations. */
int64_t lsr_id;
};

/*
* Callback to stash row and row bodies upon receipt, used to recover
* mixed transactions.
*/
static struct wal_row *
wal_stream_save_row(struct wal_stream *stream, struct xrow_header *row)
{
static int64_t lsr_id = 0;
struct lsregion *lsr = &stream->lsr;
struct wal_row *new_row =
(struct wal_row *)xlsregion_alloc(lsr, sizeof(struct wal_row),
++lsr_id);
new_row->lsr_id = lsr_id;
memcpy(new_row, row, sizeof(struct xrow_header));

assert(new_row->row.bodycnt <= 1);
if (new_row->row.bodycnt == 1) {
size_t len = new_row->row.body[0].iov_len;
char *new_base = (char *)xlsregion_alloc(lsr, len, lsr_id);
memcpy(new_base, new_row->row.body[0].iov_base, len);
/* Adjust row body pointers. */
new_row->row.body[0].iov_base = new_base;
}
return new_row;
}

/*
* Find the min lsr_id that is still needed.
*/
static int64_t
find_min_lsr_id(struct wal_stream *stream)
{
int64_t min_lsr_id = stream->base.row_count;
struct rlist *nodes_rows = stream->nodes_rows;
struct wal_row *item;
/*
* For each new row, lsr_id is incremented by 1. Only row from
* different replica_id can get mixed, so for each replica_id,
* the very first row has the smallest lsr_id of all row with that
* replica_id. Thus, for each transaction read to the end, to
* iterate through the nodes_rows array of lists and see which of
* the first `row` for each `replica_id` has the lowest lsr_id.
* This lsr_id is still needed, but anything less than this is not.
* So can free everything up to this lsr_id.
*/
for (uint32_t i = 0; i < VCLOCK_MAX; i++) {
if (!rlist_empty(&nodes_rows[i])) {
item = rlist_first_entry(&nodes_rows[i], wal_row,
in_row_list);
if (item->lsr_id < min_lsr_id)
min_lsr_id = item->lsr_id - 1;
}
}
return min_lsr_id;
}

/*
* Deallocating memory for the wal_row
*/
static void
wal_stream_gc(struct wal_stream *stream)
{
struct lsregion *lsr = &stream->lsr;
int64_t lsr_id = find_min_lsr_id(stream);
lsregion_gc(lsr, lsr_id);
}

/*
* When restoring the log is read row-by-row. However, it is necessary
* to store the rows for correct further recovery. For example, with
* mixed transactions. The following function saves the newly arrived
* row to the rlist, which is stored in array is nodes_rows in cell is
* replica_id. As soon as a row arrives with the is_commit flag set, the
* corresponding transaction tries to apply. If an error occurs the
* transaction is rolled back. And we continue to work with the next
* transaction.
*/
static int
wal_stream_apply_mixed_dml_row(struct wal_stream *stream,
struct xrow_header *row)
{
int rc = 0;
uint32_t id = row->replica_id;
struct rlist *nodes_rows = stream->nodes_rows;

/*
* When a new row arrives, it has the same address. Because the
* row is on the stack. Therefore, it needs to allocate memory on
* the heap. We do with row->body[0].iov_base analogically.
*/
struct wal_row *save_row = wal_stream_save_row(stream, row);
rlist_add_tail_entry(&nodes_rows[id], save_row, in_row_list);

if (!row->is_commit) {
return rc;
} else {
struct wal_row *item, *next_row;
rlist_foreach_entry_safe(item, &nodes_rows[id],
in_row_list, next_row) {
rlist_del_entry(item, in_row_list);
if (rc != 0)
continue;
rc = wal_stream_apply_dml_row(stream, &item->row);
}
/* deallocating the memory */
wal_stream_gc(stream);
assert(rlist_empty(&nodes_rows[id]));

return rc;
}
}

/**
* Yield once in a while, but not too often, mostly to allow signal handling to
* take place.
*/
static void
wal_stream_try_yield(struct wal_stream *stream)
{
if (wal_stream_has_tx(stream) || !stream->has_yield)
if (wal_stream_has_tx_in_progress(stream) || !stream->has_yield)
return;
stream->has_yield = false;
fiber_sleep(0);
Expand All @@ -821,6 +973,9 @@ wal_stream_apply_row(struct xstream *base, struct xrow_header *row)
} else if (iproto_type_is_raft_request(row->type)) {
if (wal_stream_apply_raft_row(stream, row) != 0)
goto end_error;
} else if (box_is_force_recovery) {
if (wal_stream_apply_mixed_dml_row(stream, row) != 0)
goto end_error;
} else if (wal_stream_apply_dml_row(stream, row) != 0) {
goto end_error;
}
Expand Down Expand Up @@ -850,12 +1005,23 @@ wal_stream_create(struct wal_stream *ctx)
{
xstream_create(&ctx->base, wal_stream_apply_row,
wal_stream_schedule_yield);
lsregion_create(&ctx->lsr, &runtime);
for (int i = 0; i < VCLOCK_MAX; i++) {
rlist_create(&ctx->nodes_rows[i]);
}
ctx->tsn = 0;
ctx->first_row_lsn = 0;
ctx->has_yield = false;
ctx->has_global_row = false;
}

static void
wal_stream_destroy(struct wal_stream *ctx)
{
lsregion_destroy(&ctx->lsr);
memset(ctx, 0, sizeof(*ctx));
}

/* {{{ configuration bindings */

/*
Expand Down Expand Up @@ -4839,6 +5005,7 @@ local_recovery(const struct vclock *checkpoint_vclock)
wal_stream_create(&wal_stream);
auto stream_guard = make_scoped_guard([&]{
wal_stream_abort(&wal_stream);
wal_stream_destroy(&wal_stream);
});
struct recovery *recovery = recovery_new(
wal_dir(), box_is_force_recovery, checkpoint_vclock);
Expand Down Expand Up @@ -4924,7 +5091,7 @@ local_recovery(const struct vclock *checkpoint_vclock)

engine_begin_final_recovery_xc();
recover_remaining_wals(recovery, &wal_stream.base, NULL, false);
if (wal_stream_has_tx(&wal_stream)) {
if (wal_stream_has_unfinished_tx(&wal_stream)) {
diag_set(XlogError, "found a not finished transaction "
"in the log");
wal_stream_abort(&wal_stream);
Expand All @@ -4951,7 +5118,7 @@ local_recovery(const struct vclock *checkpoint_vclock)
}
recovery_stop_local(recovery);
recover_remaining_wals(recovery, &wal_stream.base, NULL, true);
if (wal_stream_has_tx(&wal_stream)) {
if (wal_stream_has_unfinished_tx(&wal_stream)) {
diag_set(XlogError, "found a not finished transaction "
"in the log in hot standby mode");
wal_stream_abort(&wal_stream);
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.

0 comments on commit 8e0ce96

Please sign in to comment.