diff --git a/changelogs/unreleased/gh-8505-synchro-triggers-fail.md b/changelogs/unreleased/gh-8505-synchro-triggers-fail.md new file mode 100644 index 000000000000..9581aa8a46b9 --- /dev/null +++ b/changelogs/unreleased/gh-8505-synchro-triggers-fail.md @@ -0,0 +1,5 @@ +## bugfix/core + +* Fixed a bug causing the `ER_CURSOR_NO_TRANSACTION` failure for transactions + on synchronous spaces when the `on_commit/on_rollback` triggers are set + (gh-8505). diff --git a/src/box/txn.c b/src/box/txn.c index 8a93e64239f3..2da9e2133d74 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -751,6 +751,7 @@ txn_complete_fail(struct txn *txn) assert(!txn_has_flag(txn, TXN_IS_DONE)); assert(txn->signature < 0); assert(txn->signature != TXN_SIGNATURE_UNKNOWN); + assert(in_txn() == txn); if (txn->limbo_entry != NULL) { assert(txn_has_flag(txn, TXN_WAIT_SYNC)); txn_limbo_abort(&txn_limbo, txn->limbo_entry); @@ -783,6 +784,7 @@ txn_complete_success(struct txn *txn) assert(!txn_has_flag(txn, TXN_IS_DONE)); assert(!txn_has_flag(txn, TXN_WAIT_SYNC)); assert(txn->signature >= 0); + assert(in_txn() == txn); txn->status = TXN_COMMITTED; if (txn->engine != NULL) engine_commit(txn->engine, txn); diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c index 2d838413f6e1..463ba661ffcb 100644 --- a/src/box/txn_limbo.c +++ b/src/box/txn_limbo.c @@ -72,6 +72,22 @@ txn_limbo_is_ro(struct txn_limbo *limbo) (limbo->owner_id != instance_id || txn_limbo_is_frozen(limbo)); } +void +txn_limbo_complete(struct txn *txn, bool is_success) +{ + /* + * Some rollback/commit triggers require the in_txn fiber + * variable to be set. + */ + assert(in_txn() == NULL); + fiber_set_txn(fiber(), txn); + if (is_success) + txn_complete_success(txn); + else + txn_complete_fail(txn); + fiber_set_txn(fiber(), NULL); +} + struct txn_limbo_entry * txn_limbo_last_synchro_entry(struct txn_limbo *limbo) { @@ -287,7 +303,7 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) e->txn->limbo_entry = NULL; txn_limbo_abort(limbo, e); txn_clear_flags(e->txn, TXN_WAIT_SYNC | TXN_WAIT_ACK); - txn_complete_fail(e->txn); + txn_limbo_complete(e->txn, false); if (e == entry) break; fiber_wakeup(e->txn->fiber); @@ -461,7 +477,7 @@ txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn) * after the affected transactions. */ assert(e->txn->signature >= 0); - txn_complete_success(e->txn); + txn_limbo_complete(e->txn, true); } /* * Track CONFIRM lsn on replica in order to detect split-brain by @@ -514,7 +530,7 @@ txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn) assert(e->txn->signature >= 0); e->txn->signature = TXN_SIGNATURE_SYNC_ROLLBACK; e->txn->limbo_entry = NULL; - txn_complete_fail(e->txn); + txn_limbo_complete(e->txn, false); if (e == last_rollback) break; } diff --git a/test/replication-luatest/gh_8505_synchro_triggers_test.lua b/test/replication-luatest/gh_8505_synchro_triggers_test.lua new file mode 100644 index 000000000000..205443ec62d0 --- /dev/null +++ b/test/replication-luatest/gh_8505_synchro_triggers_test.lua @@ -0,0 +1,80 @@ +local t = require('luatest') +local replica_set = require('luatest.replica_set') +local server = require('luatest.server') +local proxy = require('luatest.replica_proxy') + +local g = t.group('gh-8505-synchro-triggers') + +g.before_all(function(g) + g.replica_set = replica_set:new({}) + local rs_id = g.replica_set.id + g.box_cfg = { + replication_timeout = 0.01, + election_fencing_mode = 'off', + replication = { + server.build_listen_uri('server1', rs_id), + server.build_listen_uri('server2', rs_id), + }, + } + + g.box_cfg.election_mode = 'candidate' + g.server1 = g.replica_set:build_and_add_server{ + alias = 'server1', + box_cfg = g.box_cfg, + } + + g.proxy1 = proxy:new{ + client_socket_path = server.build_listen_uri('server1_proxy'), + server_socket_path = server.build_listen_uri('server1', rs_id), + } + t.assert(g.proxy1:start{force = true}, 'Proxy from 2 to 1 started') + g.box_cfg.replication[1] = server.build_listen_uri('server1_proxy') + g.box_cfg.election_mode = 'voter' + g.server2 = g.replica_set:build_and_add_server{ + alias = 'server2', + box_cfg = g.box_cfg, + } + + g.replica_set:start() + g.server1:wait_for_election_leader() + g.server1:exec(function() + box.schema.create_space('test', {is_sync = true}):create_index('pk') + end) + g.server2:wait_for_vclock_of(g.server1) +end) + +g.after_all(function(g) + g.replica_set:drop() +end) + +g.test_on_commit_trigger = function(g) + g.server1:exec(function() + box.begin() + box.on_commit(function(iter) iter() end) + box.space.test:upsert({1}, {{'=', 1, 1}}) + box.commit() + end) +end + +g.test_on_rollback_trigger = function(g) + -- Force ACK gathering to fail and cause rollback. It's not enough + -- to set a small timeout, as a transaction can be committed anyway: + -- fibers don't yield so often, compared to such a tiny timeout, ACKs + -- can be processed before the transaction's rollback happens due to + -- a timeout error. So, let's break connection with proxy. + g.server1:update_box_cfg({ replication_synchro_timeout = 1e-9 }) + g.server1:wait_for_election_leader() + g.proxy1:pause() + + g.server1:exec(function() + box.begin() + box.on_rollback(function(iter) iter() end) + box.space.test:upsert({1}, {{'=', 1, 1}}) + local _, err = pcall(box.commit) + t.assert_equals(err.code, box.error.SYNC_QUORUM_TIMEOUT) + end) + + g.proxy1:resume() + g.server1:update_box_cfg({ replication_synchro_timeout = 5 }) + g.server1:wait_for_election_leader() +end