Skip to content

Commit

Permalink
box: fix force recovery for transactions with local rows
Browse files Browse the repository at this point in the history
Force recovery first tries to collect all rows of a transaction into a
single list, and only then applies those rows.

The problem was that it collected rows based on the row replica_id. For
local rows replica_id is set to 0, but actually such rows can be part
of a transaction coming from any instance.

Fix recovery of such rows

Follow-up tarantool#8746
Follow-up tarantool#7932

NO_DOC=bugfix
NO_CHANGELOG=the broken behaviour couldn't be seen due to bug tarantool#8746
  • Loading branch information
sergepetrenko committed Sep 13, 2023
1 parent 7921409 commit ad42cb4
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 6 deletions.
24 changes: 18 additions & 6 deletions src/box/box.cc
Expand Up @@ -917,14 +917,26 @@ wal_stream_apply_mixed_dml_row(struct wal_stream *stream,
struct xrow_header *row)
{
/*
* A local transaction should not be written to nodes_rows with
* id equal to 0. It should be written to nodes_rows with the id
* of the node from which we are recovering. Since a local
* transaction can only exist on the node from which we are
* recovering.
* A local row can be part of any node's transaction. Try to find the
* right transaction by tsn. If this fails, assume the row belongs to
* this node.
*/
uint32_t id = row->replica_id ? row->replica_id : instance_id;
uint32_t id;
struct rlist *nodes_rows = stream->nodes_rows;
if (row->replica_id == 0) {
id = instance_id;
for (uint32_t i = 0; i < VCLOCK_MAX; i++) {
struct wal_row *tmp =
rlist_last_entry(&nodes_rows[i], typeof(*tmp),
in_row_list);
if (tmp->row.tsn == row->tsn) {
id = i;
break;
}
}
} else {
id = row->replica_id;
}

struct wal_row *save_row = wal_stream_save_row(stream, row);
rlist_add_tail_entry(&nodes_rows[id], save_row, in_row_list);
Expand Down
Expand Up @@ -15,6 +15,7 @@ end)
local function prepare(cg)
cg.replica_set:start()
cg.master:exec(function()
box.ctl.wait_rw()
box.schema.space.create('test')
box.space.test:create_index('pk')
box.schema.space.create('loc', {is_local = true})
Expand Down Expand Up @@ -80,3 +81,45 @@ g.test_replication_to_master = function(cg)
cg.master:assert_follows_upstream(cg.replica:get_instance_id())
end)
end

g.before_test('test_replica_force_recovery', function(cg)
cg.master = cg.replica_set:build_and_add_server{
alias = 'master',
box_cfg = {
replication_timeout = 0.1,
},
}
cg.replica = cg.replica_set:build_and_add_server{
alias = 'replica',
box_cfg = {
read_only = true,
replication = server.build_listen_uri('master', cg.replica_set.id),
replication_timeout = 0.1,
force_recovery = true,
},
}
end)

g.test_replica_force_recovery = function(cg)
prepare(cg)
cg.master:exec(function()
box.begin()
box.space.test:replace{2}
box.space.test:replace{3}
box.commit()
end)
cg.replica:wait_for_vclock_of(cg.master)
cg.replica:exec(function()
box.cfg{read_only = false}
box.begin()
box.space.test:replace{4}
box.space.loc:replace{5}
box.space.test:replace{6}
box.commit()
end)
cg.replica:restart()
cg.replica:exec(function()
t.assert_equals(box.space.test:select{}, {{1}, {2}, {3}, {4}, {6}})
t.assert_equals(box.space.loc:select{}, {{1}, {2}, {3}, {4}, {5}, {6}})
end)
end

0 comments on commit ad42cb4

Please sign in to comment.