Skip to content

Commit

Permalink
replication: recovery mixed transacrtions
Browse files Browse the repository at this point in the history
See the docbot request for details.

Closes tarantool#7932

@TarantoolBot document
Title: correct recovery of mixed transactions

Transaction boundaries in WAL were introduced in 2.1.2 (`872d6f1`), but
replication applier started to apply transaction atomically only in
2.2.1(`e94ba2e`), so within this range it is possible that transactions
are mixed in WAL. There was no problem with them until 2.8.1 because
recovering process was still working row-by-row, but in 2.8.1 (`9311113`)
it became transactional as well so starting from 2.8.1 tarantool fails to
handle mixed transactions from WAL.

Mixed transactions is obviously the one of unexpected scenarios in modern
tarantool, so we should not handle it in normal mode, and we take care of
it only in force recovery mode. But current 'force recovery' treats all
the rows of the unexpected transaction as unexpected and thus just skips
the entire transaction row-by-row. However we have enough information in
this case to handle it more data-friendly.

Note:
Let there be two nodes (`node#1` and `node#2`). And let the data be
replicated from `node#1` to `node#2`. Suppose that at some point in time,
`node#1` is restoring data from an xlog containing mixed transactions. To
replicate data from `node#1` to `node#2`, do the following:
1) Disable `node#2`
2) Start `node#1` by setting `force_recovery` to `true`
3) Connect `node#2`
  • Loading branch information
yanshtunder committed May 16, 2023
1 parent bc5a086 commit a9044f6
Show file tree
Hide file tree
Showing 6 changed files with 310 additions and 1 deletion.
14 changes: 14 additions & 0 deletions changelogs/unreleased/gh-7932-recovery-mixed-transactions.md
@@ -0,0 +1,14 @@
## feature/replication

* Implemented correct recovery of mixed transactions. To do this, set
`box.cfg.force_recovery` to `true`. If you need to revert to the old
behavior, don't set the `force_recovery` option.

* Note:
Let there be two nodes (`node#1` and `node#2`). And let the data be
replicated from `node#1` to `node#2`. Suppose that at some point in time,
`node#1` is restoring data from an xlog containing mixed transactions. To
replicate data from `node#1` to `node#2`, do the following:
1. Disable `node#2`
2. Start `node#1` by setting `force_recovery` to `true`
3. Connect `node#2` (gh-7932).
170 changes: 169 additions & 1 deletion src/box/box.cc
Expand Up @@ -95,6 +95,7 @@
#include "wal_ext.h"
#include "mp_util.h"
#include "small/static.h"
#include "memory.h"

static char status[64] = "unconfigured";

Expand Down Expand Up @@ -549,6 +550,10 @@ box_set_orphan(bool orphan)
struct wal_stream {
/** Base class. */
struct xstream base;
/** The lsregion for allocating rows. */
struct lsregion lsr;
/** The array of lists keeping rows from xlog. */
struct rlist nodes_rows[VCLOCK_MAX];
/** Current transaction ID. 0 when no transaction. */
int64_t tsn;
/**
Expand Down Expand Up @@ -638,7 +643,16 @@ wal_stream_has_tx(const struct wal_stream *stream)
{
bool has = stream->tsn != 0;
assert(has == (in_txn() != NULL));
return has;

bool unfinished = false;
const struct rlist *nodes_rows = stream->nodes_rows;
for (int i = 0; i < VCLOCK_MAX; i++) {
if (!rlist_empty((struct rlist *)&nodes_rows[i])) {
unfinished = true;
break;
}
}
return has || unfinished;
}

static int
Expand Down Expand Up @@ -790,6 +804,141 @@ wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row)
return -1;
}

/* Storage, adding rows to a rlist to separate mixed transactions. */
struct wal_row {
/* Base class. */
struct xrow_header row;
/* List of rows to stacked in the nodes_rows array. */
struct rlist in_row_list;
/* A growing identifier to track lsregion allocations. */
int64_t lsr_id;
};

/*
* Callback to stash row and row bodies upon receipt, used to recover
* mixed transactions.
*/
static struct wal_row *
wal_stream_save_row(struct wal_stream *stream, struct xrow_header *row)
{
static int64_t lsr_id = 0;
struct lsregion *lsr = &stream->lsr;
struct wal_row *new_row = (struct wal_row *)xlsregion_alloc(lsr,
sizeof(struct wal_row), ++lsr_id);
new_row->lsr_id = lsr_id;
memcpy(new_row, row, sizeof(struct xrow_header));

assert(new_row->row.bodycnt <= 1);
if (new_row->row.bodycnt == 1) {
size_t len = new_row->row.body[0].iov_len;
char *new_base = (char *)xlsregion_alloc(lsr, len, lsr_id);
memcpy(new_base, new_row->row.body[0].iov_base, len);
/* Adjust row body pointers. */
new_row->row.body[0].iov_base = new_base;
}
return new_row;
}

/*
* Deallocating memory for the wal_row
*/
static void
deallocate_wal_row(struct wal_stream *stream, int64_t lsr_id)
{
struct lsregion *lsr = &stream->lsr;
lsregion_gc(lsr, lsr_id);
}

/*
* Find the min lsr_id that is still needed.
*/
static int64_t
find_min_lsr_id(struct wal_stream *stream, struct wal_row *row)
{
int64_t min_lsr_id = row->lsr_id;
struct rlist *nodes_rows = stream->nodes_rows;
struct wal_row *item;
for (uint32_t i = 0; i < VCLOCK_MAX && i != row->row.replica_id;
i++) {
if (!rlist_empty(&nodes_rows[i])) {
item = rlist_first_entry(&nodes_rows[i], wal_row,
in_row_list);
if (item->lsr_id < min_lsr_id)
min_lsr_id = item->lsr_id - 1;
}
}
return min_lsr_id;
}

/*
* When restoring the log is read row-by-row. However, it is necessary
* to store the rows for correct further recovery. For example, with
* mixed transactions. The following function saves the newly arrived
* row to the rlist, which is stored in array is nodes_rows in cell is
* replica_id. As soon as a row arrives with the is_commit flag set, the
* corresponding transaction tries to apply. If an error occurs the
* transaction is rolled back. And we continue to work with the next
* transaction.
*/
static int
wal_stream_apply_mixed_dml_row(struct wal_stream *stream,
struct xrow_header *row)
{
int rc = 0;
uint32_t id = row->replica_id;
struct rlist *nodes_rows = stream->nodes_rows;

/* When a new row arrives, it has the same address. Because the
* row is on the stack. Therefore, it needs to allocate memory on
* the heap. We do with row->body[0].iov_base analogically.
*/
struct wal_row *save_row = wal_stream_save_row(stream, row);
rlist_add_tail_entry(&nodes_rows[id], save_row, in_row_list);

if (!row->is_commit) {
return 0;
} else {
int64_t lsr_id = find_min_lsr_id(stream, save_row);
struct wal_row *item, *next_row;
rlist_foreach_entry_safe(item, &nodes_rows[id],
in_row_list, next_row) {
rlist_del_entry(item, in_row_list);
if (rc == 0) {
rc = wal_stream_apply_dml_row
(stream, (struct xrow_header *)item);
}
}
/* deallocating the memory */
deallocate_wal_row(stream, lsr_id);
assert(rlist_empty(&nodes_rows[id]));

return rc;
}
}

/*
* To find which transaction is not completed, you need to go through all
* the cells of the array, and then free the memory.
*/
static void
clear_unfinished_txn(struct wal_stream *stream)
{
struct rlist *nodes_rows = stream->nodes_rows;
for (int i = 0; i < VCLOCK_MAX; i++) {
if (!rlist_empty(&nodes_rows[i])) {
struct wal_row *item, *next_row;
rlist_foreach_entry_safe(item, &nodes_rows[i],
in_row_list, next_row) {
rlist_del_entry(item, in_row_list);
}
assert(rlist_empty(&nodes_rows[i]));
}
}
int64_t lsr_id = stream->base.row_count;
/* deallocating the memory */
deallocate_wal_row(stream, lsr_id);
}

/**
* Yield once in a while, but not too often, mostly to allow signal handling to
* take place.
Expand All @@ -814,6 +963,9 @@ wal_stream_apply_row(struct xstream *base, struct xrow_header *row)
} else if (iproto_type_is_raft_request(row->type)) {
if (wal_stream_apply_raft_row(stream, row) != 0)
goto end_error;
} else if (box_is_force_recovery) {
if (wal_stream_apply_mixed_dml_row(stream, row) != 0)
goto end_error;
} else if (wal_stream_apply_dml_row(stream, row) != 0) {
goto end_error;
}
Expand Down Expand Up @@ -843,6 +995,21 @@ wal_stream_create(struct wal_stream *ctx)
{
xstream_create(&ctx->base, wal_stream_apply_row,
wal_stream_schedule_yield);
lsregion_create(&ctx->lsr, &runtime);
for (int i = 0; i < VCLOCK_MAX; i++) {
rlist_create(&ctx->nodes_rows[i]);
}
ctx->tsn = 0;
ctx->first_row_lsn = 0;
ctx->has_yield = false;
ctx->has_global_row = false;
}

static void
wal_stream_destroy(struct wal_stream *ctx)
{
clear_unfinished_txn(ctx);
lsregion_destroy(&ctx->lsr);
ctx->tsn = 0;
ctx->first_row_lsn = 0;
ctx->has_yield = false;
Expand Down Expand Up @@ -4661,6 +4828,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
diag_set(XlogError, "found a not finished transaction "
"in the log");
wal_stream_abort(&wal_stream);
wal_stream_destroy(&wal_stream);
if (!box_is_force_recovery)
diag_raise();
diag_log();
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
127 changes: 127 additions & 0 deletions test/replication-luatest/gh_7932_recovery_mixed_transactions_test.lua
@@ -0,0 +1,127 @@
local t = require('luatest')
local cluster = require('luatest.replica_set')
local fio = require('fio')

local g = t.group('gh_7932')

g.before_each(function(cg)
cg.cluster = cluster:new({})
cg.server = cg.cluster:build_and_add_server{
alias = 'server',
box_cfg = {
instance_uuid = '1a685e63-24cd-4fc5-9532-bf85f649e0ab',
},
}
cg.cluster:start()
end)

g.after_each(function(cg)
cg.cluster:drop()
end)

g.test_good_xlog_with_force_recovery = function(cg)
--Delete all *.xlogs on the server
cg.server:stop()
local glob = fio.pathjoin(cg.server.workdir, '*.xlog')
local xlog = fio.glob(glob)
for _, file in pairs(xlog) do fio.unlink(file) end
--Copying the prepared log to the server
xlog = 'test/replication-luatest/gh_7932_data/good_xlog/'..
'00000000000000000000.xlog'
fio.copyfile(xlog, cg.server.workdir)

cg.server.box_cfg.force_recovery = true
cg.server:start()
-- The first transaction is applied and after the second transaction
-- is applied.
t.assert_equals(cg.server:exec(function()
return box.space.test:select()
end), {{1, 2}, {2, 2}, {3, 2}})
end

g.test_good_xlog_without_force_recovery = function(cg)
-- Delete all *.xlogs on the server
cg.server:stop()
local glob = fio.pathjoin(cg.server.workdir, '*.xlog')
local xlog = fio.glob(glob)
for _, file in pairs(xlog) do fio.unlink(file) end
-- Copying the prepared log to the server
xlog = 'test/replication-luatest/gh_7932_data/good_xlog/'..
'00000000000000000000.xlog'
fio.copyfile(xlog, cg.server.workdir)

cg.server.box_cfg.force_recovery = false
cg.server:start({wait_until_ready = false})
local logfile = fio.pathjoin(cg.server.workdir, 'server.log')
-- The transactions aren't applied without force_recovery
t.helpers.retrying({}, function()
t.assert(cg.server:grep_log("error at request", nil,
{filename = logfile}), "{type: 'REPLACE', replica_id: 2, "..
"lsn: 32050, space_id: 512, index_id: 0, tuple: [1, 2]}")
t.assert(cg.server:grep_log("XlogError", nil,
{filename = logfile}), "found a next transaction with the "..
"previous one not yet committed")
end)
end

g.test_bad_xlog_with_force_recovery = function(cg)
--Delete all *.xlogs on the server
cg.server:stop()
local glob = fio.pathjoin(cg.server.workdir, '*.xlog')
local xlog = fio.glob(glob)
for _, file in pairs(xlog) do fio.unlink(file) end
--Copying the prepared log to the server
xlog = 'test/replication-luatest/gh_7932_data/bad_xlog/'..
'00000000000000000000.xlog'
fio.copyfile(xlog, cg.server.workdir)

cg.server.box_cfg.force_recovery = true
cg.server:start()
local logfile = fio.pathjoin(cg.server.workdir, 'server.log')

-- The second transaction cannot be applied because the first global
-- row in the transaction has an LSN/TSN mismatch.
t.helpers.retrying({}, function()
t.assert(cg.server:grep_log("error at request", nil,
{filename = logfile}), "{type: 'REPLACE', replica_id: 2, "..
"lsn: 32051, space_id: 512, index_id: 0, tuple: [2, 2]}")
t.assert(cg.server:grep_log("skipping row {2: 32052}", nil,
{filename = logfile}), "skipping row")
t.assert(cg.server:grep_log("XlogError", nil,
{filename = logfile}), "found a first global row in a "..
"transaction with LSN/TSN mismatch")
end)

-- Only the first transaction is applied
t.assert_equals(cg.server:exec(function()
return box.space.test:select()
end), {{1, 3}, {2, 3}, {3, 3}})
end

g.test_not_finished_transaction = function(cg)
--Delete all *.xlogs on the server
cg.server:stop()
local glob = fio.pathjoin(cg.server.workdir, '*.xlog')
local xlog = fio.glob(glob)
for _, file in pairs(xlog) do fio.unlink(file) end
--Copying the prepared log to the server
xlog = 'test/replication-luatest/gh_7932_data/not_finished_xlog/'..
'00000000000000000000.xlog'
fio.copyfile(xlog, cg.server.workdir)

cg.server.box_cfg.force_recovery = true
cg.server:start()
local logfile = fio.pathjoin(cg.server.workdir, 'server.log')

-- The second transaction is not completed
t.helpers.retrying({}, function()
t.assert(cg.server:grep_log("XlogError", nil,
{filename = logfile}), "found a not finished transaction " ..
"in the log")
end)

-- Only the first transaction is applied
t.assert_equals(cg.server:exec(function()
return box.space.test:select()
end), {{1, 3}, {2, 3}, {3, 3}})
end

0 comments on commit a9044f6

Please sign in to comment.