Skip to content

Commit

Permalink
replication: make sync transactions wait quorum
Browse files Browse the repository at this point in the history
Synchronous transaction (which changes anything in a synchronous
space) before commit waits until it is replicated onto a quorum
of replicas.

When there is a not committed synchronous transaction, any attempt
to commit a next transaction is suspended, even if it is an async
transaction.

This restriction comes from the theoretically possible dependency
of what is written in the async transactions on what was written
in the previous sync transactions.

So far all the 'synchronousness' is basically the same as the well
known 'wait_lsn' technique. With the exception, that the
transaction really is not committed until replicated.

Problem of wait_lsn is still present though, in case master
restarts. Because there is no a 'confirm' record in WAL telling
which transactions are replicated and can be applied.

Closes #4844
Closes #4845
  • Loading branch information
Gerold103 committed Jul 5, 2020
1 parent dbc2781 commit 68c4460
Show file tree
Hide file tree
Showing 10 changed files with 467 additions and 2 deletions.
1 change: 1 addition & 0 deletions src/box/CMakeLists.txt
Expand Up @@ -169,6 +169,7 @@ add_library(box STATIC
session.cc
port.c
txn.c
txn_limbo.c
box.cc
gc.c
checkpoint_schedule.c
Expand Down
3 changes: 3 additions & 0 deletions src/box/applier.cc
Expand Up @@ -825,6 +825,9 @@ applier_apply_tx(struct stailq *rows)
trigger_create(on_commit, applier_txn_commit_cb, NULL, NULL);
txn_on_commit(txn, on_commit);

/* Applier does not wait for ACK. It sends ACK. */
txn_set_flag(txn, TXN_FORCE_ASYNC);

if (txn_commit_async(txn) < 0)
goto fail;

Expand Down
2 changes: 2 additions & 0 deletions src/box/box.cc
Expand Up @@ -59,6 +59,7 @@
#include "index.h"
#include "port.h"
#include "txn.h"
#include "txn_limbo.h"
#include "user.h"
#include "cfg.h"
#include "coio.h"
Expand Down Expand Up @@ -2413,6 +2414,7 @@ box_init(void)
if (tuple_init(lua_hash) != 0)
diag_raise();

txn_limbo_init();
sequence_init();
}

Expand Down
1 change: 1 addition & 0 deletions src/box/errcode.h
Expand Up @@ -266,6 +266,7 @@ struct errcode_record {
/*211 */_(ER_WRONG_QUERY_ID, "Prepared statement with id %u does not exist") \
/*212 */_(ER_SEQUENCE_NOT_STARTED, "Sequence '%s' is not started") \
/*213 */_(ER_NO_SUCH_SESSION_SETTING, "Session setting %s doesn't exist") \
/*214 */_(ER_UNCOMMITTED_FOREIGN_SYNC_TXNS, "Found uncommitted sync transactions from other instance with id %u") \

/*
* !IMPORTANT! Please follow instructions at start of the file
Expand Down
11 changes: 11 additions & 0 deletions src/box/relay.cc
Expand Up @@ -53,6 +53,7 @@
#include "xrow_io.h"
#include "xstream.h"
#include "wal.h"
#include "txn_limbo.h"

/**
* Cbus message to send status updates from relay to tx thread.
Expand Down Expand Up @@ -399,6 +400,16 @@ tx_status_update(struct cmsg *msg)
{
struct relay_status_msg *status = (struct relay_status_msg *)msg;
vclock_copy(&status->relay->tx.vclock, &status->vclock);
/*
* Let pending synchronous transactions know, which of
* them were successfully sent to the replica. Acks are
* collected only by the transactions originator (which is
* the single master in 100% so far).
*/
if (txn_limbo.instance_id == instance_id) {
txn_limbo_ack(&txn_limbo, status->relay->replica->id,
vclock_get(&status->vclock, instance_id));
}
static const struct cmsg_hop route[] = {
{relay_status_update, NULL}
};
Expand Down
65 changes: 63 additions & 2 deletions src/box/txn.c
Expand Up @@ -29,6 +29,7 @@
* SUCH DAMAGE.
*/
#include "txn.h"
#include "txn_limbo.h"
#include "engine.h"
#include "tuple.h"
#include "journal.h"
Expand Down Expand Up @@ -433,7 +434,7 @@ txn_complete(struct txn *txn)
engine_rollback(txn->engine, txn);
if (txn_has_flag(txn, TXN_HAS_TRIGGERS))
txn_run_rollback_triggers(txn, &txn->on_rollback);
} else {
} else if (!txn_has_flag(txn, TXN_WAIT_SYNC)) {
/* Commit the transaction. */
if (txn->engine != NULL)
engine_commit(txn->engine, txn);
Expand All @@ -448,6 +449,19 @@ txn_complete(struct txn *txn)
txn->signature - n_rows + 1,
stop_tm - txn->start_tm);
}
} else {
/*
* Complete is called on every WAL operation
* authored by this transaction. And it not always
* is one. And not always is enough for commit.
* In case the transaction is waiting for acks, it
* can't be committed right away. Give control
* back to the fiber, owning the transaction so as
* it could decide what to do next.
*/
if (txn->fiber != NULL && txn->fiber != fiber())
fiber_wakeup(txn->fiber);
return;
}
/*
* If there is no fiber waiting for the transaction then
Expand Down Expand Up @@ -495,6 +509,7 @@ txn_journal_entry_new(struct txn *txn)

struct xrow_header **remote_row = req->rows;
struct xrow_header **local_row = req->rows + txn->n_applier_rows;
bool is_sync = false;

stailq_foreach_entry(stmt, &txn->stmts, next) {
if (stmt->has_triggers) {
Expand All @@ -506,13 +521,29 @@ txn_journal_entry_new(struct txn *txn)
if (stmt->row == NULL)
continue;

is_sync = is_sync || (stmt->space != NULL &&
stmt->space->def->opts.is_sync);

if (stmt->row->replica_id == 0)
*local_row++ = stmt->row;
else
*remote_row++ = stmt->row;

req->approx_len += xrow_approx_len(stmt->row);
}
/*
* There is no a check for all-local rows, because a local
* space can't be synchronous. So if there is at least one
* synchronous space, the transaction is not local.
*/
if (!txn_has_flag(txn, TXN_FORCE_ASYNC)) {
if (is_sync) {
txn_set_flag(txn, TXN_WAIT_SYNC);
txn_set_flag(txn, TXN_WAIT_ACK);
} else if (!txn_limbo_is_empty(&txn_limbo)) {
txn_set_flag(txn, TXN_WAIT_SYNC);
}
}

assert(remote_row == req->rows + txn->n_applier_rows);
assert(local_row == remote_row + txn->n_new_rows);
Expand Down Expand Up @@ -621,6 +652,7 @@ int
txn_commit(struct txn *txn)
{
struct journal_entry *req;
struct txn_limbo_entry *limbo_entry;

txn->fiber = fiber();

Expand All @@ -642,8 +674,31 @@ txn_commit(struct txn *txn)
return -1;
}

bool is_sync = txn_has_flag(txn, TXN_WAIT_SYNC);
if (is_sync) {
/*
* Remote rows, if any, come before local rows, so
* check for originating instance id here.
*/
uint32_t origin_id = req->rows[0]->replica_id;

/*
* Append now. Before even WAL write is done.
* After WAL write nothing should fail, even OOM
* wouldn't be acceptable.
*/
limbo_entry = txn_limbo_append(&txn_limbo, origin_id, txn);
if (limbo_entry == NULL) {
txn_rollback(txn);
txn_free(txn);
return -1;
}
}

fiber_set_txn(fiber(), NULL);
if (journal_write(req) != 0) {
if (is_sync)
txn_limbo_abort(&txn_limbo, limbo_entry);
fiber_set_txn(fiber(), txn);
txn_rollback(txn);
txn_free(txn);
Expand All @@ -652,7 +707,13 @@ txn_commit(struct txn *txn)
diag_log();
return -1;
}

if (is_sync) {
int64_t lsn = req->rows[req->n_rows - 1]->lsn;
txn_limbo_assign_lsn(&txn_limbo, limbo_entry, lsn);
/* Local WAL write is a first 'ACK'. */
txn_limbo_ack(&txn_limbo, txn_limbo.instance_id, lsn);
txn_limbo_wait_complete(&txn_limbo, limbo_entry);
}
if (!txn_has_flag(txn, TXN_IS_DONE)) {
txn->signature = req->res;
txn_complete(txn);
Expand Down
21 changes: 21 additions & 0 deletions src/box/txn.h
Expand Up @@ -66,6 +66,27 @@ enum txn_flag {
TXN_CAN_YIELD,
/** on_commit and/or on_rollback list is not empty. */
TXN_HAS_TRIGGERS,
/**
* Synchronous transaction touched sync spaces, or an
* asynchronous transaction blocked by a sync one until it
* is confirmed.
*/
TXN_WAIT_SYNC,
/**
* Synchronous transaction 'waiting for ACKs' state before
* commit. In this state it waits until it is replicated
* onto a quorum of replicas, and only then finishes
* commit and returns success to a user.
* TXN_WAIT_SYNC is always set, if TXN_WAIT_ACK is set.
*/
TXN_WAIT_ACK,
/**
* A transaction may be forced to be asynchronous, not
* wait for any ACKs, and not depend on prepending sync
* transactions. This happens in a few special cases. For
* example, when applier receives snapshot from master.
*/
TXN_FORCE_ASYNC,
};

enum {
Expand Down

0 comments on commit 68c4460

Please sign in to comment.