diff --git a/changelogs/unreleased/gh-7932-recovery-mixed-transactions.md b/changelogs/unreleased/gh-7932-recovery-mixed-transactions.md new file mode 100644 index 000000000000..a70b420da841 --- /dev/null +++ b/changelogs/unreleased/gh-7932-recovery-mixed-transactions.md @@ -0,0 +1,13 @@ +## feature/replication + +* Implemented correct recovery of mixed transactions. To do this, set + `box.cfg.force_recovery` to `true`. If you need to revert to the old + behavior, don't set the `force_recovery` option. + +* Let there be two nodes (`node#1` and `node#2`). And let the data be + replicated from `node#1` to `node#2`. Suppose that at some point in time, + `node#1` is restoring data from an xlog containing mixed transactions. To + replicate data from `node#1` to `node#2`, do the following: + 1. Stop `node#2` and delete all xlog files from it + 2. Restart `node#1` by setting `force_recovery` to `true` + 3. Make `node#2` rejoin to `node#1` again (gh-7932). diff --git a/src/box/box.cc b/src/box/box.cc index 4ea445c3710b..1507e75319fa 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -95,6 +95,7 @@ #include "wal_ext.h" #include "mp_util.h" #include "small/static.h" +#include "memory.h" static char status[64] = "unconfigured"; @@ -549,6 +550,10 @@ box_set_orphan(bool orphan) struct wal_stream { /** Base class. */ struct xstream base; + /** The lsregion for allocating rows. */ + struct lsregion lsr; + /** The array of lists keeping rows from xlog. */ + struct rlist nodes_rows[VCLOCK_MAX]; /** Current transaction ID. 0 when no transaction. */ int64_t tsn; /** @@ -634,18 +639,43 @@ wal_stream_abort(struct wal_stream *stream) * checking even in release. */ static bool -wal_stream_has_tx(const struct wal_stream *stream) +wal_stream_has_tx_in_progress(const struct wal_stream *stream) { bool has = stream->tsn != 0; assert(has == (in_txn() != NULL)); return has; } +/** + * The wrapper exists only for the debug purposes, to ensure tsn being non-0 is + * in sync with the fiber's txn being non-NULL. In addition, there is a check + * for the completion of the transaction.It has nothing to do with the journal + * content, and therefore can use assertions instead of rigorous error checking + * even in release. + */ +static bool +wal_stream_has_unfinished_tx(const struct wal_stream *stream) +{ + bool has = stream->tsn != 0; + assert(has == (in_txn() != NULL)); + + bool unfinished = false; + const struct rlist *nodes_rows = stream->nodes_rows; + for (int i = 0; i < VCLOCK_MAX; i++) { + if (!rlist_empty((struct rlist *)&nodes_rows[i])) { + unfinished = true; + break; + } + } + + return has || unfinished; +} + static int wal_stream_apply_synchro_row(struct wal_stream *stream, struct xrow_header *row) { assert(iproto_type_is_synchro_request(row->type)); - if (wal_stream_has_tx(stream)) { + if (wal_stream_has_tx_in_progress(stream)) { diag_set(XlogError, "found synchro request in a transaction"); return -1; } @@ -661,7 +691,7 @@ static int wal_stream_apply_raft_row(struct wal_stream *stream, struct xrow_header *row) { assert(iproto_type_is_raft_request(row->type)); - if (wal_stream_has_tx(stream)) { + if (wal_stream_has_tx_in_progress(stream)) { diag_set(XlogError, "found raft request in a transaction"); return -1; } @@ -735,7 +765,7 @@ wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row) } stream->has_global_row = true; } - assert(wal_stream_has_tx(stream)); + assert(wal_stream_has_tx_in_progress(stream)); /* Nops might appear at least after before_replace skipping rows. */ if (request.type != IPROTO_NOP) { struct space *space = space_cache_find(request.space_id); @@ -790,6 +820,128 @@ wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row) return -1; } +/* Storage, adding rows to a rlist to separate mixed transactions. */ +struct wal_row { + /* Base class. */ + struct xrow_header row; + /* A link on the list of rows stacked in the nodes_rows array. */ + struct rlist in_row_list; + /* A growing identifier to track lsregion allocations. */ + int64_t lsr_id; +}; + +/* + * Callback to stash row and row bodies upon receipt, used to recover + * mixed transactions. + */ +static struct wal_row * +wal_stream_save_row(struct wal_stream *stream, struct xrow_header *row) +{ + static int64_t lsr_id = 0; + struct lsregion *lsr = &stream->lsr; + struct wal_row *new_row = + (struct wal_row *)xlsregion_alloc(lsr, sizeof(struct wal_row), + ++lsr_id); + new_row->lsr_id = lsr_id; + memcpy(new_row, row, sizeof(struct xrow_header)); + + assert(new_row->row.bodycnt <= 1); + if (new_row->row.bodycnt == 1) { + size_t len = new_row->row.body[0].iov_len; + char *new_base = (char *)xlsregion_alloc(lsr, len, lsr_id); + memcpy(new_base, new_row->row.body[0].iov_base, len); + /* Adjust row body pointers. */ + new_row->row.body[0].iov_base = new_base; + } + return new_row; +} + +/* + * Find the min lsr_id that is still needed. + */ +static int64_t +find_min_lsr_id(struct wal_stream *stream) +{ + int64_t min_lsr_id = stream->base.row_count; + struct rlist *nodes_rows = stream->nodes_rows; + struct wal_row *item; + /* + * For each new row, lsr_id is incremented by 1. Only row from + * different replica_id can get mixed, so for each replica_id, + * the very first row has the smallest lsr_id of all row with that + * replica_id. Thus, for each transaction read to the end, to + * iterate through the nodes_rows array of lists and see which of + * the first `row` for each `replica_id` has the lowest lsr_id. + * This lsr_id is still needed, but anything less than this is not. + * So can free everything up to this lsr_id. + */ + for (uint32_t i = 0; i < VCLOCK_MAX; i++) { + if (!rlist_empty(&nodes_rows[i])) { + item = rlist_first_entry(&nodes_rows[i], wal_row, + in_row_list); + if (item->lsr_id < min_lsr_id) + min_lsr_id = item->lsr_id - 1; + } + } + return min_lsr_id; +} + +/* + * Deallocating memory for the wal_row + */ +static void +wal_stream_gc(struct wal_stream *stream) +{ + struct lsregion *lsr = &stream->lsr; + int64_t lsr_id = find_min_lsr_id(stream); + lsregion_gc(lsr, lsr_id); +} + +/* + * When restoring the log is read row-by-row. However, it is necessary + * to store the rows for correct further recovery. For example, with + * mixed transactions. The following function saves the newly arrived + * row to the rlist, which is stored in array is nodes_rows in cell is + * replica_id. As soon as a row arrives with the is_commit flag set, the + * corresponding transaction tries to apply. If an error occurs the + * transaction is rolled back. And we continue to work with the next + * transaction. + */ +static int +wal_stream_apply_mixed_dml_row(struct wal_stream *stream, + struct xrow_header *row) +{ + int rc = 0; + uint32_t id = row->replica_id; + struct rlist *nodes_rows = stream->nodes_rows; + + /* + * When a new row arrives, it has the same address. Because the + * row is on the stack. Therefore, it needs to allocate memory on + * the heap. We do with row->body[0].iov_base analogically. + */ + struct wal_row *save_row = wal_stream_save_row(stream, row); + rlist_add_tail_entry(&nodes_rows[id], save_row, in_row_list); + + if (!row->is_commit) { + return rc; + } else { + struct wal_row *item, *next_row; + rlist_foreach_entry_safe(item, &nodes_rows[id], + in_row_list, next_row) { + rlist_del_entry(item, in_row_list); + if (rc != 0) + continue; + rc = wal_stream_apply_dml_row(stream, &item->row); + } + /* deallocating the memory */ + wal_stream_gc(stream); + assert(rlist_empty(&nodes_rows[id])); + + return rc; + } +} + /** * Yield once in a while, but not too often, mostly to allow signal handling to * take place. @@ -797,7 +949,7 @@ wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row) static void wal_stream_try_yield(struct wal_stream *stream) { - if (wal_stream_has_tx(stream) || !stream->has_yield) + if (wal_stream_has_tx_in_progress(stream) || !stream->has_yield) return; stream->has_yield = false; fiber_sleep(0); @@ -814,6 +966,9 @@ wal_stream_apply_row(struct xstream *base, struct xrow_header *row) } else if (iproto_type_is_raft_request(row->type)) { if (wal_stream_apply_raft_row(stream, row) != 0) goto end_error; + } else if (box_is_force_recovery) { + if (wal_stream_apply_mixed_dml_row(stream, row) != 0) + goto end_error; } else if (wal_stream_apply_dml_row(stream, row) != 0) { goto end_error; } @@ -843,12 +998,23 @@ wal_stream_create(struct wal_stream *ctx) { xstream_create(&ctx->base, wal_stream_apply_row, wal_stream_schedule_yield); + lsregion_create(&ctx->lsr, &runtime); + for (int i = 0; i < VCLOCK_MAX; i++) { + rlist_create(&ctx->nodes_rows[i]); + } ctx->tsn = 0; ctx->first_row_lsn = 0; ctx->has_yield = false; ctx->has_global_row = false; } +static void +wal_stream_destroy(struct wal_stream *ctx) +{ + lsregion_destroy(&ctx->lsr); + memset(ctx, 0, sizeof(*ctx)); +} + /* {{{ configuration bindings */ /* @@ -4572,6 +4738,7 @@ local_recovery(const struct tt_uuid *instance_uuid, wal_stream_create(&wal_stream); auto stream_guard = make_scoped_guard([&]{ wal_stream_abort(&wal_stream); + wal_stream_destroy(&wal_stream); }); struct recovery *recovery = recovery_new( wal_dir(), box_is_force_recovery, checkpoint_vclock); @@ -4657,7 +4824,7 @@ local_recovery(const struct tt_uuid *instance_uuid, engine_begin_final_recovery_xc(); recover_remaining_wals(recovery, &wal_stream.base, NULL, false); - if (wal_stream_has_tx(&wal_stream)) { + if (wal_stream_has_unfinished_tx(&wal_stream)) { diag_set(XlogError, "found a not finished transaction " "in the log"); wal_stream_abort(&wal_stream); @@ -4684,7 +4851,7 @@ local_recovery(const struct tt_uuid *instance_uuid, } recovery_stop_local(recovery); recover_remaining_wals(recovery, &wal_stream.base, NULL, true); - if (wal_stream_has_tx(&wal_stream)) { + if (wal_stream_has_unfinished_tx(&wal_stream)) { diag_set(XlogError, "found a not finished transaction " "in the log in hot standby mode"); wal_stream_abort(&wal_stream); diff --git a/test/replication-luatest/gh_7932_data/bad_xlog/00000000000000000000.xlog b/test/replication-luatest/gh_7932_data/bad_xlog/00000000000000000000.xlog new file mode 100644 index 000000000000..3d28648ff270 Binary files /dev/null and b/test/replication-luatest/gh_7932_data/bad_xlog/00000000000000000000.xlog differ diff --git a/test/replication-luatest/gh_7932_data/good_xlog/00000000000000000000.xlog b/test/replication-luatest/gh_7932_data/good_xlog/00000000000000000000.xlog new file mode 100644 index 000000000000..30f6f5bee567 Binary files /dev/null and b/test/replication-luatest/gh_7932_data/good_xlog/00000000000000000000.xlog differ diff --git a/test/replication-luatest/gh_7932_data/not_finished_xlog/00000000000000000000.xlog b/test/replication-luatest/gh_7932_data/not_finished_xlog/00000000000000000000.xlog new file mode 100644 index 000000000000..7a36674d6573 Binary files /dev/null and b/test/replication-luatest/gh_7932_data/not_finished_xlog/00000000000000000000.xlog differ diff --git a/test/replication-luatest/gh_7932_recovery_mixed_transactions_test.lua b/test/replication-luatest/gh_7932_recovery_mixed_transactions_test.lua new file mode 100644 index 000000000000..0bde8a5c160f --- /dev/null +++ b/test/replication-luatest/gh_7932_recovery_mixed_transactions_test.lua @@ -0,0 +1,127 @@ +local t = require('luatest') +local cluster = require('luatest.replica_set') +local fio = require('fio') + +local g = t.group('gh_7932') + +g.before_each(function(cg) + cg.cluster = cluster:new({}) + cg.server = cg.cluster:build_and_add_server{ + alias = 'server', + box_cfg = { + instance_uuid = '1a685e63-24cd-4fc5-9532-bf85f649e0ab', + }, + } + cg.cluster:start() +end) + +g.after_each(function(cg) + cg.cluster:drop() +end) + +g.test_good_xlog_with_force_recovery = function(cg) + --Delete all *.xlogs on the server + cg.server:stop() + local glob = fio.pathjoin(cg.server.workdir, '*.xlog') + local xlog = fio.glob(glob) + for _, file in pairs(xlog) do fio.unlink(file) end + --Copying the prepared log to the server + xlog = 'test/replication-luatest/gh_7932_data/good_xlog/'.. + '00000000000000000000.xlog' + fio.copyfile(xlog, cg.server.workdir) + + cg.server.box_cfg.force_recovery = true + cg.server:start() + -- The first transaction is applied and after the second transaction + -- is applied. + t.assert_equals(cg.server:exec(function() + return box.space.test:select() + end), {{1, 2}, {2, 2}, {3, 2}}) +end + +g.test_good_xlog_without_force_recovery = function(cg) + -- Delete all *.xlogs on the server + cg.server:stop() + local glob = fio.pathjoin(cg.server.workdir, '*.xlog') + local xlog = fio.glob(glob) + for _, file in pairs(xlog) do fio.unlink(file) end + -- Copying the prepared log to the server + xlog = 'test/replication-luatest/gh_7932_data/good_xlog/'.. + '00000000000000000000.xlog' + fio.copyfile(xlog, cg.server.workdir) + + cg.server.box_cfg.force_recovery = false + cg.server:start({wait_until_ready = false}) + local logfile = fio.pathjoin(cg.server.workdir, 'server.log') + -- The transactions aren't applied without force_recovery + t.helpers.retrying({}, function() + t.assert(cg.server:grep_log("error at request", nil, + {filename = logfile}), "{type: 'REPLACE', replica_id: 2, ".. + "lsn: 32050, space_id: 512, index_id: 0, tuple: [1, 2]}") + t.assert(cg.server:grep_log("XlogError", nil, + {filename = logfile}), "found a next transaction with the ".. + "previous one not yet committed") + end) +end + +g.test_bad_xlog_with_force_recovery = function(cg) + --Delete all *.xlogs on the server + cg.server:stop() + local glob = fio.pathjoin(cg.server.workdir, '*.xlog') + local xlog = fio.glob(glob) + for _, file in pairs(xlog) do fio.unlink(file) end + --Copying the prepared log to the server + xlog = 'test/replication-luatest/gh_7932_data/bad_xlog/'.. + '00000000000000000000.xlog' + fio.copyfile(xlog, cg.server.workdir) + + cg.server.box_cfg.force_recovery = true + cg.server:start() + local logfile = fio.pathjoin(cg.server.workdir, 'server.log') + + -- The second transaction cannot be applied because the first global + -- row in the transaction has an LSN/TSN mismatch. + t.helpers.retrying({}, function() + t.assert(cg.server:grep_log("error at request", nil, + {filename = logfile}), "{type: 'REPLACE', replica_id: 2, ".. + "lsn: 32051, space_id: 512, index_id: 0, tuple: [2, 2]}") + t.assert(cg.server:grep_log("skipping row {2: 32052}", nil, + {filename = logfile}), "skipping row") + t.assert(cg.server:grep_log("XlogError", nil, + {filename = logfile}), "found a first global row in a ".. + "transaction with LSN/TSN mismatch") + end) + + -- Only the first transaction is applied + t.assert_equals(cg.server:exec(function() + return box.space.test:select() + end), {{1, 3}, {2, 3}, {3, 3}}) +end + +g.test_not_finished_transaction = function(cg) + --Delete all *.xlogs on the server + cg.server:stop() + local glob = fio.pathjoin(cg.server.workdir, '*.xlog') + local xlog = fio.glob(glob) + for _, file in pairs(xlog) do fio.unlink(file) end + --Copying the prepared log to the server + xlog = 'test/replication-luatest/gh_7932_data/not_finished_xlog/'.. + '00000000000000000000.xlog' + fio.copyfile(xlog, cg.server.workdir) + + cg.server.box_cfg.force_recovery = true + cg.server:start() + local logfile = fio.pathjoin(cg.server.workdir, 'server.log') + + -- The second transaction is not completed + t.helpers.retrying({}, function() + t.assert(cg.server:grep_log("XlogError", nil, + {filename = logfile}), "found a not finished transaction " .. + "in the log") + end) + + -- Only the first transaction is applied + t.assert_equals(cg.server:exec(function() + return box.space.test:select() + end), {{1, 3}, {2, 3}, {3, 3}}) +end