Skip to content

Commit

Permalink
txn: undo commit/rollback triggers when reverting to savepoint
Browse files Browse the repository at this point in the history
When reverting to a savepoint inside a DDL transaction, apart from
undoing changes done by the DDL statements to the system spaces, we also
have to

 - Run rollback triggers installed after the savepoint was set, because
   otherwise changes done to the schema by DDL won't be undone.
 - Remove commit triggers installed after the savepoint, because they
   are not relevant anymore, apparently.

To achieve that, let's remember not only the last transaction statement
at the time when a savepoint is created, but also the state of commit
and rollback triggers. Since in contrast to txn statements, commit and
rollback triggers can actually be deleted from the list, we create dummy
triggers per each savepoint and use them as markers in the trigger
lists. We don't however create dummy triggers if no commit/rollback
triggers is installed, because in that case we would simply need to
clear the trigger lists.

While we are at it, let's also make sure that a rollback trigger doesn't
access an already freed tuple. To do that, we release statement tuples
after running rollback triggers, not before as we used to.

Closes #4364
Closes #4365
  • Loading branch information
locker committed Jul 26, 2019
1 parent 9a504f2 commit 1b76963
Show file tree
Hide file tree
Showing 6 changed files with 252 additions and 7 deletions.
73 changes: 66 additions & 7 deletions src/box/txn.c
Expand Up @@ -52,14 +52,24 @@ fiber_set_txn(struct fiber *fiber, struct txn *txn)
fiber->storage.txn = txn;
}

static void
dummy_trigger_cb(struct trigger *trigger, void *event)
{
(void)trigger;
(void)event;
}

/**
* A savepoint that points to the beginning of a transaction.
* Use it to rollback all statements of any transaction.
*/
static struct txn_savepoint zero_svp = {
/* .in_sub_stmt = */ 0,
/* .has_triggers = */ false,
/* .stmt = */ NULL,
/* .fk_deferred_count = */ 0,
/* .on_commit = */ {RLIST_LINK_INITIALIZER, 0, NULL, NULL},
/* .on_rollback = */ {RLIST_LINK_INITIALIZER, 0, NULL, NULL},
};

/**
Expand All @@ -71,10 +81,31 @@ txn_create_svp(struct txn *txn, struct txn_savepoint *svp)
{
svp->in_sub_stmt = txn->in_sub_stmt;
svp->stmt = stailq_last(&txn->stmts);
svp->has_triggers = txn->has_triggers;
if (svp->has_triggers) {
trigger_create(&svp->on_commit, dummy_trigger_cb, NULL, NULL);
trigger_add(&txn->on_commit, &svp->on_commit);
trigger_create(&svp->on_rollback, dummy_trigger_cb, NULL, NULL);
trigger_add(&txn->on_rollback, &svp->on_rollback);
}
if (txn->psql_txn != NULL)
svp->fk_deferred_count = txn->psql_txn->fk_deferred_count;
}

/**
* Destroy a savepoint that isn't going to be used anymore.
* This function clears dummy commit and rollback triggers
* installed by the savepoint.
*/
static void
txn_destroy_svp(struct txn_savepoint *svp)
{
if (svp->has_triggers) {
trigger_clear(&svp->on_commit);
trigger_clear(&svp->on_rollback);
}
}

static int
txn_add_redo(struct txn *txn, struct txn_stmt *stmt, struct request *request)
{
Expand Down Expand Up @@ -144,6 +175,11 @@ txn_stmt_unref_tuples(struct txn_stmt *stmt)
static void
txn_rollback_to_svp(struct txn *txn, struct txn_savepoint *svp)
{
/*
* Undo changes done to the database by the rolled back
* statements. Don't release the tuples as rollback triggers
* might still need them.
*/
struct txn_stmt *stmt;
struct stailq rollback;
stailq_cut_tail(&txn->stmts, svp->stmt, &rollback);
Expand All @@ -161,10 +197,35 @@ txn_rollback_to_svp(struct txn *txn, struct txn_savepoint *svp)
assert(txn->n_applier_rows > 0);
txn->n_applier_rows--;
}
txn_stmt_unref_tuples(stmt);
stmt->space = NULL;
stmt->row = NULL;
}
/*
* Remove commit and rollback triggers installed after
* the savepoint was set and run the rollback triggers.
*/
RLIST_HEAD(on_commit);
RLIST_HEAD(on_rollback);
if (svp->has_triggers) {
rlist_cut_before(&on_commit, &txn->on_commit,
&svp->on_commit.link);
rlist_cut_before(&on_rollback, &txn->on_rollback,
&svp->on_rollback.link);
} else if (txn->has_triggers) {
rlist_splice(&on_commit, &txn->on_commit);
rlist_splice(&on_rollback, &txn->on_rollback);
txn->has_triggers = false;
}
if (trigger_run(&on_rollback, txn) != 0) {
diag_log();
unreachable();
panic("rollback trigger failed");
}

/* Release the tuples. */
stailq_foreach_entry(stmt, &rollback, next)
txn_stmt_unref_tuples(stmt);

if (txn->psql_txn != NULL)
txn->psql_txn->fk_deferred_count = svp->fk_deferred_count;
}
Expand Down Expand Up @@ -362,6 +423,7 @@ txn_commit_stmt(struct txn *txn, struct request *request)
goto fail;
}
--txn->in_sub_stmt;
txn_destroy_svp(&txn->sub_stmt_begin[txn->in_sub_stmt]);
return 0;
fail:
txn_rollback_stmt(txn);
Expand Down Expand Up @@ -602,8 +664,9 @@ txn_rollback_stmt(struct txn *txn)
{
if (txn == NULL || txn->in_sub_stmt == 0)
return;
txn->in_sub_stmt--;
txn_rollback_to_svp(txn, &txn->sub_stmt_begin[txn->in_sub_stmt]);
struct txn_savepoint *svp = &txn->sub_stmt_begin[--txn->in_sub_stmt];
txn_rollback_to_svp(txn, svp);
txn_destroy_svp(svp);
}

void
Expand Down Expand Up @@ -803,9 +866,5 @@ txn_on_yield(struct trigger *trigger, void *event)
struct txn *txn = in_txn();
assert(txn != NULL && !txn->can_yield);
txn_rollback_to_svp(txn, &zero_svp);
if (txn->has_triggers) {
txn_run_rollback_triggers(txn);
txn->has_triggers = false;
}
txn->is_aborted_by_yield = true;
}
11 changes: 11 additions & 0 deletions src/box/txn.h
Expand Up @@ -89,6 +89,8 @@ struct txn_savepoint {
* creation.
*/
int in_sub_stmt;
/** True if commit/rollback triggers are set. */
bool has_triggers;
/**
* Statement, on which a savepoint is created. On rollback
* to this savepoint all newer statements are rolled back.
Expand All @@ -106,6 +108,15 @@ struct txn_savepoint {
* state violating any number of deferred FK constraints.
*/
uint32_t fk_deferred_count;
/**
* Dummy commit and rollback triggers installed when this
* savepoint was created. They are used on rollback to this
* savepoint in order to remove triggers set after this
* savepoint was created. We don't set the fake triggers
* if the transaction doesn't have any triggers, because
* in that case we have to remove all triggers on rollback.
*/
struct trigger on_commit, on_rollback;
};

extern double too_long_threshold;
Expand Down
39 changes: 39 additions & 0 deletions test/box/transaction.result
Expand Up @@ -778,3 +778,42 @@ box.begin() drop() box.rollback()
box.begin() drop() box.commit()
---
...
--
-- gh-4364: DDL doesn't care about savepoints.
--
test_run:cmd("setopt delimiter ';'")
---
- true
...
box.begin()
s1 = box.schema.create_space('test1')
save = box.savepoint()
s2 = box.schema.create_space('test2')
check1 = box.space.test1 ~= nil
check2 = box.space.test2 ~= nil
box.rollback_to_savepoint(save)
check3 = box.space.test1 ~= nil
check4 = box.space.test2 ~= nil
box.commit();
---
...
test_run:cmd("setopt delimiter ''");
---
- true
...
check1, check2, check3, check4
---
- true
- true
- true
- false
...
--
-- gh-4365: DDL reverted by yield triggers crash.
--
box.begin() box.execute([[CREATE TABLE test(id INTEGER PRIMARY KEY AUTOINCREMENT)]]) fiber.yield()
---
...
box.rollback()
---
...
23 changes: 23 additions & 0 deletions test/box/transaction.test.lua
Expand Up @@ -415,3 +415,26 @@ box.begin() create() box.rollback()
box.begin() create() box.commit()
box.begin() drop() box.rollback()
box.begin() drop() box.commit()

--
-- gh-4364: DDL doesn't care about savepoints.
--
test_run:cmd("setopt delimiter ';'")
box.begin()
s1 = box.schema.create_space('test1')
save = box.savepoint()
s2 = box.schema.create_space('test2')
check1 = box.space.test1 ~= nil
check2 = box.space.test2 ~= nil
box.rollback_to_savepoint(save)
check3 = box.space.test1 ~= nil
check4 = box.space.test2 ~= nil
box.commit();
test_run:cmd("setopt delimiter ''");
check1, check2, check3, check4

--
-- gh-4365: DDL reverted by yield triggers crash.
--
box.begin() box.execute([[CREATE TABLE test(id INTEGER PRIMARY KEY AUTOINCREMENT)]]) fiber.yield()
box.rollback()
74 changes: 74 additions & 0 deletions test/engine/transaction.result
Expand Up @@ -527,3 +527,77 @@ stmts
s:drop()
---
...
--
-- Check that on rollback to a savepoint we remove all commit
-- triggers and run all rollback triggers installed after the
-- savepoint.
--
s = box.schema.space.create('test', {engine = engine})
---
...
_ = s:create_index('pk')
---
...
commit = {}
---
...
rollback = {}
---
...
on_commit1 = function() table.insert(commit, 'on_commit_1') end
---
...
on_commit2 = function() table.insert(commit, 'on_commit_2') end
---
...
on_commit3 = function() table.insert(commit, 'on_commit_3') end
---
...
on_rollback1 = function() table.insert(rollback, 'on_rollback_1') end
---
...
on_rollback2 = function() table.insert(rollback, 'on_rollback_2') end
---
...
on_rollback3 = function() table.insert(rollback, 'on_rollback_3') end
---
...
inspector:cmd("setopt delimiter ';'")
---
- true
...
box.begin()
box.on_commit(on_commit1)
box.on_rollback(on_rollback1)
sv = box.savepoint()
s:insert{1}
box.on_commit(on_commit2)
box.on_rollback(on_rollback2)
s:insert{2}
box.on_commit(on_commit3)
box.on_rollback(on_rollback3)
box.rollback_to_savepoint(sv)
s:insert{3}
box.commit();
---
...
inspector:cmd("setopt delimiter ''");
---
- true
...
commit -- on_commit_1
---
- - on_commit_1
...
rollback -- on_rollback_3, on_rollback_2
---
- - on_rollback_3
- on_rollback_2
...
s:select() -- 3
---
- - [3]
...
s:drop()
---
...
39 changes: 39 additions & 0 deletions test/engine/transaction.test.lua
Expand Up @@ -228,3 +228,42 @@ box.begin() box.on_commit(on_commit) box.commit()
stmts

s:drop()

--
-- Check that on rollback to a savepoint we remove all commit
-- triggers and run all rollback triggers installed after the
-- savepoint.
--
s = box.schema.space.create('test', {engine = engine})
_ = s:create_index('pk')

commit = {}
rollback = {}
on_commit1 = function() table.insert(commit, 'on_commit_1') end
on_commit2 = function() table.insert(commit, 'on_commit_2') end
on_commit3 = function() table.insert(commit, 'on_commit_3') end
on_rollback1 = function() table.insert(rollback, 'on_rollback_1') end
on_rollback2 = function() table.insert(rollback, 'on_rollback_2') end
on_rollback3 = function() table.insert(rollback, 'on_rollback_3') end

inspector:cmd("setopt delimiter ';'")
box.begin()
box.on_commit(on_commit1)
box.on_rollback(on_rollback1)
sv = box.savepoint()
s:insert{1}
box.on_commit(on_commit2)
box.on_rollback(on_rollback2)
s:insert{2}
box.on_commit(on_commit3)
box.on_rollback(on_rollback3)
box.rollback_to_savepoint(sv)
s:insert{3}
box.commit();
inspector:cmd("setopt delimiter ''");

commit -- on_commit_1
rollback -- on_rollback_3, on_rollback_2
s:select() -- 3

s:drop()

0 comments on commit 1b76963

Please sign in to comment.