Skip to content

Commit

Permalink
applier: process synchro requests without txn engine
Browse files Browse the repository at this point in the history
Transaction processing code is very heavy simply because
transactions are carrying various data and involves a number
of other mechanisms to proceed.

In turn, when we receive confirm or rollback packed from
another node in a cluster we just need to inspect limbo
queue and write this packed into a WAL journal. So calling
a bunch of txn engine helpers is simply waste of cycles.

Thus lets rather handle them in a special light way:

 - allocate synchro_entry structure which would carry
   the journal entry itself and encoded message
 - process limbo queue to mark confirmed/rollback'ed
   messages
 - finally write this synchro_entry into a journal

Which is a way simplier.

Part-of #5129

Suggedsted-by: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
Co-developed-by: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
  • Loading branch information
cyrillos authored and Gerold103 committed Aug 24, 2020
1 parent 41b31ff commit cfccfd4
Showing 1 changed file with 148 additions and 52 deletions.
200 changes: 148 additions & 52 deletions src/box/applier.cc
Expand Up @@ -51,8 +51,10 @@
#include "schema.h"
#include "txn.h"
#include "box.h"
#include "xrow.h"
#include "scoped_guard.h"
#include "txn_limbo.h"
#include "journal.h"

STRS(applier_state, applier_STATE);

Expand Down Expand Up @@ -268,45 +270,11 @@ process_nop(struct request *request)
return txn_commit_stmt(txn, request);
}

/*
* CONFIRM/ROLLBACK rows aren't dml requests and require special
* handling: instead of performing some operations on spaces,
* processing these requests requires txn_limbo to either confirm
* or rollback some of its entries.
*/
static int
process_synchro_row(struct request *request)
{
assert(iproto_type_is_synchro_request(request->header->type));
struct txn *txn = in_txn();

struct synchro_request syn_req;
if (xrow_decode_synchro(request->header, &syn_req) != 0)
return -1;
assert(txn->n_applier_rows == 0);
/*
* This is not really a transaction. It just uses txn API
* to put the data into WAL. And obviously it should not
* go to the limbo and block on the very same sync
* transaction which it tries to confirm now.
*/
txn_set_flag(txn, TXN_FORCE_ASYNC);

if (txn_begin_stmt(txn, NULL) != 0)
return -1;
if (txn_commit_stmt(txn, request) != 0)
return -1;
return txn_limbo_process(&txn_limbo, &syn_req);
}

static int
apply_row(struct xrow_header *row)
{
struct request request;
if (iproto_type_is_synchro_request(row->type)) {
request.header = row;
return process_synchro_row(&request);
}
assert(!iproto_type_is_synchro_request(row->type));
if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
return -1;
if (request.type == IPROTO_NOP)
Expand Down Expand Up @@ -753,19 +721,9 @@ applier_read_tx(struct applier *applier, struct stailq *rows)
next)->row.is_commit);
}

static int
applier_txn_rollback_cb(struct trigger *trigger, void *event)
static void
applier_rollback_by_wal_io(void)
{
(void) trigger;
struct txn *txn = (struct txn *) event;
/*
* Synchronous transaction rollback due to receiving a
* ROLLBACK entry is a normal event and requires no
* special handling.
*/
if (txn->signature == TXN_SIGNATURE_SYNC_ROLLBACK)
return 0;

/*
* Setup shared applier diagnostic area.
*
Expand All @@ -774,9 +732,9 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event)
* diag use per-applier diag instead all the time
* (which actually already present in the structure).
*
* But remember that transactions are asynchronous
* and rollback may happen a way latter after it
* passed to the journal engine.
* But remember that WAL writes are asynchronous and
* rollback may happen a way later after it was passed to
* the journal engine.
*/
diag_set(ClientError, ER_WAL_IO);
diag_set_error(&replicaset.applier.diag,
Expand All @@ -787,6 +745,20 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event)

/* Rollback applier vclock to the committed one. */
vclock_copy(&replicaset.applier.vclock, &replicaset.vclock);
}

static int
applier_txn_rollback_cb(struct trigger *trigger, void *event)
{
(void) trigger;
struct txn *txn = (struct txn *) event;
/*
* Synchronous transaction rollback due to receiving a
* ROLLBACK entry is a normal event and requires no
* special handling.
*/
if (txn->signature != TXN_SIGNATURE_SYNC_ROLLBACK)
applier_rollback_by_wal_io();
return 0;
}

Expand All @@ -800,6 +772,110 @@ applier_txn_wal_write_cb(struct trigger *trigger, void *event)
return 0;
}

struct synchro_entry {
/** Encoded form of a synchro record. */
struct synchro_body_bin body_bin;

/** xrow to write, used by the journal engine. */
struct xrow_header row;

/**
* The journal entry itself. Note since
* it has unsized array it must be the
* last entry in the structure.
*/
struct journal_entry journal_entry;
};

static void
synchro_entry_delete(struct synchro_entry *entry)
{
free(entry);
}

/**
* Async write journal completion.
*/
static void
apply_synchro_row_cb(struct journal_entry *entry)
{
assert(entry->complete_data != NULL);
struct synchro_entry *synchro_entry =
(struct synchro_entry *)entry->complete_data;
if (entry->res < 0)
applier_rollback_by_wal_io();
else
trigger_run(&replicaset.applier.on_wal_write, NULL);

synchro_entry_delete(synchro_entry);
}

/**
* Allocate a new synchro_entry to be passed to
* the journal engine in async write way.
*/
static struct synchro_entry *
synchro_entry_new(struct xrow_header *applier_row,
struct synchro_request *req)
{
struct synchro_entry *entry;
size_t size = sizeof(*entry) + sizeof(struct xrow_header *);

/*
* For simplicity we use malloc here but
* probably should provide some cache similar
* to txn cache.
*/
entry = (struct synchro_entry *)malloc(size);
if (entry == NULL) {
diag_set(OutOfMemory, size, "malloc", "synchro_entry");
return NULL;
}

struct journal_entry *journal_entry = &entry->journal_entry;
struct synchro_body_bin *body_bin = &entry->body_bin;
struct xrow_header *row = &entry->row;

journal_entry->rows[0] = row;

xrow_encode_synchro(row, body_bin, req);

row->lsn = applier_row->lsn;
row->replica_id = applier_row->replica_id;

journal_entry_create(journal_entry, 1, xrow_approx_len(row),
apply_synchro_row_cb, entry);
return entry;
}

/** Process a synchro request. */
static int
apply_synchro_row(struct xrow_header *row)
{
assert(iproto_type_is_synchro_request(row->type));

struct synchro_request req;
if (xrow_decode_synchro(row, &req) != 0)
goto err;

if (txn_limbo_process(&txn_limbo, &req))
goto err;

struct synchro_entry *entry;
entry = synchro_entry_new(row, &req);
if (entry == NULL)
goto err;

if (journal_write_async(&entry->journal_entry) != 0) {
diag_set(ClientError, ER_WAL_IO);
goto err;
}
return 0;
err:
diag_log();
return -1;
}

/**
* Apply all rows in the rows queue as a single transaction.
*
Expand Down Expand Up @@ -847,13 +923,26 @@ applier_apply_tx(struct stailq *rows)
}
}

if (unlikely(iproto_type_is_synchro_request(first_row->type))) {
/*
* Synchro messages are not transactions, in terms
* of DML. Always sent and written isolated from
* each other.
*/
assert(first_row == last_row);
if (apply_synchro_row(first_row) != 0)
diag_raise();
goto success;
}

/**
* Explicitly begin the transaction so that we can
* control fiber->gc life cycle and, in case of apply
* conflict safely access failed xrow object and allocate
* IPROTO_NOP on gc.
*/
struct txn *txn = txn_begin();
struct txn *txn;
txn = txn_begin();
struct applier_tx_row *item;
if (txn == NULL) {
latch_unlock(latch);
Expand Down Expand Up @@ -922,6 +1011,7 @@ applier_apply_tx(struct stailq *rows)
if (txn_commit_async(txn) < 0)
goto fail;

success:
/*
* The transaction was sent to journal so promote vclock.
*
Expand Down Expand Up @@ -1089,7 +1179,13 @@ applier_subscribe(struct applier *applier)

applier->lag = TIMEOUT_INFINITY;

/* Register triggers to handle WAL writes and rollbacks. */
/*
* Register triggers to handle WAL writes and rollbacks.
*
* Note we use them for syncronous packets handling as well
* thus when changing make sure that synchro handling won't
* be broken.
*/
struct trigger on_wal_write;
trigger_create(&on_wal_write, applier_on_wal_write, applier, NULL);
trigger_add(&replicaset.applier.on_wal_write, &on_wal_write);
Expand Down

0 comments on commit cfccfd4

Please sign in to comment.