Skip to content

Commit

Permalink
Provide an API to get storages initialization state
Browse files Browse the repository at this point in the history
There is an issue with using CRUD functionality if not all storages are
up. New function is added to get the information about storages
state: initialized or not. So, a user can poll state and wait for
storages to be initialized before making CRUD calls.

Resolves #229
  • Loading branch information
psergee committed Jul 1, 2022
1 parent 3b4609b commit 428b629
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
## [Unreleased]

### Added
* `crud.storage_info` function to get storages status (#229).

### Changed

Expand Down
38 changes: 38 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ It also provides the `crud-storage` and `crud-router` roles for
- [Cut extra objects](#cut-extra-objects)
- [Truncate](#truncate)
- [Len](#len)
- [Storage info](#storage-info)
- [Count](#count)
- [Call options for crud methods](#call-options-for-crud-methods)
- [Statistics](#statistics)
Expand Down Expand Up @@ -1074,6 +1075,43 @@ crud.len('customers')
...
```

### Storage info

```lua
-- Get storages status
local result, err = crud.storage_info(opts)
```

where:

* `opts`:
* `timeout` (`?number`) - maximum time (in seconds) to wait for response from
cluster instances.

Returns storages status by instance UUID or nil with error. "status" field contains
a string representing the status:
* "running" - storage is initialized.
* "uninitialized" - storage is not initialized or disabled.
* "error" - error getting the status from a remote instance. Connection error, for example.


**Example:**

```lua
crud.storage_info()
```
```
---
- 5c3392a3-ce89-4aec-83f3-6cb5f18e60c3:
status: error
376435fc-7871-4686-9817-75df1a093e41:
status: running
afe7f578-943f-4bd9-b636-6356760f6586:
status: uninitialized
...
```


### Count

`CRUD` supports multi-conditional count, treating a cluster as a single space.
Expand Down
6 changes: 6 additions & 0 deletions crud.lua
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ crud.stats = stats.get
-- @function reset_stats
crud.reset_stats = stats.reset

-- @refer utils.storage_info
-- @function storage_info
crud.storage_info = utils.storage_info

--- Initializes crud on node
--
-- Exports all functions that are used for calls
Expand Down Expand Up @@ -165,6 +169,8 @@ function crud.init_storage()
count.init()
borders.init()
sharding_metadata.init()

_G._crud.is_initialized = function() return true end
end

function crud.init_router()
Expand Down
9 changes: 4 additions & 5 deletions crud/common/call.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ local dev_checks = require('crud.common.dev_checks')
local utils = require('crud.common.utils')
local sharding_utils = require('crud.common.sharding.utils')
local fiber_clock = require('fiber').clock
local const = require('crud.common.const')

local BaseIterator = require('crud.common.map_call_cases.base_iter')
local BasePostprocessor = require('crud.common.map_call_cases.base_postprocessor')
Expand All @@ -13,8 +14,6 @@ local CallError = errors.new_class('CallError')

local call = {}

call.DEFAULT_VSHARD_CALL_TIMEOUT = 2

function call.get_vshard_call_name(mode, prefer_replica, balance)
dev_checks('string', '?boolean', '?boolean')

Expand Down Expand Up @@ -84,7 +83,7 @@ function call.map(func_name, func_args, opts)
return nil, err
end

local timeout = opts.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT
local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT

local iter = opts.iter
if iter == nil then
Expand Down Expand Up @@ -149,7 +148,7 @@ function call.single(bucket_id, func_name, func_args, opts)
return nil, err
end

local timeout = opts.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT
local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT

local res, err = vshard.router[vshard_call_name](bucket_id, func_name, func_args, {
timeout = timeout,
Expand All @@ -171,7 +170,7 @@ function call.any(func_name, func_args, opts)
timeout = '?number',
})

local timeout = opts.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT
local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT

local replicasets, err = vshard.router.routeall()
if replicasets == nil then
Expand Down
2 changes: 2 additions & 0 deletions crud/common/const.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ const.SHARDING_RELOAD_RETRIES_NUM = 1
const.NEED_SCHEMA_RELOAD = 0x0001000
const.NEED_SHARDING_RELOAD = 0x0001001

const.DEFAULT_VSHARD_CALL_TIMEOUT = 2

return const
63 changes: 63 additions & 0 deletions crud/common/utils.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ local ffi = require('ffi')
local vshard = require('vshard')
local fun = require('fun')
local bit = require('bit')
local log = require('log')

local const = require('crud.common.const')
local schema = require('crud.common.schema')
Expand All @@ -15,6 +16,8 @@ local ShardingError = errors.new_class('ShardingError', {capture_stack = false})
local GetSpaceFormatError = errors.new_class('GetSpaceFormatError', {capture_stack = false})
local FilterFieldsError = errors.new_class('FilterFieldsError', {capture_stack = false})
local NotInitializedError = errors.new_class('NotInitialized')
local GetReplicaStateError = errors.new_class('GetStorageStateError')
local fiber_clock = require('fiber').clock

local utils = {}

Expand Down Expand Up @@ -748,4 +751,64 @@ function utils.list_slice(list, start_index, end_index)
return slice
end

--- Polls replicas for storage state
--
-- @function storage_info
--
-- @tparam ?number opts.timeout
-- Function call timeout
--
-- @return a table of storage states by replica uuid.
function utils.storage_info(opts)
local replicasets, err = vshard.router.routeall()
if replicasets == nil then
return nil, GetReplicaStateError:new("Failed to get all replicasets: %s", err.err)
end

opts = opts or {}

local futures_by_replicas = {}
local replica_state_by_uuid = {}
local async_opts = {is_async = true}
local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT

for _, replicaset in pairs(replicasets) do
for replica_uuid, replica in pairs(replicaset.replicas) do
replica_state_by_uuid[replica_uuid] = {status = "error"}
local ok, res = pcall(replica.conn.call, replica.conn, "_crud.is_initialized",
{}, async_opts)
if ok then
futures_by_replicas[replica_uuid] = res
elseif res ~= nil then
log.error("Error getting storage info for %s: %s", replica_uuid, res)
end
end
end

local deadline = fiber_clock() + timeout
for replica_uuid, future in pairs(futures_by_replicas) do
local wait_timeout = deadline - fiber_clock()
if wait_timeout < 0 then
wait_timeout = 0
end

local result, err = future:wait_result(wait_timeout)
if result == nil then
future:discard()
if err ~= nil then
local str_err = tostring(err)
if (string.find(str_err, " is not defined") ~= nil) then
replica_state_by_uuid[replica_uuid].status = "uninitialized"
else
log.error("Error getting storage info for %s: %s", replica_uuid, err)
end
end
else
replica_state_by_uuid[replica_uuid].status = result[1] and "running" or "uninitialized"
end
end

return replica_state_by_uuid
end

return utils
89 changes: 89 additions & 0 deletions test/integration/storages_state_test.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
local fio = require('fio')

local t = require('luatest')

local helpers = require('test.helper')

local fiber = require("fiber")

local pgroup = t.group('replicas_state', {
{engine = 'memtx'}
})

local all_storages_initialized = false

local function wait_storages_init(g)
local storages_initialized = false
local attempts_left = 5
local wait_for_init_timeout = 1
while (attempts_left > 0 and not storages_initialized) do
local results, err = g.cluster.main_server.net_box:call("crud.storage_info", {})
t.assert_equals(err, nil, "Error getting storage status")
storages_initialized = true
for _,v in pairs(results) do
if v.status ~= "running" then
storages_initialized = false
end
end
if not storages_initialized then
fiber.sleep(wait_for_init_timeout)
attempts_left = attempts_left-1
end
end
return storages_initialized
end

pgroup.before_all(function(g)
g.cluster = helpers.Cluster:new({
datadir = fio.tempdir(),
server_command = helpers.entrypoint('srv_select'),
use_vshard = true,
replicasets = helpers.get_test_replicasets(),
env = {
['ENGINE'] = g.params.engine,
},
})
g.cluster:start()

-- wait for storages to initialize
all_storages_initialized = wait_storages_init(g)
end)

pgroup.after_all(function(g)
helpers.stop_cluster(g.cluster)
fio.rmtree(g.cluster.datadir)
end)

pgroup.test_crud_storage_status_of_stopped_servers = function(g)
t.assert_equals(all_storages_initialized, true)

g.cluster:server("s2-replica"):stop()

local results, err = g.cluster.main_server.net_box:call("crud.storage_info", {})
t.assert_equals(err, nil, "Error getting storags states")

t.assert_equals(results[helpers.uuid('b', 1)].status, "running")
t.assert_equals(results[helpers.uuid('c', 1)].status, "running")
t.assert_equals(results[helpers.uuid('c', 2)].status, "error") -- peer closed
end

pgroup.test_disabled_storage_role = function(g)
t.assert_equals(all_storages_initialized, true)

-- stop crud storage role on one replica
local server = g.cluster:server("s1-replica")
local results = server.net_box:eval([[
local serviceregistry = require("cartridge.service-registry")
serviceregistry.get("crud-storage").stop()
return true
]])

t.assert_not_equals(results, nil, "Fail to disable storage role")

local results, err = g.cluster.main_server.net_box:call("crud.storage_info", {})
t.assert_equals(err, nil, "Error getting storags states")

t.assert_equals(results[helpers.uuid('b', 1)].status, "running")
t.assert_equals(results[helpers.uuid('b', 2)].status, "uninitialized")
t.assert_equals(results[helpers.uuid('c', 1)].status, "running")
end

0 comments on commit 428b629

Please sign in to comment.