From 41f42cc0788ae4edca1eb180679adb2c83f323b9 Mon Sep 17 00:00:00 2001 From: psergee Date: Wed, 22 Jun 2022 14:22:01 +0300 Subject: [PATCH] Provide an API to get storages initialization state There is an issue with using CRUD functionality if not all storages are up. New function is added to get the information about storages state: initialized or not. So, a user can poll state and wait for storages to be initialized before making CRUD calls. Resolves #229 --- CHANGELOG.md | 1 + README.md | 47 +++++++ crud.lua | 6 + crud/common/call.lua | 9 +- crud/common/const.lua | 2 + crud/common/utils.lua | 84 +++++++++++ test/integration/storages_state_test.lua | 170 +++++++++++++++++++++++ 7 files changed, 314 insertions(+), 5 deletions(-) create mode 100644 test/integration/storages_state_test.lua diff --git a/CHANGELOG.md b/CHANGELOG.md index dd50a2687..55e5ee7df 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] ### Added +* `crud.storage_info` function to get storages status (#229). ### Changed diff --git a/README.md b/README.md index 429c7a3c4..59cec8fed 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,7 @@ It also provides the `crud-storage` and `crud-router` roles for - [Cut extra objects](#cut-extra-objects) - [Truncate](#truncate) - [Len](#len) + - [Storage info](#storage-info) - [Count](#count) - [Call options for crud methods](#call-options-for-crud-methods) - [Statistics](#statistics) @@ -1074,6 +1075,52 @@ crud.len('customers') ... ``` +### Storage info + +```lua +-- Get storages status +local result, err = crud.storage_info(opts) +``` + +where: + +* `opts`: + * `timeout` (`?number`) - maximum time (in seconds) to wait for response from + cluster instances. + +Returns storages status table by instance UUID or nil with error. Status table fields: +"status" contains a string representing the status: +* "running" - storage is initialized and running. +* "uninitialized" - storage is not initialized or disabled. +* "error" - error getting the status from a storage. Connection error, for example. +"is_master" is true if an instance is a master. False - otherwise. +"message" is nil unless a problem occurs with getting storage status. + + +**Example:** + +```lua +crud.storage_info() +``` +``` +--- +- fe1b5bd9-42d4-4955-816c-3aa015e0eb81: + status: running + is_master: true + a1eefe51-9869-4c4c-9676-76431b08c97a: + status: running + is_master: true + 777415f4-d656-440e-8834-7124b7267b6d: + status: uninitialized + is_master: false + e1b2e202-b0f7-49cd-b0a2-6b3a584f995e: + status: error + message: 'connect, called on fd 36, aka 127.0.0.1:49762: Connection refused' + is_master: false +... +``` + + ### Count `CRUD` supports multi-conditional count, treating a cluster as a single space. diff --git a/crud.lua b/crud.lua index 05d36ec4a..b190b630b 100644 --- a/crud.lua +++ b/crud.lua @@ -138,6 +138,10 @@ crud.stats = stats.get -- @function reset_stats crud.reset_stats = stats.reset +-- @refer utils.storage_info +-- @function storage_info +crud.storage_info = utils.storage_info + --- Initializes crud on node -- -- Exports all functions that are used for calls @@ -165,6 +169,8 @@ function crud.init_storage() count.init() borders.init() sharding_metadata.init() + + _G._crud.storage_info_on_storage = utils.storage_info_on_storage end function crud.init_router() diff --git a/crud/common/call.lua b/crud/common/call.lua index a4659e4a2..a89c414b2 100644 --- a/crud/common/call.lua +++ b/crud/common/call.lua @@ -5,6 +5,7 @@ local dev_checks = require('crud.common.dev_checks') local utils = require('crud.common.utils') local sharding_utils = require('crud.common.sharding.utils') local fiber_clock = require('fiber').clock +local const = require('crud.common.const') local BaseIterator = require('crud.common.map_call_cases.base_iter') local BasePostprocessor = require('crud.common.map_call_cases.base_postprocessor') @@ -13,8 +14,6 @@ local CallError = errors.new_class('CallError') local call = {} -call.DEFAULT_VSHARD_CALL_TIMEOUT = 2 - function call.get_vshard_call_name(mode, prefer_replica, balance) dev_checks('string', '?boolean', '?boolean') @@ -84,7 +83,7 @@ function call.map(func_name, func_args, opts) return nil, err end - local timeout = opts.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT + local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT local iter = opts.iter if iter == nil then @@ -149,7 +148,7 @@ function call.single(bucket_id, func_name, func_args, opts) return nil, err end - local timeout = opts.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT + local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT local res, err = vshard.router[vshard_call_name](bucket_id, func_name, func_args, { timeout = timeout, @@ -171,7 +170,7 @@ function call.any(func_name, func_args, opts) timeout = '?number', }) - local timeout = opts.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT + local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT local replicasets, err = vshard.router.routeall() if replicasets == nil then diff --git a/crud/common/const.lua b/crud/common/const.lua index d452e1bf5..7a13cf91a 100644 --- a/crud/common/const.lua +++ b/crud/common/const.lua @@ -8,4 +8,6 @@ const.SHARDING_RELOAD_RETRIES_NUM = 1 const.NEED_SCHEMA_RELOAD = 0x0001000 const.NEED_SHARDING_RELOAD = 0x0001001 +const.DEFAULT_VSHARD_CALL_TIMEOUT = 2 + return const diff --git a/crud/common/utils.lua b/crud/common/utils.lua index 5b3614b25..345c407ad 100644 --- a/crud/common/utils.lua +++ b/crud/common/utils.lua @@ -3,6 +3,7 @@ local ffi = require('ffi') local vshard = require('vshard') local fun = require('fun') local bit = require('bit') +local log = require('log') local const = require('crud.common.const') local schema = require('crud.common.schema') @@ -15,6 +16,8 @@ local ShardingError = errors.new_class('ShardingError', {capture_stack = false}) local GetSpaceFormatError = errors.new_class('GetSpaceFormatError', {capture_stack = false}) local FilterFieldsError = errors.new_class('FilterFieldsError', {capture_stack = false}) local NotInitializedError = errors.new_class('NotInitialized') +local StorageInfoError = errors.new_class('StorageInfoError') +local fiber_clock = require('fiber').clock local utils = {} @@ -748,4 +751,85 @@ function utils.list_slice(list, start_index, end_index) return slice end +--- Polls replicas for storage state +-- +-- @function storage_info +-- +-- @tparam ?number opts.timeout +-- Function call timeout +-- +-- @return a table of storage states by replica uuid. +function utils.storage_info(opts) + local replicasets, err = vshard.router.routeall() + if replicasets == nil then + return nil, StorageInfoError:new("Failed to get all replicasets: %s", err.err) + end + + opts = opts or {} + + local futures_by_replicas = {} + local replica_state_by_uuid = {} + local async_opts = {is_async = true} + local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT + + for _, replicaset in pairs(replicasets) do + for replica_uuid, replica in pairs(replicaset.replicas) do + replica_state_by_uuid[replica_uuid] = {status = "error", + is_master = replicaset.master == replica} + local ok, res = pcall(replica.conn.call, replica.conn, "_crud.storage_info_on_storage", + {}, async_opts) + if ok then + futures_by_replicas[replica_uuid] = res + else + local err_msg = string.format("Error getting storage info for %s", replica_uuid) + if res ~= nil then + log.error("%s: %s", err_msg, res) + replica_state_by_uuid[replica_uuid].message = tostring(res) + else + log.error(err_msg) + replica_state_by_uuid[replica_uuid].message = err_msg + end + end + end + end + + local deadline = fiber_clock() + timeout + for replica_uuid, future in pairs(futures_by_replicas) do + local wait_timeout = deadline - fiber_clock() + if wait_timeout < 0 then + wait_timeout = 0 + end + + local result, err = future:wait_result(wait_timeout) + if result == nil then + future:discard() + local err_msg = string.format("Error getting storage info for %s", replica_uuid) + if err ~= nil then + if err.type == 'ClientError' and err.code == box.error.NO_SUCH_PROC then + replica_state_by_uuid[replica_uuid].status = "uninitialized" + else + log.error("%s: %s", err_msg, err) + replica_state_by_uuid[replica_uuid].message = tostring(err) + end + else + log.error(err_msg) + replica_state_by_uuid[replica_uuid].message = err_msg + end + else + replica_state_by_uuid[replica_uuid].status = result[1].status or "uninitialized" + end + end + + return replica_state_by_uuid +end + +--- Storage status information. +-- +-- @function storage_info_on_storage +-- +-- @return a table with storage status. +function utils.storage_info_on_storage() + return {status = "running"} +end + return utils diff --git a/test/integration/storages_state_test.lua b/test/integration/storages_state_test.lua new file mode 100644 index 000000000..b511d09be --- /dev/null +++ b/test/integration/storages_state_test.lua @@ -0,0 +1,170 @@ +local fio = require('fio') + +local t = require('luatest') + +local helpers = require('test.helper') + +local fiber = require("fiber") + +local pgroup = t.group('storage_info', { + {engine = 'memtx'} +}) + +local all_storages_initialized = false + +local function wait_storages_init(g) + local storages_initialized = false + local attempts_left = 5 + local wait_for_init_timeout = 1 + while (attempts_left > 0 and not storages_initialized) do + local results, err = g.cluster.main_server.net_box:call("crud.storage_info", {}) + t.assert_equals(err, nil, "Error getting storage status") + storages_initialized = true + for _,v in pairs(results) do + if v.status ~= "running" then + storages_initialized = false + end + end + if not storages_initialized then + fiber.sleep(wait_for_init_timeout) + attempts_left = attempts_left-1 + end + end + return storages_initialized +end + +pgroup.before_all(function(g) + g.cluster = helpers.Cluster:new({ + datadir = fio.tempdir(), + server_command = helpers.entrypoint('srv_select'), + use_vshard = true, + replicasets = helpers.get_test_replicasets(), + env = { + ['ENGINE'] = g.params.engine, + }, + }) + g.cluster:start() + + --[[ wait for storages to initialize + This is a workaround for "peer closed" errors for some connections right after the cluster start. + Retry is required to give a small timeout to reconnect. + ]]-- + all_storages_initialized = wait_storages_init(g) +end) + +pgroup.after_all(function(g) + helpers.stop_cluster(g.cluster) + fio.rmtree(g.cluster.datadir) +end) + +pgroup.test_crud_storage_status_of_stopped_servers = function(g) + t.assert_equals(all_storages_initialized, true) + + g.cluster:server("s2-replica"):stop() + + local results, err = g.cluster.main_server.net_box:call("crud.storage_info", {}) + t.assert_equals(err, nil, "Error getting storages info") + t.assert_equals(results, { + [helpers.uuid('b', 1)] = { + status = "running", + is_master = true + }, + [helpers.uuid('b', 2)] = { + status = "running", + is_master = false + }, + [helpers.uuid('c', 1)] = { + status = "running", + is_master = true + }, + [helpers.uuid('c', 2)] = { + status = "error", + is_master = false, + message = "Peer closed" + } + }) + + g.cluster:server("s2-replica"):start() +end + +pgroup.test_disabled_storage_role = function(g) + t.assert_equals(wait_storages_init(g), true) + + -- stop crud storage role on one replica + local server = g.cluster:server("s1-replica") + local results = server.net_box:eval([[ + local serviceregistry = require("cartridge.service-registry") + serviceregistry.get("crud-storage").stop() + return true + ]]) + + t.assert_not_equals(results, nil, "Fail to disable storage role") + + local results, err = g.cluster.main_server.net_box:call("crud.storage_info", {}) + t.assert_equals(err, nil, "Error getting storages info") + + t.assert_equals(results, { + [helpers.uuid('b', 1)] = { + status = "running", + is_master = true + }, + [helpers.uuid('b', 2)] = { + status = "uninitialized", + is_master = false + }, + [helpers.uuid('c', 1)] = { + status = "running", + is_master = true + }, + [helpers.uuid('c', 2)] = { + status = "running", + is_master = false + } + }) + + local results = server.net_box:eval([[ + local serviceregistry = require("cartridge.service-registry") + serviceregistry.get("crud-storage").init() + return true + ]]) + + t.assert_not_equals(results, nil, "Fail to enable storage role") + +end + +pgroup.test_storage_call_failure = function(g) + t.assert_equals(wait_storages_init(g), true) + + -- stop crud storage role on one replica + local server = g.cluster:server("s2-replica") + local results = server.net_box:eval([[ + _crud.storage_info_on_storage = {} + return true + ]]) + + t.assert_not_equals(results, nil, "Eval failed") + + local results, err = g.cluster.main_server.net_box:call("crud.storage_info", {}) + t.assert_equals(err, nil, "Error getting storages info") + + t.assert_equals(results, { + [helpers.uuid('b', 1)] = { + status = "running", + is_master = true + }, + [helpers.uuid('b', 2)] = { + status = "running", + is_master = false + }, + [helpers.uuid('c', 1)] = { + status = "running", + is_master = true + }, + [helpers.uuid('c', 2)] = { + status = "error", + is_master = false, + message = "attempt to call a table value" + } + }) + +end \ No newline at end of file