Skip to content

Commit

Permalink
crud: support vshard with no UUIDs in config
Browse files Browse the repository at this point in the history
PR #404 has introduced vshard 0.1.25 + Tarantool 3.0 "name as key"
identification mode based on UUIDs extraction. If works fine if
vshard configuration (or Tarantool 3.0 configuration which builds
vshard one) provides UUIDs, but fails if it isn't. Since UUIDs are
optional and won't be provided in most cases, it makes crud fails
to work on most Tarantool 3.0 vshard clusters. This patch fixes
the issue.

Now the code uses name as key, if corresponding mode is enabled, and
uuid otherwise. Patch doesn't cover `select_old` since it runs only
on pre-3.0 Tarantool. Unfortunately, code relies on vshard internals
since now there is no other way [1].

This patch covers new mode support for readview code as well. It likely
was broken before this patch even if UUIDs were provided.

1. tarantool/vshard#460

Follows #404
Closes #407
  • Loading branch information
DifferentialOrange committed Jan 10, 2024
1 parent 45a362a commit 2495a6f
Show file tree
Hide file tree
Showing 21 changed files with 429 additions and 202 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## Unreleased

### Fixed
* Compatibility with vshard configuration if UUIDs are omitted (#407).

## [1.4.2] - 25-12-23

### Added
Expand Down
9 changes: 5 additions & 4 deletions crud.lua
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,13 @@ function crud.init_storage()

local user = nil
if not box.info.ro then
local replicaset_uuid, replicaset = utils.get_self_vshard_replicaset()
local replicaset_key, replicaset = utils.get_self_vshard_replicaset()

if replicaset == nil or replicaset.master == nil then
error(string.format('Failed to find a vshard configuration for ' ..
' replicaset with replicaset_uuid %s.',
replicaset_uuid))
error(string.format(
'Failed to find a vshard configuration ' ..
'for storage replicaset with key %q.',
replicaset_key))
end
user = luri.parse(replicaset.master.uri).login or 'guest'
end
Expand Down
30 changes: 18 additions & 12 deletions crud/common/call.lua
Original file line number Diff line number Diff line change
Expand Up @@ -40,28 +40,34 @@ function call.get_vshard_call_name(mode, prefer_replica, balance)
return 'callbre'
end

local function wrap_vshard_err(vshard_router, err, func_name, replicaset_uuid, bucket_id)
local function wrap_vshard_err(vshard_router, err, func_name, replicaset_id, bucket_id)
-- Do not rewrite ShardingHashMismatchError class.
if err.class_name == sharding_utils.ShardingHashMismatchError.name then
return errors.wrap(err)
end

if replicaset_uuid == nil then
if replicaset_id == nil then
local replicaset, _ = vshard_router:route(bucket_id)
if replicaset == nil then
return CallError:new(
"Function returned an error, but we couldn't figure out the replicaset: %s", err
)
end

replicaset_uuid = replicaset.uuid
replicaset_id = utils.get_replicaset_id(vshard_router, replicaset)

if replicaset_id == nil then
return CallError:new(
"Function returned an error, but we couldn't figure out the replicaset id: %s", err
)
end
end

err = utils.update_storage_call_error_description(err, func_name, replicaset_uuid)
err = utils.update_storage_call_error_description(err, func_name, replicaset_id)
err = errors.wrap(err)

return CallError:new(utils.format_replicaset_error(
replicaset_uuid, "Function returned an error: %s", err
replicaset_id, "Function returned an error: %s", err
))
end

Expand Down Expand Up @@ -104,13 +110,13 @@ function call.map(vshard_router, func_name, func_args, opts)
local futures_by_replicasets = {}
local call_opts = {is_async = true}
while iter:has_next() do
local args, replicaset = iter:get()
local args, replicaset, replicaset_id = iter:get()
local future = replicaset[vshard_call_name](replicaset, func_name, args, call_opts)
futures_by_replicasets[replicaset.uuid] = future
futures_by_replicasets[replicaset_id] = future
end

local deadline = fiber_clock() + timeout
for replicaset_uuid, future in pairs(futures_by_replicasets) do
for replicaset_id, future in pairs(futures_by_replicasets) do
local wait_timeout = deadline - fiber_clock()
if wait_timeout < 0 then
wait_timeout = 0
Expand All @@ -119,14 +125,14 @@ function call.map(vshard_router, func_name, func_args, opts)
local result, err = future:wait_result(wait_timeout)

local result_info = {
key = replicaset_uuid,
key = replicaset_id,
value = result,
}

local err_info = {
err_wrapper = wrap_vshard_err,
err = err,
wrapper_args = {func_name, replicaset_uuid},
wrapper_args = {func_name, replicaset_id},
}

local early_exit = postprocessor:collect(result_info, err_info)
Expand Down Expand Up @@ -179,13 +185,13 @@ function call.any(vshard_router, func_name, func_args, opts)
if replicasets == nil then
return nil, CallError:new("Failed to get router replicasets: %s", err.err)
end
local replicaset = select(2, next(replicasets))
local replicaset_id, replicaset = next(replicasets)

local res, err = replicaset:call(func_name, func_args, {
timeout = timeout,
})
if err ~= nil then
return nil, wrap_vshard_err(vshard_router, err, func_name, replicaset.uuid)
return nil, wrap_vshard_err(vshard_router, err, func_name, replicaset_id)
end

if res == box.NULL then
Expand Down
4 changes: 3 additions & 1 deletion crud/common/map_call_cases/base_iter.lua
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,13 @@ end
--
-- @return[1] table func_args
-- @return[2] table replicaset
-- @return[3] string replicaset_id
function BaseIterator:get()
local replicaset_id = self.next_index
local replicaset = self.next_replicaset
self.next_index, self.next_replicaset = next(self.replicasets, self.next_index)

return self.func_args, replicaset
return self.func_args, replicaset, replicaset_id
end

return BaseIterator
10 changes: 6 additions & 4 deletions crud/common/map_call_cases/batch_insert_iter.lua
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ function BatchInsertIterator:new(opts)
return nil, SplitTuplesError:new("Failed to split tuples by replicaset: %s", err.err)
end

local next_replicaset, next_batch = next(sharding_data.batches)
local next_index, next_batch = next(sharding_data.batches)

local execute_on_storage_opts = opts.execute_on_storage_opts
execute_on_storage_opts.sharding_func_hash = sharding_data.sharding_func_hash
Expand All @@ -51,7 +51,7 @@ function BatchInsertIterator:new(opts)
space_name = opts.space.name,
opts = execute_on_storage_opts,
batches_by_replicasets = sharding_data.batches,
next_index = next_replicaset,
next_index = next_index,
next_batch = next_batch,
}

Expand All @@ -67,8 +67,10 @@ end
--
-- @return[1] table func_args
-- @return[2] table replicaset
-- @return[3] string replicaset_id
function BatchInsertIterator:get()
local replicaset = self.next_index
local replicaset_id = self.next_index
local replicaset = self.next_batch.replicaset
local func_args = {
self.space_name,
self.next_batch.tuples,
Expand All @@ -77,7 +79,7 @@ function BatchInsertIterator:get()

self.next_index, self.next_batch = next(self.batches_by_replicasets, self.next_index)

return func_args, replicaset
return func_args, replicaset, replicaset_id
end

return BatchInsertIterator
10 changes: 6 additions & 4 deletions crud/common/map_call_cases/batch_upsert_iter.lua
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ function BatchUpsertIterator:new(opts)
return nil, SplitTuplesError:new("Failed to split tuples by replicaset: %s", err.err)
end

local next_replicaset, next_batch = next(sharding_data.batches)
local next_index, next_batch = next(sharding_data.batches)

local execute_on_storage_opts = opts.execute_on_storage_opts
execute_on_storage_opts.sharding_func_hash = sharding_data.sharding_func_hash
Expand All @@ -59,7 +59,7 @@ function BatchUpsertIterator:new(opts)
space_name = opts.space.name,
opts = execute_on_storage_opts,
batches_by_replicasets = sharding_data.batches,
next_index = next_replicaset,
next_index = next_index,
next_batch = next_batch,
}

Expand All @@ -75,8 +75,10 @@ end
--
-- @return[1] table func_args
-- @return[2] table replicaset
-- @return[3] string replicaset_id
function BatchUpsertIterator:get()
local replicaset = self.next_index
local replicaset_id = self.next_index
local replicaset = self.next_batch.replicaset
local func_args = {
self.space_name,
self.next_batch.tuples,
Expand All @@ -86,7 +88,7 @@ function BatchUpsertIterator:get()

self.next_index, self.next_batch = next(self.batches_by_replicasets, self.next_index)

return func_args, replicaset
return func_args, replicaset, replicaset_id
end

return BatchUpsertIterator
4 changes: 3 additions & 1 deletion crud/common/schema.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ local ReloadSchemaError = errors.new_class('ReloadSchemaError', {capture_stack =

local const = require('crud.common.const')
local dev_checks = require('crud.common.dev_checks')
local utils = require('crud.common.vshard_utils')

local schema = {}

Expand Down Expand Up @@ -234,7 +235,8 @@ function schema.wrap_func_result(space, func, args, opts)
replica_schema_version = box.internal.schema_version()
end
result.storage_info = {
replica_uuid = box.info().uuid,
replica_uuid = box.info().uuid, -- Backward compatibility.
replica_id = utils.get_self_vshard_replica_id(), -- Replacement for replica_uuid.
replica_schema_version = replica_schema_version,
}
end
Expand Down
25 changes: 20 additions & 5 deletions crud/common/sharding/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@ function sharding.get_replicasets_by_bucket_id(vshard_router, bucket_id)
return nil, GetReplicasetsError:new("Failed to get replicaset for bucket_id %s: %s", bucket_id, err.err)
end

local replicaset_id = utils.get_replicaset_id(vshard_router, replicaset)
if replicaset_id == nil then
return nil, GetReplicasetsError:new("Failed to get replicaset id for bucket_id %s replicaset", bucket_id)
end

return {
[replicaset.uuid] = replicaset,
[replicaset_id] = replicaset,
}
end

Expand Down Expand Up @@ -206,8 +211,8 @@ end
-- Specified space
--
-- @return[1] batches
-- Map where key is a replicaset and value
-- is table of tuples related to this replicaset
-- Map where key is a replicaset id and value
-- is replicaset and table of tuples related to this replicaset
function sharding.split_tuples_by_replicaset(vshard_router, tuples, space, opts)
dev_checks('table', 'table', 'table', {
operations = '?table',
Expand Down Expand Up @@ -247,15 +252,25 @@ function sharding.split_tuples_by_replicaset(vshard_router, tuples, space, opts)
sharding_data.bucket_id, err.err)
end

local record_by_replicaset = batches[replicaset] or {tuples = {}}
local replicaset_id = utils.get_replicaset_id(vshard_router, replicaset)
if replicaset_id == nil then
return nil, GetReplicasetsError:new(
"Failed to get replicaset id for bucket_id %s replicaset",
sharding_data.bucket_id)
end

local record_by_replicaset = batches[replicaset_id] or {
replicaset = replicaset,
tuples = {},
}
table.insert(record_by_replicaset.tuples, tuple)

if opts.operations ~= nil then
record_by_replicaset.operations = record_by_replicaset.operations or {}
table.insert(record_by_replicaset.operations, opts.operations[i])
end

batches[replicaset] = record_by_replicaset
batches[replicaset_id] = record_by_replicaset
end

return {
Expand Down

0 comments on commit 2495a6f

Please sign in to comment.