Skip to content

Commit

Permalink
replication: do not ignore replication_connect_quorum.
Browse files Browse the repository at this point in the history
On bootstrap and after replication reconfiguration
replication_connect_quorum was ignored. The instance tried to connect to
every replica listed in replication parameter, and errored if it wasn't
possible.
The patch alters this behaviour. The instance still tries to connect to
every node listed in replication, but does not raise an error if it was
able to connect to at least replication_connect_quorum instances.

Closes #3428

@TarantoolBot document
Title: replication_connect_quorum is not ignored.
Now on replica set bootstrap instance tries to connect
not to every replica, listed in box.cfg.replication, but
only to replication_connect_quorum replicas. If after
replication_connect_timeout seconds instance is not connected to
at least replication_connect_quorum other instances, we
throw an error.
  • Loading branch information
sergepetrenko committed Aug 10, 2018
1 parent f29466c commit 1919b22
Show file tree
Hide file tree
Showing 44 changed files with 300 additions and 88 deletions.
10 changes: 5 additions & 5 deletions src/box/box.cc
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ cfg_get_replication(int *p_count)
* don't start appliers.
*/
static void
box_sync_replication(double timeout, bool connect_all)
box_sync_replication(bool connect_quorum)
{
int count = 0;
struct applier **appliers = cfg_get_replication(&count);
Expand All @@ -607,7 +607,7 @@ box_sync_replication(double timeout, bool connect_all)
applier_delete(appliers[i]); /* doesn't affect diag */
});

replicaset_connect(appliers, count, timeout, connect_all);
replicaset_connect(appliers, count, connect_quorum);

guard.is_active = false;
}
Expand All @@ -626,7 +626,7 @@ box_set_replication(void)

box_check_replication();
/* Try to connect to all replicas within the timeout period */
box_sync_replication(replication_connect_timeout, true);
box_sync_replication(true);
/* Follow replica */
replicaset_follow();
}
Expand Down Expand Up @@ -1866,7 +1866,7 @@ box_cfg_xc(void)
title("orphan");

/* Wait for the cluster to start up */
box_sync_replication(replication_connect_timeout, false);
box_sync_replication(false);
} else {
if (!tt_uuid_is_nil(&instance_uuid))
INSTANCE_UUID = instance_uuid;
Expand All @@ -1888,7 +1888,7 @@ box_cfg_xc(void)
* receive the same replica set UUID when a new cluster
* is deployed.
*/
box_sync_replication(TIMEOUT_INFINITY, true);
box_sync_replication(true);
/* Bootstrap a new master */
bootstrap(&replicaset_uuid, &is_bootstrap_leader);
}
Expand Down
16 changes: 11 additions & 5 deletions src/box/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ struct tt_uuid INSTANCE_UUID;
struct tt_uuid REPLICASET_UUID;

double replication_timeout = 1.0; /* seconds */
double replication_connect_timeout = 4.0; /* seconds */
double replication_connect_timeout = 30.0; /* seconds */
int replication_connect_quorum = REPLICATION_CONNECT_QUORUM_ALL;
double replication_sync_lag = 10.0; /* seconds */

Expand Down Expand Up @@ -540,7 +540,7 @@ applier_on_connect_f(struct trigger *trigger, void *event)

void
replicaset_connect(struct applier **appliers, int count,
double timeout, bool connect_all)
bool connect_quorum)
{
if (count == 0) {
/* Cleanup the replica set. */
Expand All @@ -557,7 +557,7 @@ replicaset_connect(struct applier **appliers, int count,
* - register a trigger in each applier to wake up our
* fiber via this channel when the remote peer becomes
* connected and a UUID is received;
* - wait up to CONNECT_TIMEOUT seconds for `count` messages;
* - wait up to REPLICATION_CONNECT_TIMEOUT seconds for `count` messages;
* - on timeout, raise a CFG error, cancel and destroy
* the freshly created appliers (done in a guard);
* - an success, unregister the trigger, check the UUID set
Expand All @@ -571,6 +571,8 @@ replicaset_connect(struct applier **appliers, int count,
state.connected = state.failed = 0;
fiber_cond_create(&state.wakeup);

double timeout = replication_connect_timeout;

/* Add triggers and start simulations connection to remote peers */
for (int i = 0; i < count; i++) {
struct applier *applier = appliers[i];
Expand All @@ -587,15 +589,19 @@ replicaset_connect(struct applier **appliers, int count,
double wait_start = ev_monotonic_now(loop());
if (fiber_cond_wait_timeout(&state.wakeup, timeout) != 0)
break;
if (state.failed > 0 && connect_all)
if (count - state.failed <
MIN(count, replication_connect_quorum)) {
printf("\nthis place fires while still have %lf seconds left\n", timeout);
break;
}
timeout -= ev_monotonic_now(loop()) - wait_start;
}
if (state.connected < count) {
say_crit("failed to connect to %d out of %d replicas",
count - state.connected, count);
/* Timeout or connection failure. */
if (connect_all)
if (connect_quorum && state.connected <
MIN(count, replication_connect_quorum))
goto error;
} else {
say_verbose("connected to %d replicas", state.connected);
Expand Down
8 changes: 4 additions & 4 deletions src/box/replication.h
Original file line number Diff line number Diff line change
Expand Up @@ -356,13 +356,13 @@ replicaset_add(uint32_t replica_id, const struct tt_uuid *instance_uuid);
*
* \param appliers the array of appliers
* \param count size of appliers array
* \param timeout connection timeout
* \param connect_all if this flag is set, fail unless all
* appliers have successfully connected
* \param connect_quorum if this flag is set, fail unless at
* least replication_connect_quorum
* appliers have successfully connected.
*/
void
replicaset_connect(struct applier **appliers, int count,
double timeout, bool connect_all);
bool connect_quorum);

/**
* Resume all appliers registered with the replica set.
Expand Down
2 changes: 1 addition & 1 deletion test/replication-py/init_storage.test.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@

server.stop()
replica = TarantoolServer(server.ini)
replica.script = 'replication/replica.lua'
replica.script = 'replication-py/replica.lua'
replica.vardir = server.vardir #os.path.join(server.vardir, 'replica')
replica.rpl_master = master
replica.deploy(wait=False)
Expand Down
2 changes: 2 additions & 0 deletions test/replication-py/master.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ os = require('os')
box.cfg({
listen = os.getenv("LISTEN"),
memtx_memory = 107374182,
replication_connect_timeout = 1.0,
replication_timeout = 0.3
})

require('console').listen(os.getenv('ADMIN'))
2 changes: 2 additions & 0 deletions test/replication-py/replica.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ box.cfg({
listen = os.getenv("LISTEN"),
replication = os.getenv("MASTER"),
memtx_memory = 107374182,
replication_connect_timeout = 1.0,
replication_timeout = 0.3
})

box_cfg_done = true
8 changes: 7 additions & 1 deletion test/replication/autobootstrap.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ local INSTANCE_ID = string.match(arg[0], "%d")
local USER = 'cluster'
local PASSWORD = 'somepassword'
local SOCKET_DIR = require('fio').cwd()
local TIMEOUT = tonumber(arg[1])
local CON_TIMEOUT = arg[2] and tonumber(arg[2]) or TIMEOUT * 3


local function instance_uri(instance_id)
--return 'localhost:'..(3310 + instance_id)
return SOCKET_DIR..'/autobootstrap'..instance_id..'.sock';
Expand All @@ -21,7 +25,9 @@ box.cfg({
USER..':'..PASSWORD..'@'..instance_uri(2);
USER..':'..PASSWORD..'@'..instance_uri(3);
};
replication_connect_timeout = 0.5,
replication_timeout = TIMEOUT;
replication_connect_timeout = CON_TIMEOUT;
replication_timeout = 0.1;
})

box.once("bootstrap", function()
Expand Down
7 changes: 5 additions & 2 deletions test/replication/autobootstrap.result
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ vclock_diff = require('fast_replica').vclock_diff
test_run = env.new()
---
...
create_cluster_with_args = require('cluster_cmd').create_cluster_with_args
---
...
SERVERS = { 'autobootstrap1', 'autobootstrap2', 'autobootstrap3' }
---
...
--
-- Start servers
--
test_run:create_cluster(SERVERS)
create_cluster_with_args(test_run, SERVERS, "0.1 0.5")
---
...
--
Expand Down Expand Up @@ -161,7 +164,7 @@ box.space.test_u:select()
_ = test_run:cmd("switch autobootstrap1")
---
...
_ = test_run:cmd("restart server autobootstrap1 with cleanup=1")
_ = test_run:cmd("restart server autobootstrap1 with cleanup=1, args ='0.1 0.5'")
_ = box.space.test_u:replace({5, 6, 7, 8})
---
...
Expand Down
5 changes: 3 additions & 2 deletions test/replication/autobootstrap.test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ env = require('test_run')
vclock_diff = require('fast_replica').vclock_diff
test_run = env.new()

create_cluster_with_args = require('cluster_cmd').create_cluster_with_args

SERVERS = { 'autobootstrap1', 'autobootstrap2', 'autobootstrap3' }

--
-- Start servers
--
test_run:create_cluster(SERVERS)
create_cluster_with_args(test_run, SERVERS, "0.1 0.5")

--
-- Wait for full mesh
Expand Down Expand Up @@ -76,7 +77,7 @@ box.space.test_u:select()
-- Rebootstrap one node and check that others follow.
--
_ = test_run:cmd("switch autobootstrap1")
_ = test_run:cmd("restart server autobootstrap1 with cleanup=1")
_ = test_run:cmd("restart server autobootstrap1 with cleanup=1, args ='0.1 0.5'")

_ = box.space.test_u:replace({5, 6, 7, 8})
box.space.test_u:select()
Expand Down
7 changes: 6 additions & 1 deletion test/replication/autobootstrap_guest.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
local INSTANCE_ID = string.match(arg[0], "%d")

local SOCKET_DIR = require('fio').cwd()

local TIMEOUT = tonumber(arg[1])
local CON_TIMEOUT = arg[2] and tonumber(arg[2]) or TIMEOUT * 3

local function instance_uri(instance_id)
--return 'localhost:'..(3310 + instance_id)
return SOCKET_DIR..'/autobootstrap_guest'..instance_id..'.sock';
Expand All @@ -20,7 +24,8 @@ box.cfg({
instance_uri(2);
instance_uri(3);
};
replication_connect_timeout = 0.5,
replication_timeout = TIMEOUT;
replication_connect_timeout = CON_TIMEOUT;
})

box.once("bootstrap", function()
Expand Down
10 changes: 9 additions & 1 deletion test/replication/autobootstrap_guest.result
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,21 @@ vclock_diff = require('fast_replica').vclock_diff
test_run = env.new()
---
...
create_cluster_with_args = require('cluster_cmd').create_cluster_with_args
---
...
SERVERS = { 'autobootstrap_guest1', 'autobootstrap_guest2', 'autobootstrap_guest3' }
---
...
--
-- Start servers
--
test_run:create_cluster(SERVERS)
-- for some reason create_cluster_with_args, just like the
-- usual test_run:create_cluster, can hang for arbitrary
-- time, so we have to pass a rather large value for
-- replication_connect_timeout(4 seconds).
-- In most cases, 1 second is enough, though.
create_cluster_with_args(test_run, SERVERS, "0.3 4.0")
---
...
--
Expand Down
9 changes: 8 additions & 1 deletion test/replication/autobootstrap_guest.test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,19 @@ env = require('test_run')
vclock_diff = require('fast_replica').vclock_diff
test_run = env.new()

create_cluster_with_args = require('cluster_cmd').create_cluster_with_args

SERVERS = { 'autobootstrap_guest1', 'autobootstrap_guest2', 'autobootstrap_guest3' }

--
-- Start servers
--
test_run:create_cluster(SERVERS)
-- for some reason create_cluster_with_args, just like the
-- usual test_run:create_cluster, can hang for arbitrary
-- time, so we have to pass a rather large value for
-- replication_connect_timeout(4 seconds).
-- In most cases, 1 second is enough, though.
create_cluster_with_args(test_run, SERVERS, "0.3 4.0")

--
-- Wait for full mesh
Expand Down
11 changes: 7 additions & 4 deletions test/replication/before_replace.result
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ env = require('test_run')
test_run = env.new()
---
...
create_cluster_with_args = require('cluster_cmd').create_cluster_with_args
---
...
SERVERS = { 'autobootstrap1', 'autobootstrap2', 'autobootstrap3' }
---
...
-- Deploy a cluster.
test_run:create_cluster(SERVERS)
create_cluster_with_args(test_run, SERVERS, "0.5 4.0")
---
...
test_run:wait_fullmesh(SERVERS)
Expand Down Expand Up @@ -125,7 +128,7 @@ box.space.test:select()
- [9, 90]
- [10, 100]
...
test_run:cmd('restart server autobootstrap1')
test_run:cmd('restart server autobootstrap1 with args="0.3 1.0"')
box.space.test:select()
---
- - [1, 10]
Expand Down Expand Up @@ -156,7 +159,7 @@ box.space.test:select()
- [9, 90]
- [10, 100]
...
test_run:cmd('restart server autobootstrap2')
test_run:cmd('restart server autobootstrap2 with args="0.3 1.0"')
box.space.test:select()
---
- - [1, 10]
Expand Down Expand Up @@ -187,7 +190,7 @@ box.space.test:select()
- [9, 90]
- [10, 100]
...
test_run:cmd('restart server autobootstrap3')
test_run:cmd('restart server autobootstrap3 with args="0.3 1.0"')
box.space.test:select()
---
- - [1, 10]
Expand Down
10 changes: 6 additions & 4 deletions test/replication/before_replace.test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
env = require('test_run')
test_run = env.new()

create_cluster_with_args = require('cluster_cmd').create_cluster_with_args

SERVERS = { 'autobootstrap1', 'autobootstrap2', 'autobootstrap3' }

-- Deploy a cluster.
test_run:create_cluster(SERVERS)
create_cluster_with_args(test_run, SERVERS, "0.5 4.0")
test_run:wait_fullmesh(SERVERS)

-- Setup space:before_replace trigger on all replicas.
Expand Down Expand Up @@ -54,15 +56,15 @@ vclock2 = test_run:wait_cluster_vclock(SERVERS, vclock)
-- and the state persists after restart.
test_run:cmd("switch autobootstrap1")
box.space.test:select()
test_run:cmd('restart server autobootstrap1')
test_run:cmd('restart server autobootstrap1 with args="0.3 1.0"')
box.space.test:select()
test_run:cmd("switch autobootstrap2")
box.space.test:select()
test_run:cmd('restart server autobootstrap2')
test_run:cmd('restart server autobootstrap2 with args="0.3 1.0"')
box.space.test:select()
test_run:cmd("switch autobootstrap3")
box.space.test:select()
test_run:cmd('restart server autobootstrap3')
test_run:cmd('restart server autobootstrap3 with args="0.3 1.0"')
box.space.test:select()


Expand Down
2 changes: 1 addition & 1 deletion test/replication/catch.result
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ test_run:cmd("create server replica with rpl_master=default, script='replication
---
- true
...
test_run:cmd("start server replica with args='1'")
test_run:cmd("start server replica with args='0.3'")
---
- true
...
Expand Down
2 changes: 1 addition & 1 deletion test/replication/catch.test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ errinj = box.error.injection

box.schema.user.grant('guest', 'replication')
test_run:cmd("create server replica with rpl_master=default, script='replication/replica_timeout.lua'")
test_run:cmd("start server replica with args='1'")
test_run:cmd("start server replica with args='0.3'")
test_run:cmd("switch replica")

test_run:cmd("switch default")
Expand Down

0 comments on commit 1919b22

Please sign in to comment.