Skip to content

Commit

Permalink
applier: apply transaction in parallel
Browse files Browse the repository at this point in the history
Applier use asynchronous transaction to batch journal writes. All
appliers share the replicaset.applier.tx_vclock which means the vclock
applied but not necessarily written to a journal. Appliers use a trigger
to coordinate in case of failure - when a transaction is going to
be rolled back.

Closes: #1254
  • Loading branch information
Georgy Kirichenko authored and locker committed Jun 25, 2019
1 parent 18ea440 commit 8c84932
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 44 deletions.
201 changes: 157 additions & 44 deletions src/box/applier.cc
Expand Up @@ -50,6 +50,7 @@
#include "schema.h"
#include "txn.h"
#include "box.h"
#include "scoped_guard.h"

STRS(applier_state, applier_STATE);

Expand Down Expand Up @@ -108,6 +109,27 @@ applier_log_error(struct applier *applier, struct error *e)
applier->last_logged_errcode = errcode;
}

/**
* A helper function which switches the applier to FOLLOW state
* if it has synchronized with its master.
*/
static inline void
applier_check_sync(struct applier *applier)
{
/*
* Stay 'orphan' until appliers catch up with
* the remote vclock at the time of SUBSCRIBE
* and the lag is less than configured.
*/
if (applier->state == APPLIER_SYNC &&
applier->lag <= replication_sync_lag &&
vclock_compare(&applier->remote_vclock_at_subscribe,
&replicaset.vclock) <= 0) {
/* Applier is synced, switch to "follow". */
applier_set_state(applier, APPLIER_FOLLOW);
}
}

/*
* Fiber function to write vclock to replication master.
* To track connection status, replica answers master
Expand Down Expand Up @@ -135,6 +157,14 @@ applier_writer_f(va_list ap)
else
fiber_cond_wait_timeout(&applier->writer_cond,
replication_timeout);
/*
* A writer fiber is going to be awaken after a commit or
* a heartbeat message. So this is an appropriate place to
* update an applier status because the applier state could
* yield and doesn't fit into a commit trigger.
*/
applier_check_sync(applier);

/* Send ACKs only when in FOLLOW mode ,*/
if (applier->state != APPLIER_SYNC &&
applier->state != APPLIER_FOLLOW)
Expand Down Expand Up @@ -574,6 +604,27 @@ applier_read_tx(struct applier *applier, struct stailq *rows)
next)->row.is_commit);
}

static void
applier_txn_rollback_cb(struct trigger *trigger, void *event)
{
(void) trigger;
/* Setup shared applier diagnostic area. */
diag_set(ClientError, ER_WAL_IO);
diag_move(&fiber()->diag, &replicaset.applier.diag);
/* Broadcast the rollback event across all appliers. */
trigger_run(&replicaset.applier.on_rollback, event);
/* Rollback applier vclock to the committed one. */
vclock_copy(&replicaset.applier.vclock, &replicaset.vclock);
}

static void
applier_txn_commit_cb(struct trigger *trigger, void *event)
{
(void) trigger;
/* Broadcast the commit event across all appliers. */
trigger_run(&replicaset.applier.on_commit, event);
}

/**
* Apply all rows in the rows queue as a single transaction.
*
Expand All @@ -582,6 +633,24 @@ applier_read_tx(struct applier *applier, struct stailq *rows)
static int
applier_apply_tx(struct stailq *rows)
{
struct xrow_header *first_row = &stailq_first_entry(rows,
struct applier_tx_row, next)->row;
struct replica *replica = replica_by_id(first_row->replica_id);
/*
* In a full mesh topology, the same set of changes
* may arrive via two concurrently running appliers.
* Hence we need a latch to strictly order all changes
* that belong to the same server id.
*/
struct latch *latch = (replica ? &replica->order_latch :
&replicaset.applier.order_latch);
latch_lock(latch);
if (vclock_get(&replicaset.applier.vclock,
first_row->replica_id) >= first_row->lsn) {
latch_unlock(latch);
return 0;
}

/**
* Explicitly begin the transaction so that we can
* control fiber->gc life cycle and, in case of apply
Expand All @@ -590,8 +659,10 @@ applier_apply_tx(struct stailq *rows)
*/
struct txn *txn = txn_begin();
struct applier_tx_row *item;
if (txn == NULL)
diag_raise();
if (txn == NULL) {
latch_unlock(latch);
return -1;
}
stailq_foreach_entry(item, rows, next) {
struct xrow_header *row = &item->row;
int res = apply_row(row);
Expand Down Expand Up @@ -632,14 +703,66 @@ applier_apply_tx(struct stailq *rows)
"Replication", "distributed transactions");
goto rollback;
}
return txn_commit(txn);

/* We are ready to submit txn to wal. */
struct trigger *on_rollback, *on_commit;
on_rollback = (struct trigger *)region_alloc(&txn->region,
sizeof(struct trigger));
on_commit = (struct trigger *)region_alloc(&txn->region,
sizeof(struct trigger));
if (on_rollback == NULL || on_commit == NULL)
goto rollback;

trigger_create(on_rollback, applier_txn_rollback_cb, NULL, NULL);
txn_on_rollback(txn, on_rollback);

trigger_create(on_commit, applier_txn_commit_cb, NULL, NULL);
txn_on_commit(txn, on_commit);

if (txn_write(txn) < 0)
goto fail;

/* Transaction was sent to journal so promote vclock. */
vclock_follow(&replicaset.applier.vclock,
first_row->replica_id, first_row->lsn);
latch_unlock(latch);
return 0;
rollback:
txn_rollback(txn);
fail:
latch_unlock(latch);
fiber_gc();
return -1;
}

/*
* A trigger to update an applier state after a replication commit.
*/
static void
applier_on_commit(struct trigger *trigger, void *event)
{
(void) event;
struct applier *applier = (struct applier *)trigger->data;
fiber_cond_signal(&applier->writer_cond);
}

/*
* A trigger to update an applier state after a replication rollback.
*/
static void
applier_on_rollback(struct trigger *trigger, void *event)
{
(void) event;
struct applier *applier = (struct applier *)trigger->data;
/* Setup a shared error. */
if (!diag_is_empty(&replicaset.applier.diag)) {
diag_add_error(&applier->diag,
diag_last_error(&replicaset.applier.diag));
}
/* Stop the applier fiber. */
fiber_cancel(applier->reader);
}

/**
* Execute and process SUBSCRIBE request (follow updates from a master).
*/
Expand All @@ -650,7 +773,6 @@ applier_subscribe(struct applier *applier)
struct ev_io *coio = &applier->io;
struct ibuf *ibuf = &applier->ibuf;
struct xrow_header row;
struct vclock remote_vclock_at_subscribe;
struct tt_uuid cluster_id = uuid_nil;

struct vclock vclock;
Expand All @@ -677,10 +799,9 @@ applier_subscribe(struct applier *applier)
* the replica, and replica has to check whether
* its and master's cluster ids match.
*/
vclock_create(&remote_vclock_at_subscribe);
xrow_decode_subscribe_response_xc(&row,
&cluster_id,
&remote_vclock_at_subscribe);
vclock_create(&applier->remote_vclock_at_subscribe);
xrow_decode_subscribe_response_xc(&row, &cluster_id,
&applier->remote_vclock_at_subscribe);
/*
* If master didn't send us its cluster id
* assume that it has done all the checks.
Expand All @@ -695,7 +816,7 @@ applier_subscribe(struct applier *applier)

say_info("subscribed");
say_info("remote vclock %s local vclock %s",
vclock_to_string(&remote_vclock_at_subscribe),
vclock_to_string(&applier->remote_vclock_at_subscribe),
vclock_to_string(&vclock));
}
/*
Expand Down Expand Up @@ -744,6 +865,20 @@ applier_subscribe(struct applier *applier)

applier->lag = TIMEOUT_INFINITY;

/* Register triggers to handle replication commits and rollbacks. */
struct trigger on_commit;
trigger_create(&on_commit, applier_on_commit, applier, NULL);
trigger_add(&replicaset.applier.on_commit, &on_commit);

struct trigger on_rollback;
trigger_create(&on_rollback, applier_on_rollback, applier, NULL);
trigger_add(&replicaset.applier.on_rollback, &on_rollback);

auto trigger_guard = make_scoped_guard([&] {
trigger_clear(&on_commit);
trigger_clear(&on_rollback);
});

/*
* Process a stream of rows from the binary log.
*/
Expand All @@ -756,47 +891,20 @@ applier_subscribe(struct applier *applier)
applier_set_state(applier, APPLIER_FOLLOW);
}

/*
* Stay 'orphan' until appliers catch up with
* the remote vclock at the time of SUBSCRIBE
* and the lag is less than configured.
*/
if (applier->state == APPLIER_SYNC &&
applier->lag <= replication_sync_lag &&
vclock_compare(&remote_vclock_at_subscribe,
&replicaset.vclock) <= 0) {
/* Applier is synced, switch to "follow". */
applier_set_state(applier, APPLIER_FOLLOW);
}

struct stailq rows;
applier_read_tx(applier, &rows);

struct xrow_header *first_row =
&stailq_first_entry(&rows, struct applier_tx_row,
next)->row;
applier->last_row_time = ev_monotonic_now(loop());
struct replica *replica = replica_by_id(first_row->replica_id);
struct latch *latch = (replica ? &replica->order_latch :
&replicaset.applier.order_latch);
/*
* In a full mesh topology, the same set of changes
* may arrive via two concurrently running appliers.
* Hence we need a latch to strictly order all changes
* that belong to the same server id.
* In case of an heartbeat message wake a writer up
* and check applier state.
*/
latch_lock(latch);
if (vclock_get(&replicaset.vclock, first_row->replica_id) <
first_row->lsn &&
applier_apply_tx(&rows) != 0) {
latch_unlock(latch);
if (stailq_first_entry(&rows, struct applier_tx_row,
next)->row.lsn == 0)
fiber_cond_signal(&applier->writer_cond);
else if (applier_apply_tx(&rows) != 0)
diag_raise();
}
latch_unlock(latch);

if (applier->state == APPLIER_SYNC ||
applier->state == APPLIER_FOLLOW)
fiber_cond_signal(&applier->writer_cond);
if (ibuf_used(ibuf) == 0)
ibuf_reset(ibuf);
fiber_gc();
Expand Down Expand Up @@ -881,6 +989,11 @@ applier_f(va_list ap)
return -1;
}
} catch (FiberIsCancelled *e) {
if (!diag_is_empty(&applier->diag)) {
diag_move(&applier->diag, &fiber()->diag);
applier_disconnect(applier, APPLIER_STOPPED);
break;
}
applier_disconnect(applier, APPLIER_OFF);
break;
} catch (SocketError *e) {
Expand Down Expand Up @@ -969,6 +1082,7 @@ applier_new(const char *uri)
rlist_create(&applier->on_state);
fiber_cond_create(&applier->resume_cond);
fiber_cond_create(&applier->writer_cond);
diag_create(&applier->diag);

return applier;
}
Expand All @@ -980,8 +1094,7 @@ applier_delete(struct applier *applier)
ibuf_destroy(&applier->ibuf);
assert(applier->io.fd == -1);
trigger_destroy(&applier->on_state);
fiber_cond_destroy(&applier->resume_cond);
fiber_cond_destroy(&applier->writer_cond);
diag_destroy(&applier->diag);
free(applier);
}

Expand Down
4 changes: 4 additions & 0 deletions src/box/applier.h
Expand Up @@ -114,6 +114,10 @@ struct applier {
bool is_paused;
/** Condition variable signaled to resume the applier. */
struct fiber_cond resume_cond;
/* Diag to raise an error. */
struct diag diag;
/* Master's vclock at the time of SUBSCRIBE. */
struct vclock remote_vclock_at_subscribe;
};

/**
Expand Down
8 changes: 8 additions & 0 deletions src/box/replication.cc
Expand Up @@ -90,6 +90,13 @@ replication_init(void)
fiber_cond_create(&replicaset.applier.cond);
replicaset.replica_by_id = (struct replica **)calloc(VCLOCK_MAX, sizeof(struct replica *));
latch_create(&replicaset.applier.order_latch);

vclock_create(&replicaset.applier.vclock);
vclock_copy(&replicaset.applier.vclock, &replicaset.vclock);
rlist_create(&replicaset.applier.on_rollback);
rlist_create(&replicaset.applier.on_commit);

diag_create(&replicaset.applier.diag);
}

void
Expand All @@ -103,6 +110,7 @@ replication_free(void)
replicaset_foreach(replica)
relay_cancel(replica->relay);

diag_destroy(&replicaset.applier.diag);
free(replicaset.replica_by_id);
}

Expand Down
11 changes: 11 additions & 0 deletions src/box/replication.h
Expand Up @@ -232,6 +232,17 @@ struct replicaset {
* struct replica object).
*/
struct latch order_latch;
/*
* A vclock of the last transaction wich was read
* by applier and processed by tx.
*/
struct vclock vclock;
/* Trigger to fire when a replication request failed to apply. */
struct rlist on_rollback;
/* Trigget to fire a replication request commited to a wal. */
struct rlist on_commit;
/* Shared applier diagnostic area. */
struct diag diag;
} applier;
/** Map of all known replica_id's to correspponding replica's. */
struct replica **replica_by_id;
Expand Down

0 comments on commit 8c84932

Please sign in to comment.