Skip to content

Commit

Permalink
crud: support vshard with master discovery
Browse files Browse the repository at this point in the history
In vshard 0.1.25 and Tarantool 3.0 new feature has been implemented.
The feature is related to automatic master discovery. If it has been
enabled, crud fails to bootstrap and work in several places.
This feature is enabled by Tarantool 3.0 if vshard cluster was
configured with specific 3.0 config flags.

Unfortunately, it is impossible to rework the code using only
vshard public API now [2].

1. tarantool/vshard#429
2. tarantool/vshard#467

Closes #409
  • Loading branch information
DifferentialOrange committed Jan 10, 2024
1 parent cdef96d commit 4d29aec
Show file tree
Hide file tree
Showing 10 changed files with 228 additions and 49 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Fixed
* Compatibility with vshard configuration if UUIDs are omitted (#407).
* Compatibility with automatic master discovery in vshard (#409).

## [1.4.2] - 25-12-23

Expand Down
12 changes: 1 addition & 11 deletions crud.lua
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ local stats = require('crud.stats')
local readview = require('crud.readview')
local schema = require('crud.schema')

local luri = require('uri')

local crud = {}

-- @refer crud.version
Expand Down Expand Up @@ -173,15 +171,7 @@ function crud.init_storage()

local user = nil
if not box.info.ro then
local replicaset_key, replicaset = utils.get_self_vshard_replicaset()

if replicaset == nil or replicaset.master == nil then
error(string.format(
'Failed to find a vshard configuration ' ..
'for storage replicaset with key %q.',
replicaset_key))
end
user = luri.parse(replicaset.master.uri).login or 'guest'
user = utils.get_this_replica_user() or 'guest'
end

if rawget(_G, '_crud') == nil then
Expand Down
57 changes: 49 additions & 8 deletions crud/common/call.lua
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,25 @@ local function wrap_vshard_err(vshard_router, err, func_name, replicaset_id, buc
))
end

local function retry_call_with_master_discovery(replicaset, method, ...)
-- In case cluster was just bootstrapped with auto master discovery,
-- replicaset may miss master.

local resp, err = replicaset[method](replicaset, ...)

if err == nil then
return resp, err
end

if err.name == 'MISSING_MASTER' and replicaset.locate_master ~= nil then
replicaset:locate_master()
end

-- Retry only once: should be enough for initial discovery,
-- otherwise force user fix up cluster bootstrap.
return replicaset[method](replicaset, ...)
end

function call.map(vshard_router, func_name, func_args, opts)
dev_checks('table', 'string', '?table', {
mode = 'string',
Expand Down Expand Up @@ -111,7 +130,27 @@ function call.map(vshard_router, func_name, func_args, opts)
local call_opts = {is_async = true}
while iter:has_next() do
local args, replicaset, replicaset_id = iter:get()
local future = replicaset[vshard_call_name](replicaset, func_name, args, call_opts)

local future, err = retry_call_with_master_discovery(replicaset, vshard_call_name,
func_name, args, call_opts)

if err ~= nil then
local result_info = {
key = replicaset_id,
value = nil,
}

local err_info = {
err_wrapper = wrap_vshard_err,
err = err,
wrapper_args = {func_name, replicaset_id},
}

-- Enforce early exit on futures build fail.
postprocessor:collect(result_info, err_info)
return postprocessor:get()
end

futures_by_replicasets[replicaset_id] = future
end

Expand Down Expand Up @@ -157,12 +196,15 @@ function call.single(vshard_router, bucket_id, func_name, func_args, opts)
return nil, err
end

local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT
local replicaset, err = vshard_router:route(bucket_id)
if err ~= nil then
return nil, CallError:new("Failed to get router replicaset: %s", err.err)
end

local res, err = vshard_router[vshard_call_name](vshard_router, bucket_id, func_name, func_args, {
timeout = timeout,
})
local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT

local res, err = retry_call_with_master_discovery(replicaset, vshard_call_name,
func_name, func_args, {timeout = timeout})
if err ~= nil then
return nil, wrap_vshard_err(vshard_router, err, func_name, nil, bucket_id)
end
Expand All @@ -187,9 +229,8 @@ function call.any(vshard_router, func_name, func_args, opts)
end
local replicaset_id, replicaset = next(replicasets)

local res, err = replicaset:call(func_name, func_args, {
timeout = timeout,
})
local res, err = retry_call_with_master_discovery(replicaset, 'call',
func_name, func_args, {timeout = timeout})
if err ~= nil then
return nil, wrap_vshard_err(vshard_router, err, func_name, replicaset_id)
end
Expand Down
3 changes: 2 additions & 1 deletion crud/common/schema.lua
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ local function table_len(t)
end

local function call_reload_schema_on_replicaset(replicaset, channel)
replicaset.master.conn:reload_schema()
local master = utils.get_replicaset_master(replicaset, {cached = false})
master.conn:reload_schema()
channel:put(true)
end

Expand Down
31 changes: 20 additions & 11 deletions crud/common/utils.lua
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ local function get_replicaset_by_replica_id(replicasets, id)
end

function utils.get_spaces(vshard_router, timeout, replica_id)
local replicasets, replicaset, replicaset_id
local replicasets, replicaset, replicaset_id, master

timeout = timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT
local deadline = fiber.clock() + timeout
Expand All @@ -134,7 +134,6 @@ function utils.get_spaces(vshard_router, timeout, replica_id)
fiber.clock() < deadline
) do
-- Try to get master with timeout.
fiber.yield()
replicasets = vshard_router:routeall()
if replica_id ~= nil then
-- Get the same replica on which the last DML operation was performed.
Expand All @@ -146,11 +145,16 @@ function utils.get_spaces(vshard_router, timeout, replica_id)
else
replicaset_id, replicaset = next(replicasets)
end
if replicaset ~= nil and
replicaset.master ~= nil and
replicaset.master.conn.error == nil then
break

if replicaset ~= nil then
-- Get cached, reload (if required) will be processed in other place.
master = utils.get_replicaset_master(replicaset, {cached = true})
if master ~= nil and master.conn.error == nil then
break
end
end

fiber.sleep(timeout / 100)
end

if replicaset == nil then
Expand All @@ -159,22 +163,24 @@ function utils.get_spaces(vshard_router, timeout, replica_id)
'perhaps other instances are unavailable or you have configured only the router')
end

if replicaset.master == nil then
master = utils.get_replicaset_master(replicaset, {cached = true})

if master == nil then
local error_msg = string.format(
'The master was not found in replicaset %s, ' ..
'check status of the master and repeat the operation later',
replicaset_id)
return nil, GetSpaceError:new(error_msg)
end

if replicaset.master.conn.error ~= nil then
if master.conn.error ~= nil then
local error_msg = string.format(
'The connection to the master of replicaset %s is not valid: %s',
replicaset_id, replicaset.master.conn.error)
replicaset_id, master.conn.error)
return nil, GetSpaceError:new(error_msg)
end

return replicaset.master.conn.space, nil, replicaset.master.conn.schema_version
return master.conn.space, nil, master.conn.schema_version
end

function utils.get_space(space_name, vshard_router, timeout, replica_id)
Expand Down Expand Up @@ -1155,10 +1161,13 @@ function utils.storage_info(opts)

for _, replicaset in pairs(replicasets) do
for replica_id, replica in pairs(replicaset.replicas) do
local master = utils.get_replicaset_master(replicaset, {cached = false})

replica_state_by_id[replica_id] = {
status = "error",
is_master = replicaset.master == replica
is_master = master == replica
}

local ok, res = pcall(replica.conn.call, replica.conn, CRUD_STORAGE_INFO_FUNC_NAME,
{}, async_opts)
if ok then
Expand Down
34 changes: 34 additions & 0 deletions crud/common/vshard_utils.lua
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
local luri = require('uri')

local vshard = require('vshard')

local vshard_utils = {}
Expand Down Expand Up @@ -53,4 +55,36 @@ function vshard_utils.get_vshard_identification_mode()
return vshard.storage.internal.current_cfg.identification_mode
end

function vshard_utils.get_this_replica_user()
local replicaset_key, replicaset = vshard_utils.get_self_vshard_replicaset()

if replicaset == nil or replicaset.master == nil then
error(string.format(
'Failed to find a vshard configuration ' ..
'for storage replicaset with key %q.',
replicaset_key))
end

local uri
if replicaset.master == 'auto' then
-- https://github.com/tarantool/vshard/issues/467.
uri = vshard.storage.internal.this_replica.uri
else
uri = replicaset.master.uri
end

return luri.parse(uri).login
end

function vshard_utils.get_replicaset_master(replicaset, opts)
opts = opts or {}
local cached = opts.cached or false

if (not cached) and replicaset.locate_master ~= nil then
replicaset:locate_master()
end

return replicaset.master
end

return vshard_utils
6 changes: 5 additions & 1 deletion crud/truncate.lua
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ function truncate.call(space_name, opts)
return nil, TruncateError:new(err)
end

local replicasets = vshard_router:routeall()
local replicasets, err = vshard_router:routeall()
if err ~= nil then
return nil, TruncateError:new("Failed to get router replicasets: %s", err)
end

local _, err = call.map(vshard_router, CRUD_TRUNCATE_FUNC_NAME, {space_name}, {
mode = 'write',
replicasets = replicasets,
Expand Down
84 changes: 71 additions & 13 deletions test/helper.lua
Original file line number Diff line number Diff line change
Expand Up @@ -771,29 +771,87 @@ function helpers.is_name_supported_as_vshard_id()
return is_vshard_supports and is_tarantool_supports
end

function helpers.is_master_discovery_supported_in_vshard()
local vshard_version = helpers.parse_module_version(require('vshard')._VERSION)
local is_vshard_supports = luatest_utils.version_ge(vshard_version,
luatest_utils.version(0, 1, 25))

local tarantool_version = luatest_utils.get_tarantool_version()
local is_tarantool_supports = luatest_utils.version_ge(tarantool_version,
luatest_utils.version(3, 0, 0))
return is_vshard_supports and is_tarantool_supports
end

function helpers.remove_array_ascending_keys(arr, ascending_keys)
for i = #ascending_keys, 1, -1 do
local key = ascending_keys[i]
table.remove(arr, key)
end

return arr
end

function helpers.extend_vshard_matrix(backend_params, backend_cfg_key, backend_cfg_vals, opts)
assert(type(opts) == 'table')
assert(opts.mode == 'replace' or opts.mode == 'extend')

local old_vshard_backend_keys = {}
local old_vshard_backends = {}

for k, v in ipairs(backend_params) do
if v.backend == helpers.backend.VSHARD then
table.insert(old_vshard_backend_keys, k)
table.insert(old_vshard_backends, v)
end
end

if opts.mode == 'replace' then
helpers.remove_array_ascending_keys(backend_params, old_vshard_backend_keys)
end

for _, v in ipairs(old_vshard_backends) do
for _, cfg_v in ipairs(backend_cfg_vals) do
local new_v = table.deepcopy(v)

new_v.backend_cfg = new_v.backend_cfg or {}
new_v.backend_cfg[backend_cfg_key] = cfg_v

table.insert(backend_params, new_v)
end
end

return backend_params
end

function helpers.backend_matrix(base_matrix)
base_matrix = base_matrix or {{}}
local backend_params = {
{
backend = helpers.backend.CARTRIDGE,
backend_cfg = nil,
},
{
backend = helpers.backend.VSHARD,
backend_cfg = nil,
},
}

if helpers.is_name_supported_as_vshard_id() then
table.insert(backend_params, {
backend = helpers.backend.VSHARD,
backend_cfg = {identification_mode = 'uuid_as_key'},
})
table.insert(backend_params, {
backend = helpers.backend.VSHARD,
backend_cfg = {identification_mode = 'name_as_key'},
})
else
table.insert(backend_params, {
backend = helpers.backend.VSHARD,
backend_cfg = nil,
})
backend_params = helpers.extend_vshard_matrix(
backend_params,
'identification_mode',
{'uuid_as_key', 'name_as_key'},
{mode = 'replace'}
)
end

if helpers.is_master_discovery_supported_in_vshard() then
backend_params = helpers.extend_vshard_matrix(
backend_params,
'master',
{'auto'},
{mode = 'extend'}
)
end

local matrix = {}
Expand Down
4 changes: 4 additions & 0 deletions test/integration/updated_schema_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ pgroup.before_each(function(g)
g.cluster.main_server.net_box:eval([[
local vshard = require('vshard')
for _, replicaset in pairs(vshard.router.routeall()) do
if replicaset.locate_master ~= nil then
replicaset:locate_master()
end
local master = replicaset.master
if not master.conn:ping({timeout = 3}) then
Expand Down

0 comments on commit 4d29aec

Please sign in to comment.