Skip to content

Commit

Permalink
Provide an API to get storages initialization state
Browse files Browse the repository at this point in the history
There is an issue with using CRUD functionality if not all storages are
up. New function is added to get the information about storages
state: initialized or not. So, a user can poll state and wait for
storages to be initialized before making CRUD calls.

Resolves #229
  • Loading branch information
psergee committed Jul 11, 2022
1 parent 3b4609b commit f3aa7c9
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
## [Unreleased]

### Added
* `crud.storage_info` function to get storages status (#229).

### Changed

Expand Down
42 changes: 42 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ It also provides the `crud-storage` and `crud-router` roles for
- [Cut extra objects](#cut-extra-objects)
- [Truncate](#truncate)
- [Len](#len)
- [Storage info](#storage-info)
- [Count](#count)
- [Call options for crud methods](#call-options-for-crud-methods)
- [Statistics](#statistics)
Expand Down Expand Up @@ -1074,6 +1075,47 @@ crud.len('customers')
...
```

### Storage info

```lua
-- Get storages status
local result, err = crud.storage_info(opts)
```

where:

* `opts`:
* `timeout` (`?number`) - maximum time (in seconds) to wait for response from
cluster instances.

Returns storages status table by instance UUID or nil with error. Status table fields:
"status" contains a string representing the status:
* "running" - storage is initialized.
* "uninitialized" - storage is not initialized or disabled.
* "error" - error getting the status from a remote instance. Connection error, for example.
"is_master" is true if an instance is a master. False - otherwise.


**Example:**

```lua
crud.storage_info()
```
```
---
- 5c3392a3-ce89-4aec-83f3-6cb5f18e60c3:
status: error
is_master: true
376435fc-7871-4686-9817-75df1a093e41:
status: running
is_master: false
afe7f578-943f-4bd9-b636-6356760f6586:
status: uninitialized
is_master: true
...
```


### Count

`CRUD` supports multi-conditional count, treating a cluster as a single space.
Expand Down
6 changes: 6 additions & 0 deletions crud.lua
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ crud.stats = stats.get
-- @function reset_stats
crud.reset_stats = stats.reset

-- @refer utils.storage_info
-- @function storage_info
crud.storage_info = utils.storage_info

--- Initializes crud on node
--
-- Exports all functions that are used for calls
Expand Down Expand Up @@ -165,6 +169,8 @@ function crud.init_storage()
count.init()
borders.init()
sharding_metadata.init()

_G._crud.is_initialized = function() return true end
end

function crud.init_router()
Expand Down
9 changes: 4 additions & 5 deletions crud/common/call.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ local dev_checks = require('crud.common.dev_checks')
local utils = require('crud.common.utils')
local sharding_utils = require('crud.common.sharding.utils')
local fiber_clock = require('fiber').clock
local const = require('crud.common.const')

local BaseIterator = require('crud.common.map_call_cases.base_iter')
local BasePostprocessor = require('crud.common.map_call_cases.base_postprocessor')
Expand All @@ -13,8 +14,6 @@ local CallError = errors.new_class('CallError')

local call = {}

call.DEFAULT_VSHARD_CALL_TIMEOUT = 2

function call.get_vshard_call_name(mode, prefer_replica, balance)
dev_checks('string', '?boolean', '?boolean')

Expand Down Expand Up @@ -84,7 +83,7 @@ function call.map(func_name, func_args, opts)
return nil, err
end

local timeout = opts.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT
local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT

local iter = opts.iter
if iter == nil then
Expand Down Expand Up @@ -149,7 +148,7 @@ function call.single(bucket_id, func_name, func_args, opts)
return nil, err
end

local timeout = opts.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT
local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT

local res, err = vshard.router[vshard_call_name](bucket_id, func_name, func_args, {
timeout = timeout,
Expand All @@ -171,7 +170,7 @@ function call.any(func_name, func_args, opts)
timeout = '?number',
})

local timeout = opts.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT
local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT

local replicasets, err = vshard.router.routeall()
if replicasets == nil then
Expand Down
2 changes: 2 additions & 0 deletions crud/common/const.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ const.SHARDING_RELOAD_RETRIES_NUM = 1
const.NEED_SCHEMA_RELOAD = 0x0001000
const.NEED_SHARDING_RELOAD = 0x0001001

const.DEFAULT_VSHARD_CALL_TIMEOUT = 2

return const
64 changes: 64 additions & 0 deletions crud/common/utils.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ local ffi = require('ffi')
local vshard = require('vshard')
local fun = require('fun')
local bit = require('bit')
local log = require('log')

local const = require('crud.common.const')
local schema = require('crud.common.schema')
Expand All @@ -15,6 +16,8 @@ local ShardingError = errors.new_class('ShardingError', {capture_stack = false})
local GetSpaceFormatError = errors.new_class('GetSpaceFormatError', {capture_stack = false})
local FilterFieldsError = errors.new_class('FilterFieldsError', {capture_stack = false})
local NotInitializedError = errors.new_class('NotInitialized')
local GetReplicaStateError = errors.new_class('GetStorageStateError')
local fiber_clock = require('fiber').clock

local utils = {}

Expand Down Expand Up @@ -748,4 +751,65 @@ function utils.list_slice(list, start_index, end_index)
return slice
end

--- Polls replicas for storage state
--
-- @function storage_info
--
-- @tparam ?number opts.timeout
-- Function call timeout
--
-- @return a table of storage states by replica uuid.
function utils.storage_info(opts)
local replicasets, err = vshard.router.routeall()
if replicasets == nil then
return nil, GetReplicaStateError:new("Failed to get all replicasets: %s", err.err)
end

opts = opts or {}

local futures_by_replicas = {}
local replica_state_by_uuid = {}
local async_opts = {is_async = true}
local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT

for _, replicaset in pairs(replicasets) do
for replica_uuid, replica in pairs(replicaset.replicas) do
replica_state_by_uuid[replica_uuid] = {status = "error",
is_master = replicaset.master == replica}
local ok, res = pcall(replica.conn.call, replica.conn, "_crud.is_initialized",
{}, async_opts)
if ok then
futures_by_replicas[replica_uuid] = res
elseif res ~= nil then
log.error("Error getting storage info for %s: %s", replica_uuid, res)
end
end
end

local deadline = fiber_clock() + timeout
for replica_uuid, future in pairs(futures_by_replicas) do
local wait_timeout = deadline - fiber_clock()
if wait_timeout < 0 then
wait_timeout = 0
end

local result, err = future:wait_result(wait_timeout)
if result == nil then
future:discard()
if err ~= nil then
local str_err = tostring(err)
if (string.find(str_err, " is not defined") ~= nil) then
replica_state_by_uuid[replica_uuid].status = "uninitialized"
else
log.error("Error getting storage info for %s: %s", replica_uuid, err)
end
end
else
replica_state_by_uuid[replica_uuid].status = result[1] and "running" or "uninitialized"
end
end

return replica_state_by_uuid
end

return utils
114 changes: 114 additions & 0 deletions test/integration/storages_state_test.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
local fio = require('fio')

local t = require('luatest')

local helpers = require('test.helper')

local fiber = require("fiber")

local pgroup = t.group('replicas_state', {
{engine = 'memtx'}
})

local all_storages_initialized = false

local function wait_storages_init(g)
local storages_initialized = false
local attempts_left = 5
local wait_for_init_timeout = 1
while (attempts_left > 0 and not storages_initialized) do
local results, err = g.cluster.main_server.net_box:call("crud.storage_info", {})
t.assert_equals(err, nil, "Error getting storage status")
storages_initialized = true
for _,v in pairs(results) do
if v.status ~= "running" then
storages_initialized = false
end
end
if not storages_initialized then
fiber.sleep(wait_for_init_timeout)
attempts_left = attempts_left-1
end
end
return storages_initialized
end

pgroup.before_all(function(g)
g.cluster = helpers.Cluster:new({
datadir = fio.tempdir(),
server_command = helpers.entrypoint('srv_select'),
use_vshard = true,
replicasets = helpers.get_test_replicasets(),
env = {
['ENGINE'] = g.params.engine,
},
})
g.cluster:start()

-- wait for storages to initialize
all_storages_initialized = wait_storages_init(g)
end)

pgroup.after_all(function(g)
helpers.stop_cluster(g.cluster)
fio.rmtree(g.cluster.datadir)
end)

pgroup.test_crud_storage_status_of_stopped_servers = function(g)
t.assert_equals(all_storages_initialized, true)

g.cluster:server("s2-replica"):stop()

local results, err = g.cluster.main_server.net_box:call("crud.storage_info", {})
t.assert_equals(err, nil, "Error getting storags states")

local instance = results[helpers.uuid('b', 1)]
t.assert_equals(instance.status, "running")
t.assert_equals(instance.is_master, true)

local instance = results[helpers.uuid('b', 2)]
t.assert_equals(instance.is_master, false)

instance = results[helpers.uuid('c', 1)]
t.assert_equals(instance.status, "running")
t.assert_equals(instance.is_master, true)

instance = results[helpers.uuid('c', 2)]
t.assert_equals(instance.status, "error") -- peer closed
t.assert_equals(instance.is_master, false)

g.cluster:server("s2-replica"):start()
end

pgroup.test_disabled_storage_role = function(g)
t.assert_equals(wait_storages_init(g), true)

-- stop crud storage role on one replica
local server = g.cluster:server("s1-replica")
local results = server.net_box:eval([[
local serviceregistry = require("cartridge.service-registry")
serviceregistry.get("crud-storage").stop()
return true
]])

t.assert_not_equals(results, nil, "Fail to disable storage role")

local results, err = g.cluster.main_server.net_box:call("crud.storage_info", {})
t.assert_equals(err, nil, "Error getting storags states")

local instance = results[helpers.uuid('b', 1)]
t.assert_equals(instance.status, "running")
t.assert_equals(instance.is_master, true)

instance = results[helpers.uuid('b', 2)]
t.assert_equals(instance.status, "uninitialized")
t.assert_equals(instance.is_master, false)

instance = results[helpers.uuid('c', 1)]
t.assert_equals(instance.status, "running")
t.assert_equals(instance.is_master, true)

instance = results[helpers.uuid('c', 2)]
t.assert_equals(instance.status, "running")
t.assert_equals(instance.is_master, false)
end

0 comments on commit f3aa7c9

Please sign in to comment.