diff --git a/README.md b/README.md index 803ca6f..9730870 100644 --- a/README.md +++ b/README.md @@ -65,8 +65,11 @@ cartridge.cfg({ ... }) ``` -3. Queue API will be available on all nodes where sharded_queue.api is enabled - +3. Enable the `sharded_queue.storage` role on all storage nodes. Be careful, + there should be no replicasets with `cartridge.roles.vshard-storage` role, + but without the `sharded_queue.storage` role. +4. Queue API will be available on all nodes where the `sharded_queue.api` role + is enabled. ## Usage as a ready-to-deploy service diff --git a/sharded-queue-scm-1.rockspec b/sharded-queue-scm-1.rockspec index b4210c4..7b8dcfb 100755 --- a/sharded-queue-scm-1.rockspec +++ b/sharded-queue-scm-1.rockspec @@ -30,6 +30,15 @@ build = { ['sharded_queue.stash'] = 'sharded_queue/stash.lua', ['sharded_queue.state'] = 'sharded_queue/state.lua', ['sharded_queue.stats.storage'] = 'sharded_queue/stats/storage.lua', + ['sharded_queue.router.config'] = 'sharded_queue/router/config.lua', + ['sharded_queue.router.metrics'] = 'sharded_queue/router/metrics.lua', + ['sharded_queue.router.queue'] = 'sharded_queue/router/queue.lua', + ['sharded_queue.router.tube'] = 'sharded_queue/router/tube.lua', + ['sharded_queue.storage.config'] = 'sharded_queue/storage/config.lua', + ['sharded_queue.storage.drivers'] = 'sharded_queue/storage/drivers.lua', + ['sharded_queue.storage.methods'] = 'sharded_queue/storage/methods.lua', + ['sharded_queue.storage.metrics'] = 'sharded_queue/storage/metrics.lua', + ['sharded_queue.storage.tubes'] = 'sharded_queue/storage/tubes.lua', ['sharded_queue.version'] = 'sharded_queue/version.lua', }, }, diff --git a/sharded_queue/api.lua b/sharded_queue/api.lua index 548d3da..e8a076e 100644 --- a/sharded_queue/api.lua +++ b/sharded_queue/api.lua @@ -1,454 +1,11 @@ local cartridge = require('cartridge') -local vshard = require('vshard') -local fiber = require('fiber') -local log = require('log') -local metrics = require('sharded_queue.metrics') -local stash = require('sharded_queue.stash') -local state = require('sharded_queue.state') -local time = require('sharded_queue.time') +local config = require('sharded_queue.router.config') +local metrics = require('sharded_queue.router.metrics') local utils = require('sharded_queue.utils') +local queue = require('sharded_queue.router.queue') -local cartridge_pool = require('cartridge.pool') -local cartridge_rpc = require('cartridge.rpc') - -local stash_names = { - cfg = '__sharded_queue_api_cfg', - metrics_stats = '__sharded_queue_api_metrics_stats', -} -stash.setup(stash_names) - -local remote_call = function(method, instance_uri, args, timeout) - local conn = cartridge_pool.connect(instance_uri) - return conn:call(method, { args }, { timeout = timeout }) -end - -local function validate_options(options) - if not options then return true end - - if options.wait_factor then - if type(options.wait_factor) ~= 'number' - or options.wait_factor < 1 - then - return false, "wait_factor must be number greater than or equal to 1" - end - end - - local _, err = utils.normalize.log_request(options.log_request) - if err then - return false, err - end - - if options.wait_max ~= nil then - local err - options.wait_max, err = utils.normalize.wait_max(options.wait_max) - if err ~= nil then - return false, err - end - end - - return true -end - -local function log_task(op_name, task) - local task_id = type(task) == 'table' and task[1] or task - log.info(string.format([[Router[%d] %s: task %s]], fiber.self():id(), op_name, task_id)) -end - -local sharded_tube = {} - -function sharded_tube.put(self, data, options) - local bucket_count = vshard.router.bucket_count() - local bucket_id = math.random(bucket_count) - - options = table.deepcopy(options or {}) - - if options.priority == nil and options.pri ~= nil then - options.priority = options.pri - end - - options.data = data - options.tube_name = self.tube_name - options.bucket_id = bucket_id - options.bucket_count = bucket_count - - options.extra = { - log_request = utils.normalize.log_request(options.log_request) or self.log_request, - } - - local task, err = vshard.router.call(bucket_id, - 'write', 'tube_put', { options }) - -- re-raise storage errors - if err ~= nil then error(err) end - - if options.extra.log_request then - log_task('put', task) - end - - return task -end - --- function for try get task from instance -- -local function take_task(storages, options, take_timeout, call_timeout) - for _, instance_uri in ipairs(storages) do - if take_timeout == 0 then - break - end - local begin = time.cur() - - -- try take task from all instance - local ok, ret = pcall(remote_call, 'tube_take', - instance_uri, - options, - call_timeout - ) - - if ret ~= nil and ok then - return ret - end - - local duration = time.cur() - begin - take_timeout = take_timeout > duration and take_timeout - duration or 0 - end -end - -function sharded_tube.take(self, timeout, options) - -- take task from tube -- - options = table.deepcopy(options or {}) - options.tube_name = self.tube_name - - options.extra = { - log_request = utils.normalize.log_request(options.log_request) or self.log_request, - } - - local remote_call_timeout = time.MIN_NET_BOX_CALL_TIMEOUT - if timeout ~= nil and timeout > time.MIN_NET_BOX_CALL_TIMEOUT then - remote_call_timeout = timeout - end - - local take_timeout = time.nano(timeout) or time.TIMEOUT_INFINITY - - local frequency = 1000 - local wait_part = 0.01 -- maximum waiting time in second - local wait_max = utils.normalize.wait_max(options.wait_max) - or self.wait_max or time.MAX_TIMEOUT - - local wait_factor = self.wait_factor - - local calc_part = time.sec(take_timeout / frequency) - - if calc_part < wait_part then - wait_part = tonumber(calc_part) - end - - if options.extra.log_request then - log.info(("Router[%d] take: start attempts"):format(fiber.self():id())) - end - - while take_timeout ~= 0 do - local begin = time.cur() - - local storages = cartridge_rpc.get_candidates( - 'sharded_queue.storage', - { leader_only = true }) - - utils.array_shuffle(storages) - - local task = take_task(storages, options, take_timeout, remote_call_timeout) - - if task ~= nil then - if options.extra.log_request then - log_task('take', task) - end - return task - end - - if take_timeout < time.nano(wait_part) then - if options.extra.log_request then - log.info(("Router[%d] take: next attemt will be after timeout") - :format(fiber.self():id())) - end - return nil - end - - fiber.sleep(wait_part) - - wait_part = wait_part * wait_factor - if wait_part > wait_max then - wait_part = wait_max - end - - local duration = time.cur() - begin - - take_timeout = take_timeout > duration and take_timeout - duration or 0 - end - - if options.extra.log_request then - log.info(("Router[%d] take: timeout"):format(fiber.self():id())) - end -end - -function sharded_tube.delete(self, task_id, options) - -- task delete from tube -- - - local bucket_count = vshard.router.bucket_count() - local bucket_id, _ = utils.unpack_task_id(task_id, bucket_count) - - options = table.deepcopy(options or {}) - options.tube_name = self.tube_name - options.task_id = task_id - - options.extra = { - log_request = utils.normalize.log_request(options.log_request) or self.log_request, - } - - local task, err = vshard.router.call(bucket_id, 'write', 'tube_delete', { - options - }) - -- re-raise storage errors - if err ~= nil then error(err) end - - if options.extra.log_request then - log_task('delete', task) - end - - return task -end - -function sharded_tube.release(self, task_id, options) - -- task release from tube -- - - local bucket_count = vshard.router.bucket_count() - local bucket_id, _ = utils.unpack_task_id(task_id, bucket_count) - - options = table.deepcopy(options or {}) - options.tube_name = self.tube_name - options.task_id = task_id - - options.extra = { - log_request = utils.normalize.log_request(options.log_request) or self.log_request, - } - - local task, err = vshard.router.call(bucket_id, 'write', 'tube_release', { - options - }) - -- re-raise storage errors - if err ~= nil then error(err) end - - if options.extra.log_request then - log_task('release', task) - end - - return task -end - -function sharded_tube.touch(self, task_id, delta, options) - if delta == nil or delta <= 0 then - return - end - - if delta >= time.MAX_TIMEOUT then - delta = time.TIMEOUT_INFINITY - else - delta = time.nano(delta) - end - - local bucket_count = vshard.router.bucket_count() - local bucket_id, _ = utils.unpack_task_id(task_id, bucket_count) - - options = table.deepcopy(options or {}) - options.tube_name = self.tube_name - options.task_id = task_id - options.delta = delta - - options.extra = { - log_request = utils.normalize.log_request(options.log_request) or self.log_request, - } - - local task, err = vshard.router.call(bucket_id, 'write', 'tube_touch', { - options - }) - -- re-raise storage errors - if err ~= nil then error(err) end - - if options.extra.log_request then - log_task('touch', task) - end - - return task -end - -function sharded_tube.ack(self, task_id, options) - -- task delete from tube -- - - local bucket_count = vshard.router.bucket_count() - local bucket_id, _ = utils.unpack_task_id(task_id, bucket_count) - - options = table.deepcopy(options or {}) - options.tube_name = self.tube_name - options.task_id = task_id - - options.extra = { - log_request = utils.normalize.log_request(options.log_request) or self.log_request, - } - - if options.extra.log_request then - log.info(("Router[%d] ack: call id %d, bucket %d") - :format(fiber.self():id(), task_id, bucket_id)) - end - - local task, err = vshard.router.call(bucket_id, 'write', 'tube_ack', { - options - }) - -- re-raise storage errors - if err ~= nil then error(err) end - - if options.extra.log_request then - log_task('ack', task) - end - - return task -end - -function sharded_tube.bury(self, task_id, options) - -- task bury -- - - local bucket_count = vshard.router.bucket_count() - local bucket_id, _ = utils.unpack_task_id(task_id, bucket_count) - - options = table.deepcopy(options or {}) - options.tube_name = self.tube_name - options.task_id = task_id - - options.extra = { - log_request = utils.normalize.log_request(options.log_request) or self.log_request, - } - - local task, err = vshard.router.call(bucket_id, 'write', 'tube_bury', { - options - }) - -- re-raise storage errors - if err ~= nil then error(err) end - - if options.extra.log_request then - log_task('bury', task) - end - - return task -end - -function sharded_tube.kick(self, count, options) - -- try kick few tasks -- - - local storages = cartridge_rpc.get_candidates( - 'sharded_queue.storage', - { - leader_only = true - }) - - local kicked_count = 0 -- count kicked task - for _, instance_uri in ipairs(storages) do - local opts = table.deepcopy(options or {}) - opts.tube_name = self.tube_name - opts.count = count - kicked_count - - local ok, k = pcall(remote_call, 'tube_kick', instance_uri, opts) - if not ok then - log.error(k) - return kicked_count - end - - kicked_count = kicked_count + k - - if kicked_count == count then - break - end - end - - return kicked_count -end - -function sharded_tube.peek(self, task_id, options) - local bucket_count = vshard.router.bucket_count() - local bucket_id, _ = utils.unpack_task_id(task_id, bucket_count) - - options = table.deepcopy(options or {}) - options.tube_name = self.tube_name - options.task_id = task_id - - options.extra = { - log_request = utils.normalize.log_request(options.log_request) or self.log_request, - } - - local task, err = vshard.router.call(bucket_id, 'write', 'tube_peek', { - options - }) - -- re-raise storage errors - if err ~= nil then error(err) end - - if options.extra.log_request then - log_task('peek', task) - end - - return task -end - -function sharded_tube.drop(self) - local tubes = cartridge.config_get_deepcopy('tubes') or {} - - tubes[self.tube_name] = nil - cartridge.config_patch_clusterwide({ tubes = tubes }) -end - -local sharded_queue = { - tube = {}, - cfg = stash.get(stash_names.cfg), - metrics_stats = metrics.init(stash.get(stash_names.metrics_stats)), -} - -if sharded_queue.cfg.metrics == nil then - sharded_queue.cfg.metrics = true -end - -if sharded_queue.cfg.metrics then - sharded_queue.cfg.metrics = metrics.is_supported() -end - -local function wrap_sharded_queue_call_counters(call, fun) - return function(self, ...) - local before = fiber.clock() - local ok, ret = pcall(fun, self, ...) - local after = fiber.clock() - - if sharded_queue.cfg.metrics then - sharded_queue.metrics_stats:observe(after - before, - self.tube_name, call, ok) - end - - if not ok then - error(ret) - end - - return ret - end -end - -for call, fun in pairs(sharded_tube) do - sharded_tube[call] = wrap_sharded_queue_call_counters(call, fun) -end - -local function metrics_enable() - local get_statistic = function(tube) - return sharded_queue.statistics(tube) - end - sharded_queue.metrics_stats:enable('api', sharded_queue.tube, get_statistic) -end - -local function metrics_disable() - sharded_queue.metrics_stats:disable() -end - -function sharded_queue.cfg_call(_, options) +function cfg_call(_, options) options = options or {} if options.metrics == nil then return @@ -458,7 +15,7 @@ function sharded_queue.cfg_call(_, options) error('"metrics" must be a boolean') end - if sharded_queue.cfg.metrics ~= options.metrics then + if config.metrics ~= options.metrics then local tubes = cartridge.config_get_deepcopy('tubes') or {} if tubes['cfg'] ~= nil and tubes['cfg'].metrics == nil then @@ -471,78 +28,17 @@ function sharded_queue.cfg_call(_, options) end end -function sharded_queue.statistics(tube_name) - if not tube_name then - return - end - -- collect stats from all storages - local storages = cartridge_rpc.get_candidates( - 'sharded_queue.storage', - { - leader_only = true - }) - - local stats_collection, err = cartridge_pool.map_call('tube_statistic', - {{ tube_name = tube_name }}, {uri_list=storages}) - if err ~= nil then - return nil, err - end - - if type(stats_collection) ~= 'table' then - return nil, 'No stats retrieved' - end - - if next(stats_collection) == nil then - return nil - end - - -- merge - local stat = { tasks = {}, calls = {} } - for _, uri_stat in pairs(stats_collection) do - for name, count in pairs(uri_stat.tasks) do - stat.tasks[name] = (stat.tasks[name] or 0) + count - end - for name, count in pairs(uri_stat.calls) do - stat.calls[name] = (stat.calls[name] or 0) + count - end - end - - return stat -end - -function sharded_queue.create_tube(tube_name, options) - local tubes = cartridge.config_get_deepcopy('tubes') or {} - - if tube_name == 'cfg' then - error('a tube name "cfg" is reserved') - end - - if tubes[tube_name] ~= nil then - -- already exist -- - return nil - end - - local ok , err = validate_options(options) - if not ok then error(err) end - - options = table.deepcopy(options or {}) - if options.priority == nil and options.pri ~= nil then - options.priority = options.pri - end - - tubes[tube_name] = options - ok, err = cartridge.config_patch_clusterwide({ tubes = tubes }) - if not ok then error(err) end - - return sharded_queue.tube[tube_name] -end - local function init(opts) - rawset(_G, 'queue', sharded_queue) + queue.export_globals() end local function validate_config(cfg) - return utils.validate_config_cfg(cfg) + local cfg_tubes = cfg.tubes or {} + local ok, err = utils.validate_tubes(cfg_tubes, false) + if not ok then + return ok, err + end + return utils.validate_cfg(cfg_tubes['cfg']) end local function apply_config(cfg, opts) @@ -551,43 +47,30 @@ local function apply_config(cfg, opts) for tube_name, options in pairs(cfg_tubes) do if tube_name == 'cfg' then if options.metrics ~= nil then - sharded_queue.cfg.metrics = options.metrics and true or false + config.metrics = options.metrics and true or false end - elseif sharded_queue.tube[tube_name] == nil then - local self = setmetatable({ - tube_name = tube_name, - wait_max = options.wait_max, - wait_factor = options.wait_factor or time.DEFAULT_WAIT_FACTOR, - log_request = utils.normalize.log_request(options.log_request), - }, { - __index = sharded_tube, - }) - sharded_queue.tube[tube_name] = self + elseif queue.map()[tube_name] == nil then + queue.add(tube_name, metrics, options) end end -- try drop tubes -- - for tube_name, _ in pairs(sharded_queue.tube) do + for tube_name, _ in pairs(queue.map()) do if tube_name ~= 'cfg' and cfg_tubes[tube_name] == nil then - setmetatable(sharded_queue.tube[tube_name], nil) - sharded_queue.tube[tube_name] = nil + queue.remove(tube_name) end end - if sharded_queue.cfg.metrics then - metrics_enable() + if config.metrics then + metrics.enable(queue) else - metrics_disable() + metrics.disable() end end --- FIXME: Remove when https://github.com/tarantool/cartridge/issues/308 resolved local function queue_action_wrapper(action) return function(name, ...) - if not sharded_queue.tube[name] then - return nil, string.format('No queue "%s" initialized yet', name) - end - return sharded_queue.tube[name][action](sharded_queue.tube[name], ...) + return queue.call(name, action, ...) end end @@ -608,19 +91,15 @@ return { drop = queue_action_wrapper('drop'), cfg = setmetatable({}, { - __index = sharded_queue.cfg, + __index = config, __newindex = function() error("Use api.cfg() instead", 2) end, - __call = sharded_queue.cfg_call, - __serialize = function() return sharded_queue.cfg end, + __call = cfg_call, + __serialize = function() return config end, }), - statistics = sharded_queue.statistics, + statistics = queue.statistics, _VERSION = require('sharded_queue.version'), dependencies = { 'cartridge.roles.vshard-router', }, - - __private = { - sharded_tube = sharded_tube, - } } diff --git a/sharded_queue/router/config.lua b/sharded_queue/router/config.lua new file mode 100644 index 0000000..26061ca --- /dev/null +++ b/sharded_queue/router/config.lua @@ -0,0 +1,19 @@ +local metrics = require('sharded_queue.metrics') +local stash = require('sharded_queue.stash') + +local stash_names = { + config = '__sharded_queue_router_config', +} +stash.setup(stash_names) + +local config = stash.get(stash_names.config) + +if config.metrics == nil then + config.metrics = true +end + +if config.metrics then + config.metrics = metrics.is_supported() +end + +return config diff --git a/sharded_queue/router/metrics.lua b/sharded_queue/router/metrics.lua new file mode 100644 index 0000000..e11361b --- /dev/null +++ b/sharded_queue/router/metrics.lua @@ -0,0 +1,31 @@ +local metrics = require('sharded_queue.metrics') +local stash = require('sharded_queue.stash') +local stats_storage = require('sharded_queue.stats.storage') + +local stash_names = { + metrics_stats = '__sharded_queue_router_metrics_stats', +} +stash.setup(stash_names) + +local metrics_stats = metrics.init(stash.get(stash_names.metrics_stats)) + +local function enable(queue) + local get_statistic = function(tube) + return queue.statistics(tube) + end + metrics_stats:enable('api', queue.map(), get_statistic) +end + +local function observe(latency, tube, method, ok) + metrics_stats:observe(latency, tube, method, ok) +end + +local function disable() + metrics_stats:disable() +end + +return { + enable = enable, + observe = observe, + disable = disable, +} diff --git a/sharded_queue/router/queue.lua b/sharded_queue/router/queue.lua new file mode 100644 index 0000000..c92d86f --- /dev/null +++ b/sharded_queue/router/queue.lua @@ -0,0 +1,142 @@ +local is_cartridge_package, cartridge = pcall(require, 'cartridge') +local vshard = require('vshard') +local tube = require('sharded_queue.router.tube') +local utils = require('sharded_queue.utils') + +local queue_global = { + tube = {}, +} + +function queue_global.statistics(tube_name) + if not tube_name then + return + end + + local stats_collection, err = vshard.router.map_callrw('tube_statistic', + {{ tube_name = tube_name }}) + if err ~= nil then + return nil, err + end + + if type(stats_collection) ~= 'table' then + return nil, 'No stats retrieved' + end + + if next(stats_collection) == nil then + return nil + end + + local stat = { tasks = {}, calls = {} } + for _, replicaset_stats in pairs(stats_collection) do + if type(replicaset_stats) ~= 'table' or next(replicaset_stats) == nil then + return nil, 'Invalid stats' + end + + for name, count in pairs(replicaset_stats[1].tasks) do + stat.tasks[name] = (stat.tasks[name] or 0) + count + end + for name, count in pairs(replicaset_stats[1].calls) do + stat.calls[name] = (stat.calls[name] or 0) + count + end + end + + return stat +end + +-- The Tarantool 3.0 does not support to update dinamically a configuration, so +-- a user must update the configuration by itself. +if is_cartridge_package then + local function validate_options(options) + if not options then return true end + + if options.wait_factor then + if type(options.wait_factor) ~= 'number' + or options.wait_factor < 1 + then + return false, "wait_factor must be number greater than or equal to 1" + end + end + + local _, err = utils.normalize.log_request(options.log_request) + if err then + return false, err + end + + if options.wait_max ~= nil then + local err + options.wait_max, err = utils.normalize.wait_max(options.wait_max) + if err ~= nil then + return false, err + end + end + + return true + end + + queue_global.create_tube = function(tube_name, options) + require('log').info("CREATE TUBE") + local tubes = cartridge.config_get_deepcopy('tubes') or {} + + if tube_name == 'cfg' then + error('a tube name "cfg" is reserved') + end + + if tubes[tube_name] ~= nil then + return nil + end + + local ok , err = validate_options(options) + if not ok then error(err) end + + options = table.deepcopy(options or {}) + if options.priority == nil and options.pri ~= nil then + options.priority = options.pri + end + + tubes[tube_name] = options + ok, err = cartridge.config_patch_clusterwide({ tubes = tubes }) + if not ok then + error(err) + end + + return queue_global.tube[tube_name] + end +end + +local function export_globals() + rawset(_G, 'queue', queue_global) +end + +local function add(name, metrics, options) + queue_global.tube[name] = tube.new(name, metrics, options) +end + +local function call(tube, action, ...) + if queue_global.tube[tube] == nil then + return nil, string.format('No queue "%s" initialized yet', name) + end + if queue_global.tube[tube][action] == nil then + return nil, string.format('Queue %s has not action %s', tube, action) + end + return queue_global.tube[tube][action](queue_global.tube[tube], ...) +end + +local function map() + return queue_global.tube +end + +local function remove(tube) + if queue_global.tube[tube] ~= nil then + setmetatable(queue_global.tube[tube], nil) + queue_global.tube[tube] = nil + end +end + +return { + export_globals = export_globals, + add = add, + call = call, + map = map, + statistics = queue_global.statistics, + remove = remove, +} diff --git a/sharded_queue/router/tube.lua b/sharded_queue/router/tube.lua new file mode 100644 index 0000000..032be32 --- /dev/null +++ b/sharded_queue/router/tube.lua @@ -0,0 +1,415 @@ +local is_cartridge_package, cartridge = pcall(require, 'cartridge') +local fiber = require('fiber') +local vshard = require('vshard') +local log = require('log') + +local time = require('sharded_queue.time') +local utils = require('sharded_queue.utils') + +local function log_task(op_name, task) + local task_id = type(task) == 'table' and task[1] or task + log.info(string.format([[Router[%d] %s: task %s]], fiber.self():id(), op_name, task_id)) +end + +local function remote_call(method, replicaset, args, timeout) + return replicaset:callrw(method, { args }, { timeout = timeout }) +end + +local function take_task(replicasets, options, take_timeout, call_timeout) + for _, replicaset in ipairs(replicasets) do + if take_timeout == 0 then + break + end + local begin = time.cur() + + -- Try to take a task from all instances. + local ok, ret = pcall(remote_call, 'tube_take', + replicaset, + options, + call_timeout + ) + + if ret ~= nil and ok then + return ret + end + + local duration = time.cur() - begin + take_timeout = take_timeout > duration and take_timeout - duration or 0 + end +end + +function put(self, data, options) + local bucket_count = vshard.router.bucket_count() + local bucket_id = math.random(bucket_count) + + options = table.deepcopy(options or {}) + + if options.priority == nil and options.pri ~= nil then + options.priority = options.pri + end + + options.data = data + options.tube_name = self.tube_name + options.bucket_id = bucket_id + options.bucket_count = bucket_count + + options.extra = { + log_request = utils.normalize.log_request(options.log_request) or self.log_request, + } + + local task, err = vshard.router.call(bucket_id, + 'write', 'tube_put', { options }) + -- Re-raise storage errors. + if err ~= nil then error(err) end + + if options.extra.log_request then + log_task('put', task) + end + + return task +end + +function take(self, timeout, options) + options = table.deepcopy(options or {}) + options.tube_name = self.tube_name + + options.extra = { + log_request = utils.normalize.log_request(options.log_request) or self.log_request, + } + + local remote_call_timeout = time.MIN_NET_BOX_CALL_TIMEOUT + if timeout ~= nil and timeout > time.MIN_NET_BOX_CALL_TIMEOUT then + remote_call_timeout = timeout + end + + local take_timeout = time.nano(timeout) or time.TIMEOUT_INFINITY + + local frequency = 1000 + local wait_part = 0.01 -- maximum waiting time in second + local wait_max = utils.normalize.wait_max(options.wait_max) + or self.wait_max or time.MAX_TIMEOUT + + local wait_factor = self.wait_factor + + local calc_part = time.sec(take_timeout / frequency) + + if calc_part < wait_part then + wait_part = tonumber(calc_part) + end + + if options.extra.log_request then + log.info(("Router[%d] take: start attempts"):format(fiber.self():id())) + end + + while take_timeout ~= 0 do + local begin = time.cur() + + local shards, err = vshard.router.routeall() + if err ~= nil then + error(err) + end + + local replicasets = {} + for _, replicaset in pairs(shards) do + table.insert(replicasets, replicaset) + end + utils.array_shuffle(replicasets) + + local task = take_task(replicasets, + options, take_timeout, remote_call_timeout) + + if task ~= nil then + if options.extra.log_request then + log_task('take', task) + end + return task + end + + if take_timeout < time.nano(wait_part) then + if options.extra.log_request then + log.info(("Router[%d] take: next attemt will be after timeout") + :format(fiber.self():id())) + end + return nil + end + + fiber.sleep(wait_part) + + wait_part = wait_part * wait_factor + if wait_part > wait_max then + wait_part = wait_max + end + + local duration = time.cur() - begin + + take_timeout = take_timeout > duration and take_timeout - duration or 0 + end + + if options.extra.log_request then + log.info(("Router[%d] take: timeout"):format(fiber.self():id())) + end +end + +function delete(self, task_id, options) + local bucket_count = vshard.router.bucket_count() + local bucket_id, _ = utils.unpack_task_id(task_id, bucket_count) + + options = table.deepcopy(options or {}) + options.tube_name = self.tube_name + options.task_id = task_id + + options.extra = { + log_request = utils.normalize.log_request(options.log_request) or self.log_request, + } + + local task, err = vshard.router.call(bucket_id, 'write', 'tube_delete', { + options + }) + -- Re-raise storage errors. + if err ~= nil then error(err) end + + if options.extra.log_request then + log_task('delete', task) + end + + return task +end + +function release(self, task_id, options) + local bucket_count = vshard.router.bucket_count() + local bucket_id, _ = utils.unpack_task_id(task_id, bucket_count) + + options = table.deepcopy(options or {}) + options.tube_name = self.tube_name + options.task_id = task_id + + options.extra = { + log_request = utils.normalize.log_request(options.log_request) or self.log_request, + } + + local task, err = vshard.router.call(bucket_id, 'write', 'tube_release', { + options + }) + -- Re-raise storage errors. + if err ~= nil then error(err) end + + if options.extra.log_request then + log_task('release', task) + end + + return task +end + +function touch(self, task_id, delta, options) + if delta == nil or delta <= 0 then + return + end + + if delta >= time.MAX_TIMEOUT then + delta = time.TIMEOUT_INFINITY + else + delta = time.nano(delta) + end + + local bucket_count = vshard.router.bucket_count() + local bucket_id, _ = utils.unpack_task_id(task_id, bucket_count) + + options = table.deepcopy(options or {}) + options.tube_name = self.tube_name + options.task_id = task_id + options.delta = delta + + options.extra = { + log_request = utils.normalize.log_request(options.log_request) or self.log_request, + } + + local task, err = vshard.router.call(bucket_id, 'write', 'tube_touch', { + options + }) + -- Re-raise storage errors. + if err ~= nil then error(err) end + + if options.extra.log_request then + log_task('touch', task) + end + + return task +end + +function ack(self, task_id, options) + local bucket_count = vshard.router.bucket_count() + local bucket_id, _ = utils.unpack_task_id(task_id, bucket_count) + + options = table.deepcopy(options or {}) + options.tube_name = self.tube_name + options.task_id = task_id + + options.extra = { + log_request = utils.normalize.log_request(options.log_request) or self.log_request, + } + + if options.extra.log_request then + log.info(("Router[%d] ack: call id %d, bucket %d") + :format(fiber.self():id(), task_id, bucket_id)) + end + + local task, err = vshard.router.call(bucket_id, 'write', 'tube_ack', { + options + }) + -- Re-raise storage errors. + if err ~= nil then error(err) end + + if options.extra.log_request then + log_task('ack', task) + end + + return task +end + +function bury(self, task_id, options) + local bucket_count = vshard.router.bucket_count() + local bucket_id, _ = utils.unpack_task_id(task_id, bucket_count) + + options = table.deepcopy(options or {}) + options.tube_name = self.tube_name + options.task_id = task_id + + options.extra = { + log_request = utils.normalize.log_request(options.log_request) or self.log_request, + } + + local task, err = vshard.router.call(bucket_id, 'write', 'tube_bury', { + options + }) + -- Re-raise storage errors. + if err ~= nil then error(err) end + + if options.extra.log_request then + log_task('bury', task) + end + + return task +end + +function kick(self, count, options) + local kicked_count = 0 -- count kicked task + local shards, err = vshard.router.routeall() + if err ~= nil then + error(err) + end + + for _, replicaset in pairs(shards) do + local opts = table.deepcopy(options or {}) + opts.tube_name = self.tube_name + opts.count = count - kicked_count + + local ok, k = pcall(remote_call, 'tube_kick', replicaset, opts) + if not ok then + log.error(k) + return kicked_count + end + + kicked_count = kicked_count + k + + if kicked_count == count then + break + end + end + + return kicked_count +end + +function peek(self, task_id, options) + local bucket_count = vshard.router.bucket_count() + local bucket_id, _ = utils.unpack_task_id(task_id, bucket_count) + + options = table.deepcopy(options or {}) + options.tube_name = self.tube_name + options.task_id = task_id + + options.extra = { + log_request = utils.normalize.log_request(options.log_request) or self.log_request, + } + + local task, err = vshard.router.call(bucket_id, 'write', 'tube_peek', { + options + }) + -- Re-raise storage errors. + if err ~= nil then error(err) end + + if options.extra.log_request then + log_task('peek', task) + end + + return task +end + +function drop(self) + local tubes = cartridge.config_get_deepcopy('tubes') or {} + tubes[self.tube_name] = nil + cartridge.config_patch_clusterwide({ tubes = tubes }) +end + +local methods = { + put = put, + take = take, + delete = delete, + release = release, + touch = touch, + ack = ack, + bury = bury, + kick = kick, + peek = peek, +} + +-- The Tarantool 3.0 does not support to update dinamically a configuration, so +-- a user must update the configuration by itself. +if is_cartridge_package then + methods.drop = drop +end + +local function new_metrics_metatable(metrics) + local mt = { + __index = {}, + } + + for call, fun in pairs(methods) do + mt.__index[call] = function(self, ...) + local before = fiber.clock() + local ok, ret = pcall(fun, self, ...) + local latency = fiber.clock() - before + + metrics.observe(latency, self.tube_name, call, ok) + + if not ok then + error(ret) + end + + return ret + end + end + + return mt +end + +local function new(name, metrics, options) + return setmetatable({ + tube_name = name, + wait_max = options.wait_max, + wait_factor = options.wait_factor or time.DEFAULT_WAIT_FACTOR, + log_request = utils.normalize.log_request(options.log_request), + }, new_metrics_metatable(metrics)) +end + +local function get_methods() + local list = {} + for method, _ in pairs(methods) do + table.insert(list, method) + end + return list +end + +return { + new = new, + get_methods = get_methods, +} diff --git a/sharded_queue/storage.lua b/sharded_queue/storage.lua index c3ecdb5..f33d03f 100644 --- a/sharded_queue/storage.lua +++ b/sharded_queue/storage.lua @@ -1,208 +1,61 @@ -local fiber = require('fiber') -local json = require('json') -local log = require('log') - -local cartridge = require('cartridge') - -local metrics = require('sharded_queue.metrics') -local stash = require('sharded_queue.stash') -local state = require('sharded_queue.state') +local config = require('sharded_queue.storage.config') +local methods = require('sharded_queue.storage.methods') +local metrics = require('sharded_queue.storage.metrics') local stats_storage = require('sharded_queue.stats.storage') +local tubes = require('sharded_queue.storage.tubes').new() local utils = require('sharded_queue.utils') -local DEFAULT_DRIVER = 'sharded_queue.drivers.fifottl' - -local stash_names = { - cfg = '__sharded_queue_storage_cfg', - metrics_stats = '__sharded_queue_storage_metrics_stats', -} -stash.setup(stash_names) - -local methods = { - 'statistic', - 'put', - 'take', - 'delete', - 'touch', - 'ack', - 'peek', - 'release', - 'bury', - 'kick', -} - -local storage = { - cfg = stash.get(stash_names.cfg), - metrics_stats = metrics.init(stash.get(stash_names.metrics_stats)), -} - -if storage.cfg.metrics == nil then - storage.cfg.metrics = true -end - -if storage.cfg.metrics then - storage.cfg.metrics = metrics.is_supported() -end +local function init(opts) -local queue_drivers = {} -local function get_driver(driver_name) - if queue_drivers[driver_name] == nil then - queue_drivers[driver_name] = require(driver_name) - end - return queue_drivers[driver_name] end -local tubes = {} - -local function map_tubes(cfg_tubes) - local result = {} - for tube_name, tube_opts in pairs(cfg_tubes) do - if tube_name['cfg'] ~= nil or tube_opts.enable == nil then - -- do not add 'cfg' as a tube - local driver_name = tube_opts.driver or DEFAULT_DRIVER - result[tube_name] = get_driver(driver_name) - end - end - return result -end +local function validate_config(cfg) + local cfg_tubes = cfg.tubes or {} -local function metrics_enable() - local get_statistic = function(tube) - return stats_storage.get(tube) + local ok, err = utils.validate_tubes(cfg_tubes, true) + if not ok then + return ok, err end - - storage.metrics_stats:enable('storage', tubes, get_statistic) + return utils.validate_cfg(cfg_tubes['cfg']) end -local function metrics_disable() - storage.metrics_stats:disable() -end - -local function validate_config(cfg) - local cfg_tubes = cfg.tubes or {} - for tube_name, tube_opts in pairs(cfg_tubes) do - if tube_opts.driver ~= nil then - if type('tube_opts.driver') ~= 'string' then - return nil, 'Driver name must be a valid module name for tube' .. tube_name - end - local ok, _ = pcall(require, tube_opts.driver) - if not ok then - return nil, ('Driver %s could not be loaded for tube %s'):format(tube_opts.driver, tube_name) - end +local function apply_config(cfg, opts) + local cfg_tubes = table.deepcopy(cfg.tubes or {}) + if cfg_tubes['cfg'] ~= nil then + local options = cfg_tubes['cfg'] + if options.metrics ~= nil then + config.metrics = options.metrics and true or false end + cfg_tubes['cfg'] = nil end - return utils.validate_config_cfg(cfg) -end - -local function apply_config(cfg, opts) if opts.is_master then stats_storage.init() - local cfg_tubes = cfg.tubes or {} - if cfg_tubes['cfg'] ~= nil then - local options = cfg_tubes['cfg'] - if options.metrics ~= nil then - storage.cfg.metrics = options.metrics and true or false - end - end - - local existing_tubes = tubes - - tubes = map_tubes(cfg_tubes) - - -- try create tube -- - for tube_name, driver in pairs(tubes) do - if existing_tubes[tube_name] == nil then - tubes[tube_name].create({ - name = tube_name, - options = cfg_tubes[tube_name] - }) - stats_storage.reset(tube_name) - end - end - - -- try drop tube -- - for tube_name, driver in pairs(existing_tubes) do - if tubes[tube_name] == nil then - driver.drop(tube_name) - end + local new = tubes:update(cfg_tubes) + for _, tube in ipairs(new) do + stats_storage.reset(tube) end - -- register tube methods -- - for _, name in pairs(methods) do - local func = function(args) - if args == nil then args = {} end - args.options = cfg_tubes[args.tube_name] or {} - - local tube_name = args.tube_name - if tubes[tube_name].method[name] == nil then error(('Method %s not implemented in tube %s'):format(name, tube_name)) end - - local before = fiber.clock() - local ok, ret, err = pcall(tubes[tube_name].method[name], args) - local after = fiber.clock() - - if storage.cfg.metrics then - storage.metrics_stats:observe(after - before, - tube_name, name, ok and err == nil) - end - - if not ok then - error(ret) - end - - return ret, err - end - - local global_name = 'tube_' .. name - rawset(_G, global_name, func) - box.schema.func.create(global_name, { if_not_exists = true }) - end - - local tube_statistic_func = function(args) - local before = fiber.clock() - local ok, ret, err = pcall(stats_storage.get, args.tube_name) - local after = fiber.clock() - if storage.cfg.metrics then - storage.metrics_stats:observe(after - before, - args.tube_name, 'statistic', ok and err == nil) - end - - if not ok then - error(ret) - end - - return ret, err - end - - rawset(_G, 'tube_statistic', tube_statistic_func) - box.schema.func.create('tube_statistic', { if_not_exists = true }) + methods.init(metrics, tubes) end - if storage.cfg.metrics then - metrics_enable() + if config.metrics then + metrics.enable(tubes:map()) else - metrics_disable() + metrics.disable() end return true end -local function init(opts) - -end - return { init = init, - apply_config = apply_config, validate_config = validate_config, + apply_config = apply_config, _VERSION = require('sharded_queue.version'), dependencies = { 'cartridge.roles.vshard-storage', }, - - __private = { - methods = methods, - } } diff --git a/sharded_queue/storage/config.lua b/sharded_queue/storage/config.lua new file mode 100644 index 0000000..409c342 --- /dev/null +++ b/sharded_queue/storage/config.lua @@ -0,0 +1,19 @@ +local metrics = require('sharded_queue.metrics') +local stash = require('sharded_queue.stash') + +local stash_names = { + config = '__sharded_queue_storage_config', +} +stash.setup(stash_names) + +local config = stash.get(stash_names.config) + +if config.metrics == nil then + config.metrics = true +end + +if config.metrics then + config.metrics = metrics.is_supported() +end + +return config diff --git a/sharded_queue/storage/drivers.lua b/sharded_queue/storage/drivers.lua new file mode 100644 index 0000000..62b989b --- /dev/null +++ b/sharded_queue/storage/drivers.lua @@ -0,0 +1,15 @@ +local DEFAULT_DRIVER = 'sharded_queue.drivers.fifottl' +local queue_drivers = {} + +local function get_driver(driver_name) + driver_name = driver_name or DEFAULT_DRIVER + + if queue_drivers[driver_name] == nil then + queue_drivers[driver_name] = require(driver_name) + end + return queue_drivers[driver_name] +end + +return { + get = get_driver, +} diff --git a/sharded_queue/storage/methods.lua b/sharded_queue/storage/methods.lua new file mode 100644 index 0000000..63e8283 --- /dev/null +++ b/sharded_queue/storage/methods.lua @@ -0,0 +1,67 @@ +local fiber = require('fiber') +local stats_storage = require('sharded_queue.stats.storage') + +local methods = { + 'statistic', + 'put', + 'take', + 'delete', + 'touch', + 'ack', + 'peek', + 'release', + 'bury', + 'kick', +} + +local function init(metrics, tubes) + for _, method in pairs(methods) do + local func = function(args) + args = args or {} + args.options = tubes:get_options(args.tube_name) or {} + + local tube_name = args.tube_name + local before = fiber.clock() + local ok, ret, err = pcall(tubes.call, tubes, tube_name, method, args) + local latency = fiber.clock() - before + + metrics.observe(latency, tube_name, method, ok and err == nil) + + if not ok then + error(ret) + end + + return ret, err + end + + local global_name = 'tube_' .. method + rawset(_G, global_name, func) + box.schema.func.create(global_name, { if_not_exists = true }) + end + + local tube_statistic_func = function(args) + local before = fiber.clock() + local ok, ret, err = pcall(stats_storage.get, args.tube_name) + local latency = fiber.clock() - before + + metrics.observe(latency, args.tube_name, 'statistic', ok and err == nil) + + if not ok then + error(ret) + end + + return ret, err + end + + rawset(_G, 'tube_statistic', tube_statistic_func) + box.schema.func.create('tube_statistic', { if_not_exists = true }) +end + +local function get_list() + return methods +end + +return { + init = init, + get_list = get_list, +} diff --git a/sharded_queue/storage/metrics.lua b/sharded_queue/storage/metrics.lua new file mode 100644 index 0000000..67ed4d8 --- /dev/null +++ b/sharded_queue/storage/metrics.lua @@ -0,0 +1,32 @@ +local metrics = require('sharded_queue.metrics') +local stash = require('sharded_queue.stash') +local stats_storage = require('sharded_queue.stats.storage') + +local stash_names = { + metrics_stats = '__sharded_queue_storage_metrics_stats', +} +stash.setup(stash_names) + +local metrics_stats = metrics.init(stash.get(stash_names.metrics_stats)) + +local function enable(tubes) + local get_statistic = function(tube) + return stats_storage.get(tube) + end + + metrics_stats:enable('storage', tubes, get_statistic) +end + +local function observe(latency, tube, method, ok) + metrics_stats:observe(latency, tube, method, ok) +end + +local function disable() + metrics_stats:disable() +end + +return { + enable = enable, + observe = observe, + disable = disable, +} diff --git a/sharded_queue/storage/tubes.lua b/sharded_queue/storage/tubes.lua new file mode 100644 index 0000000..f29417c --- /dev/null +++ b/sharded_queue/storage/tubes.lua @@ -0,0 +1,85 @@ +local drivers = require('sharded_queue.storage.drivers') + +local tubes = {} + +local function map_tubes(cfg_tubes) + cfg_tubes = cfg_tubes or {} + + local result = {} + for tube_name, tube_opts in pairs(cfg_tubes) do + if tube_opts.enable == nil or tube_opts.enable == true then + result[tube_name] = drivers.get(tube_opts.driver) + end + end + return result +end + +local function call(self, tube, method, args) + if self.tubes[tube] == nil then + error(('Tube %s not exist'):format(tube)) + end + if self.tubes[tube].method[method] == nil then + error(('Method %s not implemented in tube %s'):format(method, tube)) + end + return self.tubes[tube].method[method](args) +end + +local function map(self) + return self.tubes +end + +local function get_options(self, tube) + return self.options[tube] +end + +local function update(self, cfg_tubes) + local existing_tubes = self:map() + + self.options = cfg_tubes or {} + self.tubes = map_tubes(cfg_tubes) + + -- Create new. + local new = {} + for tube_name, driver in pairs(self.tubes) do + if existing_tubes[tube_name] == nil then + self.tubes[tube_name].create({ + name = tube_name, + options = cfg_tubes[tube_name] + }) + table.insert(new, tube_name) + end + end + + -- Remove old. + local old = {} + for tube_name, driver in pairs(existing_tubes) do + if self.tubes[tube_name] == nil then + driver.drop(tube_name) + table.insert(old, tube_name) + end + end + + return new, old +end + +local mt = { + __index = { + call = call, + get_options = get_options, + map = map, + update = update, + }, +} + +local function new() + local ret = { + tubes = {}, + options = {}, + } + setmetatable(ret, mt) + return ret +end + +return { + new = new, +} diff --git a/sharded_queue/utils.lua b/sharded_queue/utils.lua index 3f483b4..e1999e3 100644 --- a/sharded_queue/utils.lua +++ b/sharded_queue/utils.lua @@ -74,13 +74,31 @@ function utils.normalize.wait_max(wait_max) return wait_max end -function utils.validate_config_cfg(cfg) - cfg = cfg.tubes or {} - if cfg['cfg'] == nil then +function utils.validate_tubes(tubes, on_storage) + for tube_name, tube_opts in pairs(tubes) do + if tube_opts.driver ~= nil then + if type(tube_opts.driver) ~= 'string' then + local msg = 'Driver name must be a valid module name for tube %s' + return nil, msg:format(tube_name) + end + if on_storage then + local ok, _ = pcall(require, tube_opts.driver) + if not ok then + local msg = 'Driver %s could not be loaded for tube %s' + return nil, msg:format(tube_opts.driver, tube_name) + end + end + end + end + + return true +end + +function utils.validate_cfg(cfg) + if cfg == nil then return true end - cfg = cfg['cfg'] if type(cfg) ~= 'table' then return nil, '"cfg" must be a table' end diff --git a/test/api_test.lua b/test/api_test.lua index e161752..fdf7794 100644 --- a/test/api_test.lua +++ b/test/api_test.lua @@ -6,6 +6,7 @@ local g = t.group('api') local api = require('sharded_queue.api') local config = require('test.helper.config') local utils = require('test.helper.utils') +local tube = require('sharded_queue.router.tube') local is_metrics_supported = utils.is_metrics_supported() g.before_all(function() @@ -19,7 +20,7 @@ g.after_each(function() end) g.test_exported_api = function() - for method, _ in pairs(api.__private.sharded_tube) do + for _, method in pairs(tube.get_methods()) do t.assert_type(api[method], 'function', string.format('api role has method "%s" exported', method)) end diff --git a/test/storage_test.lua b/test/storage_test.lua index ad10ff5..521a3d5 100644 --- a/test/storage_test.lua +++ b/test/storage_test.lua @@ -3,9 +3,8 @@ local t = require('luatest') local g = t.group('storage') -local storage = require('sharded_queue.storage') local config = require('test.helper.config') - +local methods = require('sharded_queue.storage.methods') g.before_all(function() g.storage_master = config.cluster:server('queue-storage-1-0').net_box @@ -17,7 +16,7 @@ g.test_storage_methods = function() local ro = g.storage_ro:eval("return box.cfg.read_only") t.assert_equals(ro, true) - for _,method in pairs(storage.__private.methods) do + for _, method in pairs(methods.get_list()) do local global_name = 'tube_' .. method -- Master storage t.assert_equals(g.storage_master:eval(string.format('return box.schema.func.exists("%s")', global_name)), true)