From 36d9ab24ea4ea2ca1a15a05c32e335df7ae30245 Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Tue, 26 Jul 2022 12:48:51 +0300 Subject: [PATCH] api: wait until the a space or an index is created After the patch an user can to start expirationd task disregarding whether a space (and a primary/requested index) is exists or not. expirationd does not start a worker fiber until the space or index appears. Also, after the patch expirationd stops all task fibers if a space or an index configuration error happens: * the space or the index disappears (drop/rename); * an unsupported type of the index; * an invalid iterator or a start key for the index; Closes #64 Closes #68 Closes #116 --- expirationd.lua | 159 +++++++++----- test/unit/custom_index_test.lua | 18 +- test/unit/space_index_test.lua | 369 ++++++++++++++++++++++++++++++++ 3 files changed, 479 insertions(+), 67 deletions(-) create mode 100644 test/unit/space_index_test.lua diff --git a/expirationd.lua b/expirationd.lua index 9e2227a1..72510aae 100644 --- a/expirationd.lua +++ b/expirationd.lua @@ -168,12 +168,73 @@ end -- ------------------------------------------------------------------------- -- -- Task fibers -- ------------------------------------------------------------------------- -- +local function check_space_and_index_exist(task) + local space = box.space[task.space_id] + if space == nil then + local prefix = "Space with " .. + (type(task.space_id) == "string" and "name " or "id ") + return false, prefix .. task.space_id .. " does not exist" + end + + local index = space.index[task.index] + if index == nil then + local prefix = "Index with " .. + (type(task.index) == "string" and "name " or "id ") + return false, prefix .. task.index .. " does not exist" + end + + return true +end + +local function check_space_and_index(task) + local ok, err = check_space_and_index_exist(task) + if not ok then + return false, err + end + + local space = box.space[task.space_id] + local index = space.index[task.index] + if index.type ~= "TREE" and index.type ~= "HASH" then + return false, "Not supported index type, expected TREE or HASH" + end + + if space.engine == "memtx" and index.func ~= nil then + local supported = false + local version = rawget(_G, "_TARANTOOL"):split('-', 1)[1] + local major_minor_patch = version:split('.', 2) + + local major = tonumber(major_minor_patch[1]) + local minor = tonumber(major_minor_patch[2]) + local patch = tonumber(major_minor_patch[3]) + -- https://github.com/tarantool/expirationd/issues/101 + -- fixed since 2.8.4 and 2.10 + if (major > 2) or (major == 2 and minor == 8 and patch >= 4) + or (major == 2 and minor >= 10) then + supported = true + end + local force_allow = task.force_allow_functional_index or false + if not supported and not force_allow then + return false, "Functional indices are not supported for" .. + " Tarantool < 2.8.4, see" .. + " options.force_allow_functional_index" + end + end + + local ok, err = pcall(function() + index:pairs(task.start_key(), {iterator = task.iterator_type}) + end) + if not ok then + return false, err + end + + return true, nil +end -- get all fields in key(composite possible) from a tuple -local function construct_key(space_id, index_id, tuple) +local function construct_key(space_id, index, tuple) return fun.map( function(x) return tuple[x.fieldno] end, - box.space[space_id].index[index_id].parts + box.space[space_id].index[index].parts ):totable() end @@ -195,7 +256,8 @@ end local function suspend(task) -- Return the number of tuples in the space - local space_len = task.index:len() + local index = box.space[task.space_id].index[task.index] + local space_len = index:len() if space_len > 0 then suspend_basic(task, space_len) end @@ -298,9 +360,21 @@ local function guardian_loop(task) -- detach the guardian from the creator and attach it to sched fiber.name(string.format("guardian of %q", task.name), { truncate = true }) + while true do + if check_space_and_index_exist(task) then + break + end + fiber.sleep(constants.check_interval) + end + while true do -- if fiber doesn't exist if get_fiber_id(task.worker_fiber) == 0 then + local ok, err = check_space_and_index(task) + if not ok then + log.info("expiration: stop task %q, reason: %s", task.name, err) + return + end -- create worker fiber task.worker_fiber = fiber.create(worker_loop, task) @@ -501,19 +575,21 @@ local function default_tuple_drop(space_id, args, tuple) end local function create_continue_key(tuple, old_parts, task) - if tuple == nil or #old_parts ~= #task.index.parts then + local index = box.space[task.space_id].index[task.index] + + if tuple == nil or #old_parts ~= #index.parts then return nil end for i, part in ipairs(old_parts) do for k, v in pairs(part) do - if task.index.parts[i][k] == nil or task.index.parts[i][k] ~= v then + if index.parts[i][k] == nil or index.parts[i][k] ~= v then return nil end end end - return construct_key(task.space_id, task.index.id, tuple) + return construct_key(task.space_id, task.index, tuple) end local continue_iterators_map = {} @@ -537,15 +613,16 @@ local function create_continue_state(task) local key = nil local it = nil local c = task_continue[task.name] - if c and c.space_id == task.space_id and c.index_id == task.index.id and c.it == task.iterator_type then + if c and c.space_id == task.space_id and c.index == task.index and c.it == task.iterator_type then key = create_continue_key(c.tuple, c.index_parts, task) it = continue_iterators_map[task.iterator_type] end + local index = box.space[task.space_id].index[task.index] task_continue[task.name] = { space_id = task.space_id, - index_id = task.index.id, - index_parts = table.deepcopy(task.index.parts), + index = task.index, + index_parts = table.deepcopy(index.parts), it = task.iterator_type, tuple = nil, } @@ -559,8 +636,9 @@ end -- default iterate_with function local function default_iterate_with(task) local continue_key, continue_it = create_continue_state(task) - local iter, param, state = task.index:pairs(continue_key or task.start_key(), - { iterator = continue_it or task.iterator_type }) + local index = box.space[task.space_id].index[task.index] + local iter, param, state = index:pairs(continue_key or task.start_key(), + { iterator = continue_it or task.iterator_type }) :take_while( function() return task:process_while() @@ -901,53 +979,17 @@ local function expirationd_run_task(name, space_id, is_tuple_expired, options) prev:kill(name) task_continue[name] = tmp end + + options = options or {} + local task = create_task(name) task.space_id = space_id - task.is_tuple_expired = is_tuple_expired + task.index = options.index or 0 + task.force_allow_functional_index = options.force_allow_functional_index - options = options or {} + task.is_tuple_expired = is_tuple_expired task.process_expired_tuple = options.process_expired_tuple or default_tuple_drop - -- validate index - local expire_index = box.space[space_id].index[0] - if options.index then - if box.space[space_id].index[options.index] == nil then - if type(options.index) == "string" then - error("Index with name " .. options.index .. " does not exist") - elseif type(options.index) == "number" then - error("Index with id " .. options.index .. " does not exist") - else - error("Invalid type of index, expected string or number") - end - end - expire_index = box.space[space_id].index[options.index] - if expire_index.type ~= "TREE" and expire_index.type ~= "HASH" then - error("Not supported index type, expected TREE or HASH") - end - local engine = box.space[space_id].engine - if engine == "memtx" and expire_index.func ~= nil then - local supported = false - local version = rawget(_G, "_TARANTOOL"):split('-', 1)[1] - local major_minor_patch = version:split('.', 2) - - local major = tonumber(major_minor_patch[1]) - local minor = tonumber(major_minor_patch[2]) - local patch = tonumber(major_minor_patch[3]) - -- https://github.com/tarantool/expirationd/issues/101 - -- fixed since 2.8.4 and 2.10 - if (major > 2) or (major == 2 and minor == 8 and patch >= 4) - or (major == 2 and minor >= 10) then - supported = true - end - local force_allow = options.force_allow_functional_index or false - if not supported and not force_allow then - error("Functional indices are not supported for" .. - " Tarantool < 2.8.4, see options.force_allow_functional_index") - end - end - end - task.index = expire_index - -- check iterator_type if options.iterator_type ~= nil then task.iterator_type = options.iterator_type @@ -962,8 +1004,15 @@ local function expirationd_run_task(name, space_id, is_tuple_expired, options) end end - -- check valid of iterator_type and start key - task.index:pairs( task.start_key(), { iterator = task.iterator_type }) + local ok, err = check_space_and_index_exist(task) + if ok then + local ok, err = check_space_and_index(task) + if not ok then + error(err) + end + else + log.warn("expiration: postpone a task " .. name .. ", reason: " .. err) + end -- check process_while if options.process_while ~= nil then diff --git a/test/unit/custom_index_test.lua b/test/unit/custom_index_test.lua index a316e7e9..9ddd7245 100644 --- a/test/unit/custom_index_test.lua +++ b/test/unit/custom_index_test.lua @@ -32,32 +32,25 @@ function g.test_passing(cg) local task = expirationd.start("clean_all", cg.space.id, helpers.is_expired_true) -- if we don't specify index, program should use primary index - t.assert_equals(task.index, cg.space.index[0]) + t.assert_equals(task.index, cg.space.index[0].id) task:kill() -- index by name task = expirationd.start("clean_all", cg.space.id, helpers.is_expired_true, {index = "index_for_first_name"}) - t.assert_equals(task.index, cg.space.index[1]) + t.assert_equals(task.index, cg.space.index[1].name) task:kill() -- index by id task = expirationd.start("clean_all", cg.space.id, helpers.is_expired_true, {index = 1}) - t.assert_equals(task.index, cg.space.index[1]) + t.assert_equals(task.index, cg.space.index[1].id) task:kill() end function g.test_tree_index_errors(cg) t.skip_if(cg.params.index_type ~= 'TREE', 'Unsupported index type') - -- errors - t.assert_error_msg_content_equals("Index with name not_exists_index does not exist", - expirationd.start, "clean_all", cg.space.id, helpers.is_expired_true, - {index = "not_exists_index"}) - t.assert_error_msg_content_equals("Index with id 10 does not exist", - expirationd.start, "clean_all", cg.space.id, helpers.is_expired_true, - {index = 10}) t.assert_error_msg_contains("bad argument options.index to nil (?number|string expected, got table)", expirationd.start, "clean_all", cg.space.id, helpers.is_expired_true, {index = { 10 }}) @@ -239,8 +232,9 @@ function g.test_memtx_tree_functional_index_force_broken(cg) -- The problem occurs when we iterate through a functional index and delete -- a current tuple. A possible solution is somehow to process tuples chunk -- by chunk using select calls instead of iterating with index:pairs(). - local select_with = function(task) - return pairs(task.index:select({}, {iterator = "ALL", limit = 100})) + local select_with = function() + local index = cg.space.index["functional_index"] + return pairs(index:select({}, {iterator = "ALL", limit = 100})) end local task = expirationd.start("clean_all", cg.space.id, helpers.is_expired_debug, diff --git a/test/unit/space_index_test.lua b/test/unit/space_index_test.lua new file mode 100644 index 00000000..ffa8634e --- /dev/null +++ b/test/unit/space_index_test.lua @@ -0,0 +1,369 @@ +local expirationd = require("expirationd") +local fiber = require("fiber") +local t = require("luatest") +local luatest_capture = require("luatest.capture") +local helpers = require("test.helper") +local g = t.group("expirationd_space_index") + +local double_worker_restart_time = 2 + +g.before_each(function(cg) + cg.space = helpers.create_space_with_tree_index("memtx") + cg.space:insert({1, "1"}) + -- kill live tasks (it can still live after failed tests) + for _, t in ipairs(expirationd.tasks()) do + expirationd.kill(t) + end + cg.case_space = nil +end) + +g.after_each(function(cg) + if box.space[cg.space.name] then + cg.space:drop() + end + if cg.case_space then + cg.case_space:drop() + cg.case_space = nil + end +end) + +-- in some cases we need to create an additional space +local function create_case_space(cg, space_name) + cg.case_space = box.schema.create_space(space_name) + cg.case_space:create_index("primary", {type = "TREE", parts = {{field = 1}}}) + cg.case_space:insert({2, "2"}) +end + +function g.test_start_on_existing_space_and_index(cg) + helpers.iteration_result = {} + local task = expirationd.start("clean_all", cg.space.id, helpers.is_expired_debug, + {index = "index_for_first_name"}) + helpers.retrying({}, function() + t.assert_equals({{1, "1"}}, helpers.iteration_result) + end) + t.assert_not_equals(task, nil) + t.assert_equals(task:statistics().restarts, 1) + task:kill() +end + +local non_existing_start_cases = { + non_existing_index_name = { + index = "non_existing_name", + msg = "expiration: postpone a task clean_all, reason: Index with name non_existing_name does not exist", + }, + non_existing_index_id = { + index = 67, + msg = "expiration: postpone a task clean_all, reason: Index with id 67 does not exist", + }, + non_existing_space_name = { + space = "non_existing_name", + index = 0, + msg = "expiration: postpone a task clean_all, reason: Space with name non_existing_name does not exist", + }, + non_existing_space_id = { + space = 337, + index = 0, + msg = "expiration: postpone a task clean_all, reason: Space with id 337 does not exist", + }, +} + +for name, case in pairs(non_existing_start_cases) do + g["test_start_" .. name] = function(cg) + local task + local capture = luatest_capture:new() + capture:wrap(true, function() + helpers.iteration_result = {} + local space = case.space or cg.space.id + task = expirationd.start("clean_all", space, helpers.is_expired_debug, + {index = case.index}) + end) + + t.assert_str_contains(capture:flush().stderr, case.msg) + + t.assert_not_equals(task, nil) + t.assert_not_equals(expirationd.task("clean_all"), nil) + t.assert_equals(#expirationd.tasks(), 1) + t.assert_equals(task:statistics().restarts, 0) + + task:kill() + + t.assert_equals(helpers.iteration_result, {}) + end +end + +function g.test_run_after_non_existing_index_created(cg) + local task_name = "clean_all" + local index_name = "non_existing_name" + + helpers.iteration_result = {} + local task = expirationd.start(task_name, cg.space.id, helpers.is_expired_debug, + {index = index_name}) + cg.space:create_index(index_name, {type = "TREE", parts = {{field = 2}}}) + + helpers.retrying({}, function() + t.assert_equals(helpers.iteration_result, {{1, "1"}}) + end) + t.assert_equals(#expirationd.tasks(), 1) + t.assert_not_equals(expirationd.task(task_name), nil) + t.assert_equals(task:statistics().restarts, 1) + + task:kill() +end + +function g.test_run_after_non_existing_space_created(cg) + local task_name = "clean_all" + local space_name = "tmp" + + helpers.iteration_result = {} + local task = expirationd.start(task_name, space_name, helpers.is_expired_debug) + + create_case_space(cg, space_name) + + helpers.retrying({}, function() + t.assert_equals(helpers.iteration_result, {{2, "2"}}) + end) + t.assert_equals(#expirationd.tasks(), 1) + t.assert_not_equals(expirationd.task(task_name), nil) + t.assert_equals(task:statistics().restarts, 1) + + task:kill() +end + +function g.test_run_after_non_existing_space_and_index_created(cg) + local task_name = "clean_all" + local space_name = "tmp" + local index_name = "non_primary" + + helpers.iteration_result = {} + local task = expirationd.start(task_name, space_name, helpers.is_expired_debug, + {index = index_name}) + + create_case_space(cg, space_name) + + fiber.sleep(double_worker_restart_time) + + t.assert_equals(helpers.iteration_result, {}) + t.assert_equals(#expirationd.tasks(), 1) + t.assert_not_equals(expirationd.task(task_name), nil) + t.assert_equals(task:statistics().restarts, 0) + + g.case_space:create_index(index_name, {type = "TREE", parts = {{field = 1}}}) + + helpers.retrying({}, function() + t.assert_equals(helpers.iteration_result, {{2, "2"}}) + end) + t.assert_equals(#expirationd.tasks(), 1) + t.assert_not_equals(expirationd.task(task_name), nil) + t.assert_equals(task:statistics().restarts, 1) + + task:kill() +end + +local rename_cases = { + index_rename = { + fun = function(space, _) + space:rename("XXX") + end + }, + space_rename = { + fun = function(_, index) + index:rename("XXX") + end + }, +} + +for name, case in pairs(rename_cases) do + g["test_stop_after_" .. name .. "_if_name_used"] = function(cg) + local task_name = "clean_all" + local space_name = cg.space.name + local index_name = "index_for_first_name" + + helpers.iteration_result = {} + local task = expirationd.start(task_name, space_name, helpers.is_expired_debug, + {index = index_name}) + helpers.retrying({}, function() + t.assert_equals(helpers.iteration_result, {{1, "1"}}) + end) + helpers.iteration_result = {} + + local capture = luatest_capture:new() + capture:wrap(true, function() + local space = box.space[space_name] + case.fun(space, space.index[index_name]) + space:insert({1, "1"}) + + local stderr = "" + helpers.retrying({}, function() + stderr = stderr .. capture:flush().stderr + t.assert_str_contains(stderr, "expiration: stop task") + end) + end) + + fiber.sleep(double_worker_restart_time) + + t.assert_equals(helpers.iteration_result, {}) + t.assert_equals(#expirationd.tasks(), 1) + t.assert_not_equals(expirationd.task(task_name), nil) + t.assert_equals(task:statistics().restarts, 1) + + task:kill() + end + g["test_not_stop_after_" .. name .. "_if_id_used"] = function(cg) + local task_name = "clean_all" + local space_id = cg.space.id + local index_id = 1 + + helpers.iteration_result = {} + local task = expirationd.start(task_name, space_id, helpers.is_expired_debug, + {index = index_id}) + helpers.retrying({}, function() + t.assert_equals(helpers.iteration_result, {{1, "1"}}) + end) + helpers.iteration_result = {} + + local space = box.space[space_id] + case.fun(space, space.index[index_id]) + space:insert({1, "1"}) + + helpers.retrying({}, function() + t.assert_equals(helpers.iteration_result, {{1, "1"}}) + end) + t.assert_equals(#expirationd.tasks(), 1) + t.assert_not_equals(expirationd.task(task_name), nil) + t.assert_equals(task:statistics().restarts, 1) + + task:kill() + end +end + +local drop_cases = { + index_drop = { + fun = function(_, index) + index:drop() + end, + }, + space_drop = { + fun = function(space, _) + space:drop() + end, + } +} + +for name, case in pairs(drop_cases) do + g["test_stop_after_" .. name] = function(cg) + local task_name = "clean_all" + local index_id = 1 + + helpers.iteration_result = {} + local task = expirationd.start(task_name, cg.space.id, helpers.is_expired_debug, + {index = index_id}) + helpers.retrying({}, function() + t.assert_equals(helpers.iteration_result, {{1, "1"}}) + end) + helpers.iteration_result = {} + + local capture = luatest_capture:new() + capture:wrap(true, function() + case.fun(cg.space, cg.space.index[index_id]) + + local stderr = "" + helpers.retrying({}, function() + stderr = stderr .. capture:flush().stderr + t.assert_str_contains(stderr, "expiration: stop task") + end) + end) + + fiber.sleep(double_worker_restart_time) + + t.assert_equals(helpers.iteration_result, {}) + t.assert_equals(#expirationd.tasks(), 1) + t.assert_not_equals(expirationd.task(task_name), nil) + t.assert_equals(task:statistics().restarts, 1) + + task:kill() + end +end + +function g.test_stop_after_drop_stop_and_recreate(cg) + local space_name = "tmp" + local task_name = "clean_all" + + create_case_space(cg, space_name) + + helpers.iteration_result = {} + local task = expirationd.start(task_name, space_name, helpers.is_expired_debug) + helpers.retrying({}, function() + t.assert_equals(helpers.iteration_result, {{2, "2"}}) + end) + + local capture = luatest_capture:new() + capture:wrap(true, function() + cg.case_space:drop() + local stderr = "" + helpers.retrying({}, function() + stderr = stderr .. capture:flush().stderr + t.assert_str_contains(stderr, "expiration: stop task") + end) + end) + + helpers.iteration_result = {} + create_case_space(cg, space_name) + + fiber.sleep(double_worker_restart_time) + + t.assert_equals(helpers.iteration_result, {}) + t.assert_equals(#expirationd.tasks(), 1) + t.assert_not_equals(expirationd.task(task_name), nil) + t.assert_equals(task:statistics().restarts, 1) + + task:kill() +end + +function g.test_not_stop_after_drop_and_recreate(cg) + local space_name = "tmp" + local task_name = "clean_all" + + create_case_space(cg, space_name) + + helpers.iteration_result = {} + local task = expirationd.start(task_name, space_name, helpers.is_expired_debug) + helpers.retrying({}, function() + t.assert_equals(helpers.iteration_result, {{2, "2"}}) + end) + + helpers.iteration_result = {} + cg.case_space:drop() + create_case_space(cg, space_name) + + helpers.retrying({}, function() + t.assert_equals(helpers.iteration_result, {{2, "2"}}) + end) + t.assert_equals(#expirationd.tasks(), 1) + t.assert_not_equals(expirationd.task(task_name), nil) + t.assert_equals(task:statistics().restarts, 1) + + task:kill() +end + +function g.test_not_stop_after_truncate(cg) + local task_name = "clean_all" + + helpers.iteration_result = {} + local task = expirationd.start(task_name, cg.space.id, helpers.is_expired_debug) + helpers.retrying({}, function() + t.assert_equals(helpers.iteration_result, {{1, "1"}}) + end) + + helpers.iteration_result = {} + cg.space:truncate() + cg.space:insert({1, "1"}) + + helpers.retrying({}, function() + t.assert_equals(helpers.iteration_result, {{1, "1"}}) + end) + t.assert_equals(#expirationd.tasks(), 1) + t.assert_not_equals(expirationd.task(task_name), nil) + t.assert_equals(task:statistics().restarts, 1) + + task:kill() +end