diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt index 63f98f6c89ca..b8b2689d207a 100644 --- a/src/box/CMakeLists.txt +++ b/src/box/CMakeLists.txt @@ -169,6 +169,7 @@ add_library(box STATIC session.cc port.c txn.c + txn_limbo.c box.cc gc.c checkpoint_schedule.c diff --git a/src/box/applier.cc b/src/box/applier.cc index df48b47962d2..40fb6a423ac8 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -825,6 +825,9 @@ applier_apply_tx(struct stailq *rows) trigger_create(on_commit, applier_txn_commit_cb, NULL, NULL); txn_on_commit(txn, on_commit); + /* Applier does not wait for ACK. It sends ACK. */ + txn_set_flag(txn, TXN_FORCE_ASYNC); + if (txn_commit_async(txn) < 0) goto fail; diff --git a/src/box/box.cc b/src/box/box.cc index e29f741a1792..32b2e014c65f 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -59,6 +59,7 @@ #include "index.h" #include "port.h" #include "txn.h" +#include "txn_limbo.h" #include "user.h" #include "cfg.h" #include "coio.h" @@ -2413,6 +2414,7 @@ box_init(void) if (tuple_init(lua_hash) != 0) diag_raise(); + txn_limbo_init(); sequence_init(); } diff --git a/src/box/errcode.h b/src/box/errcode.h index d1e4d02a95b0..019c582afe11 100644 --- a/src/box/errcode.h +++ b/src/box/errcode.h @@ -266,6 +266,7 @@ struct errcode_record { /*211 */_(ER_WRONG_QUERY_ID, "Prepared statement with id %u does not exist") \ /*212 */_(ER_SEQUENCE_NOT_STARTED, "Sequence '%s' is not started") \ /*213 */_(ER_NO_SUCH_SESSION_SETTING, "Session setting %s doesn't exist") \ + /*214 */_(ER_UNCOMMITTED_FOREIGN_SYNC_TXNS, "Found uncommitted sync transactions from other instance with id %u") \ /* * !IMPORTANT! Please follow instructions at start of the file diff --git a/src/box/relay.cc b/src/box/relay.cc index 2ad02cb8a7a2..36fc14b8c466 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -53,6 +53,7 @@ #include "xrow_io.h" #include "xstream.h" #include "wal.h" +#include "txn_limbo.h" /** * Cbus message to send status updates from relay to tx thread. @@ -399,6 +400,16 @@ tx_status_update(struct cmsg *msg) { struct relay_status_msg *status = (struct relay_status_msg *)msg; vclock_copy(&status->relay->tx.vclock, &status->vclock); + /* + * Let pending synchronous transactions know, which of + * them were successfully sent to the replica. Acks are + * collected only by the transactions originator (which is + * the single master in 100% so far). + */ + if (txn_limbo.instance_id == instance_id) { + txn_limbo_ack(&txn_limbo, status->relay->replica->id, + vclock_get(&status->vclock, instance_id)); + } static const struct cmsg_hop route[] = { {relay_status_update, NULL} }; diff --git a/src/box/txn.c b/src/box/txn.c index 123520166a5b..cbca76c2fbe8 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -29,6 +29,7 @@ * SUCH DAMAGE. */ #include "txn.h" +#include "txn_limbo.h" #include "engine.h" #include "tuple.h" #include "journal.h" @@ -433,7 +434,7 @@ txn_complete(struct txn *txn) engine_rollback(txn->engine, txn); if (txn_has_flag(txn, TXN_HAS_TRIGGERS)) txn_run_rollback_triggers(txn, &txn->on_rollback); - } else { + } else if (!txn_has_flag(txn, TXN_WAIT_SYNC)) { /* Commit the transaction. */ if (txn->engine != NULL) engine_commit(txn->engine, txn); @@ -448,6 +449,19 @@ txn_complete(struct txn *txn) txn->signature - n_rows + 1, stop_tm - txn->start_tm); } + } else { + /* + * Complete is called on every WAL operation + * authored by this transaction. And it not always + * is one. And not always is enough for commit. + * In case the transaction is waiting for acks, it + * can't be committed right away. Give control + * back to the fiber, owning the transaction so as + * it could decide what to do next. + */ + if (txn->fiber != NULL && txn->fiber != fiber()) + fiber_wakeup(txn->fiber); + return; } /* * If there is no fiber waiting for the transaction then @@ -495,6 +509,7 @@ txn_journal_entry_new(struct txn *txn) struct xrow_header **remote_row = req->rows; struct xrow_header **local_row = req->rows + txn->n_applier_rows; + bool is_sync = false; stailq_foreach_entry(stmt, &txn->stmts, next) { if (stmt->has_triggers) { @@ -506,6 +521,9 @@ txn_journal_entry_new(struct txn *txn) if (stmt->row == NULL) continue; + is_sync = is_sync || (stmt->space != NULL && + stmt->space->def->opts.is_sync); + if (stmt->row->replica_id == 0) *local_row++ = stmt->row; else @@ -513,6 +531,19 @@ txn_journal_entry_new(struct txn *txn) req->approx_len += xrow_approx_len(stmt->row); } + /* + * There is no a check for all-local rows, because a local + * space can't be synchronous. So if there is at least one + * synchronous space, the transaction is not local. + */ + if (!txn_has_flag(txn, TXN_FORCE_ASYNC)) { + if (is_sync) { + txn_set_flag(txn, TXN_WAIT_SYNC); + txn_set_flag(txn, TXN_WAIT_ACK); + } else if (!txn_limbo_is_empty(&txn_limbo)) { + txn_set_flag(txn, TXN_WAIT_SYNC); + } + } assert(remote_row == req->rows + txn->n_applier_rows); assert(local_row == remote_row + txn->n_new_rows); @@ -621,6 +652,7 @@ int txn_commit(struct txn *txn) { struct journal_entry *req; + struct txn_limbo_entry *limbo_entry; txn->fiber = fiber(); @@ -642,8 +674,31 @@ txn_commit(struct txn *txn) return -1; } + bool is_sync = txn_has_flag(txn, TXN_WAIT_SYNC); + if (is_sync) { + /* + * Remote rows, if any, come before local rows, so + * check for originating instance id here. + */ + uint32_t origin_id = req->rows[0]->replica_id; + + /* + * Append now. Before even WAL write is done. + * After WAL write nothing should fail, even OOM + * wouldn't be acceptable. + */ + limbo_entry = txn_limbo_append(&txn_limbo, origin_id, txn); + if (limbo_entry == NULL) { + txn_rollback(txn); + txn_free(txn); + return -1; + } + } + fiber_set_txn(fiber(), NULL); if (journal_write(req) != 0) { + if (is_sync) + txn_limbo_abort(&txn_limbo, limbo_entry); fiber_set_txn(fiber(), txn); txn_rollback(txn); txn_free(txn); @@ -652,7 +707,13 @@ txn_commit(struct txn *txn) diag_log(); return -1; } - + if (is_sync) { + int64_t lsn = req->rows[req->n_rows - 1]->lsn; + txn_limbo_assign_lsn(&txn_limbo, limbo_entry, lsn); + /* Local WAL write is a first 'ACK'. */ + txn_limbo_ack(&txn_limbo, txn_limbo.instance_id, lsn); + txn_limbo_wait_complete(&txn_limbo, limbo_entry); + } if (!txn_has_flag(txn, TXN_IS_DONE)) { txn->signature = req->res; txn_complete(txn); diff --git a/src/box/txn.h b/src/box/txn.h index 3f6d79d5c024..c600c7314dfd 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -66,6 +66,27 @@ enum txn_flag { TXN_CAN_YIELD, /** on_commit and/or on_rollback list is not empty. */ TXN_HAS_TRIGGERS, + /** + * Synchronous transaction touched sync spaces, or an + * asynchronous transaction blocked by a sync one until it + * is confirmed. + */ + TXN_WAIT_SYNC, + /** + * Synchronous transaction 'waiting for ACKs' state before + * commit. In this state it waits until it is replicated + * onto a quorum of replicas, and only then finishes + * commit and returns success to a user. + * TXN_WAIT_SYNC is always set, if TXN_WAIT_ACK is set. + */ + TXN_WAIT_ACK, + /** + * A transaction may be forced to be asynchronous, not + * wait for any ACKs, and not depend on prepending sync + * transactions. This happens in a few special cases. For + * example, when applier receives snapshot from master. + */ + TXN_FORCE_ASYNC, }; enum { diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c new file mode 100644 index 000000000000..2b4aae4777c0 --- /dev/null +++ b/src/box/txn_limbo.c @@ -0,0 +1,190 @@ +/* + * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include "txn.h" +#include "txn_limbo.h" +#include "replication.h" + +struct txn_limbo txn_limbo; + +static inline void +txn_limbo_create(struct txn_limbo *limbo) +{ + rlist_create(&limbo->queue); + limbo->instance_id = REPLICA_ID_NIL; + vclock_create(&limbo->vclock); +} + +struct txn_limbo_entry * +txn_limbo_append(struct txn_limbo *limbo, uint32_t id, struct txn *txn) +{ + assert(txn_has_flag(txn, TXN_WAIT_SYNC)); + if (id == 0) + id = instance_id; + if (limbo->instance_id != id) { + if (limbo->instance_id == REPLICA_ID_NIL || + rlist_empty(&limbo->queue)) { + limbo->instance_id = id; + } else { + diag_set(ClientError, ER_UNCOMMITTED_FOREIGN_SYNC_TXNS, + limbo->instance_id); + return NULL; + } + } + size_t size; + struct txn_limbo_entry *e = region_alloc_object(&txn->region, + typeof(*e), &size); + if (e == NULL) { + diag_set(OutOfMemory, size, "region_alloc_object", "e"); + return NULL; + } + e->txn = txn; + e->lsn = -1; + e->ack_count = 0; + e->is_commit = false; + e->is_rollback = false; + rlist_add_tail_entry(&limbo->queue, e, in_queue); + return e; +} + +static inline void +txn_limbo_remove(struct txn_limbo *limbo, struct txn_limbo_entry *entry) +{ + assert(!rlist_empty(&entry->in_queue)); + assert(rlist_first_entry(&limbo->queue, struct txn_limbo_entry, + in_queue) == entry); + (void) limbo; + rlist_del_entry(entry, in_queue); +} + +void +txn_limbo_abort(struct txn_limbo *limbo, struct txn_limbo_entry *entry) +{ + entry->is_rollback = true; + txn_limbo_remove(limbo, entry); +} + +void +txn_limbo_assign_lsn(struct txn_limbo *limbo, struct txn_limbo_entry *entry, + int64_t lsn) +{ + assert(limbo->instance_id != REPLICA_ID_NIL); + assert(entry->lsn == -1); + assert(lsn > 0); + (void) limbo; + entry->lsn = lsn; +} + +static bool +txn_limbo_check_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) +{ + if (txn_limbo_entry_is_complete(entry)) + return true; + struct vclock_iterator iter; + vclock_iterator_init(&iter, &limbo->vclock); + int ack_count = 0; + int64_t lsn = entry->lsn; + vclock_foreach(&iter, vc) + ack_count += vc.lsn >= lsn; + assert(ack_count >= entry->ack_count); + entry->ack_count = ack_count; + entry->is_commit = ack_count >= replication_synchro_quorum; + return entry->is_commit; +} + +void +txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) +{ + struct txn *txn = entry->txn; + assert(entry->lsn > 0); + assert(!txn_has_flag(txn, TXN_IS_DONE)); + assert(txn_has_flag(txn, TXN_WAIT_SYNC)); + if (txn_limbo_check_complete(limbo, entry)) + goto complete; + bool cancellable = fiber_set_cancellable(false); + while (!txn_limbo_entry_is_complete(entry)) + fiber_yield(); + fiber_set_cancellable(cancellable); +complete: + // TODO: implement rollback. + // TODO: implement confirm. + assert(!entry->is_rollback); + txn_limbo_remove(limbo, entry); + txn_clear_flag(txn, TXN_WAIT_SYNC); + txn_clear_flag(txn, TXN_WAIT_ACK); +} + +void +txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn) +{ + if (rlist_empty(&limbo->queue)) + return; + assert(limbo->instance_id != REPLICA_ID_NIL); + int64_t prev_lsn = vclock_get(&limbo->vclock, replica_id); + vclock_follow(&limbo->vclock, replica_id, lsn); + struct txn_limbo_entry *e, *last_quorum = NULL; + rlist_foreach_entry(e, &limbo->queue, in_queue) { + if (e->lsn > lsn) + break; + if (e->lsn <= prev_lsn) + continue; + assert(e->ack_count <= VCLOCK_MAX); + /* + * Sync transactions need to collect acks. Async + * transactions are automatically committed right + * after all the previous sync transactions are. + */ + if (txn_has_flag(e->txn, TXN_WAIT_ACK)) { + if (++e->ack_count < replication_synchro_quorum) + continue; + } else { + assert(txn_has_flag(e->txn, TXN_WAIT_SYNC)); + if (last_quorum == NULL) + continue; + } + e->is_commit = true; + last_quorum = e; + } + if (last_quorum == NULL) + return; + // TODO: implement confirmation message. + rlist_foreach_entry(e, &limbo->queue, in_queue) { + if (e->txn->fiber != fiber()) + fiber_wakeup(e->txn->fiber); + if (e == last_quorum) + break; + } +} + +void +txn_limbo_init(void) +{ + txn_limbo_create(&txn_limbo); +} diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h new file mode 100644 index 000000000000..37fcfa880721 --- /dev/null +++ b/src/box/txn_limbo.h @@ -0,0 +1,174 @@ +#pragma once +/* + * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include "small/rlist.h" +#include "vclock.h" + +#include + +#if defined(__cplusplus) +extern "C" { +#endif /* defined(__cplusplus) */ + +struct txn; + +/** + * Transaction and its quorum metadata, to be stored in limbo. + */ +struct txn_limbo_entry { + /** Link for limbo's queue. */ + struct rlist in_queue; + /** Transaction, waiting for a quorum. */ + struct txn *txn; + /** + * LSN of the transaction by the originator's vclock + * component. May be -1 in case the transaction is not + * written to WAL yet. + */ + int64_t lsn; + /** + * Number of ACKs. Or in other words - how many replicas + * confirmed receipt of the transaction. + */ + int ack_count; + /** + * Result flags. Only one of them can be true. But both + * can be false if the transaction is still waiting for + * its resolution. + */ + bool is_commit; + bool is_rollback; +}; + +static inline bool +txn_limbo_entry_is_complete(const struct txn_limbo_entry *e) +{ + return e->is_commit || e->is_rollback; +} + +/** + * Limbo is a place where transactions are stored, which are + * finished, but not committed nor rolled back. These are + * synchronous transactions in progress of collecting ACKs from + * replicas. + * Limbo's main purposes + * - maintain the transactions ordered by LSN of their emitter; + * - be a link between transaction and replication modules, so + * as they wouldn't depend on each other directly. + */ +struct txn_limbo { + /** + * Queue of limbo entries. Ordered by LSN. Some of the + * entries in the end may not have an LSN yet (their local + * WAL write is still in progress), but their order won't + * change anyway. Because WAL write completions will give + * them LSNs in the same order. + */ + struct rlist queue; + /** + * Instance ID of the owner of all the transactions in the + * queue. Strictly speaking, nothing prevents to store not + * own transactions here, originated from some other + * instance. But still the queue may contain only + * transactions of the same instance. Otherwise LSN order + * won't make sense - different nodes have own independent + * LSNs in their vclock components. + */ + uint32_t instance_id; + /** + * All components of the vclock are versions of the limbo + * owner's LSN, how it is visible on other nodes. For + * example, assume instance ID of the limbo is 1. Then + * vclock[1] here is local LSN of the instance 1. + * vclock[2] is how replica with ID 2 sees LSN of + * instance 1. + * vclock[3] is how replica with ID 3 sees LSN of + * instance 1, and so on. + * In that way by looking at this vclock it is always can + * be said up to which LSN there is a sync quorum for + * transactions, created on the limbo's owner node. + */ + struct vclock vclock; +}; + +/** + * Global limbo entry. So far an instance can have only one limbo, + * where master's transactions are stored. Eventually there may + * appear more than one limbo for master-master support. + */ +extern struct txn_limbo txn_limbo; + +static inline bool +txn_limbo_is_empty(struct txn_limbo *limbo) +{ + return rlist_empty(&limbo->queue); +} + +/** + * Allocate, create, and append a new transaction to the limbo. + * The limbo entry is allocated on the transaction's region. + */ +struct txn_limbo_entry * +txn_limbo_append(struct txn_limbo *limbo, uint32_t id, struct txn *txn); + +/** Remove the entry from the limbo, mark as rolled back. */ +void +txn_limbo_abort(struct txn_limbo *limbo, struct txn_limbo_entry *entry); + +/** + * Assign local LSN to the limbo entry. That happens when the + * transaction is added to the limbo, writes to WAL, and gets an + * LSN. + */ +void +txn_limbo_assign_lsn(struct txn_limbo *limbo, struct txn_limbo_entry *entry, + int64_t lsn); + +/** + * Ack all transactions up to the given LSN on behalf of the + * replica with the specified ID. + */ +void +txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn); + +/** + * Block the current fiber until the transaction in the limbo + * entry is either committed or rolled back. + */ +void +txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry); + +void +txn_limbo_init(); + +#if defined(__cplusplus) +} +#endif /* defined(__cplusplus) */ diff --git a/test/box/error.result b/test/box/error.result index 2196fa541490..69c4710855bb 100644 --- a/test/box/error.result +++ b/test/box/error.result @@ -432,6 +432,7 @@ t; | 211: box.error.WRONG_QUERY_ID | 212: box.error.SEQUENCE_NOT_STARTED | 213: box.error.NO_SUCH_SESSION_SETTING + | 214: box.error.UNCOMMITTED_FOREIGN_SYNC_TXNS | ... test_run:cmd("setopt delimiter ''");