Skip to content

Commit

Permalink
memtx: abort readers of rollbacked prepared
Browse files Browse the repository at this point in the history
There's case when a transaction is rolled back from prepared
state. This happens when WAL fails, synchronized replication
confirmation failure or perhaps in other similar cases. By design
other RW transactions and transactions with READ_COMMITTED
isolation level are allowed to read prepared state. All these
transactions must be aborted in case of rollback of prepared
transaction since they definitely have read no more possible
database state.

This patch implements this abortion.

Closed #8654

NO_DOC=bugfix

(cherry picked from commit 5498690)
  • Loading branch information
alyapunov committed Jun 21, 2023
1 parent fa176d4 commit e1ac9e0
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
## bugfix/core

* Now MVCC engine automatically aborts a transaction if it reads changes
of a prepared transaction and this transaction is aborted (gh-8654).
72 changes: 66 additions & 6 deletions src/box/memtx_tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,21 @@ memtx_tx_adjust_position_in_read_view_list(struct txn *txn)
rlist_add_tail(&prev_txn->in_read_view_txs, &txn->in_read_view_txs);
}

/**
* Mark @a victim as conflicted and abort it.
* Does nothing if the transaction is already aborted.
*/
static void
memtx_tx_abort_with_conflict(struct txn *victim)
{
if (victim->status == TXN_ABORTED)
return;
if (victim->status == TXN_IN_READ_VIEW)
rlist_del(&victim->in_read_view_txs);
victim->status = TXN_ABORTED;
txn_set_flags(victim, TXN_IS_CONFLICTED);
}

/**
* Handle conflict when @a victim has read and @a breaker has written the same
* key, and @a breaker is prepared. The functions must be called in two cases:
Expand Down Expand Up @@ -781,10 +796,7 @@ memtx_tx_handle_conflict(struct txn *breaker, struct txn *victim)
memtx_tx_adjust_position_in_read_view_list(victim);
} else {
/* Mark as conflicted. */
if (victim->status == TXN_IN_READ_VIEW)
rlist_del(&victim->in_read_view_txs);
victim->status = TXN_ABORTED;
txn_set_flags(victim, TXN_IS_CONFLICTED);
memtx_tx_abort_with_conflict(victim);
}
}

Expand Down Expand Up @@ -2174,6 +2186,18 @@ memtx_tx_history_remove_story_del_stmts(struct memtx_story *story)
}
}

/*
* Abort with conflict all transactions that have read @a story.
*/
static void
memtx_tx_abort_story_readers(struct memtx_story *story)
{
struct tx_read_tracker *tracker, *tmp;
rlist_foreach_entry_safe(tracker, &story->reader_list,
in_reader_list, tmp)
memtx_tx_abort_with_conflict(tracker->reader);
}

/*
* Rollback addition of story by statement.
*/
Expand All @@ -2185,7 +2209,8 @@ memtx_tx_history_rollback_added_story(struct txn_stmt *stmt)

/*
* In case of rollback of prepared statement we need to rollback
* preparation actions.
* preparation actions and abort other transactions that managed
* to read this prepared state.
*/
if (stmt->txn->psn != 0) {
/*
Expand Down Expand Up @@ -2225,6 +2250,12 @@ memtx_tx_history_rollback_added_story(struct txn_stmt *stmt)
add_story->add_psn = 0;
if (del_story != NULL)
del_story->del_psn = 0;

/*
* If a transaction managed to read this story it must
* be aborted.
*/
memtx_tx_abort_story_readers(add_story);
}

/* Unlink stories from the statement. */
Expand All @@ -2250,6 +2281,28 @@ memtx_tx_history_rollback_added_story(struct txn_stmt *stmt)
add_story->del_psn = MEMTX_TX_ROLLBACKED_PSN;
}

/*
* Abort with conflict all transactions that have read absence of @a story.
*/
static void
memtx_tx_abort_gap_readers(struct memtx_story *story)
{
for (uint32_t i = 0; i < story->index_count; i++) {
/*
* We rely on the fact that all gap trackers are stored in the
* top story of history chain.
*/
struct memtx_story *top = memtx_tx_story_find_top(story, i);
struct gap_item_base *item, *tmp;
rlist_foreach_entry_safe(item, &top->link[i].read_gaps,
in_read_gaps, tmp) {
if (item->type != GAP_INPLACE)
continue;
memtx_tx_abort_with_conflict(item->txn);
}
}
}

/*
* Rollback deletion of story by statement.
*/
Expand All @@ -2260,7 +2313,8 @@ memtx_tx_history_rollback_deleted_story(struct txn_stmt *stmt)

/*
* In case of rollback of prepared statement we need to rollback
* preparation actions.
* preparation actions and abort other transactions that managed
* to read this prepared state.
*/
if (stmt->txn->psn != 0) {
/*
Expand Down Expand Up @@ -2288,6 +2342,12 @@ memtx_tx_history_rollback_deleted_story(struct txn_stmt *stmt)

/* Revert psn assignment. */
del_story->del_psn = 0;

/*
* If a transaction managed to read absence this story it must
* be aborted.
*/
memtx_tx_abort_gap_readers(del_story);
}

/* Unlink the story from the statement. */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
local server = require('luatest.server')
local t = require('luatest')

local pg = t.group(nil, t.helpers.matrix({engine = {'memtx', 'vinyl'},
idx = {0, 1}}))

pg.before_all(function(cg)
cg.server = server:new{
alias = 'default',
box_cfg = {memtx_use_mvcc_engine = true}
}
cg.server:start()
end)

pg.after_all(function(cg)
cg.server:drop()
end)

pg.before_each(function(cg)
cg.server:exec(function(engine)
local s = box.schema.create_space('test', {engine = engine})
s:create_index('pk')
s:create_index('sk', {parts = {{2, 'uint'}}})
end, {cg.params.engine})
end)

pg.after_each(function(cg)
cg.server:exec(function()
box.space.test:drop()
end)
end)

-- Check that if transaction with insertion is prepared and then aborted,
-- its readers are aborted too.
pg.test_abort_readers_of_insertion = function(cg)
t.tarantool.skip_if_not_debug()
cg.server:exec(function(idx)
local txn_proxy = require('test.box.lua.txn_proxy')
local fiber = require('fiber')

local tx1 = txn_proxy.new()
local tx2 = txn_proxy.new()
tx1:begin()
tx2:begin()
tx1('box.space.test:replace{1, 1}')
tx2('box.space.test:replace{2, 2}')
box.error.injection.set('ERRINJ_WAL_WRITE', true)
local fib = fiber.create(tx1.commit, tx1)
fib:set_joinable(true)
-- select by any index must read {1, 1} and lead to the same behavior.
t.assert_equals(tx2('box.space.test.index[' .. idx ..']:select{1}'),
{{{1, 1}}})
fib:join()
box.error.injection.set('ERRINJ_WAL_WRITE', false)
t.assert_equals(tx2:commit(),
{{error = "Transaction has been aborted by conflict"}})
end, {cg.params.idx})
end

-- Check that if transaction with deletion is prepared and then aborted,
-- its readers are aborted too.
pg.test_abort_readers_of_deletion = function(cg)
t.tarantool.skip_if_not_debug()
cg.server:exec(function(idx)
local txn_proxy = require('test.box.lua.txn_proxy')
local fiber = require('fiber')

box.space.test:replace{1, 1}
local tx1 = txn_proxy.new()
local tx2 = txn_proxy.new()
tx1:begin()
tx2:begin()
tx1('box.space.test:delete{1}')
tx2('box.space.test:replace{2, 2}')
box.error.injection.set('ERRINJ_WAL_WRITE', true)
local fib = fiber.create(tx1.commit, tx1)
fib:set_joinable(true)
-- select by any index must read {} and lead to the same behavior.
t.assert_equals(tx2('box.space.test.index[' .. idx ..']:select{1}'),
{{}})
fib:join()
box.error.injection.set('ERRINJ_WAL_WRITE', false)
t.assert_equals(tx2:commit(),
{{error = "Transaction has been aborted by conflict"}})
end, {cg.params.idx})
end

0 comments on commit e1ac9e0

Please sign in to comment.