Skip to content

Commit

Permalink
txn: undo commit/rollback triggers when reverting to savepoint
Browse files Browse the repository at this point in the history
When reverting to a savepoint inside a DDL transaction, apart from
undoing changes done by the DDL statements to the system spaces, we also
have to

 - Run rollback triggers installed after the savepoint was set, because
   otherwise changes done to the schema by DDL won't be undone.
 - Remove commit triggers installed after the savepoint, because they
   are not relevant anymore, apparently.

To achieve that let's append DDL triggers right to txn statements.
This allows us to easily discard commit triggers and run rollback
triggers when a statement is rolled back.

Note, txn commit/rollback triggers are not removed, because they are
still used by applier and Lua box.on_commit/on_rollback functions.

Closes #4364
Closes #4365
  • Loading branch information
locker committed Jul 30, 2019
1 parent 09e9572 commit 7ad7169
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 56 deletions.
90 changes: 45 additions & 45 deletions src/box/alter.cc
Expand Up @@ -895,7 +895,7 @@ alter_space_rollback(struct trigger *trigger, void * /* event */)
* On rollback, the new space is deleted.
*/
static void
alter_space_do(struct txn *txn, struct alter_space *alter)
alter_space_do(struct txn_stmt *stmt, struct alter_space *alter)
{
/**
* AlterSpaceOp::prepare() may perform a potentially long
Expand Down Expand Up @@ -977,8 +977,8 @@ alter_space_do(struct txn *txn, struct alter_space *alter)
* finish or rollback the DDL depending on the results of
* writing to WAL.
*/
txn_on_commit(txn, on_commit);
txn_on_rollback(txn, on_rollback);
txn_stmt_on_commit(stmt, on_commit);
txn_stmt_on_rollback(stmt, on_rollback);
}

/* }}} */
Expand Down Expand Up @@ -1882,7 +1882,7 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event)
*/
struct trigger *on_rollback =
txn_alter_trigger_new(on_create_space_rollback, space);
txn_on_rollback(txn, on_rollback);
txn_stmt_on_rollback(stmt, on_rollback);
if (def->opts.is_view) {
struct Select *select = sql_view_compile(sql_get(),
def->opts.sql);
Expand All @@ -1906,11 +1906,11 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event)
struct trigger *on_commit_view =
txn_alter_trigger_new(on_create_view_commit,
select);
txn_on_commit(txn, on_commit_view);
txn_stmt_on_commit(stmt, on_commit_view);
struct trigger *on_rollback_view =
txn_alter_trigger_new(on_create_view_rollback,
select);
txn_on_rollback(txn, on_rollback_view);
txn_stmt_on_rollback(stmt, on_rollback_view);
select_guard.is_active = false;
}
} else if (new_tuple == NULL) { /* DELETE */
Expand Down Expand Up @@ -1967,10 +1967,10 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event)
++schema_version;
struct trigger *on_commit =
txn_alter_trigger_new(on_drop_space_commit, old_space);
txn_on_commit(txn, on_commit);
txn_stmt_on_commit(stmt, on_commit);
struct trigger *on_rollback =
txn_alter_trigger_new(on_drop_space_rollback, old_space);
txn_on_rollback(txn, on_rollback);
txn_stmt_on_rollback(stmt, on_rollback);
if (old_space->def->opts.is_view) {
struct Select *select =
sql_view_compile(sql_get(),
Expand All @@ -1983,11 +1983,11 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event)
struct trigger *on_commit_view =
txn_alter_trigger_new(on_drop_view_commit,
select);
txn_on_commit(txn, on_commit_view);
txn_stmt_on_commit(stmt, on_commit_view);
struct trigger *on_rollback_view =
txn_alter_trigger_new(on_drop_view_rollback,
select);
txn_on_rollback(txn, on_rollback_view);
txn_stmt_on_rollback(stmt, on_rollback_view);
update_view_references(select, -1, true, NULL);
select_guard.is_active = false;
}
Expand Down Expand Up @@ -2057,7 +2057,7 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event)
alter_space_move_indexes(alter, 0, old_space->index_id_max + 1);
/* Remember to update schema_version. */
(void) new UpdateSchemaVersion(alter);
alter_space_do(txn, alter);
alter_space_do(stmt, alter);
alter_guard.is_active = false;
}
}
Expand Down Expand Up @@ -2295,7 +2295,7 @@ on_replace_dd_index(struct trigger * /* trigger */, void *event)
(void) new MoveCkConstraints(alter);
/* Add an op to update schema_version on commit. */
(void) new UpdateSchemaVersion(alter);
alter_space_do(txn, alter);
alter_space_do(stmt, alter);
scoped_guard.is_active = false;
}

Expand Down Expand Up @@ -2371,7 +2371,7 @@ on_replace_dd_truncate(struct trigger * /* trigger */, void *event)
}

(void) new MoveCkConstraints(alter);
alter_space_do(txn, alter);
alter_space_do(stmt, alter);
scoped_guard.is_active = false;
}

Expand Down Expand Up @@ -2559,7 +2559,7 @@ on_replace_dd_user(struct trigger * /* trigger */, void *event)
def_guard.is_active = false;
struct trigger *on_rollback =
txn_alter_trigger_new(user_cache_remove_user, new_tuple);
txn_on_rollback(txn, on_rollback);
txn_stmt_on_rollback(stmt, on_rollback);
} else if (new_tuple == NULL) { /* DELETE */
access_check_ddl(old_user->def->name, old_user->def->uid,
old_user->def->owner, old_user->def->type,
Expand All @@ -2581,7 +2581,7 @@ on_replace_dd_user(struct trigger * /* trigger */, void *event)
user_cache_delete(uid);
struct trigger *on_rollback =
txn_alter_trigger_new(user_cache_alter_user, old_tuple);
txn_on_rollback(txn, on_rollback);
txn_stmt_on_rollback(stmt, on_rollback);
} else { /* UPDATE, REPLACE */
assert(old_user != NULL && new_tuple != NULL);
/*
Expand All @@ -2597,7 +2597,7 @@ on_replace_dd_user(struct trigger * /* trigger */, void *event)
def_guard.is_active = false;
struct trigger *on_rollback =
txn_alter_trigger_new(user_cache_alter_user, old_tuple);
txn_on_rollback(txn, on_rollback);
txn_stmt_on_rollback(stmt, on_rollback);
}
}

Expand Down Expand Up @@ -2848,7 +2848,7 @@ on_replace_dd_func(struct trigger * /* trigger */, void *event)
def_guard.is_active = false;
func_cache_insert(func);
on_rollback->data = func;
txn_on_rollback(txn, on_rollback);
txn_stmt_on_rollback(stmt, on_rollback);
trigger_run_xc(&on_alter_func, func);
} else if (new_tuple == NULL) { /* DELETE */
uint32_t uid;
Expand Down Expand Up @@ -2876,8 +2876,8 @@ on_replace_dd_func(struct trigger * /* trigger */, void *event)
struct trigger *on_rollback =
txn_alter_trigger_new(on_drop_func_rollback, old_func);
func_cache_delete(old_func->def->fid);
txn_on_commit(txn, on_commit);
txn_on_rollback(txn, on_rollback);
txn_stmt_on_commit(stmt, on_commit);
txn_stmt_on_rollback(stmt, on_rollback);
trigger_run_xc(&on_alter_func, old_func);
} else { /* UPDATE, REPLACE */
assert(new_tuple != NULL && old_tuple != NULL);
Expand Down Expand Up @@ -3062,8 +3062,8 @@ on_replace_dd_collation(struct trigger * /* trigger */, void *event)
coll_id_cache_delete(old_coll_id);
on_rollback->data = old_coll_id;
on_commit->data = old_coll_id;
txn_on_rollback(txn, on_rollback);
txn_on_commit(txn, on_commit);
txn_stmt_on_rollback(stmt, on_rollback);
txn_stmt_on_commit(stmt, on_commit);
} else if (new_tuple != NULL && old_tuple == NULL) {
/* INSERT */
struct trigger *on_rollback =
Expand All @@ -3082,7 +3082,7 @@ on_replace_dd_collation(struct trigger * /* trigger */, void *event)
}
assert(replaced_id == NULL);
on_rollback->data = new_coll_id;
txn_on_rollback(txn, on_rollback);
txn_stmt_on_rollback(stmt, on_rollback);
} else {
/* UPDATE */
assert(new_tuple != NULL && old_tuple != NULL);
Expand Down Expand Up @@ -3335,7 +3335,7 @@ on_replace_dd_priv(struct trigger * /* trigger */, void *event)
grant_or_revoke(&priv);
struct trigger *on_rollback =
txn_alter_trigger_new(revoke_priv, new_tuple);
txn_on_rollback(txn, on_rollback);
txn_stmt_on_rollback(stmt, on_rollback);
} else if (new_tuple == NULL) { /* revoke */
assert(old_tuple);
priv_def_create_from_tuple(&priv, old_tuple);
Expand All @@ -3344,14 +3344,14 @@ on_replace_dd_priv(struct trigger * /* trigger */, void *event)
grant_or_revoke(&priv);
struct trigger *on_rollback =
txn_alter_trigger_new(modify_priv, old_tuple);
txn_on_rollback(txn, on_rollback);
txn_stmt_on_rollback(stmt, on_rollback);
} else { /* modify */
priv_def_create_from_tuple(&priv, new_tuple);
priv_def_check(&priv, PRIV_GRANT);
grant_or_revoke(&priv);
struct trigger *on_rollback =
txn_alter_trigger_new(modify_priv, old_tuple);
txn_on_rollback(txn, on_rollback);
txn_stmt_on_rollback(stmt, on_rollback);
}
}

Expand Down Expand Up @@ -3481,7 +3481,7 @@ on_replace_dd_cluster(struct trigger *trigger, void *event)
struct trigger *on_commit;
on_commit = txn_alter_trigger_new(register_replica,
new_tuple);
txn_on_commit(txn, on_commit);
txn_stmt_on_commit(stmt, on_commit);
}
} else {
/*
Expand All @@ -3496,7 +3496,7 @@ on_replace_dd_cluster(struct trigger *trigger, void *event)
struct trigger *on_commit;
on_commit = txn_alter_trigger_new(unregister_replica,
old_tuple);
txn_on_commit(txn, on_commit);
txn_stmt_on_commit(stmt, on_commit);
}
}

Expand Down Expand Up @@ -3619,7 +3619,7 @@ on_replace_dd_sequence(struct trigger * /* trigger */, void *event)
seq = sequence_new_xc(new_def);
sequence_cache_insert(seq);
on_rollback->data = seq;
txn_on_rollback(txn, on_rollback);
txn_stmt_on_rollback(stmt, on_rollback);
} else if (old_tuple != NULL && new_tuple == NULL) { /* DELETE */
uint32_t id = tuple_field_u32_xc(old_tuple,
BOX_SEQUENCE_DATA_FIELD_ID);
Expand All @@ -3641,8 +3641,8 @@ on_replace_dd_sequence(struct trigger * /* trigger */, void *event)
struct trigger *on_rollback =
txn_alter_trigger_new(on_drop_sequence_rollback, seq);
sequence_cache_delete(seq->def->id);
txn_on_commit(txn, on_commit);
txn_on_rollback(txn, on_rollback);
txn_stmt_on_commit(stmt, on_commit);
txn_stmt_on_rollback(stmt, on_rollback);
} else { /* UPDATE */
new_def = sequence_def_new_from_tuple(new_tuple,
ER_ALTER_SEQUENCE);
Expand All @@ -3655,8 +3655,8 @@ on_replace_dd_sequence(struct trigger * /* trigger */, void *event)
struct trigger *on_rollback =
txn_alter_trigger_new(on_alter_sequence_rollback, seq->def);
seq->def = new_def;
txn_on_commit(txn, on_commit);
txn_on_rollback(txn, on_rollback);
txn_stmt_on_commit(stmt, on_commit);
txn_stmt_on_rollback(stmt, on_rollback);
}

def_guard.is_active = false;
Expand Down Expand Up @@ -3709,7 +3709,7 @@ on_replace_dd_sequence_data(struct trigger * /* trigger */, void *event)
*/
struct trigger *on_rollback = txn_alter_trigger_new(
on_drop_sequence_data_rollback, old_tuple);
txn_on_rollback(txn, on_rollback);
txn_stmt_on_rollback(stmt, on_rollback);
sequence_reset(seq);
}
}
Expand Down Expand Up @@ -3864,7 +3864,7 @@ on_replace_dd_space_sequence(struct trigger * /* trigger */, void *event)
free(space->sequence_path);
space->sequence_path = sequence_path;
sequence_path_guard.is_active = false;
txn_on_rollback(txn, on_rollback);
txn_stmt_on_rollback(stmt, on_rollback);
} else { /* DELETE */
struct trigger *on_rollback;
on_rollback = txn_alter_trigger_new(set_space_sequence,
Expand All @@ -3875,7 +3875,7 @@ on_replace_dd_space_sequence(struct trigger * /* trigger */, void *event)
space->sequence_fieldno = 0;
free(space->sequence_path);
space->sequence_path = NULL;
txn_on_rollback(txn, on_rollback);
txn_stmt_on_rollback(stmt, on_rollback);
}
trigger_run_xc(&on_alter_space, space);
}
Expand Down Expand Up @@ -4036,8 +4036,8 @@ on_replace_dd_trigger(struct trigger * /* trigger */, void *event)
new_trigger_guard.is_active = false;
}

txn_on_rollback(txn, on_rollback);
txn_on_commit(txn, on_commit);
txn_stmt_on_rollback(stmt, on_rollback);
txn_stmt_on_commit(stmt, on_commit);
}

/**
Expand Down Expand Up @@ -4441,7 +4441,7 @@ on_replace_dd_fk_constraint(struct trigger * /* trigger*/, void *event)
struct trigger *on_rollback =
txn_alter_trigger_new(on_create_fk_constraint_rollback,
fk);
txn_on_rollback(txn, on_rollback);
txn_stmt_on_rollback(stmt, on_rollback);
fk_constraint_set_mask(fk,
&parent_space->fk_constraint_mask,
FIELD_LINK_PARENT);
Expand All @@ -4459,11 +4459,11 @@ on_replace_dd_fk_constraint(struct trigger * /* trigger*/, void *event)
struct trigger *on_rollback =
txn_alter_trigger_new(on_replace_fk_constraint_rollback,
old_fk);
txn_on_rollback(txn, on_rollback);
txn_stmt_on_rollback(stmt, on_rollback);
struct trigger *on_commit =
txn_alter_trigger_new(on_drop_or_replace_fk_constraint_commit,
old_fk);
txn_on_commit(txn, on_commit);
txn_stmt_on_commit(stmt, on_commit);
space_reset_fk_constraint_mask(child_space);
space_reset_fk_constraint_mask(parent_space);
}
Expand All @@ -4485,11 +4485,11 @@ on_replace_dd_fk_constraint(struct trigger * /* trigger*/, void *event)
struct trigger *on_commit =
txn_alter_trigger_new(on_drop_or_replace_fk_constraint_commit,
old_fk);
txn_on_commit(txn, on_commit);
txn_stmt_on_commit(stmt, on_commit);
struct trigger *on_rollback =
txn_alter_trigger_new(on_drop_fk_constraint_rollback,
old_fk);
txn_on_rollback(txn, on_rollback);
txn_stmt_on_rollback(stmt, on_rollback);
space_reset_fk_constraint_mask(child_space);
space_reset_fk_constraint_mask(parent_space);
}
Expand Down Expand Up @@ -4713,8 +4713,8 @@ on_replace_dd_ck_constraint(struct trigger * /* trigger*/, void *event)
on_rollback->run = on_drop_ck_constraint_rollback;
}

txn_on_rollback(txn, on_rollback);
txn_on_commit(txn, on_commit);
txn_stmt_on_rollback(stmt, on_rollback);
txn_stmt_on_commit(stmt, on_commit);

trigger_run_xc(&on_alter_space, space);
}
Expand Down Expand Up @@ -4772,7 +4772,7 @@ on_replace_dd_func_index(struct trigger *trigger, void *event)
space->index_id_max + 1);
(void) new MoveCkConstraints(alter);
(void) new UpdateSchemaVersion(alter);
alter_space_do(txn, alter);
alter_space_do(stmt, alter);

scoped_guard.is_active = false;
}
Expand Down

0 comments on commit 7ad7169

Please sign in to comment.