Skip to content

Commit

Permalink
txm: disallow yields after DDL operation in TX
Browse files Browse the repository at this point in the history
To avoid sharing (ergo phantom reads) for different transaction in MVCC
mode, let's do following things. Firstly, after modification of any
shared objects (like space/user/function caches) disallow transaction to
yield to prevent other transactions see that changes. Yield is permitted
til the transaction is committed. Secondly, on committing transaction
that provides DDL changes let's abort all other transaction since they
may refer to obsolete schema. The last restriction may seem too strict,
but it is OK as primitive workaround until transactional DDL is
introduced.

Closes #5998
Closes #6140
Workaround for #6138
  • Loading branch information
Korablev77 committed Jul 16, 2021
1 parent acf8b3f commit cd8ff74
Show file tree
Hide file tree
Showing 11 changed files with 506 additions and 84 deletions.
25 changes: 24 additions & 1 deletion src/box/alter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
#include "sequence.h"
#include "sql.h"
#include "constraint_id.h"
#include "memtx_tx.h"

/* {{{ Auxiliary functions and methods. */

Expand Down Expand Up @@ -2500,6 +2501,10 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event)
}
alter_guard.is_active = false;
}
/*
* After cache modification transaction is now allowed to yield.
*/
memtx_tx_acquire_ddl(txn);
return 0;
}

Expand Down Expand Up @@ -2829,6 +2834,7 @@ on_replace_dd_index(struct trigger * /* trigger */, void *event)
return -1;
}
scoped_guard.is_active = false;
memtx_tx_acquire_ddl(txn);
return 0;
}

Expand Down Expand Up @@ -2918,6 +2924,7 @@ on_replace_dd_truncate(struct trigger * /* trigger */, void *event)
return -1;
}
scoped_guard.is_active = false;
memtx_tx_acquire_ddl(txn);
return 0;
}

Expand Down Expand Up @@ -3210,6 +3217,7 @@ on_replace_dd_user(struct trigger * /* trigger */, void *event)
return -1;
txn_stmt_on_rollback(stmt, on_rollback);
}
memtx_tx_acquire_ddl(txn);
return 0;
}

Expand Down Expand Up @@ -3584,6 +3592,7 @@ on_replace_dd_func(struct trigger * /* trigger */, void *event)
return -1;
}
}
memtx_tx_acquire_ddl(txn);
return 0;
}

Expand Down Expand Up @@ -3812,6 +3821,7 @@ on_replace_dd_collation(struct trigger * /* trigger */, void *event)
diag_set(ClientError, ER_UNSUPPORTED, "collation", "alter");
return -1;
}
memtx_tx_acquire_ddl(txn);
return 0;
}

Expand Down Expand Up @@ -4134,6 +4144,7 @@ on_replace_dd_priv(struct trigger * /* trigger */, void *event)
return -1;
txn_stmt_on_rollback(stmt, on_rollback);
}
memtx_tx_acquire_ddl(txn);
return 0;
}

Expand Down Expand Up @@ -4172,6 +4183,7 @@ on_replace_dd_schema(struct trigger * /* trigger */, void *event)
REPLICASET_UUID = uu;
say_info("cluster uuid %s", tt_uuid_str(&uu));
}
memtx_tx_acquire_ddl(txn);
return 0;
}

Expand Down Expand Up @@ -4304,6 +4316,7 @@ on_replace_dd_cluster(struct trigger *trigger, void *event)
return -1;
txn_stmt_on_commit(stmt, on_commit);
}
memtx_tx_acquire_ddl(txn);
return 0;
}

Expand Down Expand Up @@ -4527,6 +4540,7 @@ on_replace_dd_sequence(struct trigger * /* trigger */, void *event)
def_guard.is_active = false;
if (trigger_run(&on_alter_sequence, seq) != 0)
return -1;
memtx_tx_acquire_ddl(txn);
return 0;
}

Expand Down Expand Up @@ -4591,6 +4605,7 @@ on_replace_dd_sequence_data(struct trigger * /* trigger */, void *event)
txn_stmt_on_rollback(stmt, on_rollback);
sequence_reset(seq);
}
memtx_tx_acquire_ddl(txn);
return 0;
}

Expand Down Expand Up @@ -4807,6 +4822,7 @@ on_replace_dd_space_sequence(struct trigger * /* trigger */, void *event)
}
if (trigger_run(&on_alter_space, space) != 0)
return -1;
memtx_tx_acquire_ddl(txn);
return 0;
}

Expand Down Expand Up @@ -4982,6 +4998,7 @@ on_replace_dd_trigger(struct trigger * /* trigger */, void *event)
txn_stmt_on_rollback(stmt, on_rollback);
txn_stmt_on_commit(stmt, on_commit);
++schema_version;
memtx_tx_acquire_ddl(txn);
return 0;
}

Expand Down Expand Up @@ -5509,6 +5526,7 @@ on_replace_dd_fk_constraint(struct trigger * /* trigger*/, void *event)
space_reset_fk_constraint_mask(parent_space);
}
++schema_version;
memtx_tx_acquire_ddl(txn);
return 0;
}

Expand Down Expand Up @@ -5698,6 +5716,7 @@ on_replace_dd_ck_constraint(struct trigger * /* trigger*/, void *event)
old_def->is_enabled = ck_def->is_enabled;
if (trigger_run(&on_alter_space, space) != 0)
return -1;
memtx_tx_acquire_ddl(txn);
return 0;
}
}
Expand Down Expand Up @@ -5765,6 +5784,7 @@ on_replace_dd_ck_constraint(struct trigger * /* trigger*/, void *event)
if (trigger_run(&on_alter_space, space) != 0)
return -1;
++schema_version;
memtx_tx_acquire_ddl(txn);
return 0;
}

Expand Down Expand Up @@ -5840,8 +5860,10 @@ on_replace_dd_func_index(struct trigger *trigger, void *event)
* Index is already initialized for corresponding
* function. Index rebuild is not required.
*/
if (index_def_get_func(index->def) == func)
if (index_def_get_func(index->def) == func) {
memtx_tx_acquire_ddl(txn);
return 0;
}

alter = alter_space_new(space);
if (alter == NULL)
Expand All @@ -5866,6 +5888,7 @@ on_replace_dd_func_index(struct trigger *trigger, void *event)
}

scoped_guard.is_active = false;
memtx_tx_acquire_ddl(txn);
return 0;
}

Expand Down
2 changes: 2 additions & 0 deletions src/box/memtx_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,8 @@ memtx_engine_prepare(struct engine *engine, struct txn *txn)
if (stmt->add_story != NULL || stmt->del_story != NULL)
memtx_tx_history_prepare_stmt(stmt);
}
if (txn->is_schema_changed)
memtx_tx_abort_all_for_ddl(txn);
return 0;
}

Expand Down
20 changes: 20 additions & 0 deletions src/box/memtx_tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,26 @@ memtx_tx_register_tx(struct txn *tx)
rlist_add_tail(&txm.all_txs, &tx->in_all_txs);
}

void
memtx_tx_acquire_ddl(struct txn *tx)
{
tx->is_schema_changed = true;
(void) txn_can_yield(tx, false);
}

void
memtx_tx_abort_all_for_ddl(struct txn *ddl_owner)
{
struct txn *to_be_aborted;
rlist_foreach_entry(to_be_aborted, &txm.all_txs, in_all_txs) {
if (to_be_aborted == ddl_owner)
continue;
to_be_aborted->status = TXN_CONFLICTED;
say_warn("Transaction committing DDL (id=%d) has aborted "
"another TX (id=%d)", ddl_owner->id,to_be_aborted->id);
}
}

int
memtx_tx_cause_conflict(struct txn *breaker, struct txn *victim)
{
Expand Down
15 changes: 15 additions & 0 deletions src/box/memtx_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,21 @@ memtx_tx_manager_init();
void
memtx_tx_manager_free();

/**
* Transaction providing DDL changes is disallowed to yield after
* modifications of internal caches (i.e. after ALTER operation finishes).
*/
void
memtx_tx_acquire_ddl(struct txn *tx);

/**
* Mark all transactions except for a given as aborted due to conflict:
* when DDL operation is about to be committed other transactions are
* considered to use obsolete schema so that should be aborted.
*/
void
memtx_tx_abort_all_for_ddl(struct txn *ddl_owner);

/**
* Notify TX manager that if transaction @a breaker is committed then the
* transaction @a victim must be aborted due to conflict. It is achieved
Expand Down
1 change: 1 addition & 0 deletions src/box/txn.c
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ txn_begin(void)
txn->engine = NULL;
txn->engine_tx = NULL;
txn->fk_deferred_count = 0;
txn->is_schema_changed = false;
rlist_create(&txn->savepoints);
memtx_tx_register_tx(txn);
txn->fiber = NULL;
Expand Down
2 changes: 2 additions & 0 deletions src/box/txn.h
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,8 @@ struct txn {
struct rlist gap_list;
/** Link in tx_manager::all_txs. */
struct rlist in_all_txs;
/** True in case transaction provides any DDL change. */
bool is_schema_changed;
};

static inline bool
Expand Down

0 comments on commit cd8ff74

Please sign in to comment.