diff --git a/CHANGELOG.md b/CHANGELOG.md index d309bc7e..0d2ba55b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,10 +18,13 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Changed * Optimize select with known bucket_id (#234). +* Deprecate manual sharding schema reload (#212). ### Fixed * Fix processing storage error for tuple-merger implementation of select/pairs (#271). +* Do not change input tuple object in requests. +* Add automatic reload of DDL schema (#212). ## [0.10.0] - 01-12-21 diff --git a/README.md b/README.md index 7097b53a..4f14ec50 100644 --- a/README.md +++ b/README.md @@ -117,26 +117,11 @@ Table below describe what operations supports custom sharding key: Current limitations for using custom sharding key: -- It's not possible to update sharding keys automatically when schema is - updated on storages, see - [#212](https://github.com/tarantool/crud/issues/212). However it is possible - to do it manually with `require('crud.common.sharding_key').update_cache()` - (this function updates both caches: sharding key cache and sharding function - cache, but returned value is sharding key from cache). - No support of JSON path for sharding key, see [#219](https://github.com/tarantool/crud/issues/219). - `primary_index_fieldno_map` is not cached, see [#243](https://github.com/tarantool/crud/issues/243). -Current limitations for using custom sharding functions: - -- It's not possible to update sharding functions automatically when schema is - updated on storages, see - [#212](https://github.com/tarantool/crud/issues/212). However it is possible - to do it manually with `require('crud.common.sharding_func').update_cache()` - (this function updates both caches: sharding key cache and sharding function - cache, but returned value is sharding function from cache). - ### Insert ```lua diff --git a/crud/borders.lua b/crud/borders.lua index 4c4793d1..1c4a81c2 100644 --- a/crud/borders.lua +++ b/crud/borders.lua @@ -2,6 +2,7 @@ local checks = require('checks') local errors = require('errors') local vshard = require('vshard') +local const = require('crud.common.const') local dev_checks = require('crud.common.dev_checks') local call = require('crud.common.call') local utils = require('crud.common.utils') @@ -76,7 +77,7 @@ local function call_get_border_on_router(border_name, space_name, index_name, op local space = utils.get_space(space_name, vshard.router.routeall()) if space == nil then - return nil, BorderError:new("Space %q doesn't exist", space_name), true + return nil, BorderError:new("Space %q doesn't exist", space_name), const.NEED_SCHEMA_RELOAD end local index @@ -87,7 +88,9 @@ local function call_get_border_on_router(border_name, space_name, index_name, op end if index == nil then - return nil, BorderError:new("Index %q of space %q doesn't exist", index_name, space_name), true + return nil, + BorderError:new("Index %q of space %q doesn't exist", index_name, space_name), + const.NEED_SCHEMA_RELOAD end local primary_index = space.index[0] @@ -131,8 +134,14 @@ local function call_get_border_on_router(border_name, space_name, index_name, op for _, storage_result in pairs(results) do local storage_result = storage_result[1] if storage_result.err ~= nil then + local err_wrapped = BorderError:new("Failed to get %s: %s", border_name, storage_result.err) + local need_reload = schema.result_needs_reload(space, storage_result) - return nil, BorderError:new("Failed to get %s: %s", border_name, storage_result.err), need_reload + if need_reload then + return nil, err_wrapped, const.NEED_SCHEMA_RELOAD + end + + return nil, err_wrapped end local tuple = storage_result.res diff --git a/crud/common/call.lua b/crud/common/call.lua index a0919237..50bc2c4b 100644 --- a/crud/common/call.lua +++ b/crud/common/call.lua @@ -3,6 +3,7 @@ local errors = require('errors') local dev_checks = require('crud.common.dev_checks') local utils = require('crud.common.utils') +local sharding_utils = require('crud.common.sharding.utils') local fiber_clock = require('fiber').clock local CallError = errors.new_class('CallError') @@ -40,6 +41,11 @@ function call.get_vshard_call_name(mode, prefer_replica, balance) end local function wrap_vshard_err(err, func_name, replicaset_uuid, bucket_id) + -- Do not rewrite ShardingHashMismatchError class. + if err.class_name == sharding_utils.ShardingHashMismatchError.name then + return errors.wrap(err) + end + if err.type == 'ClientError' and type(err.message) == 'string' then if err.message == string.format("Procedure '%s' is not defined", func_name) then if func_name:startswith('_crud.') then diff --git a/crud/common/const.lua b/crud/common/const.lua index fc261269..d452e1bf 100644 --- a/crud/common/const.lua +++ b/crud/common/const.lua @@ -3,5 +3,9 @@ local const = {} const.RELOAD_RETRIES_NUM = 1 const.RELOAD_SCHEMA_TIMEOUT = 3 -- 3 seconds const.FETCH_SHARDING_METADATA_TIMEOUT = 3 -- 3 seconds +const.SHARDING_RELOAD_RETRIES_NUM = 1 + +const.NEED_SCHEMA_RELOAD = 0x0001000 +const.NEED_SHARDING_RELOAD = 0x0001001 return const diff --git a/crud/common/schema.lua b/crud/common/schema.lua index 25375ff4..85487d7f 100644 --- a/crud/common/schema.lua +++ b/crud/common/schema.lua @@ -87,7 +87,7 @@ function schema.wrap_func_reload(func, ...) while true do res, err, need_reload = func(...) - if err == nil or not need_reload then + if err == nil or need_reload ~= const.NEED_SCHEMA_RELOAD then break end diff --git a/crud/common/sharding/init.lua b/crud/common/sharding/init.lua index de669a9a..6edc7951 100644 --- a/crud/common/sharding/init.lua +++ b/crud/common/sharding/init.lua @@ -4,9 +4,12 @@ local errors = require('errors') local BucketIDError = errors.new_class("BucketIDError", {capture_stack = false}) local GetReplicasetsError = errors.new_class('GetReplicasetsError', {capture_stack = false}) +local const = require('crud.common.const') local utils = require('crud.common.utils') local dev_checks = require('crud.common.dev_checks') local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') +local storage_metadata_cache = require('crud.common.sharding.storage_metadata_cache') +local sharding_utils = require('crud.common.sharding.utils') local sharding = {} @@ -25,37 +28,49 @@ function sharding.key_get_bucket_id(space_name, key, specified_bucket_id) dev_checks('string', '?', '?number|cdata') if specified_bucket_id ~= nil then - return specified_bucket_id + return { bucket_id = specified_bucket_id } end - local sharding_func, err = sharding_metadata_module.fetch_sharding_func_on_router(space_name) + local sharding_func_data, err = sharding_metadata_module.fetch_sharding_func_on_router(space_name) if err ~= nil then return nil, err end - if sharding_func ~= nil then - return sharding_func(key) + if sharding_func_data.value ~= nil then + return { + bucket_id = sharding_func_data.value(key), + sharding_func_hash = sharding_func_data.hash, + } end - return vshard.router.bucket_id_strcrc32(key) + return { bucket_id = vshard.router.bucket_id_strcrc32(key) } end function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id) if specified_bucket_id ~= nil then - return specified_bucket_id + return { bucket_id = specified_bucket_id } end local sharding_index_parts = space.index[0].parts - local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_sharding_key_on_router(space.name) + local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space.name) if err ~= nil then return nil, err end - if sharding_key_as_index_obj ~= nil then - sharding_index_parts = sharding_key_as_index_obj.parts + if sharding_key_data.value ~= nil then + sharding_index_parts = sharding_key_data.value.parts end local key = utils.extract_key(tuple, sharding_index_parts) - return sharding.key_get_bucket_id(space.name, key) + local bucket_id_data, err = sharding.key_get_bucket_id(space.name, key, nil) + if err ~= nil then + return nil, err + end + + return { + bucket_id = bucket_id_data.bucket_id, + sharding_func_hash = bucket_id_data.sharding_func_hash, + sharding_key_hash = sharding_key_data.hash + } end function sharding.tuple_set_and_return_bucket_id(tuple, space, specified_bucket_id) @@ -77,16 +92,95 @@ function sharding.tuple_set_and_return_bucket_id(tuple, space, specified_bucket_ end end - local bucket_id = tuple[bucket_id_fieldno] - if bucket_id == nil then - bucket_id, err = sharding.tuple_get_bucket_id(tuple, space) + local sharding_data = { bucket_id = tuple[bucket_id_fieldno] } + + if sharding_data.bucket_id == nil then + sharding_data, err = sharding.tuple_get_bucket_id(tuple, space) if err ~= nil then return nil, err end - tuple[bucket_id_fieldno] = bucket_id + tuple[bucket_id_fieldno] = sharding_data.bucket_id + end + + return sharding_data +end + +function sharding.check_sharding_hash(space_name, sharding_func_hash, sharding_key_hash, skip_sharding_hash_check) + if skip_sharding_hash_check == true then + return true + end + + local storage_func_hash = storage_metadata_cache.get_sharding_func_hash(space_name) + local storage_key_hash = storage_metadata_cache.get_sharding_key_hash(space_name) + + if storage_func_hash ~= sharding_func_hash or storage_key_hash ~= sharding_key_hash then + local err_msg = ('crud: Sharding hash mismatch for space %s. ' .. + 'Sharding info will be refreshed after receiving this error. ' .. + 'Please retry your request.' + ):format(space_name) + return nil, sharding_utils.ShardingHashMismatchError:new(err_msg) + end + + return true +end + +function sharding.result_needs_sharding_reload(err) + return err.class_name == sharding_utils.ShardingHashMismatchError.name +end + +function sharding.wrap_method(method, space_name, ...) + local i = 0 + + local res, err, need_reload + while true do + res, err, need_reload = method(space_name, ...) + + if err == nil or need_reload ~= const.NEED_SHARDING_RELOAD then + break + end + + sharding_metadata_module.reload_sharding_cache(space_name) + + i = i + 1 + + if i > const.SHARDING_RELOAD_RETRIES_NUM then + break + end + end + + return res, err, need_reload +end + +-- This wrapper assumes reload is performed inside the method and +-- expect ShardingHashMismatchError error to be thrown. +function sharding.wrap_select_method(method, space_name, ...) + local i = 0 + + local ok, res, err + while true do + ok, res, err = pcall(method, space_name, ...) + + if ok == true then + break + end + + -- Error thrown from merger casted to string, + -- so the only way to identify it is string.find. + local str_err = tostring(res) + if (str_err:find(sharding_utils.ShardingHashMismatchError.name) == nil) then + error(res) + end + + -- Reload is performed inside the merger. + + i = i + 1 + + if i > const.SHARDING_RELOAD_RETRIES_NUM then + error(res) + end end - return bucket_id + return res, err end return sharding diff --git a/crud/common/sharding/router_metadata_cache.lua b/crud/common/sharding/router_metadata_cache.lua new file mode 100644 index 00000000..a6496f00 --- /dev/null +++ b/crud/common/sharding/router_metadata_cache.lua @@ -0,0 +1,25 @@ +local fiber = require('fiber') + +local router_metadata_cache = {} + +router_metadata_cache.SHARDING_KEY_MAP_NAME = "sharding_key_as_index_obj_map" +router_metadata_cache.SHARDING_FUNC_MAP_NAME = "sharding_func_map" +router_metadata_cache.META_HASH_MAP_NAME = "sharding_meta_hash_map" +router_metadata_cache[router_metadata_cache.SHARDING_KEY_MAP_NAME] = nil +router_metadata_cache[router_metadata_cache.SHARDING_FUNC_MAP_NAME] = nil +router_metadata_cache[router_metadata_cache.META_HASH_MAP_NAME] = {} +router_metadata_cache.fetch_lock = fiber.channel(1) +router_metadata_cache.is_part_of_pk = {} + +function router_metadata_cache.drop_caches() + router_metadata_cache[router_metadata_cache.SHARDING_KEY_MAP_NAME] = nil + router_metadata_cache[router_metadata_cache.SHARDING_FUNC_MAP_NAME] = nil + router_metadata_cache[router_metadata_cache.META_HASH_MAP_NAME] = {} + if router_metadata_cache.fetch_lock ~= nil then + router_metadata_cache.fetch_lock:close() + end + router_metadata_cache.fetch_lock = fiber.channel(1) + router_metadata_cache.is_part_of_pk = {} +end + +return router_metadata_cache \ No newline at end of file diff --git a/crud/common/sharding/sharding_func.lua b/crud/common/sharding/sharding_func.lua index 8307d6d4..6a503b45 100644 --- a/crud/common/sharding/sharding_func.lua +++ b/crud/common/sharding/sharding_func.lua @@ -2,7 +2,7 @@ local errors = require('errors') local log = require('log') local dev_checks = require('crud.common.dev_checks') -local cache = require('crud.common.sharding.sharding_metadata_cache') +local cache = require('crud.common.sharding.router_metadata_cache') local utils = require('crud.common.utils') local ShardingFuncError = errors.new_class('ShardingFuncError', {capture_stack = false}) @@ -82,7 +82,12 @@ function sharding_func_module.construct_as_callable_obj_cache(metadata_map, spec local result_err - cache.sharding_func_map = {} + cache[cache.SHARDING_FUNC_MAP_NAME] = {} + local func_cache = cache[cache.SHARDING_FUNC_MAP_NAME] + + cache[cache.META_HASH_MAP_NAME][cache.SHARDING_FUNC_MAP_NAME] = {} + local func_hash_cache = cache[cache.META_HASH_MAP_NAME][cache.SHARDING_FUNC_MAP_NAME] + for space_name, metadata in pairs(metadata_map) do if metadata.sharding_func_def ~= nil then local sharding_func, err = as_callable_object(metadata.sharding_func_def, @@ -96,7 +101,8 @@ function sharding_func_module.construct_as_callable_obj_cache(metadata_map, spec end end - cache.sharding_func_map[space_name] = sharding_func + func_cache[space_name] = sharding_func + func_hash_cache[space_name] = metadata.sharding_func_hash end end diff --git a/crud/common/sharding/sharding_key.lua b/crud/common/sharding/sharding_key.lua index d38f561f..635c3e34 100644 --- a/crud/common/sharding/sharding_key.lua +++ b/crud/common/sharding/sharding_key.lua @@ -2,7 +2,7 @@ local errors = require('errors') local log = require('log') local dev_checks = require('crud.common.dev_checks') -local cache = require('crud.common.sharding.sharding_metadata_cache') +local cache = require('crud.common.sharding.router_metadata_cache') local utils = require('crud.common.utils') local ShardingKeyError = errors.new_class("ShardingKeyError", {capture_stack = false}) @@ -102,7 +102,12 @@ function sharding_key_module.construct_as_index_obj_cache(metadata_map, specifie local result_err - cache.sharding_key_as_index_obj_map = {} + cache[cache.SHARDING_KEY_MAP_NAME] = {} + local key_cache = cache[cache.SHARDING_KEY_MAP_NAME] + + cache[cache.META_HASH_MAP_NAME][cache.SHARDING_KEY_MAP_NAME] = {} + local key_hash_cache = cache[cache.META_HASH_MAP_NAME][cache.SHARDING_KEY_MAP_NAME] + for space_name, metadata in pairs(metadata_map) do if metadata.sharding_key_def ~= nil then local sharding_key_as_index_obj, err = as_index_object(space_name, @@ -117,7 +122,8 @@ function sharding_key_module.construct_as_index_obj_cache(metadata_map, specifie end end - cache.sharding_key_as_index_obj_map[space_name] = sharding_key_as_index_obj + key_cache[space_name] = sharding_key_as_index_obj + key_hash_cache[space_name] = metadata.sharding_key_hash end end diff --git a/crud/common/sharding/sharding_metadata.lua b/crud/common/sharding/sharding_metadata.lua index 3c31be7c..2716462c 100644 --- a/crud/common/sharding/sharding_metadata.lua +++ b/crud/common/sharding/sharding_metadata.lua @@ -1,12 +1,15 @@ local fiber = require('fiber') local errors = require('errors') +local log = require('log') local call = require('crud.common.call') local const = require('crud.common.const') local dev_checks = require('crud.common.dev_checks') -local cache = require('crud.common.sharding.sharding_metadata_cache') +local cache = require('crud.common.sharding.router_metadata_cache') +local storage_cache = require('crud.common.sharding.storage_metadata_cache') local sharding_func = require('crud.common.sharding.sharding_func') local sharding_key = require('crud.common.sharding.sharding_key') +local sharding_utils = require('crud.common.sharding.utils') local FetchShardingMetadataError = errors.new_class('FetchShardingMetadataError', {capture_stack = false}) @@ -39,25 +42,6 @@ local function locked(f) end end -local function extract_sharding_func_def(tuple) - if not tuple then - return nil - end - - local SPACE_SHARDING_FUNC_NAME_FIELDNO = 2 - local SPACE_SHARDING_FUNC_BODY_FIELDNO = 3 - - if tuple[SPACE_SHARDING_FUNC_BODY_FIELDNO] ~= nil then - return {body = tuple[SPACE_SHARDING_FUNC_BODY_FIELDNO]} - end - - if tuple[SPACE_SHARDING_FUNC_NAME_FIELDNO] ~= nil then - return tuple[SPACE_SHARDING_FUNC_NAME_FIELDNO] - end - - return nil -end - -- Return a map with metadata or nil when spaces box.space._ddl_sharding_key and -- box.space._ddl_sharding_func are not available on storage. function sharding_metadata_module.fetch_on_storage() @@ -68,17 +52,16 @@ function sharding_metadata_module.fetch_on_storage() return nil end - local SPACE_NAME_FIELDNO = 1 - local SPACE_SHARDING_KEY_FIELDNO = 2 local metadata_map = {} if sharding_key_space ~= nil then for _, tuple in sharding_key_space:pairs() do - local space_name = tuple[SPACE_NAME_FIELDNO] - local sharding_key_def = tuple[SPACE_SHARDING_KEY_FIELDNO] + local space_name = tuple[sharding_utils.SPACE_NAME_FIELDNO] + local sharding_key_def = tuple[sharding_utils.SPACE_SHARDING_KEY_FIELDNO] local space_format = box.space[space_name]:format() metadata_map[space_name] = { sharding_key_def = sharding_key_def, + sharding_key_hash = storage_cache.get_sharding_key_hash(space_name), space_format = space_format, } end @@ -86,10 +69,11 @@ function sharding_metadata_module.fetch_on_storage() if sharding_func_space ~= nil then for _, tuple in sharding_func_space:pairs() do - local space_name = tuple[SPACE_NAME_FIELDNO] - local sharding_func_def = extract_sharding_func_def(tuple) + local space_name = tuple[sharding_utils.SPACE_NAME_FIELDNO] + local sharding_func_def = sharding_utils.extract_sharding_func_def(tuple) metadata_map[space_name] = metadata_map[space_name] or {} metadata_map[space_name].sharding_func_def = sharding_func_def + metadata_map[space_name].sharding_func_hash = storage_cache.get_sharding_func_hash(space_name) end end @@ -102,10 +86,11 @@ end -- a sharding metadata by a single one, other fibers will wait while -- cache.fetch_lock become unlocked during timeout passed to -- _fetch_on_router(). +-- metadata_map_name == nil means forced reload. local _fetch_on_router = locked(function(timeout, space_name, metadata_map_name) - dev_checks('number', 'string', 'string') + dev_checks('number', 'string', '?string') - if cache[metadata_map_name] ~= nil then + if (metadata_map_name ~= nil) and (cache[metadata_map_name]) ~= nil then return end @@ -118,6 +103,10 @@ local _fetch_on_router = locked(function(timeout, space_name, metadata_map_name) if metadata_map == nil then cache[cache.SHARDING_KEY_MAP_NAME] = {} cache[cache.SHARDING_FUNC_MAP_NAME] = {} + cache[cache.META_HASH_MAP_NAME] = { + [cache.SHARDING_KEY_MAP_NAME] = {}, + [cache.SHARDING_FUNC_MAP_NAME] = {}, + } return end @@ -134,7 +123,10 @@ end) local function fetch_on_router(space_name, metadata_map_name, timeout) if cache[metadata_map_name] ~= nil then - return cache[metadata_map_name][space_name] + return { + value = cache[metadata_map_name][space_name], + hash = cache[cache.META_HASH_MAP_NAME][metadata_map_name][space_name] + } end local timeout = timeout or const.FETCH_SHARDING_METADATA_TIMEOUT @@ -144,7 +136,10 @@ local function fetch_on_router(space_name, metadata_map_name, timeout) end if cache[metadata_map_name] ~= nil then - return cache[metadata_map_name][space_name] + return { + value = cache[metadata_map_name][space_name], + hash = cache[cache.META_HASH_MAP_NAME][metadata_map_name][space_name], + } end return nil, FetchShardingMetadataError:new( @@ -193,6 +188,15 @@ function sharding_metadata_module.update_sharding_func_cache(space_name) return sharding_metadata_module.fetch_sharding_func_on_router(space_name) end +function sharding_metadata_module.reload_sharding_cache(space_name) + cache.drop_caches() + + local err = _fetch_on_router(const.FETCH_SHARDING_METADATA_TIMEOUT, space_name, nil) + if err ~= nil then + log.warn('Failed to reload sharding cache: %s', err) + end +end + function sharding_metadata_module.init() _G._crud.fetch_on_storage = sharding_metadata_module.fetch_on_storage end diff --git a/crud/common/sharding/sharding_metadata_cache.lua b/crud/common/sharding/sharding_metadata_cache.lua deleted file mode 100644 index 5b2fdcd6..00000000 --- a/crud/common/sharding/sharding_metadata_cache.lua +++ /dev/null @@ -1,22 +0,0 @@ -local fiber = require('fiber') - -local sharding_metadata_cache = {} - -sharding_metadata_cache.SHARDING_KEY_MAP_NAME = "sharding_key_as_index_obj_map" -sharding_metadata_cache.SHARDING_FUNC_MAP_NAME = "sharding_func_map" -sharding_metadata_cache[sharding_metadata_cache.SHARDING_KEY_MAP_NAME] = nil -sharding_metadata_cache[sharding_metadata_cache.SHARDING_FUNC_MAP_NAME] = nil -sharding_metadata_cache.fetch_lock = fiber.channel(1) -sharding_metadata_cache.is_part_of_pk = {} - -function sharding_metadata_cache.drop_caches() - sharding_metadata_cache[sharding_metadata_cache.SHARDING_KEY_MAP_NAME] = nil - sharding_metadata_cache[sharding_metadata_cache.SHARDING_FUNC_MAP_NAME] = nil - if sharding_metadata_cache.fetch_lock ~= nil then - sharding_metadata_cache.fetch_lock:close() - end - sharding_metadata_cache.fetch_lock = fiber.channel(1) - sharding_metadata_cache.is_part_of_pk = {} -end - -return sharding_metadata_cache diff --git a/crud/common/sharding/storage_metadata_cache.lua b/crud/common/sharding/storage_metadata_cache.lua new file mode 100644 index 00000000..f5cc238d --- /dev/null +++ b/crud/common/sharding/storage_metadata_cache.lua @@ -0,0 +1,95 @@ +local stash = require('crud.common.stash') +local utils = require('crud.common.sharding.utils') + +local storage_metadata_cache = {} + +local FUNC = 1 +local KEY = 2 + +local cache_data = { + [FUNC] = nil, + [KEY] = nil, +} + +local ddl_space = { + [FUNC] = '_ddl_sharding_func', + [KEY] = '_ddl_sharding_key', +} + +local trigger_stash = stash.get(stash.name.ddl_triggers) + +local function update_sharding_func_hash(tuple) + local space_name = tuple[utils.SPACE_NAME_FIELDNO] + local sharding_func_def = utils.extract_sharding_func_def(tuple) + cache_data[FUNC][space_name] = utils.compute_hash(sharding_func_def) +end + +local function update_sharding_key_hash(tuple) + local space_name = tuple[utils.SPACE_NAME_FIELDNO] + local sharding_key_def = tuple[utils.SPACE_SHARDING_KEY_FIELDNO] + cache_data[KEY][space_name] = utils.compute_hash(sharding_key_def) +end + +local update_hash = { + [FUNC] = update_sharding_func_hash, + [KEY] = update_sharding_key_hash, +} + +local function init_cache(section) + cache_data[section] = {} + + local space = box.space[ddl_space[section]] + + local update_hash_func = update_hash[section] + + -- Remove old trigger if there was some code reload. + -- It is possible that ddl space was dropped and created again, + -- so removing non-existing trigger will cause fail; + -- thus we use pcall. + pcall(space.on_replace, space, nil, trigger_stash[section]) + + trigger_stash[section] = space:on_replace( + function(_, new) + return update_hash_func(new) + end + ) + + for _, tuple in space:pairs() do + local space_name = tuple[utils.SPACE_NAME_FIELDNO] + -- If the cache record for a space is not nil, it means + -- that it was already set to up-to-date value with trigger. + -- It is more like an overcautiousness since the cycle + -- isn't expected to yield, but let it be here. + if cache_data[section][space_name] == nil then + update_hash_func(tuple) + end + end +end + +local function get_sharding_hash(space_name, section) + if box.space[ddl_space[section]] == nil then + return nil + end + + -- If one would drop and rebuild ddl spaces fom scratch manually, + -- caching is likely to break. + if cache_data[section] == nil then + init_cache(section) + end + + return cache_data[section][space_name] +end + +function storage_metadata_cache.get_sharding_func_hash(space_name) + return get_sharding_hash(space_name, FUNC) +end + +function storage_metadata_cache.get_sharding_key_hash(space_name) + return get_sharding_hash(space_name, KEY) +end + +function storage_metadata_cache.drop_caches() + cache_data = {} +end + +return storage_metadata_cache diff --git a/crud/common/sharding/utils.lua b/crud/common/sharding/utils.lua new file mode 100644 index 00000000..c6e8c2d2 --- /dev/null +++ b/crud/common/sharding/utils.lua @@ -0,0 +1,34 @@ +local digest = require('digest') +local errors = require('errors') +local msgpack = require('msgpack') + +local utils = {} + +utils.SPACE_NAME_FIELDNO = 1 +utils.SPACE_SHARDING_KEY_FIELDNO = 2 +utils.SPACE_SHARDING_FUNC_NAME_FIELDNO = 2 +utils.SPACE_SHARDING_FUNC_BODY_FIELDNO = 3 + +utils.ShardingHashMismatchError = errors.new_class("ShardingHashMismatchError", {capture_stack = false}) + +function utils.extract_sharding_func_def(tuple) + if not tuple then + return nil + end + + if tuple[utils.SPACE_SHARDING_FUNC_BODY_FIELDNO] ~= nil then + return {body = tuple[utils.SPACE_SHARDING_FUNC_BODY_FIELDNO]} + end + + if tuple[utils.SPACE_SHARDING_FUNC_NAME_FIELDNO] ~= nil then + return tuple[utils.SPACE_SHARDING_FUNC_NAME_FIELDNO] + end + + return nil +end + +function utils.compute_hash(val) + return digest.murmur(msgpack.encode(val)) +end + +return utils diff --git a/crud/common/sharding_func.lua b/crud/common/sharding_func.lua index a3aca4c1..501ca709 100644 --- a/crud/common/sharding_func.lua +++ b/crud/common/sharding_func.lua @@ -1,3 +1,5 @@ +local log = require('log') + local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') local sharding_func_cache = {} @@ -9,6 +11,8 @@ local sharding_func_cache = {} -- This method provides similar behavior for -- sharding function cache. function sharding_func_cache.update_cache(space_name) + log.warn("require('crud.common.sharding_func').update_cache()" .. + "is deprecated and will be removed in future releases") return sharding_metadata_module.update_sharding_func_cache(space_name) end diff --git a/crud/common/sharding_key.lua b/crud/common/sharding_key.lua index 07c76f5d..30d75e7b 100644 --- a/crud/common/sharding_key.lua +++ b/crud/common/sharding_key.lua @@ -1,3 +1,5 @@ +local log = require('log') + local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') local sharding_key_cache = {} @@ -7,6 +9,8 @@ local sharding_key_cache = {} -- for updating sharding key cache in their -- projects like `require('crud.common.sharding_key').update_cache()` function sharding_key_cache.update_cache(space_name) + log.warn("require('crud.common.sharding_key').update_cache()" .. + "is deprecated and will be removed in future releases") return sharding_metadata_module.update_sharding_key_cache(space_name) end diff --git a/crud/common/stash.lua b/crud/common/stash.lua index 0557cb40..0fef9af3 100644 --- a/crud/common/stash.lua +++ b/crud/common/stash.lua @@ -23,7 +23,8 @@ stash.name = { cfg = '__crud_cfg', stats_internal = '__crud_stats_internal', stats_local_registry = '__crud_stats_local_registry', - stats_metrics_registry = '__crud_stats_metrics_registry' + stats_metrics_registry = '__crud_stats_metrics_registry', + ddl_triggers = '__crud_ddl_spaces_triggers', } --- Setup Tarantool Cartridge reload. diff --git a/crud/common/utils.lua b/crud/common/utils.lua index 56c9e693..e86dd778 100644 --- a/crud/common/utils.lua +++ b/crud/common/utils.lua @@ -4,6 +4,7 @@ local vshard = require('vshard') local fun = require('fun') local bit = require('bit') +local const = require('crud.common.const') local schema = require('crud.common.schema') local dev_checks = require('crud.common.dev_checks') @@ -631,12 +632,12 @@ end local function flatten_obj(space_name, obj) local space_format, err = utils.get_space_format(space_name, vshard.router.routeall()) if err ~= nil then - return nil, FlattenError:new("Failed to get space format: %s", err), true + return nil, FlattenError:new("Failed to get space format: %s", err), const.NEED_SCHEMA_RELOAD end local tuple, err = utils.flatten(obj, space_format) if err ~= nil then - return nil, FlattenError:new("Object is specified in bad format: %s", err), true + return nil, FlattenError:new("Object is specified in bad format: %s", err), const.NEED_SCHEMA_RELOAD end return tuple diff --git a/crud/count.lua b/crud/count.lua index 09401a4d..93395293 100644 --- a/crud/count.lua +++ b/crud/count.lua @@ -4,6 +4,7 @@ local vshard = require('vshard') local fiber = require('fiber') local call = require('crud.common.call') +local const = require('crud.common.const') local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') local filters = require('crud.compare.filters') @@ -27,6 +28,9 @@ local function count_on_storage(space_name, index_id, conditions, opts) tarantool_iter = 'number', yield_every = '?number', scan_condition_num = '?number', + sharding_func_hash = '?number', + sharding_key_hash = '?number', + skip_sharding_hash_check = '?boolean', }) opts = opts or {} @@ -38,6 +42,14 @@ local function count_on_storage(space_name, index_id, conditions, opts) return nil, CountError:new("Index with ID %s doesn't exist", index_id) end + local _, err = sharding.check_sharding_hash(space_name, + opts.sharding_func_hash, + opts.sharding_key_hash, + opts.skip_sharding_hash_check) + if err ~= nil then + return nil, err + end + local value = opts.scan_value local filter_func, err = filters.gen_func(space, conditions, { @@ -111,20 +123,20 @@ local function call_count_on_router(space_name, user_conditions, opts) local space = utils.get_space(space_name, replicasets) if space == nil then - return nil, CountError:new("Space %q doesn't exist", space_name), true + return nil, CountError:new("Space %q doesn't exist", space_name), const.NEED_SCHEMA_RELOAD end - local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name) + local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name) if err ~= nil then return nil, err end -- plan count local plan, err = count_plan.new(space, conditions, { - sharding_key_as_index_obj = sharding_key_as_index_obj, + sharding_key_as_index_obj = sharding_key_data.value, }) if err ~= nil then - return nil, CountError:new("Failed to plan count: %s", err), true + return nil, CountError:new("Failed to plan count: %s", err), const.NEED_SCHEMA_RELOAD end -- set replicasets to count from @@ -159,21 +171,28 @@ local function call_count_on_router(space_name, user_conditions, opts) -- eye to resharding. However, AFAIU, the optimization -- does not make the result less consistent (sounds -- weird, huh?). + local sharding_func_hash = nil + local skip_sharding_hash_check = nil + local perform_map_reduce = opts.force_map_call == true or (opts.bucket_id == nil and plan.sharding_key == nil) if not perform_map_reduce then - local bucket_id, err = sharding.key_get_bucket_id(space_name, plan.sharding_key, opts.bucket_id) + local bucket_id_data, err = sharding.key_get_bucket_id(space_name, plan.sharding_key, opts.bucket_id) if err ~= nil then return nil, err end - assert(bucket_id ~= nil) + assert(bucket_id_data.bucket_id ~= nil) + + sharding_func_hash = bucket_id_data.sharding_func_hash local err - replicasets_to_count, err = sharding.get_replicasets_by_bucket_id(bucket_id) + replicasets_to_count, err = sharding.get_replicasets_by_bucket_id(bucket_id_data.bucket_id) if err ~= nil then - return nil, err, true + return nil, err, const.NEED_SCHEMA_RELOAD end + else + skip_sharding_hash_check = true end local yield_every = opts.yield_every or DEFAULT_YIELD_EVERY @@ -191,6 +210,9 @@ local function call_count_on_router(space_name, user_conditions, opts) tarantool_iter = plan.tarantool_iter, yield_every = yield_every, scan_condition_num = plan.scan_condition_num, + sharding_func_hash = sharding_func_hash, + sharding_key_hash = sharding_key_data.hash, + skip_sharding_hash_check = skip_sharding_hash_check, } local results, err = call.map(COUNT_FUNC_NAME, { @@ -198,7 +220,13 @@ local function call_count_on_router(space_name, user_conditions, opts) }, call_opts) if err ~= nil then - return nil, CountError:new("Failed to call count on storage-side: %s", err) + local err_wrapped = CountError:new("Failed to call count on storage-side: %s", err) + + if sharding.result_needs_sharding_reload(err) then + return nil, err_wrapped, const.NEED_SHARDING_RELOAD + end + + return nil, err_wrapped end if results.err ~= nil then @@ -268,7 +296,8 @@ function count.call(space_name, user_conditions, opts) mode = '?string', }) - return schema.wrap_func_reload(call_count_on_router, space_name, user_conditions, opts) + return schema.wrap_func_reload(sharding.wrap_method, + call_count_on_router, space_name, user_conditions, opts) end return count diff --git a/crud/delete.lua b/crud/delete.lua index 01c061bf..6cdce0a8 100644 --- a/crud/delete.lua +++ b/crud/delete.lua @@ -3,6 +3,7 @@ local errors = require('errors') local vshard = require('vshard') local call = require('crud.common.call') +local const = require('crud.common.const') local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') local sharding_key_module = require('crud.common.sharding.sharding_key') @@ -16,14 +17,29 @@ local delete = {} local DELETE_FUNC_NAME = '_crud.delete_on_storage' -local function delete_on_storage(space_name, key, field_names) - dev_checks('string', '?', '?table') +local function delete_on_storage(space_name, key, field_names, opts) + dev_checks('string', '?', '?table', { + sharding_key_hash = '?number', + sharding_func_hash = '?number', + skip_sharding_hash_check = '?boolean', + }) + + opts = opts or {} local space = box.space[space_name] if space == nil then return nil, DeleteError:new("Space %q doesn't exist", space_name) end + local _, err = sharding.check_sharding_hash(space_name, + opts.sharding_func_hash, + opts.sharding_key_hash, + opts.skip_sharding_hash_check) + + if err ~= nil then + return nil, err + end + -- add_space_schema_hash is false because -- reloading space format on router can't avoid delete error on storage return schema.wrap_box_space_func_result(space, 'delete', {key}, { @@ -50,47 +66,67 @@ local function call_delete_on_router(space_name, key, opts) local space = utils.get_space(space_name, vshard.router.routeall()) if space == nil then - return nil, DeleteError:new("Space %q doesn't exist", space_name), true + return nil, DeleteError:new("Space %q doesn't exist", space_name), const.NEED_SCHEMA_RELOAD end if box.tuple.is(key) then key = key:totable() end + local sharding_key_hash = nil + local skip_sharding_hash_check = nil + local sharding_key = key if opts.bucket_id == nil then local primary_index_parts = space.index[0].parts - local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name) + local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name) if err ~= nil then return nil, err end sharding_key, err = sharding_key_module.extract_from_pk(space_name, - sharding_key_as_index_obj, + sharding_key_data.value, primary_index_parts, key) if err ~= nil then return nil, err end + + sharding_key_hash = sharding_key_data.hash + else + skip_sharding_hash_check = true end - local bucket_id, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id) + local bucket_id_data, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id) if err ~= nil then return nil, err end + local delete_on_storage_opts = { + sharding_func_hash = bucket_id_data.sharding_func_hash, + sharding_key_hash = sharding_key_hash, + skip_sharding_hash_check = skip_sharding_hash_check, + } + local call_opts = { mode = 'write', timeout = opts.timeout, } + local storage_result, err = call.single( - bucket_id, DELETE_FUNC_NAME, - {space_name, key, opts.fields}, + bucket_id_data.bucket_id, DELETE_FUNC_NAME, + {space_name, key, opts.fields, delete_on_storage_opts}, call_opts ) if err ~= nil then - return nil, DeleteError:new("Failed to call delete on storage-side: %s", err) + local err_wrapped = DeleteError:new("Failed to call delete on storage-side: %s") + + if sharding.result_needs_sharding_reload(err) then + return nil, err_wrapped, const.NEED_SHARDING_RELOAD + end + + return nil, err_wrapped end if storage_result.err ~= nil then @@ -130,7 +166,8 @@ function delete.call(space_name, key, opts) fields = '?table', }) - return schema.wrap_func_reload(call_delete_on_router, space_name, key, opts) + return schema.wrap_func_reload(sharding.wrap_method, + call_delete_on_router, space_name, key, opts) end return delete diff --git a/crud/get.lua b/crud/get.lua index 27e20193..dbf1304b 100644 --- a/crud/get.lua +++ b/crud/get.lua @@ -3,6 +3,7 @@ local errors = require('errors') local vshard = require('vshard') local call = require('crud.common.call') +local const = require('crud.common.const') local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') local sharding_key_module = require('crud.common.sharding.sharding_key') @@ -16,14 +17,29 @@ local get = {} local GET_FUNC_NAME = '_crud.get_on_storage' -local function get_on_storage(space_name, key, field_names) - dev_checks('string', '?', '?table') +local function get_on_storage(space_name, key, field_names, opts) + dev_checks('string', '?', '?table', { + sharding_key_hash = '?number', + sharding_func_hash = '?number', + skip_sharding_hash_check = '?boolean', + }) + + opts = opts or {} local space = box.space[space_name] if space == nil then return nil, GetError:new("Space %q doesn't exist", space_name) end + local _, err = sharding.check_sharding_hash(space_name, + opts.sharding_func_hash, + opts.sharding_key_hash, + opts.skip_sharding_hash_check) + + if err ~= nil then + return nil, err + end + -- add_space_schema_hash is false because -- reloading space format on router can't avoid get error on storage return schema.wrap_box_space_func_result(space, 'get', {key}, { @@ -53,7 +69,7 @@ local function call_get_on_router(space_name, key, opts) local space = utils.get_space(space_name, vshard.router.routeall()) if space == nil then - return nil, GetError:new("Space %q doesn't exist", space_name), true + return nil, GetError:new("Space %q doesn't exist", space_name), const.NEED_SCHEMA_RELOAD end if box.tuple.is(key) then @@ -61,41 +77,61 @@ local function call_get_on_router(space_name, key, opts) end local sharding_key = key + local sharding_key_hash = nil + local skip_sharding_hash_check = nil + if opts.bucket_id == nil then local primary_index_parts = space.index[0].parts - local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name) + local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name) if err ~= nil then return nil, err end sharding_key, err = sharding_key_module.extract_from_pk(space_name, - sharding_key_as_index_obj, + sharding_key_data.value, primary_index_parts, key) if err ~= nil then return nil, err end + + sharding_key_hash = sharding_key_data.hash + else + skip_sharding_hash_check = true end - local bucket_id, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id) + local bucket_id_data, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id) if err ~= nil then return nil, err end + local get_on_storage_opts = { + sharding_func_hash = bucket_id_data.sharding_func_hash, + sharding_key_hash = sharding_key_hash, + skip_sharding_hash_check = skip_sharding_hash_check, + } + local call_opts = { mode = opts.mode or 'read', prefer_replica = opts.prefer_replica, balance = opts.balance, timeout = opts.timeout, } + local storage_result, err = call.single( - bucket_id, GET_FUNC_NAME, - {space_name, key, opts.fields}, + bucket_id_data.bucket_id, GET_FUNC_NAME, + {space_name, key, opts.fields, get_on_storage_opts}, call_opts ) if err ~= nil then - return nil, GetError:new("Failed to call get on storage-side: %s", err) + local err_wrapped = GetError:new("Failed to call get on storage-side: %s", err) + + if sharding.result_needs_sharding_reload(err) then + return nil, err_wrapped, const.NEED_SHARDING_RELOAD + end + + return nil, err_wrapped end if storage_result.err ~= nil then @@ -149,7 +185,8 @@ function get.call(space_name, key, opts) mode = '?string', }) - return schema.wrap_func_reload(call_get_on_router, space_name, key, opts) + return schema.wrap_func_reload(sharding.wrap_method, + call_get_on_router, space_name, key, opts) end return get diff --git a/crud/insert.lua b/crud/insert.lua index fbb0e462..19daebfe 100644 --- a/crud/insert.lua +++ b/crud/insert.lua @@ -3,6 +3,7 @@ local errors = require('errors') local vshard = require('vshard') local call = require('crud.common.call') +local const = require('crud.common.const') local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') local dev_checks = require('crud.common.dev_checks') @@ -18,6 +19,8 @@ local function insert_on_storage(space_name, tuple, opts) dev_checks('string', 'table', { add_space_schema_hash = '?boolean', fields = '?table', + sharding_key_hash = '?number', + sharding_func_hash = '?number', }) opts = opts or {} @@ -27,6 +30,15 @@ local function insert_on_storage(space_name, tuple, opts) return nil, InsertError:new("Space %q doesn't exist", space_name) end + local _, err = sharding.check_sharding_hash(space_name, + opts.sharding_func_hash, + opts.sharding_key_hash, + opts.skip_sharding_hash_check) + + if err ~= nil then + return nil, err + end + -- add_space_schema_hash is true only in case of insert_object -- the only one case when reloading schema can avoid insert error -- is flattening object on router @@ -43,7 +55,7 @@ end -- returns result, err, need_reload -- need_reload indicates if reloading schema could help -- see crud.common.schema.wrap_func_reload() -local function call_insert_on_router(space_name, tuple, opts) +local function call_insert_on_router(space_name, original_tuple, opts) dev_checks('string', 'table', { timeout = '?number', bucket_id = '?number|cdata', @@ -55,36 +67,52 @@ local function call_insert_on_router(space_name, tuple, opts) local space = utils.get_space(space_name, vshard.router.routeall()) if space == nil then - return nil, InsertError:new("Space %q doesn't exist", space_name), true + return nil, InsertError:new("Space %q doesn't exist", space_name), const.NEED_SCHEMA_RELOAD end - local bucket_id, err = sharding.tuple_set_and_return_bucket_id(tuple, space, opts.bucket_id) + local tuple = table.deepcopy(original_tuple) + + local sharding_data, err = sharding.tuple_set_and_return_bucket_id(tuple, space, opts.bucket_id) if err ~= nil then - return nil, InsertError:new("Failed to get bucket ID: %s", err), true + return nil, InsertError:new("Failed to get bucket ID: %s", err), const.NEED_SCHEMA_RELOAD end local insert_on_storage_opts = { add_space_schema_hash = opts.add_space_schema_hash, fields = opts.fields, + sharding_func_hash = sharding_data.sharding_func_hash, + sharding_key_hash = sharding_data.sharding_key_hash, } local call_opts = { mode = 'write', timeout = opts.timeout, } + local storage_result, err = call.single( - bucket_id, INSERT_FUNC_NAME, + sharding_data.bucket_id, INSERT_FUNC_NAME, {space_name, tuple, insert_on_storage_opts}, call_opts ) if err ~= nil then - return nil, InsertError:new("Failed to call insert on storage-side: %s", err) + local err_wrapped = InsertError:new("Failed to call insert on storage-side: %s", err) + + if sharding.result_needs_sharding_reload(err) then + return nil, err_wrapped, const.NEED_SHARDING_RELOAD + end + + return nil, err_wrapped end if storage_result.err ~= nil then - local need_reload = schema.result_needs_reload(space, storage_result) - return nil, InsertError:new("Failed to insert: %s", storage_result.err), need_reload + local err_wrapped = InsertError:new("Failed to insert: %s", storage_result.err) + + if schema.result_needs_reload(space, storage_result) then + return nil, err_wrapped, const.NEED_SCHEMA_RELOAD + end + + return nil, err_wrapped end local tuple = storage_result.res @@ -121,7 +149,8 @@ function insert.tuple(space_name, tuple, opts) fields = '?table', }) - return schema.wrap_func_reload(call_insert_on_router, space_name, tuple, opts) + return schema.wrap_func_reload(sharding.wrap_method, + call_insert_on_router, space_name, tuple, opts) end --- Inserts an object to the specified space diff --git a/crud/replace.lua b/crud/replace.lua index 2069398e..0778d99a 100644 --- a/crud/replace.lua +++ b/crud/replace.lua @@ -3,6 +3,7 @@ local errors = require('errors') local vshard = require('vshard') local call = require('crud.common.call') +local const = require('crud.common.const') local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') local dev_checks = require('crud.common.dev_checks') @@ -18,6 +19,8 @@ local function replace_on_storage(space_name, tuple, opts) dev_checks('string', 'table', { add_space_schema_hash = '?boolean', fields = '?table', + sharding_key_hash = '?number', + sharding_func_hash = '?number', }) opts = opts or {} @@ -27,6 +30,15 @@ local function replace_on_storage(space_name, tuple, opts) return nil, ReplaceError:new("Space %q doesn't exist", space_name) end + local _, err = sharding.check_sharding_hash(space_name, + opts.sharding_func_hash, + opts.sharding_key_hash, + opts.skip_sharding_hash_check) + + if err ~= nil then + return nil, err + end + -- add_space_schema_hash is true only in case of replace_object -- the only one case when reloading schema can avoid insert error -- is flattening object on router @@ -43,7 +55,7 @@ end -- returns result, err, need_reload -- need_reload indicates if reloading schema could help -- see crud.common.schema.wrap_func_reload() -local function call_replace_on_router(space_name, tuple, opts) +local function call_replace_on_router(space_name, original_tuple, opts) dev_checks('string', 'table', { timeout = '?number', bucket_id = '?number|cdata', @@ -55,21 +67,25 @@ local function call_replace_on_router(space_name, tuple, opts) local space, err = utils.get_space(space_name, vshard.router.routeall()) if err ~= nil then - return nil, ReplaceError:new("Failed to get space %q: %s", space_name, err), true + return nil, ReplaceError:new("Failed to get space %q: %s", space_name, err), const.NEED_SCHEMA_RELOAD end if space == nil then - return nil, ReplaceError:new("Space %q doesn't exist", space_name), true + return nil, ReplaceError:new("Space %q doesn't exist", space_name), const.NEED_SCHEMA_RELOAD end - local bucket_id, err = sharding.tuple_set_and_return_bucket_id(tuple, space, opts.bucket_id) + local tuple = table.deepcopy(original_tuple) + + local sharding_data, err = sharding.tuple_set_and_return_bucket_id(tuple, space, opts.bucket_id) if err ~= nil then - return nil, ReplaceError:new("Failed to get bucket ID: %s", err), true + return nil, ReplaceError:new("Failed to get bucket ID: %s", err), const.NEED_SCHEMA_RELOAD end - local insert_on_storage_opts = { + local replace_on_storage_opts = { add_space_schema_hash = opts.add_space_schema_hash, fields = opts.fields, + sharding_func_hash = sharding_data.sharding_func_hash, + sharding_key_hash = sharding_data.sharding_key_hash, } local call_opts = { @@ -77,18 +93,29 @@ local function call_replace_on_router(space_name, tuple, opts) timeout = opts.timeout, } local storage_result, err = call.single( - bucket_id, REPLACE_FUNC_NAME, - {space_name, tuple, insert_on_storage_opts}, + sharding_data.bucket_id, REPLACE_FUNC_NAME, + {space_name, tuple, replace_on_storage_opts}, call_opts ) if err ~= nil then - return nil, ReplaceError:new("Failed to call replace on storage-side: %s", err) + local err_wrapped = ReplaceError:new("Failed to call replace on storage-side: %s", err) + + if sharding.result_needs_sharding_reload(err) then + return nil, err_wrapped, const.NEED_SHARDING_RELOAD + end + + return nil, err_wrapped end if storage_result.err ~= nil then - local need_reload = schema.result_needs_reload(space, storage_result) - return nil, ReplaceError:new("Failed to replace: %s", storage_result.err), need_reload + local err_wrapped = ReplaceError:new("Failed to replace: %s", storage_result.err) + + if schema.result_needs_reload(space, storage_result) then + return nil, err_wrapped, const.NEED_SCHEMA_RELOAD + end + + return nil, err_wrapped end local tuple = storage_result.res @@ -125,7 +152,8 @@ function replace.tuple(space_name, tuple, opts) fields = '?table', }) - return schema.wrap_func_reload(call_replace_on_router, space_name, tuple, opts) + return schema.wrap_func_reload(sharding.wrap_method, + call_replace_on_router, space_name, tuple, opts) end --- Insert or replace an object in the specified space diff --git a/crud/select.lua b/crud/select.lua index b0d1ef9b..55ab00ca 100644 --- a/crud/select.lua +++ b/crud/select.lua @@ -1,6 +1,7 @@ local errors = require('errors') local utils = require('crud.common.utils') +local sharding = require('crud.common.sharding') local select_executor = require('crud.select.executor') local select_filters = require('crud.compare.filters') local dev_checks = require('crud.common.dev_checks') @@ -38,6 +39,9 @@ local function select_on_storage(space_name, index_id, conditions, opts) limit = 'number', scan_condition_num = '?number', field_names = '?table', + sharding_key_hash = '?number', + sharding_func_hash = '?number', + skip_sharding_hash_check = '?boolean', }) local space = box.space[space_name] @@ -50,6 +54,15 @@ local function select_on_storage(space_name, index_id, conditions, opts) return nil, SelectError:new("Index with ID %s doesn't exist", index_id) end + local _, err = sharding.check_sharding_hash(space_name, + opts.sharding_func_hash, + opts.sharding_key_hash, + opts.skip_sharding_hash_check) + + if err ~= nil then + return nil, err + end + local filter_func, err = select_filters.gen_func(space, conditions, { tarantool_iter = opts.tarantool_iter, scan_condition_num = opts.scan_condition_num, diff --git a/crud/select/compat/select.lua b/crud/select/compat/select.lua index 926816b3..be500f5d 100644 --- a/crud/select/compat/select.lua +++ b/crud/select/compat/select.lua @@ -2,6 +2,7 @@ local checks = require('checks') local errors = require('errors') local vshard = require('vshard') +local const = require('crud.common.const') local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') local dev_checks = require('crud.common.dev_checks') @@ -49,17 +50,22 @@ local function build_select_iterator(space_name, user_conditions, opts) local space = utils.get_space(space_name, replicasets) if space == nil then - return nil, SelectError:new("Space %q doesn't exist", space_name), true + return nil, SelectError:new("Space %q doesn't exist", space_name), const.NEED_SCHEMA_RELOAD end local space_format = space:format() - local sharding_key_as_index_obj = nil + local sharding_key_data = {} + local sharding_func_hash = nil + local skip_sharding_hash_check = nil + -- We don't need sharding info if bucket_id specified. if opts.bucket_id == nil then - sharding_key_as_index_obj, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name) + sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name) if err ~= nil then return nil, err end + else + skip_sharding_hash_check = true end -- plan select @@ -67,12 +73,12 @@ local function build_select_iterator(space_name, user_conditions, opts) first = opts.first, after_tuple = opts.after, field_names = opts.field_names, - sharding_key_as_index_obj = sharding_key_as_index_obj, + sharding_key_as_index_obj = sharding_key_data.value, bucket_id = opts.bucket_id, }) if err ~= nil then - return nil, SelectError:new("Failed to plan select: %s", err), true + return nil, SelectError:new("Failed to plan select: %s", err), const.NEED_SCHEMA_RELOAD end -- set replicasets to select from @@ -110,20 +116,23 @@ local function build_select_iterator(space_name, user_conditions, opts) local perform_map_reduce = opts.force_map_call == true or (opts.bucket_id == nil and plan.sharding_key == nil) if not perform_map_reduce then - local bucket_id, err = sharding.key_get_bucket_id(space_name, plan.sharding_key, opts.bucket_id) + local bucket_id_data, err = sharding.key_get_bucket_id(space_name, plan.sharding_key, opts.bucket_id) if err ~= nil then return nil, err end - assert(bucket_id ~= nil) + assert(bucket_id_data.bucket_id ~= nil) local err - replicasets_to_select, err = sharding.get_replicasets_by_bucket_id(bucket_id) + replicasets_to_select, err = sharding.get_replicasets_by_bucket_id(bucket_id_data.bucket_id) if err ~= nil then - return nil, err, true + return nil, err, const.NEED_SCHEMA_RELOAD end + + sharding_func_hash = bucket_id_data.sharding_func_hash else stats.update_map_reduces(space_name) + skip_sharding_hash_check = true end local tuples_limit = opts.first @@ -150,6 +159,9 @@ local function build_select_iterator(space_name, user_conditions, opts) limit = batch_size, scan_condition_num = plan.scan_condition_num, field_names = plan.field_names, + sharding_func_hash = sharding_func_hash, + sharding_key_hash = sharding_key_data.hash, + skip_sharding_hash_check = skip_sharding_hash_check, } local merger = Merger.new(replicasets_to_select, space, plan.index_id, @@ -305,7 +317,8 @@ local function select_module_call_xc(space_name, user_conditions, opts) end function select_module.call(space_name, user_conditions, opts) - return SelectError:pcall(select_module_call_xc, space_name, user_conditions, opts) + return SelectError:pcall(sharding.wrap_select_method, + select_module_call_xc, space_name, user_conditions, opts) end return select_module diff --git a/crud/select/compat/select_old.lua b/crud/select/compat/select_old.lua index acb5692b..b5f82b94 100644 --- a/crud/select/compat/select_old.lua +++ b/crud/select/compat/select_old.lua @@ -4,6 +4,7 @@ local vshard = require('vshard') local fun = require('fun') local call = require('crud.common.call') +local const = require('crud.common.const') local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') local dev_checks = require('crud.common.dev_checks') @@ -28,6 +29,7 @@ local function select_iteration(space_name, plan, opts) replicasets = 'table', limit = 'number', call_opts = 'table', + sharding_hash = 'table', }) local call_opts = opts.call_opts @@ -40,6 +42,9 @@ local function select_iteration(space_name, plan, opts) limit = opts.limit, scan_condition_num = plan.scan_condition_num, field_names = plan.field_names, + sharding_func_hash = opts.sharding_hash.sharding_func_hash, + sharding_key_hash = opts.sharding_hash.sharding_key_hash, + skip_sharding_hash_check = opts.sharding_hash.skip_sharding_hash_check, } local storage_select_args = { @@ -109,17 +114,23 @@ local function build_select_iterator(space_name, user_conditions, opts) local space = utils.get_space(space_name, replicasets) if space == nil then - return nil, SelectError:new("Space %q doesn't exist", space_name), true + return nil, SelectError:new("Space %q doesn't exist", space_name), const.NEED_SCHEMA_RELOAD end local space_format = space:format() + local sharding_hash = {} local sharding_key_as_index_obj = nil -- We don't need sharding info if bucket_id specified. if opts.bucket_id == nil then - sharding_key_as_index_obj, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name) + local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name) if err ~= nil then return nil, err end + + sharding_key_as_index_obj = sharding_key_data.value + sharding_hash.sharding_key_hash = sharding_key_data.hash + else + sharding_hash.skip_sharding_hash_check = true end -- plan select @@ -133,7 +144,7 @@ local function build_select_iterator(space_name, user_conditions, opts) }) if err ~= nil then - return nil, SelectError:new("Failed to plan select: %s", err), true + return nil, SelectError:new("Failed to plan select: %s", err), const.NEED_SCHEMA_RELOAD end -- set replicasets to select from @@ -144,20 +155,23 @@ local function build_select_iterator(space_name, user_conditions, opts) local perform_map_reduce = opts.force_map_call == true or (opts.bucket_id == nil and plan.sharding_key == nil) if not perform_map_reduce then - local bucket_id, err = sharding.key_get_bucket_id(space_name, plan.sharding_key, opts.bucket_id) + local bucket_id_data, err = sharding.key_get_bucket_id(space_name, plan.sharding_key, opts.bucket_id) if err ~= nil then return nil, err end - assert(bucket_id ~= nil) + assert(bucket_id_data.bucket_id ~= nil) local err - replicasets_to_select, err = sharding.get_replicasets_by_bucket_id(bucket_id) + replicasets_to_select, err = sharding.get_replicasets_by_bucket_id(bucket_id_data.bucket_id) if err ~= nil then - return nil, err, true + return nil, err, const.NEED_SCHEMA_RELOAD end + + sharding_hash.sharding_func_hash = bucket_id_data.sharding_func_hash else stats.update_map_reduces(space_name) + sharding_hash.skip_sharding_hash_check = true end -- generate tuples comparator @@ -196,6 +210,7 @@ local function build_select_iterator(space_name, user_conditions, opts) replicasets = replicasets_to_select, call_opts = opts.call_opts, + sharding_hash = sharding_hash, }) return iter @@ -254,6 +269,10 @@ function select_module.pairs(space_name, user_conditions, opts) local gen = function(_, iter) local tuple, err = iter:get() if err ~= nil then + if sharding.result_needs_sharding_reload(err) then + sharding_metadata_module.reload_sharding_cache(space_name) + end + error(string.format("Failed to get next object: %s", err)) end @@ -281,19 +300,8 @@ function select_module.pairs(space_name, user_conditions, opts) return gen, param, state end -function select_module.call(space_name, user_conditions, opts) - checks('string', '?table', { - after = '?table', - first = '?number', - timeout = '?number', - batch_size = '?number', - bucket_id = '?number|cdata', - force_map_call = '?boolean', - fields = '?table', - prefer_replica = '?boolean', - balance = '?boolean', - mode = '?vshard_call_mode', - }) +local function select_module_call_xc(space_name, user_conditions, opts) + dev_checks('string', '?table', '?table') opts = opts or {} @@ -330,6 +338,10 @@ function select_module.call(space_name, user_conditions, opts) while iter:has_next() do local tuple, err = iter:get() if err ~= nil then + if sharding.result_needs_sharding_reload(err) then + return nil, err, const.NEED_SHARDING_RELOAD + end + return nil, SelectError:new("Failed to get next object: %s", err) end @@ -350,4 +362,21 @@ function select_module.call(space_name, user_conditions, opts) } end +function select_module.call(space_name, user_conditions, opts) + checks('string', '?table', { + after = '?table', + first = '?number', + timeout = '?number', + batch_size = '?number', + bucket_id = '?number|cdata', + force_map_call = '?boolean', + fields = '?table', + prefer_replica = '?boolean', + balance = '?boolean', + mode = '?vshard_call_mode', + }) + + return sharding.wrap_method(select_module_call_xc, space_name, user_conditions, opts) +end + return select_module diff --git a/crud/select/iterator.lua b/crud/select/iterator.lua index df897e47..285df01c 100644 --- a/crud/select/iterator.lua +++ b/crud/select/iterator.lua @@ -2,6 +2,7 @@ local errors = require('errors') local fiber = require('fiber') local dev_checks = require('crud.common.dev_checks') +local sharding = require('crud.common.sharding') local utils = require('crud.common.utils') local UpdateTuplesError = errors.new_class('UpdateTuplesError') @@ -26,6 +27,7 @@ function Iterator.new(opts) replicasets = 'table', call_opts = 'table', + sharding_hash = 'table', }) local iter = { @@ -53,6 +55,8 @@ function Iterator.new(opts) update_tuples_channel = fiber.channel(1), wait_for_update = false, + + sharding_hash = opts.sharding_hash, } setmetatable(iter, Iterator) @@ -102,8 +106,13 @@ local function update_replicasets_tuples(iter, after_tuple, replicaset_uuid) limit = limit_per_storage_call, field_names = iter.field_names, call_opts = iter.call_opts, + sharding_hash = iter.sharding_hash, }) if err ~= nil then + if sharding.result_needs_sharding_reload(err) then + return false, err + end + return false, UpdateTuplesError:new('Failed to select tuples from storages: %s', err) end @@ -161,6 +170,10 @@ function Iterator:get() end if not res.ok then + if sharding.result_needs_sharding_reload(res.err) then + return false, res.err + end + return nil, GetTupleError:new("Failed to get tuples from storages: %s", res.err) end end diff --git a/crud/select/merger.lua b/crud/select/merger.lua index 5b46d622..d445f116 100644 --- a/crud/select/merger.lua +++ b/crud/select/merger.lua @@ -3,6 +3,8 @@ local errors = require('errors') local msgpack = require('msgpack') local ffi = require('ffi') local call = require('crud.common.call') +local sharding = require('crud.common.sharding') +local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') local compat = require('crud.common.compat') local merger_lib = compat.require('tuple.merger', 'merger') @@ -118,7 +120,13 @@ local function fetch_chunk(context, state) local cursor, err = decode_metainfo(buf) if cursor == nil then -- Wrap net.box errors error to restore metatable. - error(errors.wrap(err)) + local wrapped_err = errors.wrap(err) + + if sharding.result_needs_sharding_reload(err) then + sharding_metadata_module.reload_sharding_cache(space_name) + end + + error(wrapped_err) end -- Extract stats info. diff --git a/crud/update.lua b/crud/update.lua index f78e0035..cea9fec9 100644 --- a/crud/update.lua +++ b/crud/update.lua @@ -3,6 +3,7 @@ local errors = require('errors') local vshard = require('vshard') local call = require('crud.common.call') +local const = require('crud.common.const') local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') local sharding_key_module = require('crud.common.sharding.sharding_key') @@ -16,14 +17,29 @@ local update = {} local UPDATE_FUNC_NAME = '_crud.update_on_storage' -local function update_on_storage(space_name, key, operations, field_names) - dev_checks('string', '?', 'table', '?table') +local function update_on_storage(space_name, key, operations, field_names, opts) + dev_checks('string', '?', 'table', '?table', { + sharding_key_hash = '?number', + sharding_func_hash = '?number', + skip_sharding_hash_check = '?boolean', + }) + + opts = opts or {} local space = box.space[space_name] if space == nil then return nil, UpdateError:new("Space %q doesn't exist", space_name) end + local _, err = sharding.check_sharding_hash(space_name, + opts.sharding_func_hash, + opts.sharding_key_hash, + opts.skip_sharding_hash_check) + + if err ~= nil then + return nil, err + end + -- add_space_schema_hash is false because -- reloading space format on router can't avoid update error on storage local res, err = schema.wrap_box_space_func_result(space, 'update', {key, operations}, { @@ -72,11 +88,11 @@ local function call_update_on_router(space_name, key, user_operations, opts) local space, err = utils.get_space(space_name, vshard.router.routeall()) if err ~= nil then - return nil, UpdateError:new("Failed to get space %q: %s", space_name, err), true + return nil, UpdateError:new("Failed to get space %q: %s", space_name, err), const.NEED_SCHEMA_RELOAD end if space == nil then - return nil, UpdateError:new("Space %q doesn't exist", space_name), true + return nil, UpdateError:new("Space %q doesn't exist", space_name), const.NEED_SCHEMA_RELOAD end local space_format = space:format() @@ -86,47 +102,67 @@ local function call_update_on_router(space_name, key, user_operations, opts) end local sharding_key = key + local sharding_key_hash = nil + local skip_sharding_hash_check = nil + if opts.bucket_id == nil then local primary_index_parts = space.index[0].parts - local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name) + local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name) if err ~= nil then return nil, err end sharding_key, err = sharding_key_module.extract_from_pk(space_name, - sharding_key_as_index_obj, + sharding_key_data.value, primary_index_parts, key) if err ~= nil then return nil, err end + + sharding_key_hash = sharding_key_data.hash + else + skip_sharding_hash_check = true end local operations = user_operations if not utils.tarantool_supports_fieldpaths() then operations, err = utils.convert_operations(user_operations, space_format) if err ~= nil then - return nil, UpdateError:new("Wrong operations are specified: %s", err), true + return nil, UpdateError:new("Wrong operations are specified: %s", err), const.NEED_SCHEMA_RELOAD end end - local bucket_id, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id) + local bucket_id_data, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id) if err ~= nil then return nil, err end + local update_on_storage_opts = { + sharding_func_hash = bucket_id_data.sharding_func_hash, + sharding_key_hash = sharding_key_hash, + skip_sharding_hash_check = skip_sharding_hash_check, + } + local call_opts = { mode = 'write', timeout = opts.timeout, } + local storage_result, err = call.single( - bucket_id, UPDATE_FUNC_NAME, - {space_name, key, operations, opts.fields}, + bucket_id_data.bucket_id, UPDATE_FUNC_NAME, + {space_name, key, operations, opts.fields, update_on_storage_opts}, call_opts ) if err ~= nil then - return nil, UpdateError:new("Failed to call update on storage-side: %s", err) + local err_wrapped = UpdateError:new("Failed to call update on storage-side: %s", err) + + if sharding.result_needs_sharding_reload(err) then + return nil, err_wrapped, const.NEED_SHARDING_RELOAD + end + + return nil, err_wrapped end if storage_result.err ~= nil then @@ -170,7 +206,8 @@ function update.call(space_name, key, user_operations, opts) fields = '?table', }) - return schema.wrap_func_reload(call_update_on_router, space_name, key, user_operations, opts) + return schema.wrap_func_reload(sharding.wrap_method, + call_update_on_router, space_name, key, user_operations, opts) end return update diff --git a/crud/upsert.lua b/crud/upsert.lua index 2ce3add9..bf75bad6 100644 --- a/crud/upsert.lua +++ b/crud/upsert.lua @@ -3,6 +3,7 @@ local errors = require('errors') local vshard = require('vshard') local call = require('crud.common.call') +local const = require('crud.common.const') local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') local dev_checks = require('crud.common.dev_checks') @@ -17,6 +18,8 @@ local UPSERT_FUNC_NAME = '_crud.upsert_on_storage' local function upsert_on_storage(space_name, tuple, operations, opts) dev_checks('string', 'table', 'table', { add_space_schema_hash = '?boolean', + sharding_key_hash = '?number', + sharding_func_hash = '?number', }) opts = opts or {} @@ -26,6 +29,15 @@ local function upsert_on_storage(space_name, tuple, operations, opts) return nil, UpsertError:new("Space %q doesn't exist", space_name) end + local _, err = sharding.check_sharding_hash(space_name, + opts.sharding_func_hash, + opts.sharding_key_hash, + opts.skip_sharding_hash_check) + + if err ~= nil then + return nil, err + end + -- add_space_schema_hash is true only in case of upsert_object -- the only one case when reloading schema can avoid insert error -- is flattening object on router @@ -41,7 +53,7 @@ end -- returns result, err, need_reload -- need_reload indicates if reloading schema could help -- see crud.common.schema.wrap_func_reload() -local function call_upsert_on_router(space_name, tuple, user_operations, opts) +local function call_upsert_on_router(space_name, original_tuple, user_operations, opts) dev_checks('string', '?', 'table', { timeout = '?number', bucket_id = '?number|cdata', @@ -53,11 +65,11 @@ local function call_upsert_on_router(space_name, tuple, user_operations, opts) local space, err = utils.get_space(space_name, vshard.router.routeall()) if err ~= nil then - return nil, UpsertError:new("Failed to get space %q: %s", space_name, err), true + return nil, UpsertError:new("Failed to get space %q: %s", space_name, err), const.NEED_SCHEMA_RELOAD end if space == nil then - return nil, UpsertError:new("Space %q doesn't exist", space_name), true + return nil, UpsertError:new("Space %q doesn't exist", space_name), const.NEED_SCHEMA_RELOAD end local space_format = space:format() @@ -65,32 +77,53 @@ local function call_upsert_on_router(space_name, tuple, user_operations, opts) if not utils.tarantool_supports_fieldpaths() then operations, err = utils.convert_operations(user_operations, space_format) if err ~= nil then - return nil, UpsertError:new("Wrong operations are specified: %s", err), true + return nil, UpsertError:new("Wrong operations are specified: %s", err), const.NEED_SCHEMA_RELOAD end end - local bucket_id, err = sharding.tuple_set_and_return_bucket_id(tuple, space, opts.bucket_id) + local tuple = table.deepcopy(original_tuple) + + local sharding_data, err = sharding.tuple_set_and_return_bucket_id(tuple, space, opts.bucket_id) if err ~= nil then - return nil, UpsertError:new("Failed to get bucket ID: %s", err), true + return nil, UpsertError:new("Failed to get bucket ID: %s", err), const.NEED_SCHEMA_RELOAD end + local upsert_on_storage_opts = { + add_space_schema_hash = opts.add_space_schema_hash, + fields = opts.fields, + sharding_func_hash = sharding_data.sharding_func_hash, + sharding_key_hash = sharding_data.sharding_key_hash, + } + local call_opts = { mode = 'write', timeout = opts.timeout, } + local storage_result, err = call.single( - bucket_id, UPSERT_FUNC_NAME, - {space_name, tuple, operations}, + sharding_data.bucket_id, UPSERT_FUNC_NAME, + {space_name, tuple, operations, upsert_on_storage_opts}, call_opts ) if err ~= nil then - return nil, UpsertError:new("Failed to call upsert on storage-side: %s", err) + local err_wrapped = UpsertError:new("Failed to call upsert on storage-side: %s", err) + + if sharding.result_needs_sharding_reload(err) then + return nil, err_wrapped, const.NEED_SHARDING_RELOAD + end + + return nil, err_wrapped end if storage_result.err ~= nil then - local need_reload = schema.result_needs_reload(space, storage_result) - return nil, UpsertError:new("Failed to upsert: %s", storage_result.err), need_reload + local err_wrapped = UpsertError:new("Failed to upsert: %s", storage_result.err) + + if schema.result_needs_reload(space, storage_result) then + return nil, err_wrapped, const.NEED_SCHEMA_RELOAD + end + + return nil, err_wrapped end -- upsert always returns nil @@ -130,7 +163,8 @@ function upsert.tuple(space_name, tuple, user_operations, opts) fields = '?table', }) - return schema.wrap_func_reload(call_upsert_on_router, space_name, tuple, user_operations, opts) + return schema.wrap_func_reload(sharding.wrap_method, + call_upsert_on_router, space_name, tuple, user_operations, opts) end --- Update or insert an object in the specified space diff --git a/deps.sh b/deps.sh index 57c219de..96a7d330 100755 --- a/deps.sh +++ b/deps.sh @@ -26,7 +26,7 @@ tarantoolctl rocks install "${LUACOV_COVERALLS_ROCKSPEC_FILE}" rm "${LUACOV_COVERALLS_ROCKSPEC_FILE}" rmdir "${TMPDIR}" -tarantoolctl rocks install cartridge 2.7.3 +tarantoolctl rocks install cartridge 2.7.4 tarantoolctl rocks install ddl 1.6.0 tarantoolctl rocks make diff --git a/test/entrypoint/srv_ddl_reload.lua b/test/entrypoint/srv_ddl_reload.lua new file mode 100755 index 00000000..dece80c8 --- /dev/null +++ b/test/entrypoint/srv_ddl_reload.lua @@ -0,0 +1,218 @@ +#!/usr/bin/env tarantool + +require('strict').on() +_G.is_initialized = function() return false end + +local log = require('log') +local errors = require('errors') +local cartridge = require('cartridge') +local ddl = require('ddl') + +package.preload['customers-storage'] = function() + local customers_module = { + sharding_func_default = function(key) + local id = key[1] + assert(id ~= nil) + + return id % 3000 + 1 + end, + sharding_func_new = function(key) + local id = key[1] + assert(id ~= nil) + + return (id + 42) % 3000 + 1 + end, + } + rawset(_G, 'customers_module', customers_module) + + return { + role_name = 'customers-storage', + init = function() + local engine = os.getenv('ENGINE') or 'memtx' + + local customers_schema_raw = { + engine = engine, + temporary = false, + is_local = false, + format = { + {name = 'id', is_nullable = false, type = 'unsigned'}, + {name = 'bucket_id', is_nullable = false, type = 'unsigned'}, + {name = 'name', is_nullable = false, type = 'string'}, + {name = 'age', is_nullable = false, type = 'number'}, + }, + indexes = { + { + name = 'id', + type = 'TREE', + unique = true, + parts = { + {path = 'id', is_nullable = false, type = 'unsigned'}, + }, + }, + { + name = 'bucket_id', + type = 'TREE', + unique = false, + parts = { + {path = 'bucket_id', is_nullable = false, type = 'unsigned'}, + }, + }, + { + name = 'name', + type = 'TREE', + unique = false, + parts = { + {path = 'name', is_nullable = false, type = 'string'}, + }, + }, + { + name = 'age', + type = 'TREE', + unique = false, + parts = { + {path = 'age', is_nullable = false, type = 'number'}, + }, + }, + }, + sharding_key = { 'name' }, + } + + local customers_schema = table.deepcopy(customers_schema_raw) + customers_schema.sharding_key = { 'name' } + + local customers_pk_schema = table.deepcopy(customers_schema_raw) + customers_pk_schema.sharding_key = { 'id' } + customers_pk_schema.sharding_func = 'customers_module.sharding_func_default' + + local schema = { + spaces = { + ['customers'] = customers_schema, + ['customers_pk'] = customers_pk_schema, + } + } + + + rawset(_G, 'reset_to_default_schema', function() + if box.info.ro == true then + return + end + + if box.space['_ddl_sharding_key'] ~= nil then + box.space['_ddl_sharding_key']:truncate() + box.space['_ddl_sharding_key']:insert{'customers', customers_schema.sharding_key} + box.space['_ddl_sharding_key']:insert{'customers_pk', customers_pk_schema.sharding_key} + end + + if box.space['_ddl_sharding_func'] ~= nil then + box.space['_ddl_sharding_func']:truncate() + box.space['_ddl_sharding_func']:insert{'customers_pk', customers_pk_schema.sharding_func, box.NULL} + end + + local _, err = ddl.set_schema(schema) + if err ~= nil then + error(err) + end + end) + + rawset(_G, 'set_sharding_key', function(space_name, sharding_key_def) + if box.info.ro == true then + return + end + + local current_schema, err = ddl.get_schema() + if err ~= nil then + error(err) + end + + box.space['_ddl_sharding_key']:replace{space_name, sharding_key_def} + current_schema.spaces[space_name].sharding_key = sharding_key_def + + local _, err = ddl.set_schema(current_schema) + if err ~= nil then + error(err) + end + end) + + rawset(_G, 'set_sharding_func_name', function(space_name, sharding_func_name) + if box.info.ro == true then + return + end + + local current_schema, err = ddl.get_schema() + if err ~= nil then + error(err) + end + + local t = {space_name, sharding_func_name, box.NULL} + box.space['_ddl_sharding_func']:replace(t) + current_schema.spaces[space_name].sharding_func = sharding_func_name + + local _, err = ddl.set_schema(current_schema) + if err ~= nil then + error(err) + end + end) + + rawset(_G, 'set_sharding_func_body', function(space_name, sharding_func_body) + if box.info.ro == true then + return + end + + local current_schema, err = ddl.get_schema() + if err ~= nil then + error(err) + end + + local t = {space_name, box.NULL, sharding_func_body} + box.space['_ddl_sharding_func']:replace(t) + current_schema.spaces[space_name].sharding_func = { body = sharding_func_body } + + local _, err = ddl.set_schema(current_schema) + if err ~= nil then + error(err) + end + end) + + rawset(_G, 'create_new_space', function() + if box.info.ro == true then + return + end + + local new_schema = table.deepcopy(schema) + new_schema.spaces['customers_new'] = table.deepcopy(customers_schema_raw) + new_schema.spaces['customers_new'].sharding_func = { + body = [[ + function(key) + local vshard = require('vshard') + return vshard.router.bucket_id_mpcrc32(key) + end + ]] + } + + local _, err = ddl.set_schema(new_schema) + if err ~= nil then + error(err) + end + end) + end, + } +end + +local ok, err = errors.pcall('CartridgeCfgError', cartridge.cfg, { + advertise_uri = 'localhost:3301', + http_port = 8081, + bucket_count = 3000, + roles = { + 'customers-storage', + 'cartridge.roles.crud-router', + 'cartridge.roles.crud-storage', + }, + roles_reload_allowed = true, +}) + +if not ok then + log.error('%s', err) + os.exit(1) +end + +_G.is_initialized = cartridge.is_healthy diff --git a/test/helper.lua b/test/helper.lua index ab1612fe..eb53e52e 100644 --- a/test/helper.lua +++ b/test/helper.lua @@ -334,7 +334,7 @@ end function helpers.get_sharding_key_cache(cluster) return cluster.main_server.net_box:eval([[ - local sharding_metadata_cache = require('crud.common.sharding.sharding_metadata_cache') + local sharding_metadata_cache = require('crud.common.sharding.router_metadata_cache') return sharding_metadata_cache[sharding_metadata_cache.SHARDING_KEY_MAP_NAME] ]]) @@ -362,7 +362,7 @@ end -- but not the cache itself function helpers.get_sharding_func_cache_size(cluster) return cluster.main_server.net_box:eval([[ - local sharding_metadata_cache = require('crud.common.sharding.sharding_metadata_cache') + local sharding_metadata_cache = require('crud.common.sharding.router_metadata_cache') local cache, err = sharding_metadata_cache[sharding_metadata_cache.SHARDING_FUNC_MAP_NAME] if cache == nil then @@ -480,4 +480,12 @@ function helpers.disable_dev_checks() os.setenv('DEV', 'OFF') end +function helpers.count_on_replace_triggers(server, space_name) + return server:eval([[ + local space = box.space[...] + assert(space ~= nil) + return #space:on_replace() + ]], {space_name}) +end + return helpers diff --git a/test/integration/ddl_sharding_info_reload_test.lua b/test/integration/ddl_sharding_info_reload_test.lua new file mode 100644 index 00000000..fe307779 --- /dev/null +++ b/test/integration/ddl_sharding_info_reload_test.lua @@ -0,0 +1,676 @@ +local fio = require('fio') +local t = require('luatest') + +local sharding_utils = require('crud.common.sharding.utils') + +local helpers = require('test.helper') + +local ok = pcall(require, 'ddl') +if not ok then + t.skip('Lua module ddl is required to run test') +end + +local pgroup_storage = t.group('ddl_storage_sharding_info', { + {engine = 'memtx'}, + {engine = 'vinyl'}, +}) + +local pgroup_new_space = t.group('ddl_sharding_info_on_new_space', { + {engine = 'memtx'}, + {engine = 'vinyl'}, +}) + +local pgroup_key_change = t.group('ddl_sharding_key_reload_after_schema_change', { + {engine = 'memtx'}, + {engine = 'vinyl'}, +}) + +local pgroup_func_change = t.group('ddl_sharding_func_reload_after_schema_change', { + {engine = 'memtx'}, + {engine = 'vinyl'}, +}) + +local function start_cluster(g) + g.cluster = helpers.Cluster:new({ + datadir = fio.tempdir(), + server_command = helpers.entrypoint('srv_ddl_reload'), + use_vshard = true, + replicasets = helpers.get_test_replicasets(), + env = { + ['ENGINE'] = g.params.engine, + }, + }) + g.cluster:start() +end + +local function stop_cluster(g) + helpers.stop_cluster(g.cluster) +end + +pgroup_storage.before_all(start_cluster) +pgroup_new_space.before_all(start_cluster) +pgroup_key_change.before_all(start_cluster) +pgroup_func_change.before_all(start_cluster) + +pgroup_storage.after_all(stop_cluster) +pgroup_new_space.after_all(stop_cluster) +pgroup_key_change.after_all(stop_cluster) +pgroup_func_change.after_all(stop_cluster) + +pgroup_storage.before_each(function(g) + helpers.call_on_storages(g.cluster, function(server) + server.net_box:call('reset_to_default_schema') + end) +end) + +pgroup_new_space.before_each(function(g) + helpers.drop_space_on_cluster(g.cluster, 'customers') + helpers.drop_space_on_cluster(g.cluster, 'customers_new') + + helpers.call_on_storages(g.cluster, function(server) + server.net_box:call('reset_to_default_schema') + end) + + -- Fetch metadata schema. + local obj, err = g.cluster.main_server.net_box:call( + 'crud.insert', {'customers', {0, box.NULL, 'Emma', 22}} + ) + + t.assert_is_not(obj, nil) + t.assert_equals(err, nil) + + -- Assert space doesn't exist. + local obj, err = g.cluster.main_server.net_box:call( + 'crud.insert', {'customers_new', {1, box.NULL, 'Emma', 22}} + ) + + t.assert_equals(obj, nil) + t.assert_is_not(err, nil) + t.assert_str_contains(err.err, "Space \\-\"customers_new\\-\" doesn't exist", true) +end) + +pgroup_key_change.before_each(function(g) + -- Clean up. + helpers.truncate_space_on_cluster(g.cluster, 'customers') + + helpers.call_on_storages(g.cluster, function(server) + server.net_box:call('reset_to_default_schema') + end) + + -- Assert schema is default: insert is sharded with default ddl info. + local obj, err = g.cluster.main_server.net_box:call( + 'crud.insert', {'customers', {0, box.NULL, 'Emma', 22}} + ) + t.assert_is_not(obj, nil) + t.assert_equals(err, nil) + + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(0) + t.assert_equals(result, {0, 2861, 'Emma', 22}) + + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(0) + t.assert_equals(result, nil) +end) + +pgroup_func_change.before_each(function(g) + -- Clean up. + helpers.truncate_space_on_cluster(g.cluster, 'customers_pk') + + helpers.call_on_storages(g.cluster, function(server) + server.net_box:call('reset_to_default_schema') + end) + + -- Assert schema is default: insert is sharded with default ddl info. + local obj, err = g.cluster.main_server.net_box:call( + 'crud.insert', {'customers_pk', {0, box.NULL, 'Emma', 22}} + ) + t.assert_is_not(obj, nil) + t.assert_equals(err, nil) + + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers_pk']:get(0) + t.assert_equals(result, nil) + + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers_pk']:get(0) + t.assert_equals(result, {0, 1, 'Emma', 22}) +end) + + +-- Test storage sharding metainfo. + +local function get_hash(storage, func_name, space_name) + return storage:eval([[ + local func_name, space_name = ... + local storage_cache = require('crud.common.sharding.storage_metadata_cache') + return storage_cache[func_name](space_name) + ]], {func_name, space_name}) +end + +local sharding_cases = { + sharding_func_hash = { + eval_func = 'get_sharding_func_hash', + ddl_space = '_ddl_sharding_func', + test_space = 'customers_pk', + test_case = 'test_sharding_func_hash_is_updated_when_ddl_is_updated', + }, + sharding_key_hash = { + eval_func = 'get_sharding_key_hash', + ddl_space = '_ddl_sharding_key', + test_space = 'customers', + test_case = 'test_sharding_key_hash_is_updated_when_ddl_is_updated', + }, +} + +pgroup_storage.test_sharding_key_hash_is_updated_when_ddl_is_updated = function(g) + local storage = g.cluster:server('s1-master') + local space = sharding_cases.sharding_func_hash.test_space + + -- Set up sharding key (equal to default one). + local sharding_key_v1 = {'name'} + local _, err = storage:call('set_sharding_key', {space, sharding_key_v1}) + t.assert_equals(err, nil) + + local hash, err = get_hash(storage, 'get_sharding_key_hash', space) + + t.assert_equals(err, nil) + t.assert_equals(hash, sharding_utils.compute_hash(sharding_key_v1)) + + -- Change sharding key value. + local sharding_key_v2 = {'age'} + local _, err = storage:call('set_sharding_key', {space, sharding_key_v2}) + t.assert_equals(err, nil) + + local hash, err = get_hash(storage, 'get_sharding_key_hash', space) + + t.assert_equals(err, nil) + t.assert_equals(hash, sharding_utils.compute_hash(sharding_key_v2)) +end + +pgroup_storage.test_sharding_func_hash_is_updated_when_ddl_is_updated = function(g) + local storage = g.cluster:server('s1-master') + local space = sharding_cases.sharding_key_hash.test_space + + -- Set up sharding func (equal to default one). + local sharding_func_name = 'customers_module.sharding_func_default' + local _, err = storage:call('set_sharding_func_name', {space, sharding_func_name}) + t.assert_equals(err, nil) + + local hash, err = get_hash(storage, 'get_sharding_func_hash', space) + + t.assert_equals(err, nil) + t.assert_equals(hash, sharding_utils.compute_hash(sharding_func_name)) + + -- Change sharding func type and value. + local sharding_func_body = 'function() return 1 end' + local _, err = storage:call('set_sharding_func_body', {space, sharding_func_body}) + t.assert_equals(err, nil) + + local hash, err = get_hash(storage, 'get_sharding_func_hash', space) + + t.assert_equals(err, nil) + t.assert_equals(hash, sharding_utils.compute_hash({body = sharding_func_body})) +end + + +-- Test storage hash metadata mechanisms are ok after code reload. + +local reload_cases = { + module_reload = 'reload_package', + roles_reload = 'reload_roles' +} + +for sharding_case_name, sharding_case in pairs(sharding_cases) do + for reload_case_name, reload_case in pairs(reload_cases) do + + -- Test code reload do not break trigger logic. + + local test_name = ('test_%s_do_not_break_%s_update'):format( + reload_case_name, sharding_case_name) + + pgroup_storage[test_name] = function(g) + local storage = g.cluster:server('s1-master') + + -- Init the cache. + local _, err = get_hash(storage, sharding_case.eval_func, + sharding_case.test_space) + t.assert_equals(err, nil) + + -- Reload the code. + helpers[reload_case](storage) + + -- Execute test case from above to check that logic wasn't broken. + g[sharding_case.test_case](g) + end + end +end + +for _, sharding_case in pairs(sharding_cases) do + for reload_case_name, reload_case in pairs(reload_cases) do + + -- Test code reload cleans up redundant triggers. + + local test_name = ('test_redundant_%s_triggers_cleaned_up_on_%s'):format( + sharding_case.ddl_space, reload_case_name) + + pgroup_storage[test_name] = function(g) + local storage = g.cluster:server('s1-master') + + -- Init the cache. + local _, err = get_hash(storage, sharding_case.eval_func, + sharding_case.test_space) + t.assert_equals(err, nil) + + local before_count = helpers.count_on_replace_triggers(storage, + sharding_case.ddl_space) + + -- Reload the code. + helpers[reload_case](storage) + + -- Reinit the cache. + local _, err = get_hash(storage, sharding_case.eval_func, + sharding_case.test_space) + t.assert_equals(err, nil) + + local after_count = helpers.count_on_replace_triggers(storage, + sharding_case.ddl_space) + t.assert_equals(after_count, before_count) + end + end +end + + +-- Test metainfo is updated on router if new space added to ddl. + +local test_tuple = {1, box.NULL, 'Emma', 22} +local test_object = { id = 1, bucket_id = box.NULL, name = 'Emma', age = 22 } + +-- Sharded by "name" and computed with custom sharding function. +local test_customers_new_result = { + s1 = {1, 2861, 'Emma', 22}, + s2 = nil, +} + +local new_space_cases = { + insert = { + func = 'crud.insert', + input = {'customers_new', test_tuple}, + result = test_customers_new_result, + }, + insert_object = { + func = 'crud.insert_object', + input = {'customers_new', test_object}, + result = test_customers_new_result, + }, + replace = { + func = 'crud.replace', + input = {'customers_new', test_tuple}, + result = test_customers_new_result, + }, + replace_object = { + func = 'crud.replace_object', + input = {'customers_new', test_object}, + result = test_customers_new_result, + }, + upsert = { + func = 'crud.upsert', + input = {'customers_new', test_tuple, {}}, + result = test_customers_new_result, + }, + upsert_object = { + func = 'crud.upsert_object', + input = {'customers_new', test_object, {}}, + result = test_customers_new_result, + }, +} + +for name, case in pairs(new_space_cases) do + local test_name = ('test_%s'):format(name) + + pgroup_new_space[test_name] = function(g) + -- Create space 'customers_new', sharded by 'name'. + helpers.call_on_storages(g.cluster, function(server) + server.net_box:call('create_new_space') + end) + + -- Assert it is now possible to call opertions for a new space. + local obj, err = g.cluster.main_server.net_box:call(case.func, case.input) + t.assert_is_not(obj, nil) + t.assert_equals(err, nil) + + -- Assert it is sharded based on updated ddl info. + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers_new']:get(1) + t.assert_equals(result, case.result.s1) + + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers_new']:get(1) + t.assert_equals(result, case.result.s2) + end +end + + +-- Test using outdated sharding key info returns error. + +-- Sharded by "age". +local test_customers_age_tuple = {1, 655, 'Emma', 22} +local test_customers_age_result = { + s1 = nil, + s2 = test_customers_age_tuple, +} + +local function setup_customers_migrated_data(g) + if test_customers_age_result.s1 ~= nil then + local conn_s1 = g.cluster:server('s1-master').net_box + conn_s1.space['customers']:insert(test_customers_age_result.s1) + end + if test_customers_age_result.s2 ~= nil then + local conn_s2 = g.cluster:server('s2-master').net_box + conn_s2.space['customers']:insert(test_customers_age_result.s2) + end +end + +local schema_change_sharding_key_cases = { + insert = { + func = 'crud.insert', + input = {'customers', test_tuple}, + result = test_customers_age_result, + }, + insert_object = { + func = 'crud.insert_object', + input = {'customers', test_object}, + result = test_customers_age_result, + }, + replace = { + func = 'crud.replace', + input = {'customers', test_tuple}, + result = test_customers_age_result, + }, + replace_object = { + func = 'crud.replace_object', + input = {'customers', test_object}, + result = test_customers_age_result, + }, + upsert = { + func = 'crud.upsert', + input = {'customers', test_tuple, {}}, + result = test_customers_age_result, + }, + upsert_object = { + func = 'crud.upsert_object', + input = {'customers', test_object, {}}, + result = test_customers_age_result, + }, +} + +for name, case in pairs(schema_change_sharding_key_cases) do + local test_name = ('test_%s'):format(name) + + pgroup_key_change[test_name] = function(g) + -- Change schema to shard 'customers' by 'age'. + helpers.call_on_storages(g.cluster, function(server) + server.net_box:call('set_sharding_key', {'customers', {'age'}}) + end) + + -- Assert operation bucket_id is computed based on updated ddl info. + local obj, err = g.cluster.main_server.net_box:call(case.func, case.input) + t.assert_is_not(obj, nil) + t.assert_equals(err, nil) + + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers']:get(1) + t.assert_equals(result, case.result.s1) + + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers']:get(1) + t.assert_equals(result, case.result.s2) + end +end + +pgroup_key_change.before_test('test_select', setup_customers_migrated_data) + +pgroup_key_change.test_select = function(g) + -- Change schema to shard 'customers' by 'age'. + helpers.call_on_storages(g.cluster, function(server) + server.net_box:call('set_sharding_key', {'customers', {'age'}}) + end) + + -- Assert operation bucket_id is computed based on updated ddl info. + local obj, err = g.cluster.main_server.net_box:call( + 'crud.select', + { + 'customers', + {{'==', 'id', 1}, {'==', 'name', 'Emma'}, {'==', 'age', 22}}, + }) + t.assert_equals(err, nil) + t.assert_equals(obj.rows, {test_customers_age_tuple}) +end + +pgroup_key_change.before_test('test_count', setup_customers_migrated_data) + +pgroup_key_change.test_count = function(g) + -- Change schema to shard 'customers' by 'age'. + helpers.call_on_storages(g.cluster, function(server) + server.net_box:call('set_sharding_key', {'customers', {'age'}}) + end) + + -- Assert operation bucket_id is computed based on updated ddl info. + local obj, err = g.cluster.main_server.net_box:call( + 'crud.count', + { + 'customers', + {{'==', 'id', 1}, {'==', 'name', 'Emma'}, {'==', 'age', 22}}, + }) + t.assert_equals(err, nil) + t.assert_equals(obj, 1) +end + +local pairs_eval = [[ + local res = {} + for _, v in crud.pairs(...) do + table.insert(res, v) + end + return res +]] + +pgroup_key_change.before_test('test_pairs', setup_customers_migrated_data) + +pgroup_key_change.test_pairs = function(g) + -- Change schema to shard 'customers' by 'age'. + helpers.call_on_storages(g.cluster, function(server) + server.net_box:call('set_sharding_key', {'customers', {'age'}}) + end) + + -- First pairs request fails and reloads sharding info. + t.assert_error_msg_contains( + "Please retry your request", + g.cluster.main_server.net_box.eval, + g.cluster.main_server.net_box, + pairs_eval, + { + 'customers', + {{'==', 'id', 1}, {'==', 'name', 'Emma'}, {'==', 'age', 22}} + }) + + -- Assert operation bucket_id is computed based on updated ddl info. + local obj, err = g.cluster.main_server.net_box:eval( + pairs_eval, + { + 'customers', + {{'==', 'id', 1}, {'==', 'name', 'Emma'}, {'==', 'age', 22}}, + }) + t.assert_equals(err, nil) + t.assert_equals(obj, {test_customers_age_tuple}) +end + + +-- Test using outdated sharding func info returns error. + +-- Sharded by 'id' with custom sharding function. +local test_customers_pk_func_tuple = {1, 44, "Emma", 22} +local test_customers_pk_func = { + s1 = nil, + s2 = test_customers_pk_func_tuple, +} + +local function setup_customers_pk_migrated_data(g) + if test_customers_pk_func.s1 ~= nil then + local conn_s1 = g.cluster:server('s1-master').net_box + conn_s1.space['customers_pk']:insert(test_customers_pk_func.s1) + end + if test_customers_pk_func.s2 ~= nil then + local conn_s2 = g.cluster:server('s2-master').net_box + conn_s2.space['customers_pk']:insert(test_customers_pk_func.s2) + end +end + +local schema_change_sharding_func_cases = { + insert = { + func = 'crud.insert', + input = {'customers_pk', test_tuple}, + result = test_customers_pk_func, + }, + insert_object = { + func = 'crud.insert_object', + input = {'customers_pk', test_object}, + result = test_customers_pk_func, + }, + replace = { + func = 'crud.replace', + input = {'customers_pk', test_tuple}, + result = test_customers_pk_func, + }, + replace_object = { + func = 'crud.replace_object', + input = {'customers_pk', test_object}, + result = test_customers_pk_func, + }, + upsert = { + func = 'crud.upsert', + input = {'customers_pk', test_tuple, {}}, + result = test_customers_pk_func, + }, + upsert_object = { + func = 'crud.upsert_object', + input = {'customers_pk', test_object, {}}, + result = test_customers_pk_func, + }, + delete = { + before_test = setup_customers_pk_migrated_data, + func = 'crud.delete', + input = {'customers_pk', 1}, + result = {}, + }, + update = { + before_test = setup_customers_pk_migrated_data, + func = 'crud.update', + input = {'customers_pk', 1, {{'+', 4, 1}}}, + result = { + s1 = nil, + s2 = {1, 44, "Emma", 23}, + }, + }, +} + +for name, case in pairs(schema_change_sharding_func_cases) do + local test_name = ('test_%s'):format(name) + + if case.before_test ~= nil then + pgroup_func_change.before_test(test_name, case.before_test) + end + + pgroup_func_change[test_name] = function(g) + -- Change schema to shard 'customers_pk' with another sharding function. + helpers.call_on_storages(g.cluster, function(server) + server.net_box:call('set_sharding_func_name', + {'customers_pk', 'customers_module.sharding_func_new'}) + end) + + -- Assert operation bucket_id is computed based on updated ddl info. + local obj, err = g.cluster.main_server.net_box:call(case.func, case.input) + t.assert_is_not(obj, nil) + t.assert_equals(err, nil) + + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers_pk']:get(1) + t.assert_equals(result, case.result.s1) + + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers_pk']:get(1) + t.assert_equals(result, case.result.s2) + end +end + +pgroup_func_change.before_test('test_select', setup_customers_pk_migrated_data) + +pgroup_func_change.test_select = function(g) + -- Change schema to shard 'customers_pk' with another sharding function. + helpers.call_on_storages(g.cluster, function(server) + server.net_box:call('set_sharding_func_name', + {'customers_pk', 'customers_module.sharding_func_new'}) + end) + + -- Assert operation bucket_id is computed based on updated ddl info. + local obj, err = g.cluster.main_server.net_box:call( + 'crud.select', + {'customers_pk', {{'==', 'id', 1}}}) + t.assert_equals(err, nil) + t.assert_equals(obj.rows, {test_customers_pk_func_tuple}) +end + +pgroup_func_change.before_test('test_get', setup_customers_pk_migrated_data) + +pgroup_func_change.test_get = function(g) + -- Change schema to shard 'customers_pk' with another sharding function. + helpers.call_on_storages(g.cluster, function(server) + server.net_box:call('set_sharding_func_name', + {'customers_pk', 'customers_module.sharding_func_new'}) + end) + + -- Assert operation bucket_id is computed based on updated ddl info. + local obj, err = g.cluster.main_server.net_box:call('crud.get', {'customers_pk', 1}) + t.assert_equals(err, nil) + t.assert_equals(obj.rows, {test_customers_pk_func_tuple}) +end + +pgroup_func_change.before_test('test_count', setup_customers_pk_migrated_data) + +pgroup_func_change.test_count = function(g) + -- Change schema to shard 'customers_pk' with another sharding function. + helpers.call_on_storages(g.cluster, function(server) + server.net_box:call('set_sharding_func_name', + {'customers_pk', 'customers_module.sharding_func_new'}) + end) + + -- Assert operation bucket_id is computed based on updated ddl info. + local obj, err = g.cluster.main_server.net_box:call( + 'crud.count', + {'customers_pk', {{'==', 'id', 1}}}) + t.assert_equals(err, nil) + t.assert_equals(obj, 1) +end + +pgroup_func_change.before_test('test_pairs', setup_customers_pk_migrated_data) + +pgroup_func_change.test_pairs = function(g) + -- Change schema to shard 'customers_pk' with another sharding function. + helpers.call_on_storages(g.cluster, function(server) + server.net_box:call('set_sharding_func_name', + {'customers_pk', 'customers_module.sharding_func_new'}) + end) + + t.assert_error_msg_contains( + "Please retry your request", + g.cluster.main_server.net_box.eval, + g.cluster.main_server.net_box, + pairs_eval, + {'customers_pk', {{'==', 'id', 1}}}) + + -- Assert operation bucket_id is computed based on updated ddl info. + local obj, err = g.cluster.main_server.net_box:eval( + pairs_eval, + {'customers_pk', {{'==', 'id', 1}}}) + t.assert_equals(err, nil) + t.assert_equals(obj, {test_customers_pk_func_tuple}) +end diff --git a/test/integration/ddl_sharding_key_test.lua b/test/integration/ddl_sharding_key_test.lua index 28b2676f..9b5a0a24 100644 --- a/test/integration/ddl_sharding_key_test.lua +++ b/test/integration/ddl_sharding_key_test.lua @@ -665,36 +665,36 @@ end pgroup.test_update_cache = function(g) local space_name = 'customers_name_key' - local sharding_key_as_index_obj, err = helpers.update_sharding_key_cache(g.cluster, space_name) + local sharding_key_data, err = helpers.update_sharding_key_cache(g.cluster, space_name) t.assert_equals(err, nil) - t.assert_equals(sharding_key_as_index_obj, {parts = {{fieldno = 3}}}) + t.assert_equals(sharding_key_data.value, {parts = {{fieldno = 3}}}) helpers.call_on_servers(g.cluster, {'s1-master', 's2-master'}, function(server) server.net_box:call('set_sharding_key', {space_name, {'age'}}) end) - sharding_key_as_index_obj, err = helpers.update_sharding_key_cache(g.cluster, space_name) + sharding_key_data, err = helpers.update_sharding_key_cache(g.cluster, space_name) t.assert_equals(err, nil) - t.assert_equals(sharding_key_as_index_obj, {parts = {{fieldno = 4}}}) + t.assert_equals(sharding_key_data.value, {parts = {{fieldno = 4}}}) -- Recover sharding key. helpers.call_on_servers(g.cluster, {'s1-master', 's2-master'}, function(server) server.net_box:call('set_sharding_key', {space_name, {'name'}}) end) - sharding_key_as_index_obj, err = helpers.update_sharding_key_cache(g.cluster, space_name) + sharding_key_data, err = helpers.update_sharding_key_cache(g.cluster, space_name) t.assert_equals(err, nil) - t.assert_equals(sharding_key_as_index_obj, {parts = {{fieldno = 3}}}) + t.assert_equals(sharding_key_data.value, {parts = {{fieldno = 3}}}) end pgroup.test_update_cache_with_incorrect_key = function(g) -- get data from cache for space with correct sharding key local space_name = 'customers_name_key' - local sharding_key_as_index_obj, err = helpers.update_sharding_key_cache(g.cluster, space_name) + local sharding_key_data, err = helpers.update_sharding_key_cache(g.cluster, space_name) t.assert_equals(err, nil) - t.assert_equals(sharding_key_as_index_obj, {parts = {{fieldno = 3}}}) + t.assert_equals(sharding_key_data.value, {parts = {{fieldno = 3}}}) -- records for all spaces exist - sharding_key_as_index_obj = helpers.get_sharding_key_cache(g.cluster) + local sharding_key_as_index_obj = helpers.get_sharding_key_cache(g.cluster) t.assert_equals(sharding_key_as_index_obj, { customers = {parts = {{fieldno = 1}}}, customers_G_func = {parts = {{fieldno = 1}}}, @@ -715,9 +715,9 @@ pgroup.test_update_cache_with_incorrect_key = function(g) end) -- we get no error because we sent request for correct space - local sharding_key_as_index_obj, err = helpers.update_sharding_key_cache(g.cluster, 'customers_age_key') + local sharding_key_data, err = helpers.update_sharding_key_cache(g.cluster, 'customers_age_key') t.assert_equals(err, nil) - t.assert_equals(sharding_key_as_index_obj, {parts = {{fieldno = 4}}}) + t.assert_equals(sharding_key_data.value, {parts = {{fieldno = 4}}}) -- cache['customers_name_key'] == nil (space with incorrect key) -- other records for correct spaces exist in cache @@ -741,8 +741,8 @@ pgroup.test_update_cache_with_incorrect_key = function(g) end) -- we get an error because we sent request for incorrect space - local sharding_key_as_index_obj, err = helpers.update_sharding_key_cache(g.cluster, space_name) - t.assert_equals(sharding_key_as_index_obj, nil) + local sharding_key_data, err = helpers.update_sharding_key_cache(g.cluster, space_name) + t.assert_equals(sharding_key_data, nil) t.assert_str_contains(err.err, "No such field (non_existent_field) in a space format (customers_name_key)") -- cache['customers_name_key'] == nil (space with incorrect key) diff --git a/test/integration/simple_operations_test.lua b/test/integration/simple_operations_test.lua index 9ab7196d..a7be4200 100644 --- a/test/integration/simple_operations_test.lua +++ b/test/integration/simple_operations_test.lua @@ -1008,6 +1008,53 @@ pgroup.test_partial_result_bad_input = function(g) t.assert_str_contains(err.err, 'Space format doesn\'t contain field named "lastname"') end +pgroup.test_tuple_not_damaged = function(g) + -- insert + local insert_tuple = {22, box.NULL, 'Elizabeth', 24} + local new_insert_tuple, err = g.cluster.main_server:eval([[ + local crud = require('crud') + + local insert_tuple = ... + + local _, err = crud.insert('customers', insert_tuple) + + return insert_tuple, err + ]], {insert_tuple}) + + t.assert_equals(err, nil) + t.assert_equals(new_insert_tuple, insert_tuple) + + -- upsert + local upsert_tuple = {33, box.NULL, 'Peter', 35} + local new_upsert_tuple, err = g.cluster.main_server:eval([[ + local crud = require('crud') + + local upsert_tuple = ... + + local _, err = crud.upsert('customers', upsert_tuple, {{'+', 'age', 1}}) + + return upsert_tuple, err + ]], {upsert_tuple}) + + t.assert_equals(err, nil) + t.assert_equals(new_upsert_tuple, upsert_tuple) + + -- replace + local replace_tuple = {22, box.NULL, 'Elizabeth', 24} + local new_replace_tuple, err = g.cluster.main_server:eval([[ + local crud = require('crud') + + local replace_tuple = ... + + local _, err = crud.replace('customers', replace_tuple) + + return replace_tuple, err + ]], {replace_tuple}) + + t.assert_equals(err, nil) + t.assert_equals(new_replace_tuple, replace_tuple) +end + pgroup.test_opts_not_damaged = function(g) -- insert local insert_opts = {timeout = 1, bucket_id = 655, fields = {'name', 'age'}} diff --git a/test/unit/sharding_metadata_test.lua b/test/unit/sharding_metadata_test.lua index 9c5bfda8..e049c28e 100644 --- a/test/unit/sharding_metadata_test.lua +++ b/test/unit/sharding_metadata_test.lua @@ -3,7 +3,9 @@ local ffi = require('ffi') local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') local sharding_key_module = require('crud.common.sharding.sharding_key') local sharding_func_module = require('crud.common.sharding.sharding_func') -local cache = require('crud.common.sharding.sharding_metadata_cache') +local sharding_utils = require('crud.common.sharding.utils') +local cache = require('crud.common.sharding.router_metadata_cache') +local storage_cache = require('crud.common.sharding.storage_metadata_cache') local utils = require('crud.common.utils') local helpers = require('test.helper') @@ -55,6 +57,7 @@ g.after_each(function() box.space.fetch_on_storage:drop() cache.drop_caches() + storage_cache.drop_caches() end) g.test_as_index_object_positive = function() @@ -119,7 +122,9 @@ g.test_fetch_sharding_metadata_on_storage_positive = function() t.assert_equals(metadata_map, { [space_name] = { sharding_key_def = sharding_key_def, + sharding_key_hash = sharding_utils.compute_hash(sharding_key_def), sharding_func_def = sharding_func_def, + sharding_func_hash = sharding_utils.compute_hash(sharding_func_def), space_format = {} }, }) @@ -137,6 +142,7 @@ g.test_fetch_sharding_key_on_storage_positive = function() t.assert_equals(metadata_map, { [space_name] = { sharding_key_def = sharding_key_def, + sharding_key_hash = sharding_utils.compute_hash(sharding_key_def), space_format = {} }, }) @@ -154,6 +160,7 @@ g.test_fetch_sharding_func_name_on_storage_positive = function() t.assert_equals(metadata_map, { [space_name] = { sharding_func_def = sharding_func_def, + sharding_func_hash = sharding_utils.compute_hash(sharding_func_def), }, }) end @@ -170,6 +177,7 @@ g.test_fetch_sharding_func_body_on_storage_positive = function() t.assert_equals(metadata_map, { [space_name] = { sharding_func_def = {body = sharding_func_def}, + sharding_func_hash = sharding_utils.compute_hash({body = sharding_func_def}), }, }) end