diff --git a/changelogs/unreleased/gh-7932-recovery-mixed-transactions.md b/changelogs/unreleased/gh-7932-recovery-mixed-transactions.md new file mode 100644 index 000000000000..d7f691be64f9 --- /dev/null +++ b/changelogs/unreleased/gh-7932-recovery-mixed-transactions.md @@ -0,0 +1,5 @@ +## feature/replication + +* Implemented correct recovery of mixed transactions. To do this, set + `box.cfg.force_recovery` to `true`. If you need to revert to the old + behavior, don't set the `force_recovery` option (gh-7932). diff --git a/src/box/box.cc b/src/box/box.cc index b9a5e0c665a1..4a0536f7b7ad 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -95,6 +95,7 @@ #include "wal_ext.h" #include "mp_util.h" #include "small/static.h" +#include "memory.h" #include "node_name.h" #include "tt_sort.h" @@ -557,6 +558,10 @@ box_set_orphan(bool orphan) struct wal_stream { /** Base class. */ struct xstream base; + /** The lsregion for allocating rows. */ + struct lsregion lsr; + /** The array of lists keeping rows from xlog. */ + struct rlist nodes_rows[VCLOCK_MAX]; /** Current transaction ID. 0 when no transaction. */ int64_t tsn; /** @@ -642,18 +647,39 @@ wal_stream_abort(struct wal_stream *stream) * checking even in release. */ static bool -wal_stream_has_tx(const struct wal_stream *stream) +wal_stream_has_tx_in_progress(const struct wal_stream *stream) { bool has = stream->tsn != 0; assert(has == (in_txn() != NULL)); return has; } +/** + * The function checks that the tail of the transaction from the journal + * has been read. A mixed transaction consists of multiple transactions + * that are mixed together. Need to make sure that after the restoration + * is completed, there are no unfinished transactions left in the + * nodes_rows array. wal_stream_has_tx_in_progress is a transaction which + * has all the data, but simply isn't committed yet. + */ +static bool +wal_stream_has_unfinished_tx(const struct wal_stream *stream) +{ + if (wal_stream_has_tx_in_progress(stream)) + return true; + const struct rlist *nodes_rows = stream->nodes_rows; + for (int i = 0; i < VCLOCK_MAX; i++) { + if (!rlist_empty((struct rlist *)&nodes_rows[i])) + return true; + } + return false; +} + static int wal_stream_apply_synchro_row(struct wal_stream *stream, struct xrow_header *row) { assert(iproto_type_is_synchro_request(row->type)); - if (wal_stream_has_tx(stream)) { + if (wal_stream_has_tx_in_progress(stream)) { diag_set(XlogError, "found synchro request in a transaction"); return -1; } @@ -669,7 +695,7 @@ static int wal_stream_apply_raft_row(struct wal_stream *stream, struct xrow_header *row) { assert(iproto_type_is_raft_request(row->type)); - if (wal_stream_has_tx(stream)) { + if (wal_stream_has_tx_in_progress(stream)) { diag_set(XlogError, "found raft request in a transaction"); return -1; } @@ -743,7 +769,7 @@ wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row) } stream->has_global_row = true; } - assert(wal_stream_has_tx(stream)); + assert(wal_stream_has_tx_in_progress(stream)); /* Nops might appear at least after before_replace skipping rows. */ if (request.type != IPROTO_NOP) { struct space *space = space_cache_find(request.space_id); @@ -798,6 +824,130 @@ wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row) return -1; } +/** + * Keeping added rows in a rlist to separate mixed transactions + */ +struct wal_row { + /** Base class. */ + struct xrow_header row; + /** A link on the list of rows stacked in the nodes_rows array. */ + struct rlist in_row_list; + /** A growing identifier to track lsregion allocations. */ + int64_t lsr_id; +}; + +/** + * Callback to stash row and row bodies upon receipt, used to recover + * mixed transactions. + */ +static struct wal_row * +wal_stream_save_row(struct wal_stream *stream, + const struct xrow_header *row) +{ + static int64_t lsr_id = 0; + struct lsregion *lsr = &stream->lsr; + struct wal_row *new_row = xlsregion_alloc_object(lsr, ++lsr_id, + typeof(*new_row)); + new_row->lsr_id = lsr_id; + new_row->row = *row; + + assert(new_row->row.bodycnt <= 1); + if (new_row->row.bodycnt == 1) { + size_t len = new_row->row.body[0].iov_len; + char *new_base = (char *)xlsregion_alloc(lsr, len, lsr_id); + memcpy(new_base, new_row->row.body[0].iov_base, len); + /* Adjust row body pointers. */ + new_row->row.body[0].iov_base = new_base; + } + return new_row; +} + +/** + * Find the min lsr_id that is still needed. + */ +static int64_t +wal_stream_find_min_lsr_id(struct wal_stream *stream) +{ + int64_t min_lsr_id = INT64_MAX; + struct rlist *nodes_rows = stream->nodes_rows; + struct wal_row *item; + /* + * For each new row, lsr_id is incremented by 1. Only row from + * different replica_id can get mixed, so for each replica_id, + * the very first row has the smallest lsr_id of all row with that + * replica_id. Thus, for each transaction read to the end, to + * iterate through the nodes_rows array of lists and see which of + * the first `row` for each `replica_id` has the lowest lsr_id. + * This lsr_id is still needed, but anything less than this is not. + * So can free everything up to this lsr_id. + */ + for (uint32_t i = 0; i < VCLOCK_MAX; i++) { + if (!rlist_empty(&nodes_rows[i])) { + item = rlist_first_entry(&nodes_rows[i], wal_row, + in_row_list); + if (item->lsr_id <= min_lsr_id) + min_lsr_id = item->lsr_id - 1; + } + } + return min_lsr_id; +} + +/** + * Deallocating memory for the wal_row + */ +static void +wal_stream_gc(struct wal_stream *stream) +{ + struct lsregion *lsr = &stream->lsr; + int64_t lsr_id = wal_stream_find_min_lsr_id(stream); + lsregion_gc(lsr, lsr_id); +} + +/** + * When restoring the log is read row-by-row. However, it is necessary + * to store the rows for correct further recovery. For example, with + * mixed transactions. The function saves the again coming string to the + * rlist. This rlist is stored in nodes_rows array in replica_id cell. + * As soon as a row arrives with the is_commit flag set, the corresponding + * transaction tries to apply. If an error occurs the transaction is + * rolled back. And we continue to work with the next transaction. + */ +static int +wal_stream_apply_mixed_dml_row(struct wal_stream *stream, + struct xrow_header *row) +{ + /* + * A local transaction should not be written to nodes_rows with + * id equal to 0. It should be written to nodes_rows with the id + * of the node from which we are recovering. Since a local + * transaction can only exist on the node from which we are + * recovering. + */ + uint32_t id = row->replica_id ? row->replica_id : instance_id; + struct rlist *nodes_rows = stream->nodes_rows; + + struct wal_row *save_row = wal_stream_save_row(stream, row); + rlist_add_tail_entry(&nodes_rows[id], save_row, in_row_list); + + if (!row->is_commit) + return 0; + + int rc = 0; + struct wal_row *item, *next_row; + rlist_foreach_entry_safe(item, &nodes_rows[id], + in_row_list, next_row) { + rlist_del_entry(item, in_row_list); + if (rc != 0) + continue; + rc = wal_stream_apply_dml_row(stream, &item->row); + } + /* deallocating the memory */ + wal_stream_gc(stream); + assert(rlist_empty(&nodes_rows[id])); + + return rc; +} + /** * Yield once in a while, but not too often, mostly to allow signal handling to * take place. @@ -805,7 +955,7 @@ wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row) static void wal_stream_try_yield(struct wal_stream *stream) { - if (wal_stream_has_tx(stream) || !stream->has_yield) + if (wal_stream_has_tx_in_progress(stream) || !stream->has_yield) return; stream->has_yield = false; fiber_sleep(0); @@ -822,6 +972,9 @@ wal_stream_apply_row(struct xstream *base, struct xrow_header *row) } else if (iproto_type_is_raft_request(row->type)) { if (wal_stream_apply_raft_row(stream, row) != 0) goto end_error; + } else if (box_is_force_recovery) { + if (wal_stream_apply_mixed_dml_row(stream, row) != 0) + goto end_error; } else if (wal_stream_apply_dml_row(stream, row) != 0) { goto end_error; } @@ -851,12 +1004,23 @@ wal_stream_create(struct wal_stream *ctx) { xstream_create(&ctx->base, wal_stream_apply_row, wal_stream_schedule_yield); + lsregion_create(&ctx->lsr, &runtime); + for (int i = 0; i < VCLOCK_MAX; i++) { + rlist_create(&ctx->nodes_rows[i]); + } ctx->tsn = 0; ctx->first_row_lsn = 0; ctx->has_yield = false; ctx->has_global_row = false; } +static void +wal_stream_destroy(struct wal_stream *ctx) +{ + lsregion_destroy(&ctx->lsr); + TRASH(ctx); +} + /* {{{ configuration bindings */ /* @@ -4876,6 +5040,7 @@ local_recovery(const struct vclock *checkpoint_vclock) wal_stream_create(&wal_stream); auto stream_guard = make_scoped_guard([&]{ wal_stream_abort(&wal_stream); + wal_stream_destroy(&wal_stream); }); struct recovery *recovery = recovery_new( wal_dir(), box_is_force_recovery, checkpoint_vclock); @@ -4961,7 +5126,7 @@ local_recovery(const struct vclock *checkpoint_vclock) engine_begin_final_recovery_xc(); recover_remaining_wals(recovery, &wal_stream.base, NULL, false); - if (wal_stream_has_tx(&wal_stream)) { + if (wal_stream_has_unfinished_tx(&wal_stream)) { diag_set(XlogError, "found a not finished transaction " "in the log"); wal_stream_abort(&wal_stream); @@ -4988,7 +5153,7 @@ local_recovery(const struct vclock *checkpoint_vclock) } recovery_stop_local(recovery); recover_remaining_wals(recovery, &wal_stream.base, NULL, true); - if (wal_stream_has_tx(&wal_stream)) { + if (wal_stream_has_unfinished_tx(&wal_stream)) { diag_set(XlogError, "found a not finished transaction " "in the log in hot standby mode"); wal_stream_abort(&wal_stream); @@ -5042,6 +5207,7 @@ local_recovery(const struct vclock *checkpoint_vclock) } stream_guard.is_active = false; recovery_finalize(recovery); + wal_stream_destroy(&wal_stream); /* * We must enable WAL before finalizing engine recovery, diff --git a/test/replication-luatest/gh_7932_data/bad_xlog/00000000000000000000.xlog b/test/replication-luatest/gh_7932_data/bad_xlog/00000000000000000000.xlog new file mode 100644 index 000000000000..fabf5569d4ae Binary files /dev/null and b/test/replication-luatest/gh_7932_data/bad_xlog/00000000000000000000.xlog differ diff --git a/test/replication-luatest/gh_7932_data/good_xlog/00000000000000000000.xlog b/test/replication-luatest/gh_7932_data/good_xlog/00000000000000000000.xlog new file mode 100644 index 000000000000..ef93c5c4c88b Binary files /dev/null and b/test/replication-luatest/gh_7932_data/good_xlog/00000000000000000000.xlog differ diff --git a/test/replication-luatest/gh_7932_data/not_finished_xlog/00000000000000000000.xlog b/test/replication-luatest/gh_7932_data/not_finished_xlog/00000000000000000000.xlog new file mode 100644 index 000000000000..6bc9c2fdb219 Binary files /dev/null and b/test/replication-luatest/gh_7932_data/not_finished_xlog/00000000000000000000.xlog differ diff --git a/test/replication-luatest/gh_7932_recovery_mixed_transactions_test.lua b/test/replication-luatest/gh_7932_recovery_mixed_transactions_test.lua new file mode 100644 index 000000000000..12eb13b5393a --- /dev/null +++ b/test/replication-luatest/gh_7932_recovery_mixed_transactions_test.lua @@ -0,0 +1,164 @@ +local t = require('luatest') +local cluster = require('luatest.replica_set') +local fio = require('fio') + +local g = t.group('gh_7932') + +g.before_each(function(cg) + cg.cluster = cluster:new({}) + cg.server = cg.cluster:build_and_add_server{ + alias = 'server', + box_cfg = { + instance_uuid = '1a685e63-24cd-4fc5-9532-bf85f649e0ab', + }, + } + cg.cluster:start() +end) + +g.after_each(function(cg) + cg.cluster:drop() +end) + +-- Check that transactions with both local and global spaces recover fine +-- with this patch +g.test_local_and_global_spaces_recover = function(cg) + cg.server:exec(function() + box.schema.space.create('loc', {is_local = true}) + box.space.loc:create_index('pk') + box.schema.space.create('glob') + box.space.glob:create_index('pk') + box.begin() + for i = 1,3 do + box.space.glob:replace{i, box.info.replication[1].id} + box.space.loc:replace{i, 0} + end + box.commit() + end) + + t.assert_equals(cg.server:exec(function() + return box.space.glob:select() + end), {{1, 1}, {2, 1}, {3, 1}}) + t.assert_equals(cg.server:exec(function() + return box.space.loc:select() + end), {{1, 0}, {2, 0}, {3, 0}}) + + cg.server:stop() + + cg.server.box_cfg.force_recovery = true + cg.server:start() + -- Check that the entire transaction has been applied + t.assert_equals(cg.server:exec(function() + return box.space.glob:select() + end), {{1, 1}, {2, 1}, {3, 1}}) + t.assert_equals(cg.server:exec(function() + return box.space.loc:select() + end), {{1, 0}, {2, 0}, {3, 0}}) +end + + +g.test_good_xlog_with_force_recovery = function(cg) + -- Delete all *.xlogs on the server + cg.server:stop() + local glob = fio.pathjoin(cg.server.workdir, '*.xlog') + local xlog = fio.glob(glob) + for _, file in pairs(xlog) do fio.unlink(file) end + -- Copying the prepared log to the server + xlog = 'test/replication-luatest/gh_7932_data/good_xlog/'.. + '00000000000000000000.xlog' + fio.copyfile(xlog, cg.server.workdir) + + cg.server.box_cfg.force_recovery = true + cg.server:start() + -- The first transaction is applied and after the second transaction + -- is applied. + t.assert_equals(cg.server:exec(function() + return box.space.test:select() + end), {{1, 2}, {2, 2}, {3, 2}}) +end + +g.test_good_xlog_without_force_recovery = function(cg) + -- Delete all *.xlogs on the server + cg.server:stop() + local glob = fio.pathjoin(cg.server.workdir, '*.xlog') + local xlog = fio.glob(glob) + for _, file in pairs(xlog) do fio.unlink(file) end + -- Copying the prepared log to the server + xlog = 'test/replication-luatest/gh_7932_data/good_xlog/'.. + '00000000000000000000.xlog' + fio.copyfile(xlog, cg.server.workdir) + + cg.server.box_cfg.force_recovery = false + cg.server:start({wait_until_ready = false}) + local logfile = fio.pathjoin(cg.server.workdir, 'server.log') + -- The transactions aren't applied without force_recovery + t.helpers.retrying({}, function() + t.assert(cg.server:grep_log("error at request", nil, + {filename = logfile}), "{type: 'REPLACE', replica_id: 2, ".. + "lsn: 32050, space_id: 512, index_id: 0, tuple: [1, 2]}") + t.assert(cg.server:grep_log("XlogError", nil, + {filename = logfile}), "found a next transaction with the ".. + "previous one not yet committed") + end) +end + +g.test_bad_xlog_with_force_recovery = function(cg) + -- Delete all *.xlogs on the server + cg.server:stop() + local glob = fio.pathjoin(cg.server.workdir, '*.xlog') + local xlog = fio.glob(glob) + for _, file in pairs(xlog) do fio.unlink(file) end + -- Copying the prepared log to the server + xlog = 'test/replication-luatest/gh_7932_data/bad_xlog/'.. + '00000000000000000000.xlog' + fio.copyfile(xlog, cg.server.workdir) + + cg.server.box_cfg.force_recovery = true + cg.server:start() + local logfile = fio.pathjoin(cg.server.workdir, 'server.log') + + -- The second transaction cannot be applied because the first global + -- row in the transaction has an LSN/TSN mismatch. + t.helpers.retrying({}, function() + t.assert(cg.server:grep_log("error at request", nil, + {filename = logfile}), "{type: 'REPLACE', replica_id: 2, ".. + "lsn: 32051, space_id: 512, index_id: 0, tuple: [2, 2]}") + t.assert(cg.server:grep_log("skipping row {2: 32052}", nil, + {filename = logfile}), "skipping row") + t.assert(cg.server:grep_log("XlogError", nil, + {filename = logfile}), "found a first global row in a ".. + "transaction with LSN/TSN mismatch") + end) + + -- Only the first transaction is applied + t.assert_equals(cg.server:exec(function() + return box.space.test:select() + end), {{1, 3}, {2, 3}, {3, 3}}) +end + +g.test_not_finished_transaction = function(cg) + -- Delete all *.xlogs on the server + cg.server:stop() + local glob = fio.pathjoin(cg.server.workdir, '*.xlog') + local xlog = fio.glob(glob) + for _, file in pairs(xlog) do fio.unlink(file) end + -- Copying the prepared log to the server + xlog = 'test/replication-luatest/gh_7932_data/not_finished_xlog/'.. + '00000000000000000000.xlog' + fio.copyfile(xlog, cg.server.workdir) + + cg.server.box_cfg.force_recovery = true + cg.server:start() + local logfile = fio.pathjoin(cg.server.workdir, 'server.log') + + -- The second transaction is not completed + t.helpers.retrying({}, function() + t.assert(cg.server:grep_log("XlogError", nil, + {filename = logfile}), "found a not finished transaction " .. + "in the log") + end) + + -- Only the first transaction is applied + t.assert_equals(cg.server:exec(function() + return box.space.test:select() + end), {{1, 3}, {2, 3}, {3, 3}}) +end