diff --git a/changelogs/unreleased/gh-7932-recovery-mixed-transactions.md b/changelogs/unreleased/gh-7932-recovery-mixed-transactions.md new file mode 100644 index 000000000000..2fd64416558f --- /dev/null +++ b/changelogs/unreleased/gh-7932-recovery-mixed-transactions.md @@ -0,0 +1,14 @@ +## feature/replication + +* Implemented correct recovery of mixed transactions. To do this, set + `box.cfg.force_recovery` to `true`. If you need to revert to the old + behavior, don't set the `force_recovery` option. + +* Note: + Let there be two nodes (`node#1` and `node#2`). And let the data be + replicated from `node#1` to `node#2`. Suppose that at some point in time, + `node#1` is restoring data from an xlog containing mixed transactions. To + replicate data from `node#1` to `node#2`, do the following: + 1. Disable `node#2` + 2. Start `node#1` by setting `force_recovery` to `true` + 3. Connect `node#2` (gh-7932). diff --git a/src/box/box.cc b/src/box/box.cc index 4ea445c3710b..53c45cda60e5 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -95,6 +95,7 @@ #include "wal_ext.h" #include "mp_util.h" #include "small/static.h" +#include "memory.h" static char status[64] = "unconfigured"; @@ -549,6 +550,10 @@ box_set_orphan(bool orphan) struct wal_stream { /** Base class. */ struct xstream base; + /** The lsregion for allocating rows. */ + struct lsregion lsr; + /** The array of lists keeping rows from xlog. */ + struct rlist nodes_rows[VCLOCK_MAX]; /** Current transaction ID. 0 when no transaction. */ int64_t tsn; /** @@ -638,7 +643,16 @@ wal_stream_has_tx(const struct wal_stream *stream) { bool has = stream->tsn != 0; assert(has == (in_txn() != NULL)); - return has; + + bool unfinished = false; + const struct rlist *nodes_rows = stream->nodes_rows; + for (int i = 0; i < VCLOCK_MAX; i++) { + if (!rlist_empty((struct rlist *)&nodes_rows[i])) { + unfinished = true; + break; + } + } + return has || unfinished; } static int @@ -790,6 +804,141 @@ wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row) return -1; } +/* Storage, adding rows to a rlist to separate mixed transactions. */ +struct wal_row { + /* Base class. */ + struct xrow_header row; + /* List of rows to stacked in the nodes_rows array. */ + struct rlist in_row_list; + /* A growing identifier to track lsregion allocations. */ + int64_t lsr_id; +}; + +/* + * Callback to stash row and row bodies upon receipt, used to recover + * mixed transactions. + */ +static struct wal_row * +wal_stream_save_row(struct wal_stream *stream, struct xrow_header *row) +{ + static int64_t lsr_id = 0; + struct lsregion *lsr = &stream->lsr; + struct wal_row *new_row = (struct wal_row *)xlsregion_alloc(lsr, + sizeof(struct wal_row), ++lsr_id); + new_row->lsr_id = lsr_id; + memcpy(new_row, row, sizeof(struct xrow_header)); + + assert(new_row->row.bodycnt <= 1); + if (new_row->row.bodycnt == 1) { + size_t len = new_row->row.body[0].iov_len; + char *new_base = (char *)xlsregion_alloc(lsr, len, lsr_id); + memcpy(new_base, new_row->row.body[0].iov_base, len); + /* Adjust row body pointers. */ + new_row->row.body[0].iov_base = new_base; + } + return new_row; +} + +/* + * Deallocating memory for the wal_row + */ +static void +deallocate_wal_row(struct wal_stream *stream, int64_t lsr_id) +{ + struct lsregion *lsr = &stream->lsr; + lsregion_gc(lsr, lsr_id); +} + +/* + * Find the min lsr_id that is still needed. + */ +static int64_t +find_min_lsr_id(struct wal_stream *stream, struct wal_row *row) +{ + int64_t min_lsr_id = row->lsr_id; + struct rlist *nodes_rows = stream->nodes_rows; + struct wal_row *item; + for (uint32_t i = 0; i < VCLOCK_MAX && i != row->row.replica_id; + i++) { + if (!rlist_empty(&nodes_rows[i])) { + item = rlist_first_entry(&nodes_rows[i], wal_row, + in_row_list); + if (item->lsr_id < min_lsr_id) + min_lsr_id = item->lsr_id - 1; + } + } + return min_lsr_id; +} + +/* + * When restoring the log is read row-by-row. However, it is necessary + * to store the rows for correct further recovery. For example, with + * mixed transactions. The following function saves the newly arrived + * row to the rlist, which is stored in array is nodes_rows in cell is + * replica_id. As soon as a row arrives with the is_commit flag set, the + * corresponding transaction tries to apply. If an error occurs the + * transaction is rolled back. And we continue to work with the next + * transaction. + */ +static int +wal_stream_apply_mixed_dml_row(struct wal_stream *stream, + struct xrow_header *row) +{ + int rc = 0; + uint32_t id = row->replica_id; + struct rlist *nodes_rows = stream->nodes_rows; + + /* When a new row arrives, it has the same address. Because the + * row is on the stack. Therefore, it needs to allocate memory on + * the heap. We do with row->body[0].iov_base analogically. + */ + struct wal_row *save_row = wal_stream_save_row(stream, row); + rlist_add_tail_entry(&nodes_rows[id], save_row, in_row_list); + + if (!row->is_commit) { + return 0; + } else { + int64_t lsr_id = find_min_lsr_id(stream, save_row); + struct wal_row *item, *next_row; + rlist_foreach_entry_safe(item, &nodes_rows[id], + in_row_list, next_row) { + rlist_del_entry(item, in_row_list); + if (rc == 0) { + rc = wal_stream_apply_dml_row + (stream, (struct xrow_header *)item); + } + } + /* deallocating the memory */ + deallocate_wal_row(stream, lsr_id); + assert(rlist_empty(&nodes_rows[id])); + + return rc; + } +} + +/* + * To find which transaction is not completed, you need to go through all + * the cells of the array, and then free the memory. + */ +static void +clear_unfinished_txn(struct wal_stream *stream) +{ + struct rlist *nodes_rows = stream->nodes_rows; + for (int i = 0; i < VCLOCK_MAX; i++) { + if (!rlist_empty(&nodes_rows[i])) { + struct wal_row *item, *next_row; + rlist_foreach_entry_safe(item, &nodes_rows[i], + in_row_list, next_row) { + rlist_del_entry(item, in_row_list); + } + assert(rlist_empty(&nodes_rows[i])); + } + } + int64_t lsr_id = stream->base.row_count; + /* deallocating the memory */ + deallocate_wal_row(stream, lsr_id); +} + /** * Yield once in a while, but not too often, mostly to allow signal handling to * take place. @@ -814,6 +963,9 @@ wal_stream_apply_row(struct xstream *base, struct xrow_header *row) } else if (iproto_type_is_raft_request(row->type)) { if (wal_stream_apply_raft_row(stream, row) != 0) goto end_error; + } else if (box_is_force_recovery) { + if (wal_stream_apply_mixed_dml_row(stream, row) != 0) + goto end_error; } else if (wal_stream_apply_dml_row(stream, row) != 0) { goto end_error; } @@ -843,6 +995,21 @@ wal_stream_create(struct wal_stream *ctx) { xstream_create(&ctx->base, wal_stream_apply_row, wal_stream_schedule_yield); + lsregion_create(&ctx->lsr, &runtime); + for (int i = 0; i < VCLOCK_MAX; i++) { + rlist_create(&ctx->nodes_rows[i]); + } + ctx->tsn = 0; + ctx->first_row_lsn = 0; + ctx->has_yield = false; + ctx->has_global_row = false; +} + +static void +wal_stream_destroy(struct wal_stream *ctx) +{ + clear_unfinished_txn(ctx); + lsregion_destroy(&ctx->lsr); ctx->tsn = 0; ctx->first_row_lsn = 0; ctx->has_yield = false; @@ -4661,6 +4828,7 @@ local_recovery(const struct tt_uuid *instance_uuid, diag_set(XlogError, "found a not finished transaction " "in the log"); wal_stream_abort(&wal_stream); + wal_stream_destroy(&wal_stream); if (!box_is_force_recovery) diag_raise(); diag_log(); diff --git a/test/replication-luatest/gh_7932_data/bad_xlog/00000000000000000000.xlog b/test/replication-luatest/gh_7932_data/bad_xlog/00000000000000000000.xlog new file mode 100644 index 000000000000..3d28648ff270 Binary files /dev/null and b/test/replication-luatest/gh_7932_data/bad_xlog/00000000000000000000.xlog differ diff --git a/test/replication-luatest/gh_7932_data/good_xlog/00000000000000000000.xlog b/test/replication-luatest/gh_7932_data/good_xlog/00000000000000000000.xlog new file mode 100644 index 000000000000..30f6f5bee567 Binary files /dev/null and b/test/replication-luatest/gh_7932_data/good_xlog/00000000000000000000.xlog differ diff --git a/test/replication-luatest/gh_7932_data/not_finished_xlog/00000000000000000000.xlog b/test/replication-luatest/gh_7932_data/not_finished_xlog/00000000000000000000.xlog new file mode 100644 index 000000000000..7a36674d6573 Binary files /dev/null and b/test/replication-luatest/gh_7932_data/not_finished_xlog/00000000000000000000.xlog differ diff --git a/test/replication-luatest/gh_7932_recovery_mixed_transactions_test.lua b/test/replication-luatest/gh_7932_recovery_mixed_transactions_test.lua new file mode 100644 index 000000000000..0bde8a5c160f --- /dev/null +++ b/test/replication-luatest/gh_7932_recovery_mixed_transactions_test.lua @@ -0,0 +1,127 @@ +local t = require('luatest') +local cluster = require('luatest.replica_set') +local fio = require('fio') + +local g = t.group('gh_7932') + +g.before_each(function(cg) + cg.cluster = cluster:new({}) + cg.server = cg.cluster:build_and_add_server{ + alias = 'server', + box_cfg = { + instance_uuid = '1a685e63-24cd-4fc5-9532-bf85f649e0ab', + }, + } + cg.cluster:start() +end) + +g.after_each(function(cg) + cg.cluster:drop() +end) + +g.test_good_xlog_with_force_recovery = function(cg) + --Delete all *.xlogs on the server + cg.server:stop() + local glob = fio.pathjoin(cg.server.workdir, '*.xlog') + local xlog = fio.glob(glob) + for _, file in pairs(xlog) do fio.unlink(file) end + --Copying the prepared log to the server + xlog = 'test/replication-luatest/gh_7932_data/good_xlog/'.. + '00000000000000000000.xlog' + fio.copyfile(xlog, cg.server.workdir) + + cg.server.box_cfg.force_recovery = true + cg.server:start() + -- The first transaction is applied and after the second transaction + -- is applied. + t.assert_equals(cg.server:exec(function() + return box.space.test:select() + end), {{1, 2}, {2, 2}, {3, 2}}) +end + +g.test_good_xlog_without_force_recovery = function(cg) + -- Delete all *.xlogs on the server + cg.server:stop() + local glob = fio.pathjoin(cg.server.workdir, '*.xlog') + local xlog = fio.glob(glob) + for _, file in pairs(xlog) do fio.unlink(file) end + -- Copying the prepared log to the server + xlog = 'test/replication-luatest/gh_7932_data/good_xlog/'.. + '00000000000000000000.xlog' + fio.copyfile(xlog, cg.server.workdir) + + cg.server.box_cfg.force_recovery = false + cg.server:start({wait_until_ready = false}) + local logfile = fio.pathjoin(cg.server.workdir, 'server.log') + -- The transactions aren't applied without force_recovery + t.helpers.retrying({}, function() + t.assert(cg.server:grep_log("error at request", nil, + {filename = logfile}), "{type: 'REPLACE', replica_id: 2, ".. + "lsn: 32050, space_id: 512, index_id: 0, tuple: [1, 2]}") + t.assert(cg.server:grep_log("XlogError", nil, + {filename = logfile}), "found a next transaction with the ".. + "previous one not yet committed") + end) +end + +g.test_bad_xlog_with_force_recovery = function(cg) + --Delete all *.xlogs on the server + cg.server:stop() + local glob = fio.pathjoin(cg.server.workdir, '*.xlog') + local xlog = fio.glob(glob) + for _, file in pairs(xlog) do fio.unlink(file) end + --Copying the prepared log to the server + xlog = 'test/replication-luatest/gh_7932_data/bad_xlog/'.. + '00000000000000000000.xlog' + fio.copyfile(xlog, cg.server.workdir) + + cg.server.box_cfg.force_recovery = true + cg.server:start() + local logfile = fio.pathjoin(cg.server.workdir, 'server.log') + + -- The second transaction cannot be applied because the first global + -- row in the transaction has an LSN/TSN mismatch. + t.helpers.retrying({}, function() + t.assert(cg.server:grep_log("error at request", nil, + {filename = logfile}), "{type: 'REPLACE', replica_id: 2, ".. + "lsn: 32051, space_id: 512, index_id: 0, tuple: [2, 2]}") + t.assert(cg.server:grep_log("skipping row {2: 32052}", nil, + {filename = logfile}), "skipping row") + t.assert(cg.server:grep_log("XlogError", nil, + {filename = logfile}), "found a first global row in a ".. + "transaction with LSN/TSN mismatch") + end) + + -- Only the first transaction is applied + t.assert_equals(cg.server:exec(function() + return box.space.test:select() + end), {{1, 3}, {2, 3}, {3, 3}}) +end + +g.test_not_finished_transaction = function(cg) + --Delete all *.xlogs on the server + cg.server:stop() + local glob = fio.pathjoin(cg.server.workdir, '*.xlog') + local xlog = fio.glob(glob) + for _, file in pairs(xlog) do fio.unlink(file) end + --Copying the prepared log to the server + xlog = 'test/replication-luatest/gh_7932_data/not_finished_xlog/'.. + '00000000000000000000.xlog' + fio.copyfile(xlog, cg.server.workdir) + + cg.server.box_cfg.force_recovery = true + cg.server:start() + local logfile = fio.pathjoin(cg.server.workdir, 'server.log') + + -- The second transaction is not completed + t.helpers.retrying({}, function() + t.assert(cg.server:grep_log("XlogError", nil, + {filename = logfile}), "found a not finished transaction " .. + "in the log") + end) + + -- Only the first transaction is applied + t.assert_equals(cg.server:exec(function() + return box.space.test:select() + end), {{1, 3}, {2, 3}, {3, 3}}) +end