Skip to content

Commit

Permalink
Provide an API to get storages initialization state
Browse files Browse the repository at this point in the history
There is an issue with using CRUD functionality if not all storages are
up. New function is added to get the information about storages
state: initialized or not. So, a user can poll state and wait for
storages to be initialized before making CRUD calls.

Resolves #229
  • Loading branch information
psergee committed Jul 21, 2022
1 parent 3b4609b commit 25c6594
Show file tree
Hide file tree
Showing 7 changed files with 318 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
## [Unreleased]

### Added
* `crud.storage_info` function to get storages status (#229, PR #299).

### Changed

Expand Down
46 changes: 46 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ It also provides the `crud-storage` and `crud-router` roles for
- [Cut extra objects](#cut-extra-objects)
- [Truncate](#truncate)
- [Len](#len)
- [Storage info](#storage-info)
- [Count](#count)
- [Call options for crud methods](#call-options-for-crud-methods)
- [Statistics](#statistics)
Expand Down Expand Up @@ -1074,6 +1075,51 @@ crud.len('customers')
...
```

### Storage info

```lua
-- Get storages status
local result, err = crud.storage_info(opts)
```

where:

* `opts`:
* `timeout` (`?number`) - maximum time (in seconds) to wait for response from
cluster instances.

Returns storages status table by instance UUID or nil with error. Status table fields:
"status" contains a string representing the status:
* "running" - storage is initialized and running.
* "uninitialized" - storage is not initialized or disabled.
* "error" - error getting the status from a storage. Connection error, for example.
"is_master" is true if an instance is a master. False - otherwise.
"message" is nil unless a problem occurs with getting storage status.


**Example:**

```lua
crud.storage_info()
```
```
---
- fe1b5bd9-42d4-4955-816c-3aa015e0eb81:
status: running
is_master: true
a1eefe51-9869-4c4c-9676-76431b08c97a:
status: running
is_master: true
777415f4-d656-440e-8834-7124b7267b6d:
status: uninitialized
is_master: false
e1b2e202-b0f7-49cd-b0a2-6b3a584f995e:
status: error
message: 'connect, called on fd 36, aka 127.0.0.1:49762: Connection refused'
is_master: false
...
```

### Count

`CRUD` supports multi-conditional count, treating a cluster as a single space.
Expand Down
6 changes: 6 additions & 0 deletions crud.lua
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ crud.stats = stats.get
-- @function reset_stats
crud.reset_stats = stats.reset

-- @refer utils.storage_info
-- @function storage_info
crud.storage_info = utils.storage_info

--- Initializes crud on node
--
-- Exports all functions that are used for calls
Expand Down Expand Up @@ -165,6 +169,8 @@ function crud.init_storage()
count.init()
borders.init()
sharding_metadata.init()

_G._crud.storage_info_on_storage = utils.storage_info_on_storage
end

function crud.init_router()
Expand Down
9 changes: 4 additions & 5 deletions crud/common/call.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ local dev_checks = require('crud.common.dev_checks')
local utils = require('crud.common.utils')
local sharding_utils = require('crud.common.sharding.utils')
local fiber_clock = require('fiber').clock
local const = require('crud.common.const')

local BaseIterator = require('crud.common.map_call_cases.base_iter')
local BasePostprocessor = require('crud.common.map_call_cases.base_postprocessor')
Expand All @@ -13,8 +14,6 @@ local CallError = errors.new_class('CallError')

local call = {}

call.DEFAULT_VSHARD_CALL_TIMEOUT = 2

function call.get_vshard_call_name(mode, prefer_replica, balance)
dev_checks('string', '?boolean', '?boolean')

Expand Down Expand Up @@ -84,7 +83,7 @@ function call.map(func_name, func_args, opts)
return nil, err
end

local timeout = opts.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT
local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT

local iter = opts.iter
if iter == nil then
Expand Down Expand Up @@ -149,7 +148,7 @@ function call.single(bucket_id, func_name, func_args, opts)
return nil, err
end

local timeout = opts.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT
local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT

local res, err = vshard.router[vshard_call_name](bucket_id, func_name, func_args, {
timeout = timeout,
Expand All @@ -171,7 +170,7 @@ function call.any(func_name, func_args, opts)
timeout = '?number',
})

local timeout = opts.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT
local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT

local replicasets, err = vshard.router.routeall()
if replicasets == nil then
Expand Down
2 changes: 2 additions & 0 deletions crud/common/const.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ const.SHARDING_RELOAD_RETRIES_NUM = 1
const.NEED_SCHEMA_RELOAD = 0x0001000
const.NEED_SHARDING_RELOAD = 0x0001001

const.DEFAULT_VSHARD_CALL_TIMEOUT = 2

return const
86 changes: 86 additions & 0 deletions crud/common/utils.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ local ffi = require('ffi')
local vshard = require('vshard')
local fun = require('fun')
local bit = require('bit')
local log = require('log')

local const = require('crud.common.const')
local schema = require('crud.common.schema')
Expand All @@ -15,6 +16,8 @@ local ShardingError = errors.new_class('ShardingError', {capture_stack = false})
local GetSpaceFormatError = errors.new_class('GetSpaceFormatError', {capture_stack = false})
local FilterFieldsError = errors.new_class('FilterFieldsError', {capture_stack = false})
local NotInitializedError = errors.new_class('NotInitialized')
local StorageInfoError = errors.new_class('StorageInfoError')
local fiber_clock = require('fiber').clock

local utils = {}

Expand Down Expand Up @@ -748,4 +751,87 @@ function utils.list_slice(list, start_index, end_index)
return slice
end

--- Polls replicas for storage state
--
-- @function storage_info
--
-- @tparam ?number opts.timeout
-- Function call timeout
--
-- @return a table of storage states by replica uuid.
function utils.storage_info(opts)
local replicasets, err = vshard.router.routeall()
if replicasets == nil then
return nil, StorageInfoError:new("Failed to get all replicasets: %s", err.err)
end

opts = opts or {}

local futures_by_replicas = {}
local replica_state_by_uuid = {}
local async_opts = {is_async = true}
local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT

for _, replicaset in pairs(replicasets) do
for replica_uuid, replica in pairs(replicaset.replicas) do
replica_state_by_uuid[replica_uuid] = {
status = "error",
is_master = replicaset.master == replica
}
local ok, res = pcall(replica.conn.call, replica.conn, "_crud.storage_info_on_storage",
{}, async_opts)
if ok then
futures_by_replicas[replica_uuid] = res
else
local err_msg = string.format("Error getting storage info for %s", replica_uuid)
if res ~= nil then
log.error("%s: %s", err_msg, res)
replica_state_by_uuid[replica_uuid].message = tostring(res)
else
log.error(err_msg)
replica_state_by_uuid[replica_uuid].message = err_msg
end
end
end
end

local deadline = fiber_clock() + timeout
for replica_uuid, future in pairs(futures_by_replicas) do
local wait_timeout = deadline - fiber_clock()
if wait_timeout < 0 then
wait_timeout = 0
end

local result, err = future:wait_result(wait_timeout)
if result == nil then
future:discard()
local err_msg = string.format("Error getting storage info for %s", replica_uuid)
if err ~= nil then
if err.type == 'ClientError' and err.code == box.error.NO_SUCH_PROC then
replica_state_by_uuid[replica_uuid].status = "uninitialized"
else
log.error("%s: %s", err_msg, err)
replica_state_by_uuid[replica_uuid].message = tostring(err)
end
else
log.error(err_msg)
replica_state_by_uuid[replica_uuid].message = err_msg
end
else
replica_state_by_uuid[replica_uuid].status = result[1].status or "uninitialized"
end
end

return replica_state_by_uuid
end

--- Storage status information.
--
-- @function storage_info_on_storage
--
-- @return a table with storage status.
function utils.storage_info_on_storage()
return {status = "running"}
end

return utils

0 comments on commit 25c6594

Please sign in to comment.