Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Changed
* Optimize select with known bucket_id (#234).
* Deprecate manual sharding schema reload (#212).

### Fixed
* Fix processing storage error for tuple-merger implementation of
select/pairs (#271).
* Do not change input tuple object in requests.
* Add automatic reload of DDL schema (#212).

## [0.10.0] - 01-12-21

Expand Down
15 changes: 0 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,26 +117,11 @@ Table below describe what operations supports custom sharding key:

Current limitations for using custom sharding key:

- It's not possible to update sharding keys automatically when schema is
updated on storages, see
[#212](https://github.com/tarantool/crud/issues/212). However it is possible
to do it manually with `require('crud.common.sharding_key').update_cache()`
(this function updates both caches: sharding key cache and sharding function
cache, but returned value is sharding key from cache).
- No support of JSON path for sharding key, see
[#219](https://github.com/tarantool/crud/issues/219).
- `primary_index_fieldno_map` is not cached, see
[#243](https://github.com/tarantool/crud/issues/243).

Current limitations for using custom sharding functions:

- It's not possible to update sharding functions automatically when schema is
updated on storages, see
[#212](https://github.com/tarantool/crud/issues/212). However it is possible
to do it manually with `require('crud.common.sharding_func').update_cache()`
(this function updates both caches: sharding key cache and sharding function
cache, but returned value is sharding function from cache).

### Insert

```lua
Expand Down
15 changes: 12 additions & 3 deletions crud/borders.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ local checks = require('checks')
local errors = require('errors')
local vshard = require('vshard')

local const = require('crud.common.const')
local dev_checks = require('crud.common.dev_checks')
local call = require('crud.common.call')
local utils = require('crud.common.utils')
Expand Down Expand Up @@ -76,7 +77,7 @@ local function call_get_border_on_router(border_name, space_name, index_name, op

local space = utils.get_space(space_name, vshard.router.routeall())
if space == nil then
return nil, BorderError:new("Space %q doesn't exist", space_name), true
return nil, BorderError:new("Space %q doesn't exist", space_name), const.NEED_SCHEMA_RELOAD
end

local index
Expand All @@ -87,7 +88,9 @@ local function call_get_border_on_router(border_name, space_name, index_name, op
end

if index == nil then
return nil, BorderError:new("Index %q of space %q doesn't exist", index_name, space_name), true
return nil,
BorderError:new("Index %q of space %q doesn't exist", index_name, space_name),
const.NEED_SCHEMA_RELOAD
end

local primary_index = space.index[0]
Expand Down Expand Up @@ -131,8 +134,14 @@ local function call_get_border_on_router(border_name, space_name, index_name, op
for _, storage_result in pairs(results) do
local storage_result = storage_result[1]
if storage_result.err ~= nil then
local err_wrapped = BorderError:new("Failed to get %s: %s", border_name, storage_result.err)

local need_reload = schema.result_needs_reload(space, storage_result)
return nil, BorderError:new("Failed to get %s: %s", border_name, storage_result.err), need_reload
if need_reload then
return nil, err_wrapped, const.NEED_SCHEMA_RELOAD
end

return nil, err_wrapped
end

local tuple = storage_result.res
Expand Down
6 changes: 6 additions & 0 deletions crud/common/call.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ local errors = require('errors')

local dev_checks = require('crud.common.dev_checks')
local utils = require('crud.common.utils')
local sharding_utils = require('crud.common.sharding.utils')
local fiber_clock = require('fiber').clock

local CallError = errors.new_class('CallError')
Expand Down Expand Up @@ -40,6 +41,11 @@ function call.get_vshard_call_name(mode, prefer_replica, balance)
end

local function wrap_vshard_err(err, func_name, replicaset_uuid, bucket_id)
-- Do not rewrite ShardingHashMismatchError class.
if err.class_name == sharding_utils.ShardingHashMismatchError.name then
return errors.wrap(err)
end

if err.type == 'ClientError' and type(err.message) == 'string' then
if err.message == string.format("Procedure '%s' is not defined", func_name) then
if func_name:startswith('_crud.') then
Expand Down
4 changes: 4 additions & 0 deletions crud/common/const.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,9 @@ local const = {}
const.RELOAD_RETRIES_NUM = 1
const.RELOAD_SCHEMA_TIMEOUT = 3 -- 3 seconds
const.FETCH_SHARDING_METADATA_TIMEOUT = 3 -- 3 seconds
const.SHARDING_RELOAD_RETRIES_NUM = 1

const.NEED_SCHEMA_RELOAD = 0x0001000
const.NEED_SHARDING_RELOAD = 0x0001001

return const
2 changes: 1 addition & 1 deletion crud/common/schema.lua
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ function schema.wrap_func_reload(func, ...)
while true do
res, err, need_reload = func(...)

if err == nil or not need_reload then
if err == nil or need_reload ~= const.NEED_SCHEMA_RELOAD then
break
end

Expand Down
124 changes: 109 additions & 15 deletions crud/common/sharding/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ local errors = require('errors')
local BucketIDError = errors.new_class("BucketIDError", {capture_stack = false})
local GetReplicasetsError = errors.new_class('GetReplicasetsError', {capture_stack = false})

local const = require('crud.common.const')
local utils = require('crud.common.utils')
local dev_checks = require('crud.common.dev_checks')
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')
local storage_metadata_cache = require('crud.common.sharding.storage_metadata_cache')
local sharding_utils = require('crud.common.sharding.utils')

local sharding = {}

Expand All @@ -25,37 +28,49 @@ function sharding.key_get_bucket_id(space_name, key, specified_bucket_id)
dev_checks('string', '?', '?number|cdata')

if specified_bucket_id ~= nil then
return specified_bucket_id
return { bucket_id = specified_bucket_id }
end

local sharding_func, err = sharding_metadata_module.fetch_sharding_func_on_router(space_name)
local sharding_func_data, err = sharding_metadata_module.fetch_sharding_func_on_router(space_name)
if err ~= nil then
return nil, err
end

if sharding_func ~= nil then
return sharding_func(key)
if sharding_func_data.value ~= nil then
return {
bucket_id = sharding_func_data.value(key),
sharding_func_hash = sharding_func_data.hash,
}
end

return vshard.router.bucket_id_strcrc32(key)
return { bucket_id = vshard.router.bucket_id_strcrc32(key) }
end

function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id)
if specified_bucket_id ~= nil then
return specified_bucket_id
return { bucket_id = specified_bucket_id }
end

local sharding_index_parts = space.index[0].parts
local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_sharding_key_on_router(space.name)
local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space.name)
if err ~= nil then
return nil, err
end
if sharding_key_as_index_obj ~= nil then
sharding_index_parts = sharding_key_as_index_obj.parts
if sharding_key_data.value ~= nil then
sharding_index_parts = sharding_key_data.value.parts
end
local key = utils.extract_key(tuple, sharding_index_parts)

return sharding.key_get_bucket_id(space.name, key)
local bucket_id_data, err = sharding.key_get_bucket_id(space.name, key, nil)
if err ~= nil then
return nil, err
end

return {
bucket_id = bucket_id_data.bucket_id,
sharding_func_hash = bucket_id_data.sharding_func_hash,
sharding_key_hash = sharding_key_data.hash
}
end

function sharding.tuple_set_and_return_bucket_id(tuple, space, specified_bucket_id)
Expand All @@ -77,16 +92,95 @@ function sharding.tuple_set_and_return_bucket_id(tuple, space, specified_bucket_
end
end

local bucket_id = tuple[bucket_id_fieldno]
if bucket_id == nil then
bucket_id, err = sharding.tuple_get_bucket_id(tuple, space)
local sharding_data = { bucket_id = tuple[bucket_id_fieldno] }

if sharding_data.bucket_id == nil then
sharding_data, err = sharding.tuple_get_bucket_id(tuple, space)
if err ~= nil then
return nil, err
end
tuple[bucket_id_fieldno] = bucket_id
tuple[bucket_id_fieldno] = sharding_data.bucket_id
end

return sharding_data
end

function sharding.check_sharding_hash(space_name, sharding_func_hash, sharding_key_hash, skip_sharding_hash_check)
if skip_sharding_hash_check == true then
return true
end

local storage_func_hash = storage_metadata_cache.get_sharding_func_hash(space_name)
local storage_key_hash = storage_metadata_cache.get_sharding_key_hash(space_name)

if storage_func_hash ~= sharding_func_hash or storage_key_hash ~= sharding_key_hash then
local err_msg = ('crud: Sharding hash mismatch for space %s. ' ..
'Sharding info will be refreshed after receiving this error. ' ..
'Please retry your request.'
):format(space_name)
return nil, sharding_utils.ShardingHashMismatchError:new(err_msg)
end

return true
end

function sharding.result_needs_sharding_reload(err)
return err.class_name == sharding_utils.ShardingHashMismatchError.name
end

function sharding.wrap_method(method, space_name, ...)
local i = 0

local res, err, need_reload
while true do
res, err, need_reload = method(space_name, ...)

if err == nil or need_reload ~= const.NEED_SHARDING_RELOAD then
break
end

sharding_metadata_module.reload_sharding_cache(space_name)

i = i + 1

if i > const.SHARDING_RELOAD_RETRIES_NUM then
break
end
end

return res, err, need_reload
end

-- This wrapper assumes reload is performed inside the method and
-- expect ShardingHashMismatchError error to be thrown.
function sharding.wrap_select_method(method, space_name, ...)
local i = 0

local ok, res, err
while true do
ok, res, err = pcall(method, space_name, ...)

if ok == true then
break
end

-- Error thrown from merger casted to string,
-- so the only way to identify it is string.find.
local str_err = tostring(res)
if (str_err:find(sharding_utils.ShardingHashMismatchError.name) == nil) then
error(res)
end

-- Reload is performed inside the merger.

i = i + 1

if i > const.SHARDING_RELOAD_RETRIES_NUM then
error(res)
end
end

return bucket_id
return res, err
end

return sharding
25 changes: 25 additions & 0 deletions crud/common/sharding/router_metadata_cache.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
local fiber = require('fiber')

local router_metadata_cache = {}

router_metadata_cache.SHARDING_KEY_MAP_NAME = "sharding_key_as_index_obj_map"
router_metadata_cache.SHARDING_FUNC_MAP_NAME = "sharding_func_map"
router_metadata_cache.META_HASH_MAP_NAME = "sharding_meta_hash_map"
router_metadata_cache[router_metadata_cache.SHARDING_KEY_MAP_NAME] = nil
router_metadata_cache[router_metadata_cache.SHARDING_FUNC_MAP_NAME] = nil
router_metadata_cache[router_metadata_cache.META_HASH_MAP_NAME] = {}
router_metadata_cache.fetch_lock = fiber.channel(1)
router_metadata_cache.is_part_of_pk = {}

function router_metadata_cache.drop_caches()
router_metadata_cache[router_metadata_cache.SHARDING_KEY_MAP_NAME] = nil
router_metadata_cache[router_metadata_cache.SHARDING_FUNC_MAP_NAME] = nil
router_metadata_cache[router_metadata_cache.META_HASH_MAP_NAME] = {}
if router_metadata_cache.fetch_lock ~= nil then
router_metadata_cache.fetch_lock:close()
end
router_metadata_cache.fetch_lock = fiber.channel(1)
router_metadata_cache.is_part_of_pk = {}
end

return router_metadata_cache
12 changes: 9 additions & 3 deletions crud/common/sharding/sharding_func.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ local errors = require('errors')
local log = require('log')

local dev_checks = require('crud.common.dev_checks')
local cache = require('crud.common.sharding.sharding_metadata_cache')
local cache = require('crud.common.sharding.router_metadata_cache')
local utils = require('crud.common.utils')

local ShardingFuncError = errors.new_class('ShardingFuncError', {capture_stack = false})
Expand Down Expand Up @@ -82,7 +82,12 @@ function sharding_func_module.construct_as_callable_obj_cache(metadata_map, spec

local result_err

cache.sharding_func_map = {}
cache[cache.SHARDING_FUNC_MAP_NAME] = {}
local func_cache = cache[cache.SHARDING_FUNC_MAP_NAME]

cache[cache.META_HASH_MAP_NAME][cache.SHARDING_FUNC_MAP_NAME] = {}
local func_hash_cache = cache[cache.META_HASH_MAP_NAME][cache.SHARDING_FUNC_MAP_NAME]

for space_name, metadata in pairs(metadata_map) do
if metadata.sharding_func_def ~= nil then
local sharding_func, err = as_callable_object(metadata.sharding_func_def,
Expand All @@ -96,7 +101,8 @@ function sharding_func_module.construct_as_callable_obj_cache(metadata_map, spec
end
end

cache.sharding_func_map[space_name] = sharding_func
func_cache[space_name] = sharding_func
func_hash_cache[space_name] = metadata.sharding_func_hash
end
end

Expand Down
12 changes: 9 additions & 3 deletions crud/common/sharding/sharding_key.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ local errors = require('errors')
local log = require('log')

local dev_checks = require('crud.common.dev_checks')
local cache = require('crud.common.sharding.sharding_metadata_cache')
local cache = require('crud.common.sharding.router_metadata_cache')
local utils = require('crud.common.utils')

local ShardingKeyError = errors.new_class("ShardingKeyError", {capture_stack = false})
Expand Down Expand Up @@ -102,7 +102,12 @@ function sharding_key_module.construct_as_index_obj_cache(metadata_map, specifie

local result_err

cache.sharding_key_as_index_obj_map = {}
cache[cache.SHARDING_KEY_MAP_NAME] = {}
local key_cache = cache[cache.SHARDING_KEY_MAP_NAME]

cache[cache.META_HASH_MAP_NAME][cache.SHARDING_KEY_MAP_NAME] = {}
local key_hash_cache = cache[cache.META_HASH_MAP_NAME][cache.SHARDING_KEY_MAP_NAME]

for space_name, metadata in pairs(metadata_map) do
if metadata.sharding_key_def ~= nil then
local sharding_key_as_index_obj, err = as_index_object(space_name,
Expand All @@ -117,7 +122,8 @@ function sharding_key_module.construct_as_index_obj_cache(metadata_map, specifie
end
end

cache.sharding_key_as_index_obj_map[space_name] = sharding_key_as_index_obj
key_cache[space_name] = sharding_key_as_index_obj
key_hash_cache[space_name] = metadata.sharding_key_hash
end
end

Expand Down
Loading