From 40caa163149f95855a8334eb749cf2396bbd374e Mon Sep 17 00:00:00 2001 From: Yan Shtunder Date: Tue, 16 May 2023 20:25:44 +0300 Subject: [PATCH] replication: recovery mixed transacrtions See the docbot request for details. Closes #7932 @TarantoolBot document Title: correct recovery of mixed transactions Transaction boundaries in WAL were introduced in 2.1.2 (`872d6f1`), but replication applier started to apply transaction atomically only in 2.2.1(`e94ba2e`), so within this range it is possible that transactions are mixed in WAL. There was no problem with them until 2.8.1 because recovering process was still working row-by-row, but in 2.8.1 (`9311113`) it became transactional as well so starting from 2.8.1 tarantool fails to handle mixed transactions from WAL. Mixed transactions is obviously the one of unexpected scenarios in modern tarantool, so we should not handle it in normal mode, and we take care of it only in force recovery mode. But current 'force recovery' treats all the rows of the unexpected transaction as unexpected and thus just skips the entire transaction row-by-row. However we have enough information in this case to handle it more data-friendly. Note: Let there be two nodes (`node#1` and `node#2`). And let the data be replicated from `node#1` to `node#2`. Suppose that at some point in time, `node#1` is restoring data from an xlog containing mixed transactions. To replicate data from `node#1` to `node#2`, do the following: 1. Stop `node#2` and delete all xlog files from it 2. Restart `node#1` by setting `force_recovery` to `true` 3. Make `node#2` rejoin to `node#1` again (gh-7932). --- .../gh-7932-recovery-mixed-transactions.md | 13 ++ src/box/box.cc | 181 +++++++++++++++++- .../bad_xlog/00000000000000000000.xlog | Bin 0 -> 597 bytes .../good_xlog/00000000000000000000.xlog | Bin 0 -> 646 bytes .../00000000000000000000.xlog | Bin 0 -> 595 bytes ..._7932_recovery_mixed_transactions_test.lua | 127 ++++++++++++ 6 files changed, 314 insertions(+), 7 deletions(-) create mode 100644 changelogs/unreleased/gh-7932-recovery-mixed-transactions.md create mode 100644 test/replication-luatest/gh_7932_data/bad_xlog/00000000000000000000.xlog create mode 100644 test/replication-luatest/gh_7932_data/good_xlog/00000000000000000000.xlog create mode 100644 test/replication-luatest/gh_7932_data/not_finished_xlog/00000000000000000000.xlog create mode 100644 test/replication-luatest/gh_7932_recovery_mixed_transactions_test.lua diff --git a/changelogs/unreleased/gh-7932-recovery-mixed-transactions.md b/changelogs/unreleased/gh-7932-recovery-mixed-transactions.md new file mode 100644 index 000000000000..a70b420da841 --- /dev/null +++ b/changelogs/unreleased/gh-7932-recovery-mixed-transactions.md @@ -0,0 +1,13 @@ +## feature/replication + +* Implemented correct recovery of mixed transactions. To do this, set + `box.cfg.force_recovery` to `true`. If you need to revert to the old + behavior, don't set the `force_recovery` option. + +* Let there be two nodes (`node#1` and `node#2`). And let the data be + replicated from `node#1` to `node#2`. Suppose that at some point in time, + `node#1` is restoring data from an xlog containing mixed transactions. To + replicate data from `node#1` to `node#2`, do the following: + 1. Stop `node#2` and delete all xlog files from it + 2. Restart `node#1` by setting `force_recovery` to `true` + 3. Make `node#2` rejoin to `node#1` again (gh-7932). diff --git a/src/box/box.cc b/src/box/box.cc index 4ea445c3710b..1507e75319fa 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -95,6 +95,7 @@ #include "wal_ext.h" #include "mp_util.h" #include "small/static.h" +#include "memory.h" static char status[64] = "unconfigured"; @@ -549,6 +550,10 @@ box_set_orphan(bool orphan) struct wal_stream { /** Base class. */ struct xstream base; + /** The lsregion for allocating rows. */ + struct lsregion lsr; + /** The array of lists keeping rows from xlog. */ + struct rlist nodes_rows[VCLOCK_MAX]; /** Current transaction ID. 0 when no transaction. */ int64_t tsn; /** @@ -634,18 +639,43 @@ wal_stream_abort(struct wal_stream *stream) * checking even in release. */ static bool -wal_stream_has_tx(const struct wal_stream *stream) +wal_stream_has_tx_in_progress(const struct wal_stream *stream) { bool has = stream->tsn != 0; assert(has == (in_txn() != NULL)); return has; } +/** + * The wrapper exists only for the debug purposes, to ensure tsn being non-0 is + * in sync with the fiber's txn being non-NULL. In addition, there is a check + * for the completion of the transaction.It has nothing to do with the journal + * content, and therefore can use assertions instead of rigorous error checking + * even in release. + */ +static bool +wal_stream_has_unfinished_tx(const struct wal_stream *stream) +{ + bool has = stream->tsn != 0; + assert(has == (in_txn() != NULL)); + + bool unfinished = false; + const struct rlist *nodes_rows = stream->nodes_rows; + for (int i = 0; i < VCLOCK_MAX; i++) { + if (!rlist_empty((struct rlist *)&nodes_rows[i])) { + unfinished = true; + break; + } + } + + return has || unfinished; +} + static int wal_stream_apply_synchro_row(struct wal_stream *stream, struct xrow_header *row) { assert(iproto_type_is_synchro_request(row->type)); - if (wal_stream_has_tx(stream)) { + if (wal_stream_has_tx_in_progress(stream)) { diag_set(XlogError, "found synchro request in a transaction"); return -1; } @@ -661,7 +691,7 @@ static int wal_stream_apply_raft_row(struct wal_stream *stream, struct xrow_header *row) { assert(iproto_type_is_raft_request(row->type)); - if (wal_stream_has_tx(stream)) { + if (wal_stream_has_tx_in_progress(stream)) { diag_set(XlogError, "found raft request in a transaction"); return -1; } @@ -735,7 +765,7 @@ wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row) } stream->has_global_row = true; } - assert(wal_stream_has_tx(stream)); + assert(wal_stream_has_tx_in_progress(stream)); /* Nops might appear at least after before_replace skipping rows. */ if (request.type != IPROTO_NOP) { struct space *space = space_cache_find(request.space_id); @@ -790,6 +820,128 @@ wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row) return -1; } +/* Storage, adding rows to a rlist to separate mixed transactions. */ +struct wal_row { + /* Base class. */ + struct xrow_header row; + /* A link on the list of rows stacked in the nodes_rows array. */ + struct rlist in_row_list; + /* A growing identifier to track lsregion allocations. */ + int64_t lsr_id; +}; + +/* + * Callback to stash row and row bodies upon receipt, used to recover + * mixed transactions. + */ +static struct wal_row * +wal_stream_save_row(struct wal_stream *stream, struct xrow_header *row) +{ + static int64_t lsr_id = 0; + struct lsregion *lsr = &stream->lsr; + struct wal_row *new_row = + (struct wal_row *)xlsregion_alloc(lsr, sizeof(struct wal_row), + ++lsr_id); + new_row->lsr_id = lsr_id; + memcpy(new_row, row, sizeof(struct xrow_header)); + + assert(new_row->row.bodycnt <= 1); + if (new_row->row.bodycnt == 1) { + size_t len = new_row->row.body[0].iov_len; + char *new_base = (char *)xlsregion_alloc(lsr, len, lsr_id); + memcpy(new_base, new_row->row.body[0].iov_base, len); + /* Adjust row body pointers. */ + new_row->row.body[0].iov_base = new_base; + } + return new_row; +} + +/* + * Find the min lsr_id that is still needed. + */ +static int64_t +find_min_lsr_id(struct wal_stream *stream) +{ + int64_t min_lsr_id = stream->base.row_count; + struct rlist *nodes_rows = stream->nodes_rows; + struct wal_row *item; + /* + * For each new row, lsr_id is incremented by 1. Only row from + * different replica_id can get mixed, so for each replica_id, + * the very first row has the smallest lsr_id of all row with that + * replica_id. Thus, for each transaction read to the end, to + * iterate through the nodes_rows array of lists and see which of + * the first `row` for each `replica_id` has the lowest lsr_id. + * This lsr_id is still needed, but anything less than this is not. + * So can free everything up to this lsr_id. + */ + for (uint32_t i = 0; i < VCLOCK_MAX; i++) { + if (!rlist_empty(&nodes_rows[i])) { + item = rlist_first_entry(&nodes_rows[i], wal_row, + in_row_list); + if (item->lsr_id < min_lsr_id) + min_lsr_id = item->lsr_id - 1; + } + } + return min_lsr_id; +} + +/* + * Deallocating memory for the wal_row + */ +static void +wal_stream_gc(struct wal_stream *stream) +{ + struct lsregion *lsr = &stream->lsr; + int64_t lsr_id = find_min_lsr_id(stream); + lsregion_gc(lsr, lsr_id); +} + +/* + * When restoring the log is read row-by-row. However, it is necessary + * to store the rows for correct further recovery. For example, with + * mixed transactions. The following function saves the newly arrived + * row to the rlist, which is stored in array is nodes_rows in cell is + * replica_id. As soon as a row arrives with the is_commit flag set, the + * corresponding transaction tries to apply. If an error occurs the + * transaction is rolled back. And we continue to work with the next + * transaction. + */ +static int +wal_stream_apply_mixed_dml_row(struct wal_stream *stream, + struct xrow_header *row) +{ + int rc = 0; + uint32_t id = row->replica_id; + struct rlist *nodes_rows = stream->nodes_rows; + + /* + * When a new row arrives, it has the same address. Because the + * row is on the stack. Therefore, it needs to allocate memory on + * the heap. We do with row->body[0].iov_base analogically. + */ + struct wal_row *save_row = wal_stream_save_row(stream, row); + rlist_add_tail_entry(&nodes_rows[id], save_row, in_row_list); + + if (!row->is_commit) { + return rc; + } else { + struct wal_row *item, *next_row; + rlist_foreach_entry_safe(item, &nodes_rows[id], + in_row_list, next_row) { + rlist_del_entry(item, in_row_list); + if (rc != 0) + continue; + rc = wal_stream_apply_dml_row(stream, &item->row); + } + /* deallocating the memory */ + wal_stream_gc(stream); + assert(rlist_empty(&nodes_rows[id])); + + return rc; + } +} + /** * Yield once in a while, but not too often, mostly to allow signal handling to * take place. @@ -797,7 +949,7 @@ wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row) static void wal_stream_try_yield(struct wal_stream *stream) { - if (wal_stream_has_tx(stream) || !stream->has_yield) + if (wal_stream_has_tx_in_progress(stream) || !stream->has_yield) return; stream->has_yield = false; fiber_sleep(0); @@ -814,6 +966,9 @@ wal_stream_apply_row(struct xstream *base, struct xrow_header *row) } else if (iproto_type_is_raft_request(row->type)) { if (wal_stream_apply_raft_row(stream, row) != 0) goto end_error; + } else if (box_is_force_recovery) { + if (wal_stream_apply_mixed_dml_row(stream, row) != 0) + goto end_error; } else if (wal_stream_apply_dml_row(stream, row) != 0) { goto end_error; } @@ -843,12 +998,23 @@ wal_stream_create(struct wal_stream *ctx) { xstream_create(&ctx->base, wal_stream_apply_row, wal_stream_schedule_yield); + lsregion_create(&ctx->lsr, &runtime); + for (int i = 0; i < VCLOCK_MAX; i++) { + rlist_create(&ctx->nodes_rows[i]); + } ctx->tsn = 0; ctx->first_row_lsn = 0; ctx->has_yield = false; ctx->has_global_row = false; } +static void +wal_stream_destroy(struct wal_stream *ctx) +{ + lsregion_destroy(&ctx->lsr); + memset(ctx, 0, sizeof(*ctx)); +} + /* {{{ configuration bindings */ /* @@ -4572,6 +4738,7 @@ local_recovery(const struct tt_uuid *instance_uuid, wal_stream_create(&wal_stream); auto stream_guard = make_scoped_guard([&]{ wal_stream_abort(&wal_stream); + wal_stream_destroy(&wal_stream); }); struct recovery *recovery = recovery_new( wal_dir(), box_is_force_recovery, checkpoint_vclock); @@ -4657,7 +4824,7 @@ local_recovery(const struct tt_uuid *instance_uuid, engine_begin_final_recovery_xc(); recover_remaining_wals(recovery, &wal_stream.base, NULL, false); - if (wal_stream_has_tx(&wal_stream)) { + if (wal_stream_has_unfinished_tx(&wal_stream)) { diag_set(XlogError, "found a not finished transaction " "in the log"); wal_stream_abort(&wal_stream); @@ -4684,7 +4851,7 @@ local_recovery(const struct tt_uuid *instance_uuid, } recovery_stop_local(recovery); recover_remaining_wals(recovery, &wal_stream.base, NULL, true); - if (wal_stream_has_tx(&wal_stream)) { + if (wal_stream_has_unfinished_tx(&wal_stream)) { diag_set(XlogError, "found a not finished transaction " "in the log in hot standby mode"); wal_stream_abort(&wal_stream); diff --git a/test/replication-luatest/gh_7932_data/bad_xlog/00000000000000000000.xlog b/test/replication-luatest/gh_7932_data/bad_xlog/00000000000000000000.xlog new file mode 100644 index 0000000000000000000000000000000000000000..3d28648ff2705d8cbcc13da92fadfd0792c01f92 GIT binary patch literal 597 zcma#>@ptDk&@(jV3QH|2&dkrVQZUjp)HBvK&`meCNHI?_OEyj6^2{qPNz6-5wNfxl zG_x>GH8a*VGD%L+HAzc0)wMJ=HquQ>voK9FGqFrHNKE1ibI!?6&bCshuI1vox{G_Y zBEvbK>V_-J86cpAftiVsnUUqR<4wlg2+l2vO#){bEfl9RGAt>|&q-xw0qN0TICtH9 z-a~{Q7N8y`h@R-;-@jUbdIUrn6(%mrO{|E|Oi`RTd7(BFBS@mQG6A(R z!?ga_0$C@aIQ=XW1LKmC)Z&t*xv9A&6$}j%K$`6s&YfMKjHa0ds(Ibj5RhgC#c4pz z49g3OGIJA)DwmWLrKUD6E6vL+EKNN;aT3Fd(!Ao#^t{v*kbyD`=Sr6Bc7VC273g_p znCB-(TYu+Z02;}}pg4(<8LGVI&c!9D%Ax+A7;O{9!3b5(1XeD`aBlY2)B8ilmw*WCwv-ki2 literal 0 HcmV?d00001 diff --git a/test/replication-luatest/gh_7932_data/good_xlog/00000000000000000000.xlog b/test/replication-luatest/gh_7932_data/good_xlog/00000000000000000000.xlog new file mode 100644 index 0000000000000000000000000000000000000000..30f6f5bee567e9b8d95fbacef2eebb222f6390a5 GIT binary patch literal 646 zcma#>@ptDk&@(jV3QH|2&dkrVQZUjp)HBvK&`meCNHI?_OEyj6^2{qPNz6-5wNfxl zG_x>GH8a*VGD%L+HAzc0)wMJ=HquQ>voK9FGqFrHNKE1ibI!?6&bCshuI1vox{G_Y zBEvbK>V_-J86cpAftiVsnUUqR<4wlg2+l2vO#){bEfl9RGAt>|&q-xw0qN0TICtH9 z-a~{Q7N8y`h@R-;-@jUbdIUrn6(%mrO{|E|Oi`RTd7(BFBS@mQG6A(R z!?ga_0$C@aIQ=XW1LKmC)Z&t*xv9A&6$}j%K$`6s&YfMKjHa0ds(Ibj5RhgC#c4pz z49g3OGIJA)DwmWLrKUD6E6vL+EKNN;aT3Fd(!Ao#^t{v*kbyD`=Sr6Bc7VC273g_p znCB-(TYu+Z02;}}pg4(<8Khi};aunT9#&-KOw4C%jUX zVF3kH9>u{3HJ=G){_L%(1<1-l<{QJ!pN7x?HXrC9Uw4*iFb!>B_d`PyX#Q0WCQg_Q g%wQW77|vxb_#}X=0c3*-%m!NlWGzfr1=hX=0QJJbZvX%Q literal 0 HcmV?d00001 diff --git a/test/replication-luatest/gh_7932_data/not_finished_xlog/00000000000000000000.xlog b/test/replication-luatest/gh_7932_data/not_finished_xlog/00000000000000000000.xlog new file mode 100644 index 0000000000000000000000000000000000000000..7a36674d6573e13b31b46329781f4c154cf00274 GIT binary patch literal 595 zcma#>@ptDk&@(jV3QH|2&dkrVQZUjp)HBvK&`meCNHI?_OEyj6^2{qPNz6-5wNfxl zG_x>GH8a*VGD%L+HAzc0)wMJ=HquQ>voK9FGqFrHNKE1ibI!?6&bCshuI1vox{G_Y zBEvbK>V_-J86cpAftiVsnUUqR<4wlg2+l2vO#){bEfl9RGAt>|&q-xw0qN0TICtH9 z-a~{Q7N8y`h@R-;-@jUbdIUrn6(%mrO{|E|Oi`RTd7(BFBS@mQG6A(R z!?ga_0$C@aIQ=XW1LKmC)Z&t*xv9A&6$}j%K$`6s&YfMKjHa0ds(Ibj5RhgC#c4pz z49g3OGIJA)DwmWLrKUD6E6vL+EKNN;aT3Fd(!Ao#^t{v*kbyD`=Sr6Bc7VC273g_p znCB-(TYu+Z02;}}pg4(<8Khi};aunT9#&-KOw4C%jUX zVF3kH9>u{3HJ=G){_L%(1<1-l<{QJ!pN7x?HXrC9Uw4*iFb!>B_d`PyX#Q0WCQg_Q L%**1iP**N3ye literal 0 HcmV?d00001 diff --git a/test/replication-luatest/gh_7932_recovery_mixed_transactions_test.lua b/test/replication-luatest/gh_7932_recovery_mixed_transactions_test.lua new file mode 100644 index 000000000000..0bde8a5c160f --- /dev/null +++ b/test/replication-luatest/gh_7932_recovery_mixed_transactions_test.lua @@ -0,0 +1,127 @@ +local t = require('luatest') +local cluster = require('luatest.replica_set') +local fio = require('fio') + +local g = t.group('gh_7932') + +g.before_each(function(cg) + cg.cluster = cluster:new({}) + cg.server = cg.cluster:build_and_add_server{ + alias = 'server', + box_cfg = { + instance_uuid = '1a685e63-24cd-4fc5-9532-bf85f649e0ab', + }, + } + cg.cluster:start() +end) + +g.after_each(function(cg) + cg.cluster:drop() +end) + +g.test_good_xlog_with_force_recovery = function(cg) + --Delete all *.xlogs on the server + cg.server:stop() + local glob = fio.pathjoin(cg.server.workdir, '*.xlog') + local xlog = fio.glob(glob) + for _, file in pairs(xlog) do fio.unlink(file) end + --Copying the prepared log to the server + xlog = 'test/replication-luatest/gh_7932_data/good_xlog/'.. + '00000000000000000000.xlog' + fio.copyfile(xlog, cg.server.workdir) + + cg.server.box_cfg.force_recovery = true + cg.server:start() + -- The first transaction is applied and after the second transaction + -- is applied. + t.assert_equals(cg.server:exec(function() + return box.space.test:select() + end), {{1, 2}, {2, 2}, {3, 2}}) +end + +g.test_good_xlog_without_force_recovery = function(cg) + -- Delete all *.xlogs on the server + cg.server:stop() + local glob = fio.pathjoin(cg.server.workdir, '*.xlog') + local xlog = fio.glob(glob) + for _, file in pairs(xlog) do fio.unlink(file) end + -- Copying the prepared log to the server + xlog = 'test/replication-luatest/gh_7932_data/good_xlog/'.. + '00000000000000000000.xlog' + fio.copyfile(xlog, cg.server.workdir) + + cg.server.box_cfg.force_recovery = false + cg.server:start({wait_until_ready = false}) + local logfile = fio.pathjoin(cg.server.workdir, 'server.log') + -- The transactions aren't applied without force_recovery + t.helpers.retrying({}, function() + t.assert(cg.server:grep_log("error at request", nil, + {filename = logfile}), "{type: 'REPLACE', replica_id: 2, ".. + "lsn: 32050, space_id: 512, index_id: 0, tuple: [1, 2]}") + t.assert(cg.server:grep_log("XlogError", nil, + {filename = logfile}), "found a next transaction with the ".. + "previous one not yet committed") + end) +end + +g.test_bad_xlog_with_force_recovery = function(cg) + --Delete all *.xlogs on the server + cg.server:stop() + local glob = fio.pathjoin(cg.server.workdir, '*.xlog') + local xlog = fio.glob(glob) + for _, file in pairs(xlog) do fio.unlink(file) end + --Copying the prepared log to the server + xlog = 'test/replication-luatest/gh_7932_data/bad_xlog/'.. + '00000000000000000000.xlog' + fio.copyfile(xlog, cg.server.workdir) + + cg.server.box_cfg.force_recovery = true + cg.server:start() + local logfile = fio.pathjoin(cg.server.workdir, 'server.log') + + -- The second transaction cannot be applied because the first global + -- row in the transaction has an LSN/TSN mismatch. + t.helpers.retrying({}, function() + t.assert(cg.server:grep_log("error at request", nil, + {filename = logfile}), "{type: 'REPLACE', replica_id: 2, ".. + "lsn: 32051, space_id: 512, index_id: 0, tuple: [2, 2]}") + t.assert(cg.server:grep_log("skipping row {2: 32052}", nil, + {filename = logfile}), "skipping row") + t.assert(cg.server:grep_log("XlogError", nil, + {filename = logfile}), "found a first global row in a ".. + "transaction with LSN/TSN mismatch") + end) + + -- Only the first transaction is applied + t.assert_equals(cg.server:exec(function() + return box.space.test:select() + end), {{1, 3}, {2, 3}, {3, 3}}) +end + +g.test_not_finished_transaction = function(cg) + --Delete all *.xlogs on the server + cg.server:stop() + local glob = fio.pathjoin(cg.server.workdir, '*.xlog') + local xlog = fio.glob(glob) + for _, file in pairs(xlog) do fio.unlink(file) end + --Copying the prepared log to the server + xlog = 'test/replication-luatest/gh_7932_data/not_finished_xlog/'.. + '00000000000000000000.xlog' + fio.copyfile(xlog, cg.server.workdir) + + cg.server.box_cfg.force_recovery = true + cg.server:start() + local logfile = fio.pathjoin(cg.server.workdir, 'server.log') + + -- The second transaction is not completed + t.helpers.retrying({}, function() + t.assert(cg.server:grep_log("XlogError", nil, + {filename = logfile}), "found a not finished transaction " .. + "in the log") + end) + + -- Only the first transaction is applied + t.assert_equals(cg.server:exec(function() + return box.space.test:select() + end), {{1, 3}, {2, 3}, {3, 3}}) +end