Skip to content

Commit

Permalink
replication: do not relay rows coming from a remote instance back to it
Browse files Browse the repository at this point in the history
We have a mechanism for restoring rows originating from an instance that
suffered a sudden power loss: remote masters resend the isntance's rows
received before a certain point in time, defined by remote master vclock
at the moment of subscribe.
However, this is useful only on initial replication configuraiton, when
an instance has just recovered, so that it can receive what it has
relayed but haven't synced to disk.
In other cases, when an instance is operating normally and master-master
replication is configured, the mechanism described above may lead to
instance re-applying instance's own rows, coming from a master it has just
subscribed to.
To fix the problem do not relay rows coming from a remote instance, if
the instance has already recovered.

Closes #4739
  • Loading branch information
sergepetrenko authored and Gerold103 committed Feb 28, 2020
1 parent 73f8d88 commit 1e2158d
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 3 deletions.
7 changes: 6 additions & 1 deletion src/box/applier.cc
Expand Up @@ -866,8 +866,13 @@ applier_subscribe(struct applier *applier)
struct vclock vclock;
vclock_create(&vclock);
vclock_copy(&vclock, &replicaset.vclock);
/*
* Stop accepting local rows coming from a remote
* instance as soon as local WAL starts accepting writes.
*/
uint32_t id_filter = box_is_orphan() ? 0 : 1 << instance_id;
xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
&vclock, replication_anon, 0);
&vclock, replication_anon, id_filter);
coio_write_xrow(coio, &row);

/* Read SUBSCRIBE response */
Expand Down
3 changes: 3 additions & 0 deletions src/box/wal.c
Expand Up @@ -1114,6 +1114,7 @@ wal_write_to_disk(struct cmsg *msg)
}
fiber_gc();
wal_notify_watchers(writer, WAL_EVENT_WRITE);
ERROR_INJECT_SLEEP(ERRINJ_RELAY_FASTER_THAN_TX);
}

/** WAL writer main loop. */
Expand Down Expand Up @@ -1325,6 +1326,8 @@ wal_watcher_notify(struct wal_watcher *watcher, unsigned events)
msg->events = events;
cmsg_init(&msg->cmsg, watcher->route);
cpipe_push(&watcher->watcher_pipe, &msg->cmsg);
ERROR_INJECT(ERRINJ_RELAY_FASTER_THAN_TX,
cpipe_deliver_now(&watcher->watcher_pipe));
}

static void
Expand Down
7 changes: 7 additions & 0 deletions src/lib/core/cbus.h
Expand Up @@ -176,6 +176,13 @@ cpipe_set_max_input(struct cpipe *pipe, int max_input)
pipe->max_input = max_input;
}

static inline void
cpipe_deliver_now(struct cpipe *pipe)
{
if (pipe->n_input > 0)
ev_invoke(pipe->producer, &pipe->flush_input, EV_CUSTOM);
}

/**
* Flush all staged messages into the pipe and eventually to the
* consumer.
Expand Down
3 changes: 2 additions & 1 deletion src/lib/core/errinj.h
Expand Up @@ -136,7 +136,8 @@ struct errinj {
_(ERRINJ_SWIM_FD_ONLY, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_DYN_MODULE_COUNT, ERRINJ_INT, {.iparam = 0}) \
_(ERRINJ_FIBER_MADVISE, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_FIBER_MPROTECT, ERRINJ_INT, {.iparam = -1})
_(ERRINJ_FIBER_MPROTECT, ERRINJ_INT, {.iparam = -1}) \
_(ERRINJ_RELAY_FASTER_THAN_TX, ERRINJ_BOOL, {.bparam = false}) \

ENUM0(errinj_id, ERRINJ_LIST);
extern struct errinj errinjs[];
Expand Down
1 change: 1 addition & 0 deletions test/box/errinj.result
Expand Up @@ -59,6 +59,7 @@ evals
- ERRINJ_PORT_DUMP: false
- ERRINJ_RELAY_BREAK_LSN: -1
- ERRINJ_RELAY_EXIT_DELAY: 0
- ERRINJ_RELAY_FASTER_THAN_TX: false
- ERRINJ_RELAY_FINAL_JOIN: false
- ERRINJ_RELAY_FINAL_SLEEP: false
- ERRINJ_RELAY_REPORT_INTERVAL: 0
Expand Down
88 changes: 88 additions & 0 deletions test/replication/gh-4739-vclock-assert.result
@@ -0,0 +1,88 @@
-- test-run result file version 2
env = require('test_run')
| ---
| ...
test_run = env.new()
| ---
| ...

SERVERS = {'rebootstrap1', 'rebootstrap2'}
| ---
| ...
test_run:create_cluster(SERVERS, "replication")
| ---
| ...
test_run:wait_fullmesh(SERVERS)
| ---
| ...

test_run:cmd('switch rebootstrap1')
| ---
| - true
| ...
fiber = require('fiber')
| ---
| ...
-- Stop updating replicaset vclock to simulate a situation, when
-- a row is already relayed to the remote master, but the local
-- vclock update hasn't happened yet.
box.error.injection.set('ERRINJ_RELAY_FASTER_THAN_TX', true)
| ---
| - ok
| ...
lsn = box.info.lsn
| ---
| ...
f = fiber.create(function() box.space._schema:replace{'something'} end)
| ---
| ...
f:status()
| ---
| - suspended
| ...
-- Vclock isn't updated.
box.info.lsn == lsn
| ---
| - true
| ...

-- Wait until the remote instance gets the row.
test_run:wait_cond(function()\
return test_run:get_vclock('rebootstrap2')[box.info.id] > lsn\
end, 10)
| ---
| - true
| ...

-- Restart the remote instance. This will make the first instance
-- resubscribe without entering orphan mode.
test_run:cmd('restart server rebootstrap2 with wait=False')
| ---
| - true
| ...
test_run:cmd('switch rebootstrap1')
| ---
| - true
| ...
-- Wait until resubscribe is sent
test_run:wait_cond(function()\
return box.info.replication[2].upstream.status == 'sync'\
end, 10)
| ---
| - true
| ...
box.error.injection.set('ERRINJ_RELAY_FASTER_THAN_TX', false)
| ---
| - ok
| ...
box.space._schema:get{'something'}
| ---
| - ['something']
| ...
test_run:cmd('switch default')
| ---
| - true
| ...
test_run:drop_cluster(SERVERS)
| ---
| ...
36 changes: 36 additions & 0 deletions test/replication/gh-4739-vclock-assert.test.lua
@@ -0,0 +1,36 @@
env = require('test_run')
test_run = env.new()

SERVERS = {'rebootstrap1', 'rebootstrap2'}
test_run:create_cluster(SERVERS, "replication")
test_run:wait_fullmesh(SERVERS)

test_run:cmd('switch rebootstrap1')
fiber = require('fiber')
-- Stop updating replicaset vclock to simulate a situation, when
-- a row is already relayed to the remote master, but the local
-- vclock update hasn't happened yet.
box.error.injection.set('ERRINJ_RELAY_FASTER_THAN_TX', true)
lsn = box.info.lsn
f = fiber.create(function() box.space._schema:replace{'something'} end)
f:status()
-- Vclock isn't updated.
box.info.lsn == lsn

-- Wait until the remote instance gets the row.
test_run:wait_cond(function()\
return test_run:get_vclock('rebootstrap2')[box.info.id] > lsn\
end, 10)

-- Restart the remote instance. This will make the first instance
-- resubscribe without entering orphan mode.
test_run:cmd('restart server rebootstrap2 with wait=False')
test_run:cmd('switch rebootstrap1')
-- Wait until resubscribe is sent
test_run:wait_cond(function()\
return box.info.replication[2].upstream.status == 'sync'\
end, 10)
box.error.injection.set('ERRINJ_RELAY_FASTER_THAN_TX', false)
box.space._schema:get{'something'}
test_run:cmd('switch default')
test_run:drop_cluster(SERVERS)
1 change: 1 addition & 0 deletions test/replication/suite.cfg
Expand Up @@ -15,6 +15,7 @@
"gh-4402-info-errno.test.lua": {},
"gh-4605-empty-password.test.lua": {},
"gh-4606-admin-creds.test.lua": {},
"gh-4739-vclock-assert.test.lua": {},
"*": {
"memtx": {"engine": "memtx"},
"vinyl": {"engine": "vinyl"}
Expand Down
2 changes: 1 addition & 1 deletion test/replication/suite.ini
Expand Up @@ -3,7 +3,7 @@ core = tarantool
script = master.lua
description = tarantool/box, replication
disabled = consistent.test.lua
release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua long_row_timeout.test.lua
release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua long_row_timeout.test.lua gh-4739-vclock-assert.test.lua
config = suite.cfg
lua_libs = lua/fast_replica.lua lua/rlimit.lua
use_unix_sockets = True
Expand Down

0 comments on commit 1e2158d

Please sign in to comment.