Skip to content

Commit

Permalink
replication: do not ignore replication_connect_quorum.
Browse files Browse the repository at this point in the history
On bootstrap and after replication reconfiguration
replication_connect_quorum was ignored. The instance tried to connect to
every replica listed in replication parameter, and errored if it wasn't
possible.
The patch alters this behaviour. The instance still tries to connect to
every node listed in replication, but does not raise an error if it was
able to connect to at least replication_connect_quorum instances.

Closes #3428

@TarantoolBot document
Title: replication_connect_quorum is not ignored.
Now on replica set bootstrap instance tries to connect
not to every replica, listed in box.cfg.replication, but
only to replication_connect_quorum replicas. If after
replication_connect_timeout seconds instance is not connected to
at least replication_connect_quorum other instances, we
throw an error.
  • Loading branch information
sergepetrenko committed Aug 9, 2018
1 parent f29466c commit 1c768e9
Show file tree
Hide file tree
Showing 19 changed files with 45 additions and 33 deletions.
10 changes: 5 additions & 5 deletions src/box/box.cc
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ cfg_get_replication(int *p_count)
* don't start appliers.
*/
static void
box_sync_replication(double timeout, bool connect_all)
box_sync_replication(bool connect_quorum)
{
int count = 0;
struct applier **appliers = cfg_get_replication(&count);
Expand All @@ -607,7 +607,7 @@ box_sync_replication(double timeout, bool connect_all)
applier_delete(appliers[i]); /* doesn't affect diag */
});

replicaset_connect(appliers, count, timeout, connect_all);
replicaset_connect(appliers, count, connect_quorum);

guard.is_active = false;
}
Expand All @@ -626,7 +626,7 @@ box_set_replication(void)

box_check_replication();
/* Try to connect to all replicas within the timeout period */
box_sync_replication(replication_connect_timeout, true);
box_sync_replication(true);
/* Follow replica */
replicaset_follow();
}
Expand Down Expand Up @@ -1866,7 +1866,7 @@ box_cfg_xc(void)
title("orphan");

/* Wait for the cluster to start up */
box_sync_replication(replication_connect_timeout, false);
box_sync_replication(false);
} else {
if (!tt_uuid_is_nil(&instance_uuid))
INSTANCE_UUID = instance_uuid;
Expand All @@ -1888,7 +1888,7 @@ box_cfg_xc(void)
* receive the same replica set UUID when a new cluster
* is deployed.
*/
box_sync_replication(TIMEOUT_INFINITY, true);
box_sync_replication(true);
/* Bootstrap a new master */
bootstrap(&replicaset_uuid, &is_bootstrap_leader);
}
Expand Down
13 changes: 7 additions & 6 deletions src/box/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ struct tt_uuid INSTANCE_UUID;
struct tt_uuid REPLICASET_UUID;

double replication_timeout = 1.0; /* seconds */
double replication_connect_timeout = 4.0; /* seconds */
double replication_connect_timeout = 30.0; /* seconds */
int replication_connect_quorum = REPLICATION_CONNECT_QUORUM_ALL;
double replication_sync_lag = 10.0; /* seconds */

Expand Down Expand Up @@ -540,7 +540,7 @@ applier_on_connect_f(struct trigger *trigger, void *event)

void
replicaset_connect(struct applier **appliers, int count,
double timeout, bool connect_all)
bool connect_quorum)
{
if (count == 0) {
/* Cleanup the replica set. */
Expand All @@ -557,7 +557,7 @@ replicaset_connect(struct applier **appliers, int count,
* - register a trigger in each applier to wake up our
* fiber via this channel when the remote peer becomes
* connected and a UUID is received;
* - wait up to CONNECT_TIMEOUT seconds for `count` messages;
* - wait up to REPLICATION_CONNECT_TIMEOUT seconds for `count` messages;
* - on timeout, raise a CFG error, cancel and destroy
* the freshly created appliers (done in a guard);
* - an success, unregister the trigger, check the UUID set
Expand All @@ -571,6 +571,8 @@ replicaset_connect(struct applier **appliers, int count,
state.connected = state.failed = 0;
fiber_cond_create(&state.wakeup);

double timeout = replication_connect_timeout;

/* Add triggers and start simulations connection to remote peers */
for (int i = 0; i < count; i++) {
struct applier *applier = appliers[i];
Expand All @@ -587,15 +589,14 @@ replicaset_connect(struct applier **appliers, int count,
double wait_start = ev_monotonic_now(loop());
if (fiber_cond_wait_timeout(&state.wakeup, timeout) != 0)
break;
if (state.failed > 0 && connect_all)
break;
timeout -= ev_monotonic_now(loop()) - wait_start;
}
if (state.connected < count) {
say_crit("failed to connect to %d out of %d replicas",
count - state.connected, count);
/* Timeout or connection failure. */
if (connect_all)
if (connect_quorum && state.connected <
MIN(count, replication_connect_quorum))
goto error;
} else {
say_verbose("connected to %d replicas", state.connected);
Expand Down
8 changes: 4 additions & 4 deletions src/box/replication.h
Original file line number Diff line number Diff line change
Expand Up @@ -356,13 +356,13 @@ replicaset_add(uint32_t replica_id, const struct tt_uuid *instance_uuid);
*
* \param appliers the array of appliers
* \param count size of appliers array
* \param timeout connection timeout
* \param connect_all if this flag is set, fail unless all
* appliers have successfully connected
* \param connect_quorum if this flag is set, fail unless at
* least replication_connect_quorum
* appliers have successfully connected.
*/
void
replicaset_connect(struct applier **appliers, int count,
double timeout, bool connect_all);
bool connect_quorum);

/**
* Resume all appliers registered with the replica set.
Expand Down
2 changes: 1 addition & 1 deletion test/replication-py/init_storage.test.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@

server.stop()
replica = TarantoolServer(server.ini)
replica.script = 'replication/replica.lua'
replica.script = 'replication-py/replica.lua'
replica.vardir = server.vardir #os.path.join(server.vardir, 'replica')
replica.rpl_master = master
replica.deploy(wait=False)
Expand Down
2 changes: 2 additions & 0 deletions test/replication-py/master.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ os = require('os')
box.cfg({
listen = os.getenv("LISTEN"),
memtx_memory = 107374182,
replication_connect_timeout = 1.0,
replication_timeout = 0.3
})

require('console').listen(os.getenv('ADMIN'))
2 changes: 2 additions & 0 deletions test/replication-py/replica.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ box.cfg({
listen = os.getenv("LISTEN"),
replication = os.getenv("MASTER"),
memtx_memory = 107374182,
replication_connect_timeout = 1.0,
replication_timeout = 0.3
})

box_cfg_done = true
3 changes: 2 additions & 1 deletion test/replication/autobootstrap.lua
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ box.cfg({
USER..':'..PASSWORD..'@'..instance_uri(2);
USER..':'..PASSWORD..'@'..instance_uri(3);
};
replication_connect_timeout = 0.5,
replication_connect_timeout = 3.0;
replication_timeout = 0.5;
})

box.once("bootstrap", function()
Expand Down
2 changes: 1 addition & 1 deletion test/replication/autobootstrap_guest.lua
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ box.cfg({
instance_uri(2);
instance_uri(3);
};
replication_connect_timeout = 0.5,
replication_connect_timeout = 5,
})

box.once("bootstrap", function()
Expand Down
3 changes: 2 additions & 1 deletion test/replication/ddl.lua
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ box.cfg({
USER..':'..PASSWORD..'@'..instance_uri(3);
USER..':'..PASSWORD..'@'..instance_uri(4);
};
replication_connect_timeout = 0.5,
replication_timeout = 0.1,
replication_connect_timeout = 2.0,
})

box.once("bootstrap", function()
Expand Down
6 changes: 3 additions & 3 deletions test/replication/errinj.result
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ test_run:cmd("create server replica_timeout with rpl_master=default, script='rep
---
- true
...
test_run:cmd("start server replica_timeout with args='0.01'")
test_run:cmd("start server replica_timeout with args='0.1, 0.5'")
---
- true
...
Expand Down Expand Up @@ -474,7 +474,7 @@ errinj.set("ERRINJ_RELAY_REPORT_INTERVAL", 0)
...
-- Check replica's ACKs don't prevent the master from sending
-- heartbeat messages (gh-3160).
test_run:cmd("start server replica_timeout with args='0.009'")
test_run:cmd("start server replica_timeout with args='0.009, 0.5'")
---
- true
...
Expand Down Expand Up @@ -522,7 +522,7 @@ for i = 0, 9999 do box.space.test:replace({i, 4, 5, 'test'}) end
-- during the join stage, i.e. a replica with a minuscule
-- timeout successfully bootstraps and breaks connection only
-- after subscribe.
test_run:cmd("start server replica_timeout with args='0.00001'")
test_run:cmd("start server replica_timeout with args='0.00001, 0.5'")
---
- true
...
Expand Down
6 changes: 3 additions & 3 deletions test/replication/errinj.test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ errinj.set("ERRINJ_RELAY_EXIT_DELAY", 0)
box.cfg{replication_timeout = 0.01}

test_run:cmd("create server replica_timeout with rpl_master=default, script='replication/replica_timeout.lua'")
test_run:cmd("start server replica_timeout with args='0.01'")
test_run:cmd("start server replica_timeout with args='0.1, 0.5'")
test_run:cmd("switch replica_timeout")

fiber = require('fiber')
Expand All @@ -199,7 +199,7 @@ errinj.set("ERRINJ_RELAY_REPORT_INTERVAL", 0)
-- Check replica's ACKs don't prevent the master from sending
-- heartbeat messages (gh-3160).

test_run:cmd("start server replica_timeout with args='0.009'")
test_run:cmd("start server replica_timeout with args='0.009, 0.5'")
test_run:cmd("switch replica_timeout")

fiber = require('fiber')
Expand All @@ -219,7 +219,7 @@ for i = 0, 9999 do box.space.test:replace({i, 4, 5, 'test'}) end
-- during the join stage, i.e. a replica with a minuscule
-- timeout successfully bootstraps and breaks connection only
-- after subscribe.
test_run:cmd("start server replica_timeout with args='0.00001'")
test_run:cmd("start server replica_timeout with args='0.00001, 0.5'")
test_run:cmd("switch replica_timeout")
fiber = require('fiber')
while box.info.replication[1].upstream.message ~= 'timed out' do fiber.sleep(0.0001) end
Expand Down
1 change: 1 addition & 0 deletions test/replication/master.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ box.cfg({
listen = os.getenv("LISTEN"),
memtx_memory = 107374182,
replication_connect_timeout = 0.5,
replication_timeout = 0.1
})

require('console').listen(os.getenv('ADMIN'))
3 changes: 2 additions & 1 deletion test/replication/master_quorum.lua
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ box.cfg({
instance_uri(2);
};
replication_connect_quorum = 0;
replication_connect_timeout = 0.1;
replication_timeout = 0.5;
replication_connect_timeout = 2.0;
})

test_run = require('test_run').new()
Expand Down
3 changes: 2 additions & 1 deletion test/replication/on_replace.lua
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ box.cfg({
USER..':'..PASSWORD..'@'..instance_uri(1);
USER..':'..PASSWORD..'@'..instance_uri(2);
};
replication_connect_timeout = 0.5,
replication_timeout = 0.5,
replication_connect_timeout = 1.0,
})

env = require('test_run')
Expand Down
4 changes: 2 additions & 2 deletions test/replication/quorum.lua
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ require('console').listen(os.getenv('ADMIN'))
box.cfg({
listen = instance_uri(INSTANCE_ID);
replication_timeout = 0.05;
replication_sync_lag = 0.01;
replication_connect_timeout = 0.1;
replication_sync_lag = 0.1;
replication_connect_timeout = 3.0;
replication_connect_quorum = 3;
replication = {
instance_uri(1);
Expand Down
2 changes: 1 addition & 1 deletion test/replication/rebootstrap.lua
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ box.cfg({
listen = instance_uri(INSTANCE_ID),
instance_uuid = '12345678-abcd-1234-abcd-123456789ef' .. INSTANCE_ID,
replication_timeout = 0.1,
replication_connect_timeout = 0.5,
replication_connect_timeout = 2.0,
replication = {
instance_uri(1);
instance_uri(2);
Expand Down
3 changes: 2 additions & 1 deletion test/replication/replica_no_quorum.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ box.cfg({
replication = os.getenv("MASTER"),
memtx_memory = 107374182,
replication_connect_quorum = 0,
replication_connect_timeout = 0.1,
replication_timeout = 0.1,
replication_connect_timeout = 0.5,
})

require('console').listen(os.getenv('ADMIN'))
3 changes: 2 additions & 1 deletion test/replication/replica_timeout.lua
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
#!/usr/bin/env tarantool

local TIMEOUT = tonumber(arg[1])
local CON_TIMEOUT = arg[2] and tonumber(arg[2]) or TIMEOUT * 3

box.cfg({
listen = os.getenv("LISTEN"),
replication = os.getenv("MASTER"),
memtx_memory = 107374182,
replication_timeout = TIMEOUT,
replication_connect_timeout = TIMEOUT * 3,
replication_connect_timeout = CON_TIMEOUT,
})

require('console').listen(os.getenv('ADMIN'))
2 changes: 1 addition & 1 deletion test/replication/replica_uuid_ro.lua
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ box.cfg({
USER..':'..PASSWORD..'@'..instance_uri(2);
};
read_only = (INSTANCE_ID ~= '1' and true or false);
replication_connect_timeout = 0.5,
replication_connect_timeout = 5,
})

box.once("bootstrap", function()
Expand Down

0 comments on commit 1c768e9

Please sign in to comment.