Skip to content

Commit

Permalink
replication: recovery mixed transacrtions
Browse files Browse the repository at this point in the history
See the docbot request for details.

Closes tarantool#7932

@TarantoolBot document
Title: correct recovery of mixed transactions

In this patch implemented correct recovery of mixed transactions. To do
this, set  `box.cfg.force_recovery` to `true`. If you need to revert to
the old behavior, don't set the `force_recovery` option.

What to do when one node feeds the other a xlog with mixed transactions?
Let there be two nodes (`node#1` and `node#2`). And let the data be
replicated from `node#1` to `node#2`. Suppose that at some point in time,
`node#1` is restoring data from an xlog containing mixed transactions. To
replicate data from `node#1` to `node#2`, do the following:
1. Stop `node#2` and delete all xlog files from it
2. Restart `node#1` by setting `force_recovery` to `true`
3. Make `node#2` rejoin to `node#1` again.
  • Loading branch information
yanshtunder authored and sergepetrenko committed Jun 16, 2023
1 parent ba8eb14 commit 873e431
Show file tree
Hide file tree
Showing 6 changed files with 342 additions and 7 deletions.
5 changes: 5 additions & 0 deletions changelogs/unreleased/gh-7932-recovery-mixed-transactions.md
@@ -0,0 +1,5 @@
## feature/replication

* Implemented correct recovery of mixed transactions. To do this, set
`box.cfg.force_recovery` to `true`. If you need to revert to the old
behavior, don't set the `force_recovery` option (gh-7932).
180 changes: 173 additions & 7 deletions src/box/box.cc
Expand Up @@ -95,6 +95,7 @@
#include "wal_ext.h"
#include "mp_util.h"
#include "small/static.h"
#include "memory.h"
#include "node_name.h"
#include "tt_sort.h"

Expand Down Expand Up @@ -557,6 +558,10 @@ box_set_orphan(bool orphan)
struct wal_stream {
/** Base class. */
struct xstream base;
/** The lsregion for allocating rows. */
struct lsregion lsr;
/** The array of lists keeping rows from xlog. */
struct rlist nodes_rows[VCLOCK_MAX];
/** Current transaction ID. 0 when no transaction. */
int64_t tsn;
/**
Expand Down Expand Up @@ -642,18 +647,39 @@ wal_stream_abort(struct wal_stream *stream)
* checking even in release.
*/
static bool
wal_stream_has_tx(const struct wal_stream *stream)
wal_stream_has_tx_in_progress(const struct wal_stream *stream)
{
bool has = stream->tsn != 0;
assert(has == (in_txn() != NULL));
return has;
}

/**
* The function checks that the tail of the transaction from the journal
* has been read. A mixed transaction consists of multiple transactions
* that are mixed together. Need to make sure that after the restoration
* is completed, there are no unfinished transactions left in the
* nodes_rows array. wal_stream_has_tx_in_progress is a transaction which
* has all the data, but simply isn't committed yet.
*/
static bool
wal_stream_has_unfinished_tx(const struct wal_stream *stream)
{
if (wal_stream_has_tx_in_progress(stream))
return true;
const struct rlist *nodes_rows = stream->nodes_rows;
for (int i = 0; i < VCLOCK_MAX; i++) {
if (!rlist_empty((struct rlist *)&nodes_rows[i]))
return true;
}
return false;
}

static int
wal_stream_apply_synchro_row(struct wal_stream *stream, struct xrow_header *row)
{
assert(iproto_type_is_synchro_request(row->type));
if (wal_stream_has_tx(stream)) {
if (wal_stream_has_tx_in_progress(stream)) {
diag_set(XlogError, "found synchro request in a transaction");
return -1;
}
Expand All @@ -669,7 +695,7 @@ static int
wal_stream_apply_raft_row(struct wal_stream *stream, struct xrow_header *row)
{
assert(iproto_type_is_raft_request(row->type));
if (wal_stream_has_tx(stream)) {
if (wal_stream_has_tx_in_progress(stream)) {
diag_set(XlogError, "found raft request in a transaction");
return -1;
}
Expand Down Expand Up @@ -743,7 +769,7 @@ wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row)
}
stream->has_global_row = true;
}
assert(wal_stream_has_tx(stream));
assert(wal_stream_has_tx_in_progress(stream));
/* Nops might appear at least after before_replace skipping rows. */
if (request.type != IPROTO_NOP) {
struct space *space = space_cache_find(request.space_id);
Expand Down Expand Up @@ -798,14 +824,138 @@ wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row)
return -1;
}

/**
* Keeping added rows in a rlist to separate mixed transactions
*/
struct wal_row {
/** Base class. */
struct xrow_header row;
/** A link on the list of rows stacked in the nodes_rows array. */
struct rlist in_row_list;
/** A growing identifier to track lsregion allocations. */
int64_t lsr_id;
};

/**
* Callback to stash row and row bodies upon receipt, used to recover
* mixed transactions.
*/
static struct wal_row *
wal_stream_save_row(struct wal_stream *stream,
const struct xrow_header *row)
{
static int64_t lsr_id = 0;
struct lsregion *lsr = &stream->lsr;
struct wal_row *new_row = xlsregion_alloc_object(lsr, ++lsr_id,
typeof(*new_row));
new_row->lsr_id = lsr_id;
new_row->row = *row;

assert(new_row->row.bodycnt <= 1);
if (new_row->row.bodycnt == 1) {
size_t len = new_row->row.body[0].iov_len;
char *new_base = (char *)xlsregion_alloc(lsr, len, lsr_id);
memcpy(new_base, new_row->row.body[0].iov_base, len);
/* Adjust row body pointers. */
new_row->row.body[0].iov_base = new_base;
}
return new_row;
}

/**
* Find the min lsr_id that is still needed.
*/
static int64_t
wal_stream_find_min_lsr_id(struct wal_stream *stream)
{
int64_t min_lsr_id = INT64_MAX;
struct rlist *nodes_rows = stream->nodes_rows;
struct wal_row *item;
/*
* For each new row, lsr_id is incremented by 1. Only row from
* different replica_id can get mixed, so for each replica_id,
* the very first row has the smallest lsr_id of all row with that
* replica_id. Thus, for each transaction read to the end, to
* iterate through the nodes_rows array of lists and see which of
* the first `row` for each `replica_id` has the lowest lsr_id.
* This lsr_id is still needed, but anything less than this is not.
* So can free everything up to this lsr_id.
*/
for (uint32_t i = 0; i < VCLOCK_MAX; i++) {
if (!rlist_empty(&nodes_rows[i])) {
item = rlist_first_entry(&nodes_rows[i], wal_row,
in_row_list);
if (item->lsr_id <= min_lsr_id)
min_lsr_id = item->lsr_id - 1;
}
}
return min_lsr_id;
}

/**
* Deallocating memory for the wal_row
*/
static void
wal_stream_gc(struct wal_stream *stream)
{
struct lsregion *lsr = &stream->lsr;
int64_t lsr_id = wal_stream_find_min_lsr_id(stream);
lsregion_gc(lsr, lsr_id);
}

/**
* When restoring the log is read row-by-row. However, it is necessary
* to store the rows for correct further recovery. For example, with
* mixed transactions. The function saves the again coming string to the
* rlist. This rlist is stored in nodes_rows array in replica_id cell.
* As soon as a row arrives with the is_commit flag set, the corresponding
* transaction tries to apply. If an error occurs the transaction is
* rolled back. And we continue to work with the next transaction.
*/
static int
wal_stream_apply_mixed_dml_row(struct wal_stream *stream,
struct xrow_header *row)
{
/*
* A local transaction should not be written to nodes_rows with
* id equal to 0. It should be written to nodes_rows with the id
* of the node from which we are recovering. Since a local
* transaction can only exist on the node from which we are
* recovering.
*/
uint32_t id = row->replica_id ? row->replica_id : instance_id;
struct rlist *nodes_rows = stream->nodes_rows;

struct wal_row *save_row = wal_stream_save_row(stream, row);
rlist_add_tail_entry(&nodes_rows[id], save_row, in_row_list);

if (!row->is_commit)
return 0;

int rc = 0;
struct wal_row *item, *next_row;
rlist_foreach_entry_safe(item, &nodes_rows[id],
in_row_list, next_row) {
rlist_del_entry(item, in_row_list);
if (rc != 0)
continue;
rc = wal_stream_apply_dml_row(stream, &item->row);
}
/* deallocating the memory */
wal_stream_gc(stream);
assert(rlist_empty(&nodes_rows[id]));

return rc;
}

/**
* Yield once in a while, but not too often, mostly to allow signal handling to
* take place.
*/
static void
wal_stream_try_yield(struct wal_stream *stream)
{
if (wal_stream_has_tx(stream) || !stream->has_yield)
if (wal_stream_has_tx_in_progress(stream) || !stream->has_yield)
return;
stream->has_yield = false;
fiber_sleep(0);
Expand All @@ -822,6 +972,9 @@ wal_stream_apply_row(struct xstream *base, struct xrow_header *row)
} else if (iproto_type_is_raft_request(row->type)) {
if (wal_stream_apply_raft_row(stream, row) != 0)
goto end_error;
} else if (box_is_force_recovery) {
if (wal_stream_apply_mixed_dml_row(stream, row) != 0)
goto end_error;
} else if (wal_stream_apply_dml_row(stream, row) != 0) {
goto end_error;
}
Expand Down Expand Up @@ -851,12 +1004,23 @@ wal_stream_create(struct wal_stream *ctx)
{
xstream_create(&ctx->base, wal_stream_apply_row,
wal_stream_schedule_yield);
lsregion_create(&ctx->lsr, &runtime);
for (int i = 0; i < VCLOCK_MAX; i++) {
rlist_create(&ctx->nodes_rows[i]);
}
ctx->tsn = 0;
ctx->first_row_lsn = 0;
ctx->has_yield = false;
ctx->has_global_row = false;
}

static void
wal_stream_destroy(struct wal_stream *ctx)
{
lsregion_destroy(&ctx->lsr);
TRASH(ctx);
}

/* {{{ configuration bindings */

/*
Expand Down Expand Up @@ -4876,6 +5040,7 @@ local_recovery(const struct vclock *checkpoint_vclock)
wal_stream_create(&wal_stream);
auto stream_guard = make_scoped_guard([&]{
wal_stream_abort(&wal_stream);
wal_stream_destroy(&wal_stream);
});
struct recovery *recovery = recovery_new(
wal_dir(), box_is_force_recovery, checkpoint_vclock);
Expand Down Expand Up @@ -4961,7 +5126,7 @@ local_recovery(const struct vclock *checkpoint_vclock)

engine_begin_final_recovery_xc();
recover_remaining_wals(recovery, &wal_stream.base, NULL, false);
if (wal_stream_has_tx(&wal_stream)) {
if (wal_stream_has_unfinished_tx(&wal_stream)) {
diag_set(XlogError, "found a not finished transaction "
"in the log");
wal_stream_abort(&wal_stream);
Expand All @@ -4988,7 +5153,7 @@ local_recovery(const struct vclock *checkpoint_vclock)
}
recovery_stop_local(recovery);
recover_remaining_wals(recovery, &wal_stream.base, NULL, true);
if (wal_stream_has_tx(&wal_stream)) {
if (wal_stream_has_unfinished_tx(&wal_stream)) {
diag_set(XlogError, "found a not finished transaction "
"in the log in hot standby mode");
wal_stream_abort(&wal_stream);
Expand Down Expand Up @@ -5042,6 +5207,7 @@ local_recovery(const struct vclock *checkpoint_vclock)
}
stream_guard.is_active = false;
recovery_finalize(recovery);
wal_stream_destroy(&wal_stream);

/*
* We must enable WAL before finalizing engine recovery,
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.

0 comments on commit 873e431

Please sign in to comment.