Skip to content

Commit

Permalink
wal: fix transaction boundaries for replicated transactions
Browse files Browse the repository at this point in the history
Transaction boundaries were not updated correctly for transactions in
which local space writes were made from a replication trigger. Existing
transaction boundaries and row flags from the master were written as is
on the replica. Actually, the replica should recalculate transaction
boundaries and even WAIT_SYNC/WAIT_ACK flags.

Transaction boundaries should be recalculated when a replica appends a
local write at the end of the master's transaction, and
WAIT_SYNC/WAIT_ACK should be overwritten when nopifying synchronous
transactions coming from an old term.

The latter fix has uncovered the bug in skipping outdated synchronous
transactions: if one replica replaces a transaction from an old term
with NOPs and then passes that transaction to the other replica, the
other replica raises a split brain error. It believes the NOPs are an
async transaction form an old term. This worked before the fix, because
the rows were written with the original WAIT_ACK = true bit. Now this
is fixed properly: we allow fully NOP async tranasctions from the old
term.

Closes tarantool#8746

NO_DOC=bugfix
NO_CHANGELOG=covered by the next commit
  • Loading branch information
sergepetrenko committed Sep 8, 2023
1 parent ae5964a commit 0878c30
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 22 deletions.
12 changes: 9 additions & 3 deletions src/box/applier.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1536,15 +1536,21 @@ applier_synchro_filter_tx(struct stailq *rows)
*/
struct xrow_header *last_row =
&stailq_last_entry(rows, struct applier_tx_row, next)->row;
struct applier_tx_row *item;
if (!last_row->wait_sync) {
if (iproto_type_is_dml(last_row->type) &&
txn_limbo.owner_id != REPLICA_ID_NIL) {
if (!iproto_type_is_dml(last_row->type) ||
txn_limbo.owner_id == REPLICA_ID_NIL) {
return;
}
stailq_foreach_entry(item, rows, next) {
row = &item->row;
if (row->type == IPROTO_NOP)
continue;
tnt_raise(ClientError, ER_SPLIT_BRAIN,
"got an async transaction from an old term");
}
return;
}
struct applier_tx_row *item;
stailq_foreach_entry(item, rows, next) {
row = &item->row;
row->type = IPROTO_NOP;
Expand Down
38 changes: 19 additions & 19 deletions src/box/wal.c
Original file line number Diff line number Diff line change
Expand Up @@ -958,10 +958,10 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
{
int64_t tsn = 0;
struct xrow_header **start = entry->rows;
struct xrow_header **end = entry->rows + entry->n_rows;
struct xrow_header **end = entry->rows + entry->n_rows - 1;
struct xrow_header **first_glob_row = entry->rows;
/** Assign LSN to all local rows. */
for (struct xrow_header **row = start; row < end; row++) {
for (struct xrow_header **row = start; row <= end; row++) {
if ((*row)->replica_id == 0) {
/*
* All rows representing local space data
Expand All @@ -976,23 +976,6 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,

(*row)->lsn = vclock_inc(vclock_diff, (*row)->replica_id) +
vclock_get(base, (*row)->replica_id);
/*
* Use lsn of the first global row as
* transaction id.
*/
if ((*row)->group_id != GROUP_LOCAL && tsn == 0) {
tsn = (*row)->lsn;
/*
* Remember the tail being processed.
*/
first_glob_row = row;
}
(*row)->tsn = tsn == 0 ? (*start)->lsn : tsn;
/* Tx meta is stored in the last tx row. */
if (row == end - 1) {
(*row)->flags = entry->flags;
(*row)->is_commit = true;
}
} else {
int64_t diff = (*row)->lsn - vclock_get(base, (*row)->replica_id);
if (diff <= vclock_get(vclock_diff,
Expand All @@ -1009,7 +992,21 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
} else {
vclock_follow(vclock_diff, (*row)->replica_id, diff);
}
/* Reset row flags taken from the remote instance. */
(*row)->flags = 0;
}
/*
* Use lsn of the first global row as
* transaction id.
*/
if (tsn == 0 && (*row)->group_id != GROUP_LOCAL) {
tsn = (*row)->lsn;
/*
* Remember the tail being processed.
*/
first_glob_row = row;
}
(*row)->tsn = tsn == 0 ? (*start)->lsn : tsn;
}

/*
Expand All @@ -1019,6 +1016,9 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
*/
for (struct xrow_header **row = start; row < first_glob_row; row++)
(*row)->tsn = tsn;
/* Tx meta is stored in the last tx row. */
(*end)->flags = entry->flags;
(*end)->is_commit = true;
}

static void
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
local t = require('luatest')
local server = require('luatest.server')
local replica_set = require('luatest.replica_set')

local g = t.group('gh-8746-transaction-boundaries')

g.before_each(function(cg)
cg.replica_set = replica_set:new{}
end)

g.after_each(function(cg)
cg.replica_set:drop()
end)

local function prepare(cg)
cg.replica_set:start()
cg.master:exec(function()
box.schema.space.create('test')
box.space.test:create_index('pk')
box.schema.space.create('loc', {is_local = true})
box.space.loc:create_index('pk')
end)
cg.replica:wait_for_vclock_of(cg.master)
cg.replica:exec(function()
box.space.test:on_replace(function(_, new)
box.space.loc:replace(new)
end)
end)
cg.master:exec(function()
box.space.test:replace{1}
end)
cg.replica:wait_for_vclock_of(cg.master)
end

g.before_test('test_replica_recovery', function(cg)
cg.master = cg.replica_set:build_and_add_server{
alias = 'master',
box_cfg = {
replication_timeout = 0.1,
},
}
cg.replica = cg.replica_set:build_and_add_server{
alias = 'replica',
box_cfg = {
read_only = true,
replication = server.build_listen_uri('master', cg.replica_set.id),
replication_timeout = 0.1,
},
}
end)

g.test_replica_recovery = function(cg)
prepare(cg)
cg.replica:restart()
cg.replica:wait_for_vclock_of(cg.master)
end

0 comments on commit 0878c30

Please sign in to comment.