Skip to content

Commit

Permalink
api: wait until the a space or an index is created
Browse files Browse the repository at this point in the history
After the patch an user can to start expirationd task disregarding
whether a space (and a primary/requested index) is exists or not.
expirationd does not start a worker fiber until the space or index
appears.

Also, after the patch expirationd stops all task fibers if a space
or an index configuration error happens:

* the space or the index disappears (drop/rename);
* an unsupported type of the index;
* an invalid iterator or a start key for the index;

Closes #64
Closes #68
Closes #116
  • Loading branch information
oleg-jukovec committed Jul 29, 2022
1 parent bd20ded commit 36d9ab2
Show file tree
Hide file tree
Showing 3 changed files with 479 additions and 67 deletions.
159 changes: 104 additions & 55 deletions expirationd.lua
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,73 @@ end
-- ------------------------------------------------------------------------- --
-- Task fibers
-- ------------------------------------------------------------------------- --
local function check_space_and_index_exist(task)
local space = box.space[task.space_id]
if space == nil then
local prefix = "Space with " ..
(type(task.space_id) == "string" and "name " or "id ")
return false, prefix .. task.space_id .. " does not exist"
end

local index = space.index[task.index]
if index == nil then
local prefix = "Index with " ..
(type(task.index) == "string" and "name " or "id ")
return false, prefix .. task.index .. " does not exist"
end

return true
end

local function check_space_and_index(task)
local ok, err = check_space_and_index_exist(task)
if not ok then
return false, err
end

local space = box.space[task.space_id]
local index = space.index[task.index]
if index.type ~= "TREE" and index.type ~= "HASH" then
return false, "Not supported index type, expected TREE or HASH"
end

if space.engine == "memtx" and index.func ~= nil then
local supported = false
local version = rawget(_G, "_TARANTOOL"):split('-', 1)[1]
local major_minor_patch = version:split('.', 2)

local major = tonumber(major_minor_patch[1])
local minor = tonumber(major_minor_patch[2])
local patch = tonumber(major_minor_patch[3])
-- https://github.com/tarantool/expirationd/issues/101
-- fixed since 2.8.4 and 2.10
if (major > 2) or (major == 2 and minor == 8 and patch >= 4)
or (major == 2 and minor >= 10) then
supported = true
end
local force_allow = task.force_allow_functional_index or false
if not supported and not force_allow then
return false, "Functional indices are not supported for" ..
" Tarantool < 2.8.4, see" ..
" options.force_allow_functional_index"
end
end

local ok, err = pcall(function()
index:pairs(task.start_key(), {iterator = task.iterator_type})
end)
if not ok then
return false, err
end

return true, nil
end

-- get all fields in key(composite possible) from a tuple
local function construct_key(space_id, index_id, tuple)
local function construct_key(space_id, index, tuple)
return fun.map(
function(x) return tuple[x.fieldno] end,
box.space[space_id].index[index_id].parts
box.space[space_id].index[index].parts
):totable()
end

Expand All @@ -195,7 +256,8 @@ end

local function suspend(task)
-- Return the number of tuples in the space
local space_len = task.index:len()
local index = box.space[task.space_id].index[task.index]
local space_len = index:len()
if space_len > 0 then
suspend_basic(task, space_len)
end
Expand Down Expand Up @@ -298,9 +360,21 @@ local function guardian_loop(task)
-- detach the guardian from the creator and attach it to sched
fiber.name(string.format("guardian of %q", task.name), { truncate = true })

while true do
if check_space_and_index_exist(task) then
break
end
fiber.sleep(constants.check_interval)
end

while true do
-- if fiber doesn't exist
if get_fiber_id(task.worker_fiber) == 0 then
local ok, err = check_space_and_index(task)
if not ok then
log.info("expiration: stop task %q, reason: %s", task.name, err)
return
end
-- create worker fiber
task.worker_fiber = fiber.create(worker_loop, task)

Expand Down Expand Up @@ -501,19 +575,21 @@ local function default_tuple_drop(space_id, args, tuple)
end

local function create_continue_key(tuple, old_parts, task)
if tuple == nil or #old_parts ~= #task.index.parts then
local index = box.space[task.space_id].index[task.index]

if tuple == nil or #old_parts ~= #index.parts then
return nil
end

for i, part in ipairs(old_parts) do
for k, v in pairs(part) do
if task.index.parts[i][k] == nil or task.index.parts[i][k] ~= v then
if index.parts[i][k] == nil or index.parts[i][k] ~= v then
return nil
end
end
end

return construct_key(task.space_id, task.index.id, tuple)
return construct_key(task.space_id, task.index, tuple)
end

local continue_iterators_map = {}
Expand All @@ -537,15 +613,16 @@ local function create_continue_state(task)
local key = nil
local it = nil
local c = task_continue[task.name]
if c and c.space_id == task.space_id and c.index_id == task.index.id and c.it == task.iterator_type then
if c and c.space_id == task.space_id and c.index == task.index and c.it == task.iterator_type then
key = create_continue_key(c.tuple, c.index_parts, task)
it = continue_iterators_map[task.iterator_type]
end

local index = box.space[task.space_id].index[task.index]
task_continue[task.name] = {
space_id = task.space_id,
index_id = task.index.id,
index_parts = table.deepcopy(task.index.parts),
index = task.index,
index_parts = table.deepcopy(index.parts),
it = task.iterator_type,
tuple = nil,
}
Expand All @@ -559,8 +636,9 @@ end
-- default iterate_with function
local function default_iterate_with(task)
local continue_key, continue_it = create_continue_state(task)
local iter, param, state = task.index:pairs(continue_key or task.start_key(),
{ iterator = continue_it or task.iterator_type })
local index = box.space[task.space_id].index[task.index]
local iter, param, state = index:pairs(continue_key or task.start_key(),
{ iterator = continue_it or task.iterator_type })
:take_while(
function()
return task:process_while()
Expand Down Expand Up @@ -901,53 +979,17 @@ local function expirationd_run_task(name, space_id, is_tuple_expired, options)
prev:kill(name)
task_continue[name] = tmp
end

options = options or {}

local task = create_task(name)
task.space_id = space_id
task.is_tuple_expired = is_tuple_expired
task.index = options.index or 0
task.force_allow_functional_index = options.force_allow_functional_index

options = options or {}
task.is_tuple_expired = is_tuple_expired
task.process_expired_tuple = options.process_expired_tuple or default_tuple_drop

-- validate index
local expire_index = box.space[space_id].index[0]
if options.index then
if box.space[space_id].index[options.index] == nil then
if type(options.index) == "string" then
error("Index with name " .. options.index .. " does not exist")
elseif type(options.index) == "number" then
error("Index with id " .. options.index .. " does not exist")
else
error("Invalid type of index, expected string or number")
end
end
expire_index = box.space[space_id].index[options.index]
if expire_index.type ~= "TREE" and expire_index.type ~= "HASH" then
error("Not supported index type, expected TREE or HASH")
end
local engine = box.space[space_id].engine
if engine == "memtx" and expire_index.func ~= nil then
local supported = false
local version = rawget(_G, "_TARANTOOL"):split('-', 1)[1]
local major_minor_patch = version:split('.', 2)

local major = tonumber(major_minor_patch[1])
local minor = tonumber(major_minor_patch[2])
local patch = tonumber(major_minor_patch[3])
-- https://github.com/tarantool/expirationd/issues/101
-- fixed since 2.8.4 and 2.10
if (major > 2) or (major == 2 and minor == 8 and patch >= 4)
or (major == 2 and minor >= 10) then
supported = true
end
local force_allow = options.force_allow_functional_index or false
if not supported and not force_allow then
error("Functional indices are not supported for" ..
" Tarantool < 2.8.4, see options.force_allow_functional_index")
end
end
end
task.index = expire_index

-- check iterator_type
if options.iterator_type ~= nil then
task.iterator_type = options.iterator_type
Expand All @@ -962,8 +1004,15 @@ local function expirationd_run_task(name, space_id, is_tuple_expired, options)
end
end

-- check valid of iterator_type and start key
task.index:pairs( task.start_key(), { iterator = task.iterator_type })
local ok, err = check_space_and_index_exist(task)
if ok then
local ok, err = check_space_and_index(task)
if not ok then
error(err)
end
else
log.warn("expiration: postpone a task " .. name .. ", reason: " .. err)
end

-- check process_while
if options.process_while ~= nil then
Expand Down
18 changes: 6 additions & 12 deletions test/unit/custom_index_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -32,32 +32,25 @@ function g.test_passing(cg)

local task = expirationd.start("clean_all", cg.space.id, helpers.is_expired_true)
-- if we don't specify index, program should use primary index
t.assert_equals(task.index, cg.space.index[0])
t.assert_equals(task.index, cg.space.index[0].id)
task:kill()

-- index by name
task = expirationd.start("clean_all", cg.space.id, helpers.is_expired_true,
{index = "index_for_first_name"})
t.assert_equals(task.index, cg.space.index[1])
t.assert_equals(task.index, cg.space.index[1].name)
task:kill()

-- index by id
task = expirationd.start("clean_all", cg.space.id, helpers.is_expired_true,
{index = 1})
t.assert_equals(task.index, cg.space.index[1])
t.assert_equals(task.index, cg.space.index[1].id)
task:kill()
end

function g.test_tree_index_errors(cg)
t.skip_if(cg.params.index_type ~= 'TREE', 'Unsupported index type')

-- errors
t.assert_error_msg_content_equals("Index with name not_exists_index does not exist",
expirationd.start, "clean_all", cg.space.id, helpers.is_expired_true,
{index = "not_exists_index"})
t.assert_error_msg_content_equals("Index with id 10 does not exist",
expirationd.start, "clean_all", cg.space.id, helpers.is_expired_true,
{index = 10})
t.assert_error_msg_contains("bad argument options.index to nil (?number|string expected, got table)",
expirationd.start, "clean_all", cg.space.id, helpers.is_expired_true,
{index = { 10 }})
Expand Down Expand Up @@ -239,8 +232,9 @@ function g.test_memtx_tree_functional_index_force_broken(cg)
-- The problem occurs when we iterate through a functional index and delete
-- a current tuple. A possible solution is somehow to process tuples chunk
-- by chunk using select calls instead of iterating with index:pairs().
local select_with = function(task)
return pairs(task.index:select({}, {iterator = "ALL", limit = 100}))
local select_with = function()
local index = cg.space.index["functional_index"]
return pairs(index:select({}, {iterator = "ALL", limit = 100}))
end

local task = expirationd.start("clean_all", cg.space.id, helpers.is_expired_debug,
Expand Down
Loading

0 comments on commit 36d9ab2

Please sign in to comment.