Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 51 additions & 37 deletions crud/select.lua
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ local select_conditions = require('crud.select.conditions')
local select_plan = require('crud.select.plan')
local select_executor = require('crud.select.executor')
local select_comparators = require('crud.select.comparators')
local select_filters = require('crud.select.filters')

local Iterator = require('crud.select.iterator')

Expand All @@ -22,29 +23,42 @@ local select_module = {}

local SELECT_FUNC_NAME = '__select'

local function call_select_on_storage(space_name, conditions, opts)
checks('string', '?table', {
limit = '?number',
local DEFAULT_BATCH_SIZE = 100

local function call_select_on_storage(space_name, index_id, conditions, opts)
checks('string', 'number', '?table', {
scan_value = 'table',
after_tuple = '?table',
iter = 'number',
limit = 'number',
scan_condition_num = '?number',
})

local space = box.space[space_name]
if space == nil then
return nil, SelectError:new("Space %s doesn't exists", space_name)
end

-- plan select
local plan, err = select_plan.new(space, conditions, {
limit = opts.limit,
after_tuple = opts.after_tuple,
})
local index = space.index[index_id]
if index == nil then
return nil, SelectError:new("Index with ID %s doesn't exists", index_id)
end

local filter_func, err = select_filters.gen_func(space, conditions, {
iter = opts.iter,
scan_condition_num = opts.scan_condition_num,
})
if err ~= nil then
return nil, SelectError:new("Failed to plan select: %s", err)
return nil, SelectError:new("Failed to generate tuples filter: %s", err)
end

-- execute select
local tuples, err = select_executor.execute(plan)
local tuples, err = select_executor.execute(space, index, filter_func, {
scan_value = opts.scan_value,
after_tuple = opts.after_tuple,
iter = opts.iter,
limit = opts.limit,
})
if err ~= nil then
return nil, SelectError:new("Failed to execute select: %s", err)
end
Expand All @@ -58,21 +72,26 @@ function select_module.init()
})
end

local function select_iteration(space_name, conditions, opts)
local function select_iteration(space_name, plan, opts)
checks('string', '?table', {
after_tuple = '?table',
replicasets = 'table',
timeout = '?number',
batch_size = '?number',
limit = 'number',
})

-- call select on storages
local storage_select_opts = {
scan_value = plan.scan_value,
after_tuple = opts.after_tuple,
limit = opts.batch_size,
iter = plan.iter,
limit = opts.limit,
scan_condition_num = plan.scan_condition_num,
}

local storage_select_args = {space_name, conditions, storage_select_opts}
local storage_select_args = {
space_name, plan.index_id, plan.conditions, storage_select_opts,
}

local results, err = call.ro(SELECT_FUNC_NAME, storage_select_args, {
replicasets = opts.replicasets,
Expand All @@ -86,14 +105,8 @@ local function select_iteration(space_name, conditions, opts)
return results
end

local function get_replicasets_to_select_from(plan, all_replicasets)
if not plan.is_scan_by_full_sharding_key_eq then
return all_replicasets
end

plan.scanner.limit = 1

local bucket_id = vshard.router.bucket_id_strcrc32(plan.scanner.value)
local function get_replicasets_by_sharding_key(sharding_key)
local bucket_id = vshard.router.bucket_id_strcrc32(sharding_key)
local replicaset, err = vshard.router.route(bucket_id)
if replicaset == nil then
return nil, GetReplicasetsError:new("Failed to get replicaset for bucket_id %s: %s", bucket_id, err.err)
Expand All @@ -118,6 +131,8 @@ local function build_select_iterator(space_name, user_conditions, opts)
return nil, SelectError:new("batch_size should be > 0")
end

local batch_size = opts.batch_size or DEFAULT_BATCH_SIZE

if opts.limit ~= nil and opts.limit < 0 then
return nil, SelectError:new("limit should be >= 0")
end
Expand All @@ -137,32 +152,32 @@ local function build_select_iterator(space_name, user_conditions, opts)
if space == nil then
return nil, SelectError:new("Space %s doesn't exists", space_name)
end

local space_format = space:format()

local after_tuple = utils.flatten(opts.after, space_format)

-- plan select
local plan, err = select_plan.new(space, conditions, {
limit = opts.limit,
after_tuple = after_tuple
})

if err ~= nil then
return nil, SelectError:new("Failed to plan select: %s", err)
end

-- get replicasets to select from
local replicasets, err = get_replicasets_to_select_from(plan, replicasets)
if err ~= nil then
return nil, SelectError:new("Failed to get replicasets to select from: %s", err)
-- set limit and replicasets to select from
local replicasets_to_select = replicasets

if plan.sharding_key ~= nil then
replicasets_to_select = get_replicasets_by_sharding_key(plan.sharding_key)
end

local scan_index = space.index[plan.scanner.index_id]
local primary_index = space.index[0]
-- set after tuple
local after_tuple = utils.flatten(opts.after, space_format)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure it's a good place for unflatten...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we should pass raw tuple to this function as option...

But ok, feel free to ignore this comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since crud returns map, user will use it as an after value for selecting next batch of data.


-- generate tuples comparator
local scan_index = space.index[plan.index_id]
local primary_index = space.index[0]
local cmp_key_parts = utils.merge_primary_key_parts(scan_index.parts, primary_index.parts)
local cmp_operator = select_comparators.get_cmp_operator(plan.scanner.iter)
local cmp_operator = select_comparators.get_cmp_operator(plan.iter)
local tuples_comparator, err = select_comparators.gen_tuples_comparator(
cmp_operator, cmp_key_parts
)
Expand All @@ -176,12 +191,11 @@ local function build_select_iterator(space_name, user_conditions, opts)
iteration_func = select_iteration,
comparator = tuples_comparator,

conditions = conditions,
plan = plan,
after_tuple = after_tuple,
limit = plan.scanner.limit,

batch_size = opts.batch_size,
replicasets = replicasets,
batch_size = batch_size,
replicasets = replicasets_to_select,

timeout = opts.timeout,
})
Expand Down
62 changes: 31 additions & 31 deletions crud/select/executor.lua
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
local checks = require('checks')
local errors = require('errors')
local log = require('log')

local select_filters = require('crud.select.filters')
local select_comparators = require('crud.select.comparators')

local utils = require('crud.common.utils')
Expand All @@ -11,72 +11,72 @@ local ExecuteSelectError = errors.new_class('ExecuteSelectError')

local executor = {}

local function scroll_to_after_tuple(gen, param, state, space, scanner)
local scan_index = space.index[scanner.index_id]
local function scroll_to_after_tuple(gen, space, scan_index, iter, after_tuple)
local primary_index = space.index[0]

local scroll_key_parts = utils.merge_primary_key_parts(scan_index.parts, primary_index.parts)

local cmp_operator = select_comparators.get_cmp_operator(scanner.iter)
local cmp_operator = select_comparators.get_cmp_operator(iter)
local scroll_comparator, err = select_comparators.gen_tuples_comparator(cmp_operator, scroll_key_parts)
if err ~= nil then
return nil, ScrollToAfterError:new("Failed to generate comparator to scroll: %s", err)
end

while true do
local tuple
state, tuple = gen(param, state)
gen.state, tuple = gen(gen.param, gen.state)

if tuple == nil then
return nil
end

if scroll_comparator(tuple, scanner.after_tuple) then
if scroll_comparator(tuple, after_tuple) then
return tuple
end
end
end

function executor.execute(plan)
local scanner = plan.scanner
function executor.execute(space, index, filter_func, opts)
checks('table', 'table', 'function', {
scan_value = 'table',
after_tuple = '?cdata|table',
iter = 'number',
limit = '?number',
})

if scanner.limit == 0 then
opts = opts or {}

if opts.limit == 0 then
return {}
end

local filter = select_filters.gen_code(plan.filter_conditions)
local filer_func = select_filters.compile(filter)

local tuples = {}
local tuples_count = 0

local space = box.space[scanner.space_name]
local index = space.index[scanner.index_id]

local scan_value = scanner.value
if scanner.after_tuple ~= nil then
if scan_value == nil then
scan_value = scanner.after_tuple
local value = opts.scan_value
if opts.after_tuple ~= nil then
if value == nil then
value = opts.after_tuple
else
local cmp_operator = select_comparators.get_cmp_operator(scanner.iter)
local cmp_operator = select_comparators.get_cmp_operator(opts.iter)
local scan_comparator, err = select_comparators.gen_tuples_comparator(cmp_operator, index.parts)
if err ~= nil then
log.warn("Failed to generate comparator for scan value: %s", err)
elseif scan_comparator(scanner.after_tuple, scan_value) then
local after_tuple_key = utils.extract_key(scanner.after_tuple, index.parts)
scan_value = after_tuple_key
elseif scan_comparator(opts.after_tuple, opts.scan_value) then
local after_tuple_key = utils.extract_key(opts.after_tuple, index.parts)
value = after_tuple_key
end
end
end

local tuple
local gen,param,state = index:pairs(scan_value, {iterator = scanner.iter})
local gen = index:pairs(value, {iterator = opts.iter})

if scanner.after_tuple ~= nil then
if opts.after_tuple ~= nil then
local err
tuple, err = scroll_to_after_tuple(gen, param, state, space, scanner)
tuple, err = scroll_to_after_tuple(gen, space, index, opts.iter, opts.after_tuple)
if err ~= nil then
return nil, ExecuteSelectError:new("Failed to scroll to the last tuple: %s", err)
return nil, ExecuteSelectError:new("Failed to scroll to the after_tuple: %s", err)
end

if tuple == nil then
Expand All @@ -85,28 +85,28 @@ function executor.execute(plan)
end

if tuple == nil then
state, tuple = gen(param, state)
gen.state, tuple = gen(gen.param, gen.state)
end

while true do
if tuple == nil then
break
end

local matched, early_exit = filer_func(tuple)
local matched, early_exit = filter_func(tuple)

if matched then
table.insert(tuples, tuple)
tuples_count = tuples_count + 1

if scanner.limit ~= nil and tuples_count >= scanner.limit then
if opts.limit ~= nil and tuples_count >= opts.limit then
break
end
elseif early_exit then
break
end

state, tuple = gen(param, state)
gen.state, tuple = gen(gen.param, gen.state)
end

return tuples
Expand Down
Loading