Skip to content

Commit

Permalink
rebalancer: parallel routes applying
Browse files Browse the repository at this point in the history
Closes #161

@TarantoolBot document
Title: vshard rebalancer_max_sending

VShard from its first release had quite a simple rebalancer -
one process on one node calculates who and to whom should send
how many buckets. The nodes applied these so called routes one by
one sequentially.

Unfortunately, such a simple schema works not fast enough,
especially for Vinyl, where costs of reading disk are comparable
with network costs. In fact, with Vinyl the rebalancer routes
applier was sleeping most of the time.

Now each node can send multiple buckets in parallel in a
round-robin manner to several destinations, or even to one.

To set degree of parallelism a new option is added -
rebalancer_max_sending. It can be specified in a storage
configuration in the root table:

    cfg.rebalancer_max_sending = 5
    vshard.storage.cfg(cfg, box.info.uuid)

In routers the option is ignored. Note, that max_sending N
perhaps won't give N times speed up. It depends on network, disk,
number of other fibers in the system.

By default the option is 1. Maximal value is 15.

One another important thing - from this moment
rebalancer_max_receiving is not useless. It can actually limit
load at one storage. Consider an example. You have 10 replicasets
and a new one is added. Now all the 10 replicasets will try to
send buckets to the new one.

Assume, that each replicaset has 5 max sending. In that case the
new replicaset will experience quite a high load of 50 buckets
being downloaded at once. If the node needs to do some other
work, perhaps such a big load is undesirable. Also too many
parallel buckets can lead to timeouts in the rebalancing process
itself.

Then you can set lower rebalancer_max_sending on old
replicasets, or decrease rebalancer_max_receiving on the new one.
In the latter case some workers on old nodes will be throttled,
and you will see that in the logs.

Rebalancer_max_sending is important, if you have restriction on
how many buckets can be read-only at once in the cluster. As you
remember, when a bucket is being sent, it does not accept new
write requests. For example, you have 100000 buckets and each
bucket stores ~0.001% of your data. The cluster has 10
replicasets. And you never can afford > 0.1% of data locked on
write. Then you should not set rebalancer_max_sending > 10 on
these nodes. It guarantees that the rebalancer won't send more
than 100 buckets at once in the whole cluster.

Take into account, that if max_sending is set too high with too
low max_receiving, then some buckets will try to relocate, will
fail with that, it will consume network resources and time. It is
important to configure these parameters not contradicting to each
other.
  • Loading branch information
Gerold103 committed Aug 31, 2019
1 parent e2092b5 commit b9dc5fe
Show file tree
Hide file tree
Showing 11 changed files with 346 additions and 28 deletions.
1 change: 1 addition & 0 deletions test/rebalancer/config.lua
Expand Up @@ -78,6 +78,7 @@ return {
bucket_count = 200,
sharding = sharding,
rebalancer_disbalance_threshold = 0.01,
rebalancer_max_sending = 5,
shard_index = 'vbucket',
replication_connect_quorum = 0,
replication_connect_timeout = 0.01,
Expand Down
4 changes: 4 additions & 0 deletions test/rebalancer/engine.cfg
Expand Up @@ -14,5 +14,9 @@
"stress_add_remove_several_rs.test.lua": {
"memtx": {"engine": "memtx"},
"vinyl": {"engine": "vinyl"}
},
"workers.test.lua": {
"memtx": {"engine": "memtx"},
"vinyl": {"engine": "vinyl"}
}
}
2 changes: 1 addition & 1 deletion test/rebalancer/errinj.result
Expand Up @@ -133,7 +133,7 @@ errinj.set('ERRINJ_WAL_DELAY', false)
-- the bucket. On the destination the bucket is applied too long,
-- and the bucket_send gets timeout error.
--
while not test_run:grep_log('box_1_a', 'Can not apply routes') do vshard.storage.rebalancer_wakeup() fiber.sleep(0.01) end
while not test_run:grep_log('box_1_a', 'Error during rebalancer routes applying') do vshard.storage.rebalancer_wakeup() fiber.sleep(0.01) end
---
...
test_run:grep_log('box_1_a', 'Timeout exceeded') ~= nil
Expand Down
2 changes: 1 addition & 1 deletion test/rebalancer/errinj.test.lua
Expand Up @@ -69,7 +69,7 @@ errinj.set('ERRINJ_WAL_DELAY', false)
-- the bucket. On the destination the bucket is applied too long,
-- and the bucket_send gets timeout error.
--
while not test_run:grep_log('box_1_a', 'Can not apply routes') do vshard.storage.rebalancer_wakeup() fiber.sleep(0.01) end
while not test_run:grep_log('box_1_a', 'Error during rebalancer routes applying') do vshard.storage.rebalancer_wakeup() fiber.sleep(0.01) end
test_run:grep_log('box_1_a', 'Timeout exceeded') ~= nil

test_run:switch('box_2_a')
Expand Down
116 changes: 116 additions & 0 deletions test/rebalancer/parallel.result
@@ -0,0 +1,116 @@
-- test-run result file version 2
test_run = require('test_run').new()
| ---
| ...

REPLICASET_1 = { 'box_1_a', 'box_1_b' }
| ---
| ...
REPLICASET_2 = { 'box_2_a', 'box_2_b' }
| ---
| ...
REPLICASET_3 = { 'box_3_a', 'box_3_b' }
| ---
| ...
REPLICASET_4 = { 'box_4_a', 'box_4_b' }
| ---
| ...
engine = test_run:get_cfg('engine')
| ---
| ...

test_run:create_cluster(REPLICASET_1, 'rebalancer')
| ---
| ...
test_run:create_cluster(REPLICASET_2, 'rebalancer')
| ---
| ...
test_run:create_cluster(REPLICASET_3, 'rebalancer')
| ---
| ...
test_run:create_cluster(REPLICASET_4, 'rebalancer')
| ---
| ...
util = require('util')
| ---
| ...
util.wait_master(test_run, REPLICASET_1, 'box_1_a')
| ---
| ...
util.wait_master(test_run, REPLICASET_2, 'box_2_a')
| ---
| ...
util.wait_master(test_run, REPLICASET_3, 'box_3_a')
| ---
| ...
util.wait_master(test_run, REPLICASET_4, 'box_4_a')
| ---
| ...
util.map_evals(test_run, {REPLICASET_1, REPLICASET_2, REPLICASET_3, \
REPLICASET_4}, 'bootstrap_storage(\'%s\')', engine)
| ---
| ...

--
-- The test is about parallel rebalancer. It is not very different
-- from a normal rebalancer except the problem of max receiving
-- bucket limit. Workers should correctly handle that, and of
-- course rebalancing should never totally stop.
--

util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'add_replicaset()')
| ---
| ...
util.map_evals(test_run, {REPLICASET_1, REPLICASET_2, REPLICASET_3}, 'add_second_replicaset()')
| ---
| ...
-- 4 replicasets, 1 sends to 3. It has 5 workers. It means, that
-- throttling is inevitable.
util.map_evals(test_run, {REPLICASET_1, REPLICASET_2, REPLICASET_3, REPLICASET_4}, [[\
cfg.rebalancer_max_receiving = 1\
vshard.storage.cfg(cfg, box.info.uuid)\
]])
| ---
| ...

test_run:switch('box_1_a')
| ---
| - true
| ...
vshard.storage.bucket_force_create(1, 200)
| ---
| - true
| ...
t1 = fiber.time()
| ---
| ...
wait_rebalancer_state('The cluster is balanced ok', test_run)
| ---
| ...
t2 = fiber.time()
| ---
| ...
-- Rebalancing should not stop. It can be checked by watching if
-- there was a sleep REBALANCER_WORK_INTERVAL (which is 10
-- seconds).
(t2 - t1 < 10) or {t1, t2}
| ---
| - true
| ...

test_run:switch('default')
| ---
| - true
| ...
test_run:drop_cluster(REPLICASET_4)
| ---
| ...
test_run:drop_cluster(REPLICASET_3)
| ---
| ...
test_run:drop_cluster(REPLICASET_2)
| ---
| ...
test_run:drop_cluster(REPLICASET_1)
| ---
| ...
51 changes: 51 additions & 0 deletions test/rebalancer/parallel.test.lua
@@ -0,0 +1,51 @@
test_run = require('test_run').new()

REPLICASET_1 = { 'box_1_a', 'box_1_b' }
REPLICASET_2 = { 'box_2_a', 'box_2_b' }
REPLICASET_3 = { 'box_3_a', 'box_3_b' }
REPLICASET_4 = { 'box_4_a', 'box_4_b' }
engine = test_run:get_cfg('engine')

test_run:create_cluster(REPLICASET_1, 'rebalancer')
test_run:create_cluster(REPLICASET_2, 'rebalancer')
test_run:create_cluster(REPLICASET_3, 'rebalancer')
test_run:create_cluster(REPLICASET_4, 'rebalancer')
util = require('util')
util.wait_master(test_run, REPLICASET_1, 'box_1_a')
util.wait_master(test_run, REPLICASET_2, 'box_2_a')
util.wait_master(test_run, REPLICASET_3, 'box_3_a')
util.wait_master(test_run, REPLICASET_4, 'box_4_a')
util.map_evals(test_run, {REPLICASET_1, REPLICASET_2, REPLICASET_3, \
REPLICASET_4}, 'bootstrap_storage(\'%s\')', engine)

--
-- The test is about parallel rebalancer. It is not very different
-- from a normal rebalancer except the problem of max receiving
-- bucket limit. Workers should correctly handle that, and of
-- course rebalancing should never totally stop.
--

util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'add_replicaset()')
util.map_evals(test_run, {REPLICASET_1, REPLICASET_2, REPLICASET_3}, 'add_second_replicaset()')
-- 4 replicasets, 1 sends to 3. It has 5 workers. It means, that
-- throttling is inevitable.
util.map_evals(test_run, {REPLICASET_1, REPLICASET_2, REPLICASET_3, REPLICASET_4}, [[\
cfg.rebalancer_max_receiving = 1\
vshard.storage.cfg(cfg, box.info.uuid)\
]])

test_run:switch('box_1_a')
vshard.storage.bucket_force_create(1, 200)
t1 = fiber.time()
wait_rebalancer_state('The cluster is balanced ok', test_run)
t2 = fiber.time()
-- Rebalancing should not stop. It can be checked by watching if
-- there was a sleep REBALANCER_WORK_INTERVAL (which is 10
-- seconds).
(t2 - t1 < 10) or {t1, t2}

test_run:switch('default')
test_run:drop_cluster(REPLICASET_4)
test_run:drop_cluster(REPLICASET_3)
test_run:drop_cluster(REPLICASET_2)
test_run:drop_cluster(REPLICASET_1)
40 changes: 40 additions & 0 deletions test/unit/config.result
Expand Up @@ -578,3 +578,43 @@ util.check_error(lcfg.check, cfg)
cfg.sharding = nil
---
...
--
-- gh-161: parallel rebalancer and its option - max sending.
--
cfg.sharding = {}
---
...
cfg.rebalancer_max_sending = 'ten'
---
...
util.check_error(lcfg.check, cfg)
---
- Rebalancer max sending bucket count must be positive integer
...
cfg.rebalancer_max_sending = 100
---
...
util.check_error(lcfg.check, cfg)
---
- Rebalancer max sending bucket count must not be greater than 15
...
cfg.rebalancer_max_sending = 0
---
...
util.check_error(lcfg.check, cfg)
---
- Rebalancer max sending bucket count must be positive integer
...
cfg.rebalancer_max_sending = 15
---
...
lcfg.check(cfg).rebalancer_max_sending
---
- 15
...
cfg.rebalancer_max_sending = nil
---
...
cfg.sharding = nil
---
...
15 changes: 15 additions & 0 deletions test/unit/config.test.lua
Expand Up @@ -230,3 +230,18 @@ replicaset = {replicas = {['id'] = replica}, weight = math.huge}
cfg.sharding = {rsid = replicaset}
util.check_error(lcfg.check, cfg)
cfg.sharding = nil

--
-- gh-161: parallel rebalancer and its option - max sending.
--
cfg.sharding = {}
cfg.rebalancer_max_sending = 'ten'
util.check_error(lcfg.check, cfg)
cfg.rebalancer_max_sending = 100
util.check_error(lcfg.check, cfg)
cfg.rebalancer_max_sending = 0
util.check_error(lcfg.check, cfg)
cfg.rebalancer_max_sending = 15
lcfg.check(cfg).rebalancer_max_sending
cfg.rebalancer_max_sending = nil
cfg.sharding = nil
31 changes: 21 additions & 10 deletions vshard/cfg.lua
Expand Up @@ -57,32 +57,36 @@ local type_validate = {
local function validate_config(config, template, check_arg)
for key, template_value in pairs(template) do
local value = config[key]
local name = template_value.name
local expected_type = template_value.type
if not value then
if not template_value.is_optional then
error(string.format('%s must be specified',
template_value.name))
error(string.format('%s must be specified', name))
else
config[key] = template_value.default
end
else
if type(template_value.type) == 'string' then
if not type_validate[template_value.type](value) then
error(string.format('%s must be %s', template_value.name,
template_value.type))
if type(expected_type) == 'string' then
if not type_validate[expected_type](value) then
error(string.format('%s must be %s', name, expected_type))
end
local max = template_value.max
if max and value > max then
error(string.format('%s must not be greater than %s', name,
max))
end
else
local is_valid_type_found = false
for _, t in pairs(template_value.type) do
for _, t in pairs(expected_type) do
if type_validate[t](value) then
is_valid_type_found = true
break
end
end
if not is_valid_type_found then
local types = table.concat(template_value.type, ', ')
local types = table.concat(expected_type, ', ')
error(string.format('%s must be one of the following '..
'types: %s', template_value.name,
types))
'types: %s', name, types))
end
end
if template_value.check then
Expand Down Expand Up @@ -223,6 +227,13 @@ local cfg_template = {
name = 'Rebalancer max receiving bucket count', is_optional = true,
default = consts.DEFAULT_REBALANCER_MAX_RECEIVING
},
rebalancer_max_sending = {
type = 'positive integer',
name = 'Rebalancer max sending bucket count',
is_optional = true,
default = consts.DEFAULT_REBALANCER_MAX_SENDING,
max = consts.REBALANCER_MAX_SENDING_MAX
},
collect_bucket_garbage_interval = {
type = 'positive number', name = 'Garbage bucket collect interval',
is_optional = true,
Expand Down
2 changes: 2 additions & 0 deletions vshard/consts.lua
Expand Up @@ -27,6 +27,8 @@ return {
REBALANCER_IDLE_INTERVAL = 60 * 60;
REBALANCER_WORK_INTERVAL = 10;
REBALANCER_CHUNK_TIMEOUT = 60 * 5;
DEFAULT_REBALANCER_MAX_SENDING = 1;
REBALANCER_MAX_SENDING_MAX = 15;
DEFAULT_REBALANCER_MAX_RECEIVING = 100;
CALL_TIMEOUT_MIN = 0.5;
CALL_TIMEOUT_MAX = 64;
Expand Down

0 comments on commit b9dc5fe

Please sign in to comment.