Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
### Fixed

* Damaging of opts table by CRUD methods.
* Ignoring of `bucket_id` option in `crud.select()`/`crud.pairs()` (#220).

### Added

Expand Down
34 changes: 33 additions & 1 deletion crud/select/compat/select.lua
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,40 @@ local function build_select_iterator(space_name, user_conditions, opts)
-- set replicasets to select from
local replicasets_to_select = replicasets

if plan.sharding_key ~= nil and opts.force_map_call ~= true then
-- Whether to call one storage replicaset or perform
-- map-reduce?
--
-- If map-reduce is requested explicitly, ignore provided
-- bucket_id and fetch data from all storage replicasets.
--
-- Otherwise:
--
-- 1. If particular replicaset is pointed by a caller (using
-- the bucket_id option[^1]), crud MUST fetch data only
-- from this storage replicaset: disregarding whether other
-- storages have tuples that fit given condition.
--
-- 2. If a replicaset may be deduced from conditions
-- (conditions -> sharding key -> bucket id -> replicaset),
-- fetch data only from the replicaset. It does not change
-- the result[^2], but significantly reduces network
-- pressure.
--
-- 3. Fallback to map-reduce otherwise.
--
-- [^1]: We can change meaning of this option in a future,
-- see gh-190. But now bucket_id points a storage
-- replicaset, not a virtual bucket.
--
-- [^2]: It is correct statement only if we'll turn a blind
-- eye to resharding. However, AFAIU, the optimization
-- does not make the result less consistent (sounds
-- weird, huh?).
local perform_map_reduce = opts.force_map_call == true or
(opts.bucket_id == nil and plan.sharding_key == nil)
if not perform_map_reduce then
local bucket_id = sharding.key_get_bucket_id(plan.sharding_key, opts.bucket_id)
assert(bucket_id ~= nil)

local err
replicasets_to_select, err = common.get_replicasets_by_sharding_key(bucket_id)
Expand Down
7 changes: 6 additions & 1 deletion crud/select/compat/select_old.lua
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,13 @@ local function build_select_iterator(space_name, user_conditions, opts)
-- set replicasets to select from
local replicasets_to_select = replicasets

if plan.sharding_key ~= nil and opts.force_map_call ~= true then
-- See explanation of this logic in
-- crud/select/compat/select.lua.
local perform_map_reduce = opts.force_map_call == true or
(opts.bucket_id == nil and plan.sharding_key == nil)
if not perform_map_reduce then
local bucket_id = sharding.key_get_bucket_id(plan.sharding_key, opts.bucket_id)
assert(bucket_id ~= nil)

local err
replicasets_to_select, err = common.get_replicasets_by_sharding_key(bucket_id)
Expand Down
70 changes: 70 additions & 0 deletions test/helper.lua
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,76 @@ function helpers.call_on_servers(cluster, aliases, func)
end
end

-- Call given function for each server with the 'crud-storage'
-- role.
--
-- 'func' accepts a server object, a replicaset config and all
-- arguments passed after 'func'.
--
-- Usage example:
--
-- | local res = {}
-- | helpers.call_on_storages(g.cluster, function(server, replicaset, res)
-- | local instance_res = server.net_box:call(<...>)
-- | res[replicaset.alias] = res[replicaset.alias] + instance_res
-- | end)
-- | t.assert_equals(res, {['s-1'] = 5, ['s-2'] = 6})
function helpers.call_on_storages(cluster, func, ...)
-- Accumulate storages into a map from the storage alias to
-- the replicaset object. Only storages, skip other instances.
--
-- Example:
--
-- | {
-- | ['s1-master'] = {
-- | alias = 's-1',
-- | roles = <...>,
-- | servers = {
-- | {
-- | alias = 's1-master',
-- | env = <...>,
-- | instance_uuid = <...>,
-- | },
-- | <...>
-- | },
-- | uuid = <...>,
-- | }
-- | ['s1-replica'] = <...>,
-- | ['s2-master'] = <...>,
-- | ['s2-replica'] = <...>,
-- | }
--
-- NB: The 'servers' field contains server configs. They are
-- not the same as server objects: say, there is no 'net_box'
-- field here.
local alias_map = {}
for _, replicaset in ipairs(cluster.replicasets) do
-- Whether it is a storage replicaset?
local has_crud_storage_role = false
for _, role in ipairs(replicaset.roles) do
if role == 'crud-storage' then
has_crud_storage_role = true
break
end
end

-- If so, add servers of the replicaset into the mapping.
if has_crud_storage_role then
for _, server in ipairs(replicaset.servers) do
alias_map[server.alias] = replicaset
end
end
end

-- Call given function for each storage node.
for _, server in ipairs(cluster.servers) do
local replicaset_alias = alias_map[server.alias]
if replicaset_alias ~= nil then
func(server, replicaset_alias, ...)
end
end
end

function helpers.assert_ge(actual, expected, message)
if not (actual >= expected) then
local err = string.format('expected: %s >= %s', actual, expected)
Expand Down
98 changes: 98 additions & 0 deletions test/helpers/storage_stat.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
local checks = require('checks')
local helpers = require('test.helper')

local storage_stat = {}

-- Wrap crud's select_on_storage() function to count selects
-- and add storage_stat() function that returns resulting
-- statistics.
--
-- Call it after crud's initialization.
function storage_stat.init_on_storage()
assert(_G._crud.select_on_storage ~= nil)

-- Here we count requests.
local storage_stat_table = {
select_requests = 0,
}

-- Wrap select_on_storage() function.
local select_on_storage_saved = _G._crud.select_on_storage
_G._crud.select_on_storage = function(...)
local requests = storage_stat_table.select_requests
storage_stat_table.select_requests = requests + 1
return select_on_storage_saved(...)
end

-- Accessor for the statistics.
rawset(_G, 'storage_stat', function()
return storage_stat_table
end)
end

-- Accumulate statistics from storages.
--
-- The statistics is grouped by replicasets.
--
-- Example of a return value:
--
-- | {
-- | ['s-1'] = {
-- | select_requests = 1,
-- | },
-- | ['s-2'] = {
-- | select_requests = 0,
-- | },
-- | }
function storage_stat.collect(cluster)
checks('table')

local res = {}

helpers.call_on_storages(cluster, function(server, replicaset)
checks('table', 'table')

-- Collect the statistics.
local storage_stat = server.net_box:call('storage_stat')

-- Initialize if needed.
if res[replicaset.alias] == nil then
res[replicaset.alias] = {}
end

-- Accumulate the collected statistics.
for key, val in pairs(storage_stat) do
local old = res[replicaset.alias][key] or 0
res[replicaset.alias][key] = old + val
end
end)

return res
end

-- Difference between 'a' and 'b' storage statistics.
--
-- The return value structure is the same as for
-- storage_stat.collect().
function storage_stat.diff(a, b)
checks('table', 'table')

local diff = table.deepcopy(a)

for replicaset_alias, stat_b in pairs(b) do
-- Initialize if needed.
if diff[replicaset_alias] == nil then
diff[replicaset_alias] = {}
end

-- Substract 'b' statistics from 'a'.
for key, val in pairs(stat_b) do
local old = diff[replicaset_alias][key] or 0
diff[replicaset_alias][key] = old - val
end
end

return diff
end

return storage_stat
90 changes: 89 additions & 1 deletion test/integration/pairs_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ local t = require('luatest')
local crud_utils = require('crud.common.utils')

local helpers = require('test.helper')
local storage_stat = require('test.helpers.storage_stat')

local pgroup = t.group('pairs', {
{engine = 'memtx'},
Expand All @@ -25,6 +26,13 @@ pgroup.before_all(function(g)
g.cluster:start()

g.space_format = g.cluster.servers[2].net_box.space.customers:format()

helpers.call_on_storages(g.cluster, function(server)
server.net_box:eval([[
local storage_stat = require('test.helpers.storage_stat')
storage_stat.init_on_storage()
]])
end)
end)

pgroup.after_all(function(g) helpers.stop_cluster(g.cluster) end)
Expand Down Expand Up @@ -758,15 +766,19 @@ end
pgroup.test_opts_not_damaged = function(g)
local customers = helpers.insert_objects(g, 'customers', {
{
-- bucket_id is 477, storage is s-2
id = 1, name = "Elizabeth", last_name = "Jackson",
age = 12, city = "Los Angeles",
}, {
-- bucket_id is 401, storage is s-2
id = 2, name = "Mary", last_name = "Brown",
age = 46, city = "London",
}, {
-- bucket_id is 2804, storage is s-1
id = 3, name = "David", last_name = "Smith",
age = 33, city = "Los Angeles",
}, {
-- bucket_id is 1161, storage is s-2
id = 4, name = "William", last_name = "White",
age = 46, city = "Chicago",
},
Expand All @@ -775,7 +787,6 @@ pgroup.test_opts_not_damaged = function(g)
table.sort(customers, function(obj1, obj2) return obj1.id < obj2.id end)

local expected_customers = {
{id = 3, name = "David", age = 33},
{id = 4, name = "William", age = 46},
}

Expand Down Expand Up @@ -805,3 +816,80 @@ pgroup.test_opts_not_damaged = function(g)
t.assert_equals(objects, expected_customers)
t.assert_equals(new_pairs_opts, pairs_opts)
end

-- gh-220: bucket_id argument is ignored when it cannot be deduced
-- from provided select/pairs conditions.
pgroup.test_pairs_no_map_reduce = function(g)
local customers = helpers.insert_objects(g, 'customers', {
{
-- bucket_id is 477, storage is s-2
id = 1, name = 'Elizabeth', last_name = 'Jackson',
age = 12, city = 'New York',
}, {
-- bucket_id is 401, storage is s-2
id = 2, name = 'Mary', last_name = 'Brown',
age = 46, city = 'Los Angeles',
}, {
-- bucket_id is 2804, storage is s-1
id = 3, name = 'David', last_name = 'Smith',
age = 33, city = 'Los Angeles',
}, {
-- bucket_id is 1161, storage is s-2
id = 4, name = 'William', last_name = 'White',
age = 81, city = 'Chicago',
},
})

table.sort(customers, function(obj1, obj2) return obj1.id < obj2.id end)

local stat_a = storage_stat.collect(g.cluster)

-- Case: no conditions, just bucket id.
local rows = g.cluster.main_server.net_box:eval([[
local crud = require('crud')

return crud.pairs(...):totable()
]], {
'customers',
nil,
{bucket_id = 2804, timeout = 1},
})
t.assert_equals(rows, {
{3, 2804, 'David', 'Smith', 33, 'Los Angeles'},
})

local stat_b = storage_stat.collect(g.cluster)
t.assert_equals(storage_stat.diff(stat_b, stat_a), {
['s-1'] = {
select_requests = 1,
},
['s-2'] = {
select_requests = 0,
},
})

-- Case: EQ on secondary index, which is not in the sharding
-- index (primary index in the case).
local rows = g.cluster.main_server.net_box:eval([[
local crud = require('crud')

return crud.pairs(...):totable()
]], {
'customers',
{{'==', 'age', 81}},
{bucket_id = 1161, timeout = 1},
})
t.assert_equals(rows, {
{4, 1161, 'William', 'White', 81, 'Chicago'},
})

local stat_c = storage_stat.collect(g.cluster)
t.assert_equals(storage_stat.diff(stat_c, stat_b), {
['s-1'] = {
select_requests = 0,
},
['s-2'] = {
select_requests = 1,
},
})
end
Loading