Skip to content

Commit

Permalink
parallel rebalancer
Browse files Browse the repository at this point in the history
Closes #161
  • Loading branch information
Gerold103 committed Aug 27, 2019
1 parent 87f937f commit 13fcd40
Show file tree
Hide file tree
Showing 10 changed files with 300 additions and 29 deletions.
1 change: 1 addition & 0 deletions test/rebalancer/config.lua
Expand Up @@ -78,6 +78,7 @@ return {
bucket_count = 200,
sharding = sharding,
rebalancer_disbalance_threshold = 0.01,
rebalancer_worker_count = 5,
shard_index = 'vbucket',
replication_connect_quorum = 0,
replication_connect_timeout = 0.01,
Expand Down
2 changes: 1 addition & 1 deletion test/rebalancer/errinj.result
Expand Up @@ -133,7 +133,7 @@ errinj.set('ERRINJ_WAL_DELAY', false)
-- the bucket. On the destination the bucket is applied too long,
-- and the bucket_send gets timeout error.
--
while not test_run:grep_log('box_1_a', 'Can not apply routes') do vshard.storage.rebalancer_wakeup() fiber.sleep(0.01) end
while not test_run:grep_log('box_1_a', 'Error during rebalancer routes applying') do vshard.storage.rebalancer_wakeup() fiber.sleep(0.01) end
---
...
test_run:grep_log('box_1_a', 'Timeout exceeded') ~= nil
Expand Down
2 changes: 1 addition & 1 deletion test/rebalancer/errinj.test.lua
Expand Up @@ -69,7 +69,7 @@ errinj.set('ERRINJ_WAL_DELAY', false)
-- the bucket. On the destination the bucket is applied too long,
-- and the bucket_send gets timeout error.
--
while not test_run:grep_log('box_1_a', 'Can not apply routes') do vshard.storage.rebalancer_wakeup() fiber.sleep(0.01) end
while not test_run:grep_log('box_1_a', 'Error during rebalancer routes applying') do vshard.storage.rebalancer_wakeup() fiber.sleep(0.01) end
test_run:grep_log('box_1_a', 'Timeout exceeded') ~= nil

test_run:switch('box_2_a')
Expand Down
95 changes: 95 additions & 0 deletions test/rebalancer/workers.result
@@ -0,0 +1,95 @@
-- test-run result file version 2
test_run = require('test_run').new()
| ---
| ...

REPLICASET_1 = { 'box_1_a', 'box_1_b' }
| ---
| ...
REPLICASET_2 = { 'box_2_a', 'box_2_b' }
| ---
| ...
REPLICASET_3 = { 'box_3_a', 'box_3_b' }
| ---
| ...
REPLICASET_4 = { 'box_4_a', 'box_4_b' }
| ---
| ...
engine = 'memtx' -- test_run:get_cfg('engine')
| ---
| ...

test_run:create_cluster(REPLICASET_1, 'rebalancer')
| ---
| ...
test_run:create_cluster(REPLICASET_2, 'rebalancer')
| ---
| ...
test_run:create_cluster(REPLICASET_3, 'rebalancer')
| ---
| ...
test_run:create_cluster(REPLICASET_4, 'rebalancer')
| ---
| ...
util = require('util')
| ---
| ...
util.wait_master(test_run, REPLICASET_1, 'box_1_a')
| ---
| ...
util.wait_master(test_run, REPLICASET_2, 'box_2_a')
| ---
| ...
util.wait_master(test_run, REPLICASET_3, 'box_3_a')
| ---
| ...
util.wait_master(test_run, REPLICASET_4, 'box_4_a')
| ---
| ...
util.map_evals(test_run, {REPLICASET_1, REPLICASET_2, REPLICASET_3, \
REPLICASET_4}, 'bootstrap_storage(\'%s\')', engine)
| ---
| ...


util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'add_replicaset()')
| ---
| ...
util.map_evals(test_run, {REPLICASET_1, REPLICASET_2, REPLICASET_3}, 'add_second_replicaset()')
| ---
| ...
util.map_evals(test_run, {REPLICASET_1, REPLICASET_2, REPLICASET_3, REPLICASET_4}, [[\
cfg.rebalancer_max_receiving = 1\
vshard.storage.cfg(cfg, box.info.uuid)\
]])
| ---
| ...

test_run:switch('box_1_a')
| ---
| - true
| ...
vshard.storage.bucket_force_create(1, 200)
| ---
| - true
| ...
wait_rebalancer_state('The cluster is balanced ok', test_run)
| ---
| ...

test_run:switch('default')
| ---
| - true
| ...
test_run:drop_cluster(REPLICASET_4)
| ---
| ...
test_run:drop_cluster(REPLICASET_3)
| ---
| ...
test_run:drop_cluster(REPLICASET_2)
| ---
| ...
test_run:drop_cluster(REPLICASET_1)
| ---
| ...
40 changes: 40 additions & 0 deletions test/rebalancer/workers.test.lua
@@ -0,0 +1,40 @@
test_run = require('test_run').new()

REPLICASET_1 = { 'box_1_a', 'box_1_b' }
REPLICASET_2 = { 'box_2_a', 'box_2_b' }
REPLICASET_3 = { 'box_3_a', 'box_3_b' }
REPLICASET_4 = { 'box_4_a', 'box_4_b' }
engine = 'memtx' -- test_run:get_cfg('engine')

test_run:create_cluster(REPLICASET_1, 'rebalancer')
test_run:create_cluster(REPLICASET_2, 'rebalancer')
test_run:create_cluster(REPLICASET_3, 'rebalancer')
test_run:create_cluster(REPLICASET_4, 'rebalancer')
util = require('util')
util.wait_master(test_run, REPLICASET_1, 'box_1_a')
util.wait_master(test_run, REPLICASET_2, 'box_2_a')
util.wait_master(test_run, REPLICASET_3, 'box_3_a')
util.wait_master(test_run, REPLICASET_4, 'box_4_a')
util.map_evals(test_run, {REPLICASET_1, REPLICASET_2, REPLICASET_3, \
REPLICASET_4}, 'bootstrap_storage(\'%s\')', engine)


util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'add_replicaset()')
util.map_evals(test_run, {REPLICASET_1, REPLICASET_2, REPLICASET_3}, 'add_second_replicaset()')
util.map_evals(test_run, {REPLICASET_1, REPLICASET_2, REPLICASET_3, REPLICASET_4}, [[\
cfg.rebalancer_max_receiving = 1\
vshard.storage.cfg(cfg, box.info.uuid)\
]])

test_run:switch('box_1_a')
vshard.storage.bucket_force_create(1, 200)
t1 = fiber.time()
wait_rebalancer_state('The cluster is balanced ok', test_run)
t2 = fiber.time()
t2 - t1

test_run:switch('default')
test_run:drop_cluster(REPLICASET_4)
test_run:drop_cluster(REPLICASET_3)
test_run:drop_cluster(REPLICASET_2)
test_run:drop_cluster(REPLICASET_1)
40 changes: 40 additions & 0 deletions test/unit/config.result
Expand Up @@ -578,3 +578,43 @@ util.check_error(lcfg.check, cfg)
cfg.sharding = nil
---
...
--
-- gh-161: write a description
--
cfg.sharding = {}
---
...
cfg.rebalancer_worker_count = 'ten'
---
...
util.check_error(lcfg.check, cfg)
---
- Rebalancer worker fiber count must be positive integer
...
cfg.rebalancer_worker_count = 100
---
...
util.check_error(lcfg.check, cfg)
---
- Rebalancer worker fiber count must not be greater than 15
...
cfg.rebalancer_worker_count = 0
---
...
util.check_error(lcfg.check, cfg)
---
- Rebalancer worker fiber count must be positive integer
...
cfg.rebalancer_worker_count = 15
---
...
lcfg.check(cfg).rebalancer_worker_count
---
- 15
...
cfg.rebalancer_worker_count = nil
---
...
cfg.sharding = nil
---
...
15 changes: 15 additions & 0 deletions test/unit/config.test.lua
Expand Up @@ -230,3 +230,18 @@ replicaset = {replicas = {['id'] = replica}, weight = math.huge}
cfg.sharding = {rsid = replicaset}
util.check_error(lcfg.check, cfg)
cfg.sharding = nil

--
-- gh-161: write a description
--
cfg.sharding = {}
cfg.rebalancer_worker_count = 'ten'
util.check_error(lcfg.check, cfg)
cfg.rebalancer_worker_count = 100
util.check_error(lcfg.check, cfg)
cfg.rebalancer_worker_count = 0
util.check_error(lcfg.check, cfg)
cfg.rebalancer_worker_count = 15
lcfg.check(cfg).rebalancer_worker_count
cfg.rebalancer_worker_count = nil
cfg.sharding = nil
31 changes: 21 additions & 10 deletions vshard/cfg.lua
Expand Up @@ -57,32 +57,36 @@ local type_validate = {
local function validate_config(config, template, check_arg)
for key, template_value in pairs(template) do
local value = config[key]
local name = template_value.name
local expected_type = template_value.type
if not value then
if not template_value.is_optional then
error(string.format('%s must be specified',
template_value.name))
error(string.format('%s must be specified', name))
else
config[key] = template_value.default
end
else
if type(template_value.type) == 'string' then
if not type_validate[template_value.type](value) then
error(string.format('%s must be %s', template_value.name,
template_value.type))
if type(expected_type) == 'string' then
if not type_validate[expected_type](value) then
error(string.format('%s must be %s', name, expected_type))
end
local max = template_value.max
if max and value > max then
error(string.format('%s must not be greater than %s', name,
max))
end
else
local is_valid_type_found = false
for _, t in pairs(template_value.type) do
for _, t in pairs(expected_type) do
if type_validate[t](value) then
is_valid_type_found = true
break
end
end
if not is_valid_type_found then
local types = table.concat(template_value.type, ', ')
local types = table.concat(expected_type, ', ')
error(string.format('%s must be one of the following '..
'types: %s', template_value.name,
types))
'types: %s', name, types))
end
end
if template_value.check then
Expand Down Expand Up @@ -223,6 +227,13 @@ local cfg_template = {
name = 'Rebalancer max receiving bucket count', is_optional = true,
default = consts.DEFAULT_REBALANCER_MAX_RECEIVING
},
rebalancer_worker_count = {
type = 'positive integer',
name = 'Rebalancer worker fiber count',
is_optional = true,
default = consts.DEFAULT_REBALANCER_WORKER_COUNT,
max = consts.REBALANCER_WORKER_COUNT_MAX
},
collect_bucket_garbage_interval = {
type = 'positive number', name = 'Garbage bucket collect interval',
is_optional = true,
Expand Down
2 changes: 2 additions & 0 deletions vshard/consts.lua
Expand Up @@ -27,6 +27,8 @@ return {
REBALANCER_IDLE_INTERVAL = 60 * 60;
REBALANCER_WORK_INTERVAL = 10;
REBALANCER_CHUNK_TIMEOUT = 60 * 5;
DEFAULT_REBALANCER_WORKER_COUNT = 1;
REBALANCER_WORKER_COUNT_MAX = 15;
DEFAULT_REBALANCER_MAX_RECEIVING = 100;
CALL_TIMEOUT_MIN = 0.5;
CALL_TIMEOUT_MAX = 64;
Expand Down

0 comments on commit 13fcd40

Please sign in to comment.