diff --git a/src/box/applier.cc b/src/box/applier.cc index 91135342572c..78f3d8a73e05 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -866,8 +866,13 @@ applier_subscribe(struct applier *applier) struct vclock vclock; vclock_create(&vclock); vclock_copy(&vclock, &replicaset.vclock); + /* + * Stop accepting local rows coming from a remote + * instance as soon as local WAL starts accepting writes. + */ + uint32_t id_filter = box_is_orphan() ? 0 : 1 << instance_id; xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID, - &vclock, replication_anon, 0); + &vclock, replication_anon, id_filter); coio_write_xrow(coio, &row); /* Read SUBSCRIBE response */ diff --git a/src/box/wal.c b/src/box/wal.c index b08d62b2938a..e5b9fbdce439 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -1114,6 +1114,7 @@ wal_write_to_disk(struct cmsg *msg) } fiber_gc(); wal_notify_watchers(writer, WAL_EVENT_WRITE); + ERROR_INJECT_SLEEP(ERRINJ_RELAY_FASTER_THAN_TX); } /** WAL writer main loop. */ @@ -1325,6 +1326,8 @@ wal_watcher_notify(struct wal_watcher *watcher, unsigned events) msg->events = events; cmsg_init(&msg->cmsg, watcher->route); cpipe_push(&watcher->watcher_pipe, &msg->cmsg); + ERROR_INJECT(ERRINJ_RELAY_FASTER_THAN_TX, + cpipe_deliver_now(&watcher->watcher_pipe)); } static void diff --git a/src/lib/core/cbus.h b/src/lib/core/cbus.h index 16d122779176..f0101cb8b6b1 100644 --- a/src/lib/core/cbus.h +++ b/src/lib/core/cbus.h @@ -176,6 +176,13 @@ cpipe_set_max_input(struct cpipe *pipe, int max_input) pipe->max_input = max_input; } +static inline void +cpipe_deliver_now(struct cpipe *pipe) +{ + if (pipe->n_input > 0) + ev_invoke(pipe->producer, &pipe->flush_input, EV_CUSTOM); +} + /** * Flush all staged messages into the pipe and eventually to the * consumer. diff --git a/src/lib/core/errinj.h b/src/lib/core/errinj.h index ed0cba90342b..d8cdf3f27b08 100644 --- a/src/lib/core/errinj.h +++ b/src/lib/core/errinj.h @@ -136,7 +136,8 @@ struct errinj { _(ERRINJ_SWIM_FD_ONLY, ERRINJ_BOOL, {.bparam = false}) \ _(ERRINJ_DYN_MODULE_COUNT, ERRINJ_INT, {.iparam = 0}) \ _(ERRINJ_FIBER_MADVISE, ERRINJ_BOOL, {.bparam = false}) \ - _(ERRINJ_FIBER_MPROTECT, ERRINJ_INT, {.iparam = -1}) + _(ERRINJ_FIBER_MPROTECT, ERRINJ_INT, {.iparam = -1}) \ + _(ERRINJ_RELAY_FASTER_THAN_TX, ERRINJ_BOOL, {.bparam = false}) \ ENUM0(errinj_id, ERRINJ_LIST); extern struct errinj errinjs[]; diff --git a/test/box/errinj.result b/test/box/errinj.result index bf1dd6fa1112..4569f0219916 100644 --- a/test/box/errinj.result +++ b/test/box/errinj.result @@ -57,6 +57,7 @@ evals - ERRINJ_PORT_DUMP: false - ERRINJ_RELAY_BREAK_LSN: -1 - ERRINJ_RELAY_EXIT_DELAY: 0 + - ERRINJ_RELAY_FASTER_THAN_TX: false - ERRINJ_RELAY_FINAL_JOIN: false - ERRINJ_RELAY_FINAL_SLEEP: false - ERRINJ_RELAY_REPORT_INTERVAL: 0 diff --git a/test/replication/gh-4739-vclock-assert.result b/test/replication/gh-4739-vclock-assert.result new file mode 100644 index 000000000000..83896c4e16e9 --- /dev/null +++ b/test/replication/gh-4739-vclock-assert.result @@ -0,0 +1,88 @@ +-- test-run result file version 2 +env = require('test_run') + | --- + | ... +test_run = env.new() + | --- + | ... + +SERVERS = {'rebootstrap1', 'rebootstrap2'} + | --- + | ... +test_run:create_cluster(SERVERS, "replication") + | --- + | ... +test_run:wait_fullmesh(SERVERS) + | --- + | ... + +test_run:cmd('switch rebootstrap1') + | --- + | - true + | ... +fiber = require('fiber') + | --- + | ... +-- Stop updating replicaset vclock to simulate a situation, when +-- a row is already relayed to the remote master, but the local +-- vclock update hasn't happened yet. +box.error.injection.set('ERRINJ_RELAY_FASTER_THAN_TX', true) + | --- + | - ok + | ... +lsn = box.info.lsn + | --- + | ... +f = fiber.create(function() box.space._schema:replace{'something'} end) + | --- + | ... +f:status() + | --- + | - suspended + | ... +-- Vclock isn't updated. +box.info.lsn == lsn + | --- + | - true + | ... + +-- Wait until the remote instance gets the row. +test_run:wait_cond(function()\ + return test_run:get_vclock('rebootstrap2')[box.info.id] > lsn\ +end, 10) + | --- + | - true + | ... + +-- Restart the remote instance. This will make the first instance +-- resubscribe without entering orphan mode. +test_run:cmd('restart server rebootstrap2 with wait=False') + | --- + | - true + | ... +test_run:cmd('switch rebootstrap1') + | --- + | - true + | ... +-- Wait until resubscribe is sent +test_run:wait_cond(function()\ + return box.info.replication[2].upstream.status == 'sync'\ +end, 10) + | --- + | - true + | ... +box.error.injection.set('ERRINJ_RELAY_FASTER_THAN_TX', false) + | --- + | - ok + | ... +box.space._schema:get{'something'} + | --- + | - ['something'] + | ... +test_run:cmd('switch default') + | --- + | - true + | ... +test_run:drop_cluster(SERVERS) + | --- + | ... diff --git a/test/replication/gh-4739-vclock-assert.test.lua b/test/replication/gh-4739-vclock-assert.test.lua new file mode 100644 index 000000000000..5755ad75285a --- /dev/null +++ b/test/replication/gh-4739-vclock-assert.test.lua @@ -0,0 +1,36 @@ +env = require('test_run') +test_run = env.new() + +SERVERS = {'rebootstrap1', 'rebootstrap2'} +test_run:create_cluster(SERVERS, "replication") +test_run:wait_fullmesh(SERVERS) + +test_run:cmd('switch rebootstrap1') +fiber = require('fiber') +-- Stop updating replicaset vclock to simulate a situation, when +-- a row is already relayed to the remote master, but the local +-- vclock update hasn't happened yet. +box.error.injection.set('ERRINJ_RELAY_FASTER_THAN_TX', true) +lsn = box.info.lsn +f = fiber.create(function() box.space._schema:replace{'something'} end) +f:status() +-- Vclock isn't updated. +box.info.lsn == lsn + +-- Wait until the remote instance gets the row. +test_run:wait_cond(function()\ + return test_run:get_vclock('rebootstrap2')[box.info.id] > lsn\ +end, 10) + +-- Restart the remote instance. This will make the first instance +-- resubscribe without entering orphan mode. +test_run:cmd('restart server rebootstrap2 with wait=False') +test_run:cmd('switch rebootstrap1') +-- Wait until resubscribe is sent +test_run:wait_cond(function()\ + return box.info.replication[2].upstream.status == 'sync'\ +end, 10) +box.error.injection.set('ERRINJ_RELAY_FASTER_THAN_TX', false) +box.space._schema:get{'something'} +test_run:cmd('switch default') +test_run:drop_cluster(SERVERS) diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg index 429c64df31eb..90fd53ca6283 100644 --- a/test/replication/suite.cfg +++ b/test/replication/suite.cfg @@ -15,6 +15,7 @@ "gh-4402-info-errno.test.lua": {}, "gh-4605-empty-password.test.lua": {}, "gh-4606-admin-creds.test.lua": {}, + "gh-4739-vclock-assert.test.lua": {}, "*": { "memtx": {"engine": "memtx"}, "vinyl": {"engine": "vinyl"} diff --git a/test/replication/suite.ini b/test/replication/suite.ini index ed1de31405e5..b4e09744a654 100644 --- a/test/replication/suite.ini +++ b/test/replication/suite.ini @@ -3,7 +3,7 @@ core = tarantool script = master.lua description = tarantool/box, replication disabled = consistent.test.lua -release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua long_row_timeout.test.lua +release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua long_row_timeout.test.lua gh-4739-vclock-assert.test.lua config = suite.cfg lua_libs = lua/fast_replica.lua lua/rlimit.lua use_unix_sockets = True