Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .luacheckrc
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
redefined = false
globals = {'box', 'utf8'}
globals = {'box', 'utf8', 'checkers'}
include_files = {'**/*.lua', '*.luacheckrc', '*.rockspec'}
exclude_files = {'**/*.rocks/', 'tmp/', 'tarantool-enterprise/'}
max_line_length = 120
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]

### Added

* `mode`, `prefer_replica` and `balance` options for read operations
(get, select, pairs). According to this parameters one of vshard
calls (`callrw`, `callro`, `callbro`, `callre`, `callbre`) is selected

## [0.5.0] - 2021-03-10

### Fixed
Expand Down
18 changes: 14 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,14 @@ where:
* `space_name` (`string`) - name of the space
* `key` (`any`) - primary key value
* `opts`:
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `bucket_id` (`?number|cdata`) - bucket ID
* `fields` (`?table`) - field names for getting only a subset of fields
* `bucket_id` (`?number|cdata`) - bucket ID
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `mode` (`?string`, `read` or `write`) - if `write` is specified then `get` is
performed on master
* `prefer_replica` (`?boolean`) - if `true` then the preferred target is one of
the replicas
* `balance` (`?boolean`) - use replica according to vshard load balancing policy

Returns metadata and array contains one row, error.

Expand Down Expand Up @@ -315,9 +320,14 @@ where:
(`after` option is required in this case).
* `after` (`?table`) - tuple after which objects should be selected
* `batch_size` (`?number`) - number of tuples to process per one request to storage
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `bucket_id` (`?number|cdata`) - bucket ID
(is used when select by full primary key is performed)
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `mode` (`?string`, `read` or `write`) - if `write` is specified then `select` is
performed on master
* `prefer_replica` (`?boolean`) - if `true` then the preferred target is one of
the replicas
* `balance` (`?boolean`) - use replica according to vshard load balancing policy


Returns metadata and array of rows, error.

Expand Down
164 changes: 76 additions & 88 deletions crud/common/call.lua
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,70 @@ local call = {}

local DEFAULT_VSHARD_CALL_TIMEOUT = 2

local function call_impl(vshard_call, func_name, func_args, opts)
dev_checks('string', 'string', '?table', {
local function get_vshard_call_name(mode, prefer_replica, balance)
dev_checks('string', '?boolean', '?boolean')

if mode == 'write' then
return 'callrw'
end

if not prefer_replica and not balance then
return 'callro'
end

if not prefer_replica and balance then
return 'callbro'
end

if prefer_replica and not balance then
return 'callre'
end

-- prefer_replica and balance
return 'callbre'
end

local function wrap_vshard_err(err, func_name, replicaset_uuid, bucket_id)
if err.type == 'ClientError' and type(err.message) == 'string' then
if err.message == string.format("Procedure '%s' is not defined", func_name) then
if func_name:startswith('_crud.') then
err = NotInitializedError:new("crud isn't initialized on replicaset: %q", replicaset_uuid)
else
err = NotInitializedError:new("Function %s is not registered", func_name)
end
end
end

if replicaset_uuid == nil then
local replicaset, _ = vshard.router.route(bucket_id)
if replicaset == nil then
return CallError:new(
"Function returned an error, but we couldn't figure out the replicaset: %s", err
)
end

replicaset_uuid = replicaset.uuid
end

err = errors.wrap(err)

return CallError:new(utils.format_replicaset_error(
replicaset_uuid, "Function returned an error: %s", err
))
end

function call.map(func_name, func_args, opts)
dev_checks('string', '?table', {
mode = 'string',
prefer_replica = '?boolean',
balance = '?boolean',
timeout = '?number',
replicasets = '?table',
})

opts = opts or {}

local vshard_call_name = get_vshard_call_name(opts.mode, opts.prefer_replica, opts.balance)

local timeout = opts.timeout or DEFAULT_VSHARD_CALL_TIMEOUT

local replicasets, err
Expand All @@ -35,7 +91,7 @@ local function call_impl(vshard_call, func_name, func_args, opts)
local futures_by_replicasets = {}
local call_opts = {is_async = true}
for _, replicaset in pairs(replicasets) do
local future = replicaset[vshard_call](replicaset, func_name, func_args, call_opts)
local future = replicaset[vshard_call_name](replicaset, func_name, func_args, call_opts)
futures_by_replicasets[replicaset.uuid] = future
end

Expand All @@ -53,101 +109,33 @@ local function call_impl(vshard_call, func_name, func_args, opts)
end

if err ~= nil then
if err.type == 'ClientError' and type(err.message) == 'string' then
if err.message == string.format("Procedure '%s' is not defined", func_name) then
if func_name:startswith('_crud.') then
err = NotInitializedError:new("crud isn't initialized on replicaset: %q", replicaset_uuid)
else
err = NotInitializedError:new("Function %s is not registered", func_name)
end
end
end
err = errors.wrap(err)
return nil, CallError:new(utils.format_replicaset_error(
replicaset_uuid, "Function returned an error: %s", err
))
return nil, wrap_vshard_err(err, func_name, replicaset_uuid)
end

results[replicaset_uuid] = result[1]
end

return results
end

--- Calls specified function on all cluster storages.
--
-- Allowed functions to call can be specified by `crud.register` call.
-- If function with specified `opts.func_name` isn't registered,
-- global function with this name is called.
--
-- Uses vshard `replicaset:callrw`
--
-- @function rw
--
-- @param string func_name
-- A function name
--
-- @param ?table func_args
-- Array of arguments to be passed to the function
--
-- @tparam table opts Available options are:
--
-- @tparam ?number opts.timeout
-- Function call timeout
--
-- @tparam ?table opts.replicasets
-- vshard replicasets to call the function.
-- By default, function is called on the all storages.
--
-- Returns map {replicaset_uuid: result} with all specified replicasets results
--
-- @return[1] table
-- @treturn[2] nil
-- @treturn[2] table Error description
--
function call.rw(func_name, func_args, opts)
return call_impl('callrw', func_name, func_args, opts)
end
function call.single(bucket_id, func_name, func_args, opts)
dev_checks('number', 'string', '?table', {
mode = 'string',
prefer_replica = '?boolean',
balance = '?boolean',
timeout = '?number',
})

--- Calls specified function on all cluster storages.
--
-- The same as `rw`, but uses vshard `replicaset:callro`
--
-- @function ro
--
function call.ro(func_name, func_args, opts)
return call_impl('callro', func_name, func_args, opts)
end
local vshard_call_name = get_vshard_call_name(opts.mode, opts.prefer_replica, opts.balance, opts.mode)

--- Calls specified function on a node according to bucket_id.
--
-- Exactly mimics the contract of vshard.router.callrw, but adds
-- better error hangling
--
-- @function rw_single
--
function call.rw_single(bucket_id, func_name, func_args, options)
local res, err = vshard.router.callrw(bucket_id, func_name, func_args, options)

-- This is a workaround, until vshard supports telling us where the error happened
if err ~= nil then
if type(err) == 'table' and err.type == 'ClientError' and type(err.message) == 'string' then
if err.message == string.format("Procedure '%s' is not defined", func_name) then
err = NotInitializedError:new("crud isn't initialized on replicaset")
end
end

local replicaset, _ = vshard.router.route(bucket_id)
if replicaset == nil then
return nil, CallError:new(
"Function returned an error, but we couldn't figure out the replicaset: %s", err
)
end
local timeout = opts.timeout or DEFAULT_VSHARD_CALL_TIMEOUT

err = errors.wrap(err)
local res, err = vshard.router[vshard_call_name](bucket_id, func_name, func_args, {
timeout = timeout,
})

return nil, CallError:new(utils.format_replicaset_error(
replicaset.uuid, "Function returned an error: %s", err
))
if err ~= nil then
return nil, wrap_vshard_err(err, func_name, nil, bucket_id)
end

if res == box.NULL then
Expand Down
8 changes: 6 additions & 2 deletions crud/delete.lua
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,14 @@ local function call_delete_on_router(space_name, key, opts)
end

local bucket_id = sharding.key_get_bucket_id(key, opts.bucket_id)
local storage_result, err = call.rw_single(
local call_opts = {
mode = 'write',
timeout = opts.timeout,
}
local storage_result, err = call.single(
bucket_id, DELETE_FUNC_NAME,
{space_name, key, opts.fields},
{timeout = opts.timeout}
call_opts
)

if err ~= nil then
Expand Down
26 changes: 20 additions & 6 deletions crud/get.lua
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ local function call_get_on_router(space_name, key, opts)
timeout = '?number',
bucket_id = '?number|cdata',
fields = '?table',
prefer_replica = '?boolean',
balance = '?boolean',
mode = '?string',
})

opts = opts or {}
Expand All @@ -56,14 +59,16 @@ local function call_get_on_router(space_name, key, opts)
end

local bucket_id = sharding.key_get_bucket_id(key, opts.bucket_id)
-- We don't use callro() here, because if the replication is
-- async, there could be a lag between master and replica, so a
-- connector which sequentially calls put() and then get() may get
-- a stale result.
local storage_result, err = call.rw_single(
local call_opts = {
mode = opts.mode or 'read',
prefer_replica = opts.prefer_replica,
balance = opts.balance,
timeout = opts.timeout,
}
local storage_result, err = call.single(
bucket_id, GET_FUNC_NAME,
{space_name, key, opts.fields},
{timeout = opts.timeout}
call_opts
)

if err ~= nil then
Expand Down Expand Up @@ -101,6 +106,12 @@ end
-- Bucket ID
-- (by default, it's vshard.router.bucket_id_strcrc32 of primary key)
--
-- @tparam ?boolean opts.prefer_replica
-- Call on replica if it's possible
--
-- @tparam ?boolean opts.balance
-- Use replica according to round-robin load balancing
--
-- @return[1] object
-- @treturn[2] nil
-- @treturn[2] table Error description
Expand All @@ -110,6 +121,9 @@ function get.call(space_name, key, opts)
timeout = '?number',
bucket_id = '?number|cdata',
fields = '?table',
prefer_replica = '?boolean',
balance = '?boolean',
mode = '?string',
})

return schema.wrap_func_reload(call_get_on_router, space_name, key, opts)
Expand Down
8 changes: 6 additions & 2 deletions crud/insert.lua
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,14 @@ local function call_insert_on_router(space_name, tuple, opts)
fields = opts.fields,
}

local storage_result, err = call.rw_single(
local call_opts = {
mode = 'write',
timeout = opts.timeout,
}
local storage_result, err = call.single(
bucket_id, INSERT_FUNC_NAME,
{space_name, tuple, insert_on_storage_opts},
{timeout = opts.timeout}
call_opts
)

if err ~= nil then
Expand Down
8 changes: 6 additions & 2 deletions crud/replace.lua
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,14 @@ local function call_replace_on_router(space_name, tuple, opts)
fields = opts.fields,
}

local storage_result, err = call.rw_single(
local call_opts = {
mode = 'write',
timeout = opts.timeout,
}
local storage_result, err = call.single(
bucket_id, REPLACE_FUNC_NAME,
{space_name, tuple, insert_on_storage_opts},
{timeout = opts.timeout}
call_opts
)

if err ~= nil then
Expand Down
Loading