diff --git a/.luacheckrc b/.luacheckrc index 271cb551f..036d0f6a8 100644 --- a/.luacheckrc +++ b/.luacheckrc @@ -11,3 +11,10 @@ exclude_files = { 'cartridge/graphql.lua', 'cartridge/graphql/*.lua', } +new_read_globals = { + box = { fields = { + session = { fields = { + storage = {read_only = false, other_fields = true} + }} + }} +} diff --git a/CHANGELOG.md b/CHANGELOG.md index 2cfaaa470..3b29338c6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. can be either absolute or relative - in the later case it's calculated relative to `cartridge.workdir`. +- Implement stateful failover mode. + ### Deprecated Lua API: diff --git a/CMakeLists.txt b/CMakeLists.txt index d128289f3..9fef6bfd6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -165,6 +165,11 @@ install( DESTINATION ${TARANTOOL_INSTALL_LUADIR} ) +install( + FILES ${CMAKE_CURRENT_SOURCE_DIR}/kingdom.lua + DESTINATION ${TARANTOOL_INSTALL_BINDIR} +) + install( FILES ${CMAKE_CURRENT_BINARY_DIR}/VERSION.lua diff --git a/cartridge-scm-1.rockspec b/cartridge-scm-1.rockspec index 9bd1575c0..ee2ad9eee 100644 --- a/cartridge-scm-1.rockspec +++ b/cartridge-scm-1.rockspec @@ -29,6 +29,7 @@ build = { TARANTOOL_DIR = '$(TARANTOOL_DIR)', TARANTOOL_INSTALL_LIBDIR = '$(LIBDIR)', TARANTOOL_INSTALL_LUADIR = '$(LUADIR)', + TARANTOOL_INSTALL_BINDIR = '$(BINDIR)', }, copy_directories = {'doc'}, } diff --git a/cartridge.lua b/cartridge.lua index 13aa5e669..7fdb25e0f 100644 --- a/cartridge.lua +++ b/cartridge.lua @@ -509,6 +509,10 @@ local function cfg(opts, box_opts) service_registry.set('httpd', httpd) end + local ok, err = roles.register_role('cartridge.roles.coordinator') + if not ok then + return nil, err + end for _, role in ipairs(opts.roles or {}) do local ok, err = roles.register_role(role) if not ok then diff --git a/cartridge/failover.lua b/cartridge/failover.lua index c2b64c494..a837226ea 100644 --- a/cartridge/failover.lua +++ b/cartridge/failover.lua @@ -1,19 +1,19 @@ -#!/usr/bin/env tarantool - ---- Make decisions regarding instances leadership. +--- Gather information regarding instances leadership. -- -- Failover can operate in two modes: -- -- * In `disabled` mode the leader is the first server configured in --- `topology.replicsets[].master` array. +-- `topology.replicasets[].master` array. -- * In `eventual` mode the leader isn't elected consistently. -- Instead, every instance in cluster thinks the leader is the -- first **healthy** server in replicaset, while instance health is -- determined according to membership status (the SWIM protocol). +-- * In `stateful` mode leaders appointments are polled from the +-- external storage. (**Added** in v2.0.2-2) -- -- This module behavior depends on the instance state. -- --- From the very beginig it reports `is_rw() == false`, +-- From the very beginning it reports `is_rw() == false`, -- `is_leader() == false`, `get_active_leaders() == {}`. -- -- The module is configured when the instance enters `ConfiguringRoles` @@ -29,6 +29,7 @@ local log = require('log') local fiber = require('fiber') local checks = require('checks') local errors = require('errors') +local netbox = require('net.box') local membership = require('membership') local vars = require('cartridge.vars').new('cartridge.failover') @@ -38,28 +39,53 @@ local service_registry = require('cartridge.service-registry') local FailoverError = errors.new_class('FailoverError') local ApplyConfigError = errors.new_class('ApplyConfigError') +local NetboxConnectError = errors.new_class('NetboxConnectError') local ValidateConfigError = errors.new_class('ValidateConfigError') -vars:new('mode', 'disabled') -- disabled | eventual -vars:new('notification', membership.subscribe()) +vars:new('membership_notification', membership.subscribe()) vars:new('clusterwide_config') vars:new('failover_fiber') vars:new('cache', { - active_leaders = nil, + active_leaders = {--[[ [replicaset_uuid] = leader_uuid ]]}, is_leader = false, is_rw = false, }) +vars:new('options', { + LONGPOLL_TIMEOUT = 30, + NETBOX_CALL_TIMEOUT = 1, +}) -local function _get_health_map(topology_cfg, mode) - checks('table', 'string') - assert(topology_cfg.replicasets ~= nil) +--- Generate appointments according to clusterwide configuration. +-- Used in 'disabled' failover mode. +-- @function _get_appointments_disabled_mode +-- @local +local function _get_appointments_disabled_mode(topology_cfg) + checks('table') + local replicasets = assert(topology_cfg.replicasets) - local ret = { - active_leaders = {--[[ [replicaset_uuid] = leader_uuid ]]}, - potential_leaders = {--[[ [instance_uuid] = true|false ]]}, - } + local appointments = {} - for replicaset_uuid, _ in pairs(topology_cfg.replicasets) do + for replicaset_uuid, _ in pairs(replicasets) do + local leaders = topology.get_leaders_order( + topology_cfg, replicaset_uuid + ) + appointments[replicaset_uuid] = leaders[1] + end + + return appointments +end + +--- Generate appointments according to membership status. +-- Used in 'eventual' failover mode. +-- @function _get_appointments_eventual_mode +-- @local +local function _get_appointments_eventual_mode(topology_cfg) + checks('table') + local replicasets = assert(topology_cfg.replicasets) + + local appointments = {} + + for replicaset_uuid, _ in pairs(replicasets) do local leaders = topology.get_leaders_order( topology_cfg, replicaset_uuid ) @@ -75,43 +101,76 @@ local function _get_health_map(topology_cfg, mode) member.payload.state == 'ConfiguringRoles' or member.payload.state == 'RolesConfigured' ) then - if ret.active_leaders[replicaset_uuid] == nil - and mode == 'eventual' - then - ret.active_leaders[replicaset_uuid] = instance_uuid - end - ret.potential_leaders[instance_uuid] = true - else - ret.potential_leaders[instance_uuid] = false + appointments[replicaset_uuid] = instance_uuid + break end end - if ret.active_leaders[replicaset_uuid] == nil then - ret.active_leaders[replicaset_uuid] = leaders[1] + if appointments[replicaset_uuid] == nil then + appointments[replicaset_uuid] = leaders[1] end end - return ret + return appointments end -local function refresh_cache() +--- Get appointments from external storage. +-- Used in 'stateful' failover mode. +-- @function _get_appointments_stateful_mode +-- @local +local function _get_appointments_stateful_mode(conn, timeout) + checks('table', 'number') + local appointments, err = errors.netbox_call( + -- Server will answer in `timeout` seconds (maybe) + conn, 'longpoll', {timeout}, + -- But if it doesn't, we give him another spare second. + {timeout = timeout + vars.options.NETBOX_CALL_TIMEOUT} + ) + + if appointments == nil then + return nil, err + end + + return appointments +end + +--- Accept new appointments. +-- +-- Get appointments wherever they come from and put them into cache. +-- +-- @function accept_appointments +-- @local +-- @tparam {[string]=string} replicaset_uuid to leader_uuid map +-- @treturn boolean Whether leadership map has changed +local function accept_appointments(appointments) + checks('table') local topology_cfg = vars.clusterwide_config:get_readonly('topology') - local health_map = _get_health_map(topology_cfg, vars.mode) + local replicasets = assert(topology_cfg.replicasets) - local leader_uuid = health_map.active_leaders[box.info.cluster.uuid] - local replicasets = topology_cfg.replicasets - local all_rw = replicasets[box.info.cluster.uuid].all_rw + local old_leaders = table.copy(vars.cache.active_leaders) - vars.cache.active_leaders = health_map.active_leaders - vars.cache.is_leader = box.info.uuid == leader_uuid - vars.cache.is_rw = vars.cache.is_leader or all_rw + -- Merge new appointments into cache + for replicaset_uuid, leader_uuid in pairs(appointments) do + vars.cache.active_leaders[replicaset_uuid] = leader_uuid + end - if utils.deepcmp(health_map, vars.cache.health_map) then - return false + -- Remove replicasets that aren't listed in topology + for replicaset_uuid, _ in pairs(vars.cache.active_leaders) do + if replicasets[replicaset_uuid] == nil then + vars.cache.active_leaders[replicaset_uuid] = nil + end end - vars.cache.health_map = health_map - return true + -- Constitute oneself + if vars.cache.active_leaders[box.info.cluster.uuid] == box.info.uuid then + vars.cache.is_leader = true + vars.cache.is_rw = true + else + vars.cache.is_leader = false + vars.cache.is_rw = replicasets[box.info.cluster.uuid].all_rw + end + + return not utils.deepcmp(old_leaders, vars.cache.active_leaders) end local function apply_config(mod) @@ -141,25 +200,32 @@ local function apply_config(mod) ) end -local function failover_loop(notification) +--- Repeatedly fetch new appointments and reconfigure roles. +-- +-- @function failover_loop +-- @local +local function failover_loop(args) + checks({ + get_appointments = 'function', + }) local confapplier = require('cartridge.confapplier') local all_roles = require('cartridge.roles').get_all_roles() ::start_over:: - if vars.mode == 'disabled' then - refresh_cache() - notification:wait() - goto start_over - elseif vars.mode == 'eventual' then - if not refresh_cache() then - -- Nothing changed. - -- Wait for the next event. - notification:wait() + while pcall(fiber.testcancel) do + local appointments, err = FailoverError:pcall(args.get_appointments) + if appointments == nil then + log.warn('%s', err.err) goto start_over end - -- The event may arrive during two-pahse commit is in progress. + if not accept_appointments(appointments) then + -- nothing changed + goto start_over + end + + -- The event may arrive during two-phase commit is in progress. -- We should wait for the appropriate state. local state = confapplier.wish_state('RolesConfigured', math.huge) if state ~= 'RolesConfigured' then @@ -167,10 +233,10 @@ local function failover_loop(notification) goto start_over end + log.info('Failover triggered') confapplier.set_state('ConfiguringRoles') - local ok, err = FailoverError:pcall(function() - log.info('Failover triggered') + local ok, err = FailoverError:pcall(function() box.cfg({ read_only = not vars.cache.is_rw, }) @@ -186,47 +252,92 @@ local function failover_loop(notification) return true end) + if ok then log.info('Failover step finished') else log.warn('Failover step failed: %s', err) end confapplier.set_state('RolesConfigured') - - goto start_over end end +------------------------------------------------------------------------ + --- Initialize the failover module. -- @function cfg -- @local local function cfg(clusterwide_config) checks('ClusterwideConfig') - local topology_cfg = clusterwide_config:get_readonly('topology') - assert(topology_cfg ~= nil) - local failover_cfg = topology.get_failover_params(topology_cfg) - local new_mode = failover_cfg.mode - if vars.mode ~= new_mode then - vars.notification:signal() - log.info('Failover mode set to %q', new_mode) + if vars.failover_fiber ~= nil then + vars.failover_fiber:cancel() + vars.failover_fiber = nil end vars.clusterwide_config = clusterwide_config - vars.mode = new_mode + local topology_cfg = clusterwide_config:get_readonly('topology') + local failover_cfg = topology.get_failover_params(topology_cfg) + local first_appointments + + if failover_cfg.mode == 'disabled' then + log.info('Failover disabled') + first_appointments = _get_appointments_disabled_mode(topology_cfg) + + elseif failover_cfg.mode == 'eventual' then + log.info('Eventual failover enabled') + first_appointments = _get_appointments_eventual_mode(topology_cfg) + + vars.failover_fiber = fiber.new(failover_loop, { + get_appointments = function() + vars.membership_notification:wait() + return _get_appointments_eventual_mode(topology_cfg) + end, + }) + vars.failover_fiber:name('cartridge.eventual-failover') + + elseif failover_cfg.mode == 'stateful' and failover_cfg.state_provider == 'tarantool' then + local params = assert(failover_cfg.tarantool_params) + local conn, err = NetboxConnectError:pcall( + netbox.connect, assert(params.uri), { + wait_connected = false, + reconnect_after = 1.0, + user = 'client', + password = params.password, + }) + + if conn == nil then + log.warn('Stateful failover not enabled: %s', err) + else + log.info( + 'Stateful failover enabled with external storage at %s', + params.uri + ) + end + -- WARNING: network yields + first_appointments = _get_appointments_stateful_mode(conn, 0) + if first_appointments == nil then + first_appointments = {} + end - if vars.failover_fiber == nil - or vars.failover_fiber:status() == 'dead' - then - vars.failover_fiber = fiber.new(failover_loop, vars.notification) - vars.failover_fiber:name('cartridge.failover') + vars.failover_fiber = fiber.new(failover_loop, { + get_appointments = function() + return _get_appointments_stateful_mode(conn, + vars.options.LONGPOLL_TIMEOUT + ) + end, + }) + vars.failover_fiber:name('cartridge.stateful-failover') + else + error('Unknown failover mode') end - refresh_cache() + accept_appointments(first_appointments) box.cfg({ read_only = not vars.cache.is_rw, }) + return true end @@ -235,32 +346,27 @@ end -- @local -- @return {[replicaset_uuid] = instance_uuid,...} local function get_active_leaders() - if vars.cache.active_leaders ~= nil then - return vars.cache.active_leaders - end + return vars.cache.active_leaders +end +--- Check current instance leadership. +-- @function is_leader +-- @local +-- @treturn boolean true / false +local function is_leader() + return vars.cache.is_leader +end - local confapplier = require('cartridge.confapplier') - local topology_cfg = confapplier.get_readonly('topology') - if topology_cfg == nil then - return {} - end - local health_map = _get_health_map(topology_cfg, 'disabled') - return health_map.active_leaders +--- Check current instance writability. +-- @function is_rw +-- @local +-- @treturn boolean true / false +local function is_rw() + return vars.cache.is_rw end return { cfg = cfg, get_active_leaders = get_active_leaders, - - --- Check current instance leadership. - -- @function is_leader - -- @local - -- @treturn boolean true / false - is_leader = function() return vars.cache.is_leader end, - - --- Check current instance writability. - -- @function is_rw - -- @local - -- @treturn boolean true / false - is_rw = function() return vars.cache.is_rw end, + is_leader = is_leader, + is_rw = is_rw, } diff --git a/cartridge/lua-api/get-topology.lua b/cartridge/lua-api/get-topology.lua index 0d42bc4d0..5573fd934 100644 --- a/cartridge/lua-api/get-topology.lua +++ b/cartridge/lua-api/get-topology.lua @@ -123,6 +123,7 @@ local function get_topology() local replicasets = {} local known_roles = roles.get_known_roles() local leaders_order = {} + local failover_cfg = topology.get_failover_params(topology_cfg) --- Replicaset general information. -- @tfield @@ -162,8 +163,18 @@ local function get_topology() uuid = replicaset_uuid, roles = {}, status = 'healthy', - master = nil, - active_master = nil, + master = { + uri = 'void', + uuid = 'void', + status = 'void', + message = 'void', + }, + active_master = { + uri = 'void', + uuid = 'void', + status = 'void', + message = 'void', + }, weight = nil, vshard_group = replicaset.vshard_group, servers = {}, @@ -197,9 +208,14 @@ local function get_topology() srv.replicaset = replicasets[server.replicaset_uuid] if leaders_order[server.replicaset_uuid][1] == instance_uuid then - srv.replicaset.master = srv + if failover_cfg.mode ~= 'stateful' then + srv.replicaset.master = srv + end end if active_leaders[server.replicaset_uuid] == instance_uuid then + if failover_cfg.mode == 'stateful' then + srv.replicaset.master = srv + end srv.replicaset.active_master = srv end if srv.status ~= 'healthy' then diff --git a/cartridge/roles/coordinator.lua b/cartridge/roles/coordinator.lua new file mode 100644 index 000000000..ab37f2c22 --- /dev/null +++ b/cartridge/roles/coordinator.lua @@ -0,0 +1,318 @@ +local log = require('log') +local fiber = require('fiber') +local checks = require('checks') +local errors = require('errors') +local netbox = require('net.box') +local membership = require('membership') +local uri_lib = require('uri') + +local vars = require('cartridge.vars').new('cartridge.roles.coordinator') +local topology = require('cartridge.topology') +local confapplier = require('cartridge.confapplier') + +local AppointmentError = errors.new_class('AppointmentError') +local CoordinatorError = errors.new_class('CoordinatorError') +local NetboxConnectError = errors.new_class('NetboxConnectError') + +vars:new('membership_notification', membership.subscribe()) +vars:new('connect_fiber', nil) +vars:new('topology_cfg', nil) +vars:new('conn', nil) +vars:new('options', { + RECONNECT_PERIOD = 5, + IMMUNITY_TIMEOUT = 15, + NETBOX_CALL_TIMEOUT = 1, +}) + +-- The healthcheck function is put into vars for easier +-- monkeypatching and further extending. +vars:new('healthcheck', function(members, instance_uuid) + checks('table', 'string') + assert(vars.topology_cfg ~= nil) + assert(vars.topology_cfg.servers ~= nil) + + local server = vars.topology_cfg.servers[instance_uuid] + if server == nil or not topology.not_disabled(instance_uuid, server) then + return false + end + local member = members[server.uri] + + if member ~= nil + and (member.status == 'alive' or member.status == 'suspect') + and (member.payload.uuid == instance_uuid) + then + return true + end + + return false +end) + +local function pack_decision(leader_uuid) + checks('string') + return { + leader = leader_uuid, + immunity = fiber.time() + vars.options.IMMUNITY_TIMEOUT, + } +end + +local function make_decision(ctx, replicaset_uuid) + checks({members = 'table', decisions = 'table'}, 'string') + + local current_decision = ctx.decisions[replicaset_uuid] + if current_decision ~= nil then + if fiber.time() < current_decision.immunity + or vars.healthcheck(ctx.members, current_decision.leader) + then + return nil + end + end + + local candidates = topology.get_leaders_order( + vars.topology_cfg, replicaset_uuid + ) + + if current_decision == nil then + -- This is a case when new replicaset is created. + -- First appointment is always made according to `topology_cfg` + -- without regard to the healthcheck + local decision = pack_decision(candidates[1]) + ctx.decisions[replicaset_uuid] = decision + return decision + end + + for _, instance_uuid in ipairs(candidates) do + if vars.healthcheck(ctx.members, instance_uuid) then + local decision = pack_decision(instance_uuid) + ctx.decisions[replicaset_uuid] = decision + return decision + end + end +end + +local function control_loop(conn) + local leaders, err = errors.netbox_call(conn, 'get_leaders', + nil, {timeout = vars.options.NETBOX_CALL_TIMEOUT} + ) + if leaders == nil then + log.error('%s', err) + return + end + + local ctx = { + members = nil, + decisions = {}, + } + + for replicaset_uuid, leader_uuid in pairs(leaders) do + ctx.decisions[replicaset_uuid] = pack_decision(leader_uuid) + end + + repeat + ctx.members = membership.members() + + local updates = {} + + for replicaset_uuid, _ in pairs(vars.topology_cfg.replicasets) do + local decision = make_decision(ctx, replicaset_uuid) + if decision ~= nil then + table.insert(updates, {replicaset_uuid, decision.leader}) + log.info('Appoint new leader %s -> %s (%q)', + replicaset_uuid, decision.leader, + vars.topology_cfg.servers[decision.leader].uri + ) + end + end + + if next(updates) ~= nil then + local ok, err = errors.netbox_call(conn, 'set_leaders', + {updates}, {timeout = vars.options.NETBOX_CALL_TIMEOUT} + ) + if ok == nil then + log.error('%s', err) + break + end + end + + local now = fiber.time() + local next_moment = math.huge + for _, decision in pairs(ctx.decisions) do + if (now < decision.immunity) + and (decision.immunity < next_moment) + then + next_moment = decision.immunity + end + end + + vars.membership_notification:wait(next_moment - now) + until not pcall(fiber.testcancel) +end + +local function take_control(uri) + checks('string') + local conn, err = NetboxConnectError:pcall(netbox.connect, uri) + if conn == nil then + return nil, err + elseif not conn:is_connected() then + return nil, NetboxConnectError:new('%q: %s', uri, conn.error) + end + + local lock_delay, err = errors.netbox_call(conn, 'get_lock_delay', + nil, {timeout = vars.options.NETBOX_CALL_TIMEOUT} + ) + if lock_delay == nil then + return nil, err + end + + local lock_args = { + confapplier.get_instance_uuid(), + confapplier.get_advertise_uri() + } + + local ok, err = errors.netbox_call(conn, 'acquire_lock', + lock_args, {timeout = vars.options.NETBOX_CALL_TIMEOUT} + ) + + if ok == nil then + return nil, err + end + + if ok ~= true then + return false + end + + log.info('Lock acquired') + vars.conn = conn + local control_fiber = fiber.new(control_loop, conn) + control_fiber:name('failover-coordinate') + + repeat + if not pcall(fiber.sleep, lock_delay/2) then + break + end + + if control_fiber:status() == 'dead' + or not errors.netbox_call(conn, 'acquire_lock', + lock_args, {timeout = vars.options.NETBOX_CALL_TIMEOUT} + ) then + break + end + until not pcall(fiber.testcancel) + + conn:close() + vars.conn = nil + if control_fiber:status() ~= 'dead' then + control_fiber:cancel() + end + + log.info('Lock released') + return true +end + +local function connect_loop(uri) + checks('string') + repeat + local t1 = fiber.time() + local ok, err = CoordinatorError:pcall(take_control, uri) + local t2 = fiber.time() + + if ok == nil then + log.error('%s', type(err) == 'table' and err.err or err) + end + + if ok ~= true then + fiber.sleep(t1 + vars.options.RECONNECT_PERIOD - t2) + end + + until not pcall(fiber.testcancel) +end + +local function stop() + if vars.connect_fiber == nil then + return + elseif vars.connect_fiber:status() ~= 'dead' then + vars.connect_fiber:cancel() + end + + vars.connect_fiber = nil +end + +local function apply_config(conf, _) + vars.topology_cfg = conf.topology + local failover_cfg = topology.get_failover_params(conf.topology) + + if failover_cfg.mode ~= 'stateful' then + stop() + return true + end + + if failover_cfg.state_provider ~= 'tarantool' then + local err = string.format( + 'assertion failed! unknown state_provider %s', + failover_cfg.state_provider + ) + error(err) + end + + if vars.connect_fiber == nil + or vars.connect_fiber:status() == 'dead' + then + log.info( + 'Starting failover coordinator' .. + ' with external storage at %s', + failover_cfg.tarantool_params.uri + ) + + local parts = uri_lib.parse(failover_cfg.tarantool_params.uri) + parts.login = 'client' + parts.password = failover_cfg.tarantool_params.password + local storage_uri = uri_lib.format(parts, true) + + vars.connect_fiber = fiber.new(connect_loop, storage_uri) + vars.connect_fiber:name('failover-connect-kv') + end + vars.membership_notification:broadcast() + return true +end + +--- Manually set leaders. +-- @function appoint_leaders +-- @tparam {[string]=string,...} replicaset_uuid to leader_uuid mapping +-- @treturn[1] boolean true +-- @treturn[2] nil +-- @treturn[2] table Error description +local function appoint_leaders(leaders) + checks('table') + local updates = {} + for k, v in pairs(leaders) do + if type(k) ~= 'string' or type(v) ~= 'string' then + error('bad argument #1 to appoint_leaders' .. + ' (keys and values must be strings)', 2 + ) + end + table.insert(updates, {k, v}) + end + + if vars.conn == nil then + return nil, AppointmentError:new("Lock not acquired") + end + + local ok, err = errors.netbox_call(vars.conn, 'set_leaders', + {updates}, {timeout = vars.options.NETBOX_CALL_TIMEOUT} + ) + if ok == nil then + return nil, AppointmentError:new( + type(err) == 'table' and err.err or err + ) + end + + return true +end + +return { + role_name = 'failover-coordinator', + apply_config = apply_config, + stop = stop, + + -- rpc + appoint_leaders = appoint_leaders, +} diff --git a/kingdom.lua b/kingdom.lua new file mode 100755 index 000000000..df19deb50 --- /dev/null +++ b/kingdom.lua @@ -0,0 +1,242 @@ +#!/usr/bin/env tarantool + +local log = require('log') +local fio = require('fio') +local clock = require('clock') +local fiber = require('fiber') +local checks = require('checks') +local argparse = require('cartridge.argparse') + +local opts = argparse.get_opts({ + listen = 'string', + workdir = 'string', + password = 'string', + lock_delay = 'number', +}) + +local ok, err = fio.mktree(opts.workdir) +if not ok then error(err, 0) end + +local LOCK_DELAY = opts.lock_delay or 10 +if LOCK_DELAY == nil then + error("Invalid TARANTOOL_LOCK_DELAY value") +end +function _G.get_lock_delay() + return LOCK_DELAY +end + +box.cfg({work_dir = opts.workdir}) + +box.schema.user.create('client', {if_not_exists = true}) +box.schema.user.passwd('client', opts.password) + +------------------------------------------------------------------------ +box.schema.func.create('get_coordinator', {if_not_exists = true}) +box.schema.func.create('get_lock_delay', {if_not_exists = true}) +box.schema.func.create('acquire_lock', {if_not_exists = true}) +box.schema.func.create('set_leaders', {if_not_exists = true}) +box.schema.func.create('get_leaders', {if_not_exists = true}) +box.schema.func.create('longpoll', {if_not_exists = true}) + +------------------------------------------------------------------------ +box.schema.sequence.create('coordinator_audit', { + if_not_exists = true +}) +box.schema.space.create('coordinator_audit', { + format = { + {name = 'ordinal', type = 'unsigned', is_nullable = false}, + {name = 'time', type = 'number', is_nullable = false}, + {name = 'uuid', type = 'string', is_nullable = false}, + {name = 'uri', type = 'string', is_nullable = false}, + }, + if_not_exists = true, +}) + +box.space.coordinator_audit:create_index('ordinal', { + unique = true, + type = 'TREE', + parts = {{field = 'ordinal', type = 'unsigned'}}, + sequence = 'coordinator_audit', + if_not_exists = true, +}) + +------------------------------------------------------------------------ +box.schema.sequence.create('leader_audit', { + if_not_exists = true +}) +box.schema.space.create('leader_audit', { + format = { + {name = 'ordinal', type = 'unsigned', is_nullable = false}, + {name = 'time', type = 'number', is_nullable = false}, + {name = 'replicaset_uuid', type = 'string', is_nullable = false}, + {name = 'instance_uuid', type = 'string', is_nullable = false}, + }, + if_not_exists = true, +}) + +box.space.leader_audit:create_index('ordinal', { + unique = true, + type = 'TREE', + parts = { + {field = 'ordinal', type = 'unsigned'}, + {field = 'replicaset_uuid', type = 'string'}, + }, + if_not_exists = true, +}) + +------------------------------------------------------------------------ +box.schema.space.create('leader', { + format = { + {name = 'replicaset_uuid', type = 'string', is_nullable = false}, + {name = 'instance_uuid', type = 'string', is_nullable = false}, + }, + if_not_exists = true, +}) + +box.space.leader:create_index('replicaset_uuid', { + unique = true, + type = 'TREE', + parts = {{field = 'replicaset_uuid', type = 'string'}}, + if_not_exists = true, +}) + +------------------------------------------------------------------------ + +box.schema.user.grant('client', 'read,write', 'space', 'coordinator_audit', {if_not_exists = true}) +box.schema.user.grant('client', 'read,write', 'space', 'leader_audit', {if_not_exists = true}) +box.schema.user.grant('client', 'read,write', 'space', 'leader', {if_not_exists = true}) +box.schema.user.grant('client', 'read,write', 'sequence', 'coordinator_audit', {if_not_exists = true}) +box.schema.user.grant('client', 'read,write', 'sequence', 'leader_audit', {if_not_exists = true}) +box.schema.user.grant('client', 'execute', 'function', 'get_coordinator', {if_not_exists = true}) +box.schema.user.grant('client', 'execute', 'function', 'get_lock_delay', {if_not_exists = true}) +box.schema.user.grant('client', 'execute', 'function', 'acquire_lock', {if_not_exists = true}) +box.schema.user.grant('client', 'execute', 'function', 'set_leaders', {if_not_exists = true}) +box.schema.user.grant('client', 'execute', 'function', 'get_leaders', {if_not_exists = true}) +box.schema.user.grant('client', 'execute', 'function', 'longpoll', {if_not_exists = true}) + +-- Enable listen port only after all spaces are set up +box.cfg({listen = opts.listen}) + +------------------------------------------------------------------------ + +local notification = fiber.cond() +local lock = { + coordinator = nil, + session_id = 0, + session_expiry = 0, +} + +function _G.acquire_lock(uuid, uri) + checks('string', 'string') + local now = clock.monotonic() + + if box.session.id() ~= lock.session_id + and box.session.exists(lock.session_id) + and now < lock.session_expiry then + return false + end + + lock.session_id = box.session.id() + lock.session_expiry = now + LOCK_DELAY + + if lock.coordinator == nil + or lock.coordinator.uuid ~= uuid + or lock.coordinator.uri ~= uri + then + box.space.coordinator_audit:insert({nil, fiber.time(), uuid, uri}) + log.info('Long live the coordinator %q (%s)!', uri, uuid) + lock.coordinator = { + uuid = uuid, + uri = uri, + } + end + + return true +end + +local function set_leaders(leaders) + if type(leaders) ~= 'table' then + local err = string.format( + "bad argument to #1 to set_leaders" .. + " (table expected, got %s)", type(leaders) + ) + error(err, 2) + end + + if lock.session_id ~= box.session.id() then + return nil, 'You are not holding the lock' + end + + local ordinal = box.sequence.leader_audit:next() + for _, leader in ipairs(leaders) do + local replicaset_uuid, instance_uuid = unpack(leader) + box.space.leader:upsert( + {replicaset_uuid, instance_uuid}, {{'=', 2, instance_uuid}} + ) + box.space.leader_audit:insert({ + ordinal, fiber.time(), replicaset_uuid, instance_uuid + }) + end + notification:broadcast() + return true +end +function _G.set_leaders(...) + return box.atomic(set_leaders, ...) +end + +function _G.get_leaders() + local ret = {} + for _, v in box.space.leader:pairs() do + ret[v.replicaset_uuid] = v.instance_uuid + end + + return ret +end + +function _G.longpoll(timeout) + checks('?number') + if timeout == nil then + timeout = 1 + end + + local latest_audit = box.space.leader_audit.index.ordinal:max() + local latest_ordinal = latest_audit and latest_audit.ordinal or 0 + local session = box.session.storage + + if session.ordinal == nil then + session.ordinal = latest_ordinal + return _G.get_leaders() + elseif session.ordinal > latest_ordinal then + error('Impossibru! (session_ordinal > latest_ordinal)') + elseif session.ordinal == latest_ordinal then + notification:wait(timeout) + end + + local ret = {} + for _, v in box.space.leader_audit:pairs({session.ordinal}, {iterator = 'GT'}) do + ret[v.replicaset_uuid] = v.instance_uuid + session.ordinal = v.ordinal + end + return ret +end + +function _G.get_coordinator() + if lock ~= nil + and box.session.exists(lock.session_id) + and clock.monotonic() < lock.session_expiry then + return lock.coordinator + end +end + +fiber.new(function() + fiber.self():name('audit-log') + -- It's not good to print logs inside a transaction + -- thus logging is performed in the separate fiber + _G.longpoll(0) + + while true do + for replicaset_uuid, instance_uuid in pairs(_G.longpoll(10)) do + log.info('New leader %s -> %s', replicaset_uuid, instance_uuid) + end + end +end) diff --git a/test/integration/failover_test.lua b/test/integration/failover_eventual_test.lua similarity index 100% rename from test/integration/failover_test.lua rename to test/integration/failover_eventual_test.lua diff --git a/test/integration/failover_stateful_test.lua b/test/integration/failover_stateful_test.lua new file mode 100644 index 000000000..aefcab44d --- /dev/null +++ b/test/integration/failover_stateful_test.lua @@ -0,0 +1,291 @@ +local log = require('log') +local fio = require('fio') +local t = require('luatest') +local g = t.group() + +local helpers = require('test.helper') + +local storage_uuid = helpers.uuid('b') +local storage_1_uuid = helpers.uuid('b', 'b', 1) +local storage_2_uuid = helpers.uuid('b', 'b', 2) +local storage_3_uuid = helpers.uuid('b', 'b', 3) + +g.before_all(function() + g.datadir = fio.tempdir() + + fio.mktree(fio.pathjoin(g.datadir, 'kingdom')) + local kvpassword = require('digest').urandom(6):hex() + g.kingdom = require('luatest.server'):new({ + command = fio.pathjoin(helpers.project_root, 'kingdom.lua'), + workdir = fio.pathjoin(g.datadir, 'kingdom'), + net_box_port = 14401, + net_box_credentials = { + user = 'client', + password = kvpassword, + }, + env = { + TARANTOOL_LOCK_DELAY = 1, + TARANTOOL_PASSWORD = kvpassword, + }, + }) + g.kingdom:start() + helpers.retrying({}, function() + g.kingdom:connect_net_box() + end) + + g.cluster = helpers.Cluster:new({ + datadir = g.datadir, + use_vshard = true, + server_command = helpers.entrypoint('srv_basic'), + cookie = require('digest').urandom(6):hex(), + env = { + TARANTOOL_SWIM_SUSPECT_TIMEOUT_SECONDS = 0, + }, + replicasets = { + { + alias = 'router', + uuid = helpers.uuid('a'), + roles = {'vshard-router', 'failover-coordinator'}, + servers = { + {alias = 'router', instance_uuid = helpers.uuid('a', 'a', 1)}, + }, + }, + { + alias = 'storage', + uuid = storage_uuid, + roles = {'vshard-router', 'vshard-storage'}, + servers = { + {alias = 'storage-1', instance_uuid = storage_1_uuid}, + {alias = 'storage-2', instance_uuid = storage_2_uuid}, + {alias = 'storage-3', instance_uuid = storage_3_uuid}, + }, + }, + }, + }) + + g.cluster:start() + g.cluster.main_server.net_box:eval([[ + local vars = require('cartridge.vars').new('cartridge.roles.coordinator') + vars.options.IMMUNITY_TIMEOUT = 0 + ]]) + g.cluster.main_server.net_box:call( + 'package.loaded.cartridge.failover_set_params', + {{ + mode = 'stateful', + state_provider = 'tarantool', + tarantool_params = { + uri = g.kingdom.net_box_uri, + password = kvpassword, + }, + }} + ) + helpers.retrying({}, function() + g.kingdom:connect_net_box() + t.assert_covers( + g.kingdom.net_box:call('get_leaders'), + {[storage_uuid] = storage_1_uuid} + ) + end) +end) + +g.after_all(function() + g.cluster:stop() + g.kingdom:stop() + fio.rmtree(g.datadir) +end) + +local q_leadership = string.format([[ + local failover = require('cartridge.failover') + return failover.get_active_leaders()[%q] +]], storage_uuid) +local q_readonliness = [[ + return box.info.ro +]] +local function eval(alias, ...) + return g.cluster:server(alias).net_box:eval(...) +end + +function g.test_kingdom_restart() + fio.rmtree(g.kingdom.workdir) + g.kingdom:stop() + g.kingdom:start() + helpers.retrying({}, function() + g.kingdom:connect_net_box() + t.assert_covers( + g.kingdom.net_box:call('get_leaders'), + {[storage_uuid] = storage_1_uuid} + ) + end) + + helpers.retrying({}, function() + t.assert_equals(eval('router', q_leadership), storage_1_uuid) + t.assert_equals(eval('storage-1', q_leadership), storage_1_uuid) + t.assert_equals(eval('storage-2', q_leadership), storage_1_uuid) + t.assert_equals(eval('storage-3', q_leadership), storage_1_uuid) + end) + + t.assert_equals(eval('storage-1', q_readonliness), false) + t.assert_equals(eval('storage-2', q_readonliness), true) + t.assert_equals(eval('storage-3', q_readonliness), true) +end + +function g.test_leader_restart() + t.assert_equals( + g.kingdom.net_box:call('longpoll', {0}), + { + [helpers.uuid('a')] = helpers.uuid('a', 'a', 1), + [storage_uuid] = storage_1_uuid, + } + ) + + ----------------------------------------------------- + g.cluster:server('storage-1'):stop() + t.assert_equals( + g.kingdom.net_box:call('longpoll', {3}), + {[storage_uuid] = storage_2_uuid} + ) + + helpers.retrying({}, function() + t.assert_equals(eval('router', q_leadership), storage_2_uuid) + t.assert_equals(eval('storage-2', q_leadership), storage_2_uuid) + t.assert_equals(eval('storage-3', q_leadership), storage_2_uuid) + end) + + t.assert_equals(eval('router', q_readonliness), false) + t.assert_equals(eval('storage-2', q_readonliness), false) + t.assert_equals(eval('storage-3', q_readonliness), true) + + ----------------------------------------------------- + -- After old s1 recovers it doesn't take leadership + g.cluster:server('storage-1'):start() + g.cluster:wait_until_healthy(g.cluster.main_server) + + t.assert_equals(eval('router', q_leadership), storage_2_uuid) + t.assert_equals(eval('storage-1', q_leadership), storage_2_uuid) + t.assert_equals(eval('storage-2', q_leadership), storage_2_uuid) + t.assert_equals(eval('storage-3', q_leadership), storage_2_uuid) + + t.assert_equals(eval('router', q_readonliness), false) + t.assert_equals(eval('storage-1', q_readonliness), true) + t.assert_equals(eval('storage-2', q_readonliness), false) + t.assert_equals(eval('storage-3', q_readonliness), true) + + ----------------------------------------------------- + -- And even applying config doesn't change leadership + g.cluster.main_server:graphql({ + query = [[ + mutation( + $uuid: String! + $master_uuid: [String!]! + ) { + edit_replicaset( + uuid: $uuid + master: $master_uuid + ) + } + ]], + variables = { + uuid = storage_uuid, + master_uuid = {storage_3_uuid}, + }, + }) + + t.assert_equals(eval('router', q_leadership), storage_2_uuid) + t.assert_equals(eval('storage-1', q_leadership), storage_2_uuid) + t.assert_equals(eval('storage-2', q_leadership), storage_2_uuid) + t.assert_equals(eval('storage-3', q_leadership), storage_2_uuid) + + ----------------------------------------------------- + -- Switching leadership is accomplised by the coordinator rpc + + log.info('--------------------------------------------------------') + local ok, err = eval('router', [[ + local cartridge = require('cartridge') + local coordinator = cartridge.service_get('failover-coordinator') + return coordinator.appoint_leaders(...) + ]], {{[storage_uuid] = storage_1_uuid}}) + t.assert_equals({ok, err}, {true, nil}) + + helpers.retrying({}, function() + t.assert_equals(eval('router', q_leadership), storage_1_uuid) + t.assert_equals(eval('storage-1', q_leadership), storage_1_uuid) + t.assert_equals(eval('storage-2', q_leadership), storage_1_uuid) + t.assert_equals(eval('storage-3', q_leadership), storage_1_uuid) + end) +end + +function g.test_leaderless() + g.kingdom:stop() + local router = g.cluster:server('router') + -- restart both router (which is a failover coordinator) + -- and storage-1 (which is a leader among storages) + for _, s in pairs({'router', 'storage-1'}) do + g.cluster:server(s):stop() + g.cluster:server(s):start() + end + + ----------------------------------------------------- + -- Chack that replicaset without leaders can exist + g.cluster:wait_until_healthy(g.cluster.main_server) + t.assert_equals(eval('router', q_leadership), box.NULL) + t.assert_equals(eval('storage-1', q_leadership), box.NULL) + t.assert_equals(eval('storage-2', q_leadership), storage_1_uuid) + t.assert_equals(eval('storage-3', q_leadership), storage_1_uuid) + + t.assert_equals(eval('router', q_readonliness), true) + t.assert_equals(eval('storage-1', q_readonliness), true) + t.assert_equals(eval('storage-2', q_readonliness), true) + t.assert_equals(eval('storage-3', q_readonliness), true) + + local ret, err = g.cluster.main_server.net_box:call( + 'package.loaded.vshard.router.callrw', + {1, 'box.space.test:insert', {{1, 'one'}}} + ) + t.assert_equals(ret, nil) + t.assert_covers(err, { + name = "MISSING_MASTER", + type = "ShardingError", + replicaset_uuid = storage_uuid, + message = "Master is not configured for replicaset " .. storage_uuid, + }) + + t.assert_items_equals( + router:graphql({ + query = [[{ + replicasets { + uuid + master { uuid } + active_master { uuid } + } + }]] + }).data.replicasets, + {{ + uuid = router.replicaset_uuid, + master = {uuid = 'void'}, + active_master = {uuid = 'void'}, + }, { + uuid = storage_uuid, + master = {uuid = 'void'}, + active_master = {uuid = 'void'}, + }} + ) + + ----------------------------------------------------- + -- Check cluster repairs + + g.kingdom:start() + local q_waitrw = 'return {pcall(box.ctl.wait_rw, 3)}' + + t.assert_equals(eval('router', q_waitrw), {true}) + t.assert_equals(eval('storage-1', q_waitrw), {true}) + + t.assert_equals(eval('router', q_leadership), storage_1_uuid) + t.assert_equals(eval('storage-1', q_leadership), storage_1_uuid) + t.assert_equals(eval('storage-2', q_leadership), storage_1_uuid) + t.assert_equals(eval('storage-3', q_leadership), storage_1_uuid) + + t.assert_equals(eval('router', q_readonliness), false) + t.assert_equals(eval('storage-1', q_readonliness), false) + t.assert_equals(eval('storage-2', q_readonliness), true) + t.assert_equals(eval('storage-3', q_readonliness), true) +end diff --git a/test/integration/hidden_roles_test.lua b/test/integration/hidden_roles_test.lua index 94333562a..ab947537d 100644 --- a/test/integration/hidden_roles_test.lua +++ b/test/integration/hidden_roles_test.lua @@ -42,6 +42,7 @@ function g.test_graphql_known_roles() }) t.assert_equals(response.data.cluster.known_roles, { + {name = 'failover-coordinator', dependencies = {}}, {name = 'vshard-storage', dependencies = {}}, {name = 'vshard-router', dependencies = {}}, {name = 'myrole-dependency', dependencies = {}}, diff --git a/test/integration/kingdom_test.lua b/test/integration/kingdom_test.lua new file mode 100644 index 000000000..4bec3a8ee --- /dev/null +++ b/test/integration/kingdom_test.lua @@ -0,0 +1,147 @@ +local fio = require('fio') +local uuid = require('uuid') +local netbox = require('net.box') +local t = require('luatest') +local g = t.group() + +local helpers = require('test.helper') + +g.before_each(function() + g.datadir = fio.tempdir() + local password = require('digest').urandom(6):hex() + + fio.mktree(fio.pathjoin(g.datadir, 'kingdom')) + g.kingdom = require('luatest.server'):new({ + command = fio.pathjoin(helpers.project_root, 'kingdom.lua'), + workdir = fio.pathjoin(g.datadir), + net_box_port = 13301, + net_box_credentials = { + user = 'client', + password = password, + }, + env = { + TARANTOOL_PASSWORD = password, + TARANTOOL_LOCK_DELAY = 40, + }, + }) + g.kingdom:start() + helpers.retrying({}, function() + g.kingdom:connect_net_box() + end) +end) + +g.after_each(function() + g.kingdom:stop() + fio.rmtree(g.datadir) +end) + +local function connect(srv) + return netbox.connect(srv.net_box_port, srv.net_box_credentials) +end + +function g.test_locks() + local c1 = connect(g.kingdom) + local c2 = connect(g.kingdom) + local kid = uuid.str() + + t.assert_equals( + c1:call('acquire_lock', {kid, 'localhost:9'}), + true + ) + t.assert_equals( + c1:call('get_coordinator'), + {uuid = kid, uri = 'localhost:9'} + ) + + t.assert_equals( + c2:call('acquire_lock', {uuid.str(), 'localhost:11'}), + false + ) + t.assert_equals( + {c2:call('set_leaders', {{'A', 'a1'}})}, + {box.NULL, 'You are not holding the lock'} + ) + t.assert_equals( + c2:call('get_coordinator'), + {uuid = kid, uri = 'localhost:9'} + ) + + c1:close() + local kid = uuid.str() + helpers.retrying({}, function() + t.assert_equals(c2:call('get_coordinator'), box.NULL) + end) + + t.assert_equals( + c2:call('acquire_lock', {kid, 'localhost:11'}), + true + ) + t.assert_equals( + c2:call('get_coordinator'), + {uuid = kid, uri = 'localhost:11'} + ) +end + +function g.test_appointments() + local c = connect(g.kingdom) + local kid = uuid.str() + t.assert_equals( + c:call('acquire_lock', {kid, 'localhost:9'}), + true + ) + + t.assert_equals( + c:call('set_leaders', {{{'A', 'a1'}, {'B', 'b1'}}}), + true + ) + + t.assert_equals( + c:call('get_leaders'), + {A = 'a1', B = 'b1'} + ) + + t.assert_error_msg_equals( + "Duplicate key exists in unique index 'ordinal'" .. + " in space 'leader_audit'", + c.call, c, 'set_leaders', {{{'A', 'a2'}, {'A', 'a3'}}} + ) +end + +function g.test_longpolling() + local c1 = connect(g.kingdom) + local kid = uuid.str() + t.assert_equals( + c1:call('acquire_lock', {kid, 'localhost:9'}), + true + ) + c1:call('set_leaders', {{{'A', 'a1'}, {'B', 'b1'}}}) + + local c2 = connect(g.kingdom) + t.assert_equals(c2:call('longpoll'), {A = 'a1', B = 'b1'}) + local future = c2:call('longpoll', {0.2}, {is_async = true}) + c1:call('set_leaders', {{{'A', 'a2'}}}) + + local ret, err = future:wait_result(0.1) -- err is cdata + t.assert_equals({ret, tostring(err)}, { {{A = 'a2'}}, 'nil' }) + + local future = c2:call('longpoll', {0.2}, {is_async = true}) + local ret, err = future:wait_result(0.1) -- err is cdata + t.assert_equals({ret, tostring(err)}, {nil, 'Timeout exceeded'}) + + local ret, err = future:wait_result(0.2) -- err is cdata + t.assert_equals({ret, tostring(err)}, { {{}}, 'nil' }) +end + +function g.test_passwd() + local new_password = require('digest').urandom(6):hex() + + g.kingdom:stop() + g.kingdom.env.TARANTOOL_PASSWORD = new_password + g.kingdom.net_box_credentials.password = new_password + g.kingdom:start() + helpers.retrying({}, function() + g.kingdom:connect_net_box() + end) + + t.assert_equals(g.kingdom.net_box:call('get_lock_delay'), 40) +end diff --git a/test/integration/myrole_test.lua b/test/integration/myrole_test.lua index 4842d6544..1e67f2396 100644 --- a/test/integration/myrole_test.lua +++ b/test/integration/myrole_test.lua @@ -44,6 +44,7 @@ function g.test_api() }) t.assert_equals(res['data']['cluster']['known_roles'], { + {['name'] = 'failover-coordinator', ['dependencies'] = {}}, {['name'] = 'vshard-storage', ['dependencies'] = {}}, {['name'] = 'vshard-router', ['dependencies'] = {}}, {['name'] = 'myrole-dependency', ['dependencies'] = {}},