Skip to content

Commit

Permalink
api: continue a task from a last tuple
Browse files Browse the repository at this point in the history
After the patch a task continues processing from a last tuple at
startup unless it was killed using `task:kill()` or
`expirationd.kill()`. We cannot provide a safe behavior for all user
cases, so it works only with default `start_key` and `iterate_with`
callbacks.

The behavior supports a cartridge hot-reload.

Closes #54
  • Loading branch information
oleg-jukovec committed Jul 22, 2022
1 parent cc8c16a commit 718bdf2
Show file tree
Hide file tree
Showing 3 changed files with 453 additions and 20 deletions.
130 changes: 112 additions & 18 deletions expirationd.lua
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ end
local stash_names = {
cfg = '__expirationd_cfg',
metrics_stats = '__expirationd_metrics_stats',
task_continue = '__expirationd_task_continue',
}

if is_hotreload_package then
Expand All @@ -41,6 +42,7 @@ local function stash_get(name)
end

local task_list = {}
local task_continue = stash_get(stash_names.task_continue)
local cfg = stash_get(stash_names.cfg)
if cfg.metrics == nil then
cfg.metrics = true
Expand Down Expand Up @@ -90,10 +92,9 @@ local function metrics_enable()
-- Workaround for a cartridge role reload:
--
-- Metrics package does not lose observation after a cartridge reload since
-- 0.13.0. expirationd does not yet support the cartridge reload (it
-- requires saving and restarting tasks at least). So, we are acting here
-- as if all expirationd tasks have been killed: reset all collectors and
-- the callback.
-- 0.13.0. expirationd does not automatically restart tasks after the
-- cartridge reload. So, we are acting here as if all expirationd tasks
-- have been killed: reset all collectors and the callback.
if metrics_stats.callback then
metrics.unregister_callback(metrics_stats.callback)
end
Expand Down Expand Up @@ -168,11 +169,11 @@ end
-- Task fibers
-- ------------------------------------------------------------------------- --

-- get all fields in primary key(composite possible) from tuple
local function construct_key(space_id, tuple)
-- get all fields in key(composite possible) from a tuple
local function construct_key(space_id, index_id, tuple)
return fun.map(
function(x) return tuple[x.fieldno] end,
box.space[space_id].index[0].parts
box.space[space_id].index[index_id].parts
):totable()
end

Expand Down Expand Up @@ -326,7 +327,9 @@ end
-- @section Methods
--
local Task_methods = {
--- Start a task.
--- Start a task. It continues processing from a last tuple if the task
-- was previously stopped by @{task.stop} and the task has default
-- `start_key` and `iterate_with` functions.
--
-- @param self
-- Task instance.
Expand Down Expand Up @@ -377,6 +380,7 @@ local Task_methods = {
-- @function task.restart
restart = function (self)
self:stop()
task_continue[self.name] = nil
self:start()
end,

Expand All @@ -399,6 +403,7 @@ local Task_methods = {
end
end
task_list[self.name] = nil
task_continue[self.name] = nil
end,

--- Get a statistics about a task.
Expand Down Expand Up @@ -492,20 +497,92 @@ end
-- default process_expired_tuple function
-- luacheck: ignore unused args
local function default_tuple_drop(space_id, args, tuple)
box.space[space_id]:delete(construct_key(space_id, tuple))
box.space[space_id]:delete(construct_key(space_id, 0, tuple))
end

local function create_continue_key(tuple, old_parts, task)
if tuple == nil or #old_parts ~= #task.index.parts then
return nil
end

for i, part in ipairs(old_parts) do
for k, v in pairs(part) do
if task.index.parts[i][k] == nil or task.index.parts[i][k] ~= v then
return nil
end
end
end

return construct_key(task.space_id, task.index.id, tuple)
end

local continue_iterators_map = {}
for _, ge in ipairs({box.index.ALL, box.index.GE, box.index.GT, box.index.EQ,
"ALL", "GE", "GT", "EQ"}) do
-- default start_key() == nil, so it's ok that EQ -> GE because the
-- continue logic does not work with non-default start_key callback
continue_iterators_map[ge] = box.index.GE
end
for _, le in ipairs({box.index.LE, box.index.LT, box.index.REQ,
"LE", "LT", "REQ"}) do
continue_iterators_map[le] = box.index.LE
end

local function create_continue_state(task)
if task.start_key ~= constants.start_key then
task_continue[task.name] = nil
return nil, nil
end

local key = nil
local it = nil
local c = task_continue[task.name]
if c and c.space_id == task.space_id and c.index_id == task.index.id and c.it == task.iterator_type then
key = create_continue_key(c.tuple, c.index_parts, task)
it = continue_iterators_map[task.iterator_type]
end

task_continue[task.name] = {
space_id = task.space_id,
index_id = task.index.id,
index_parts = table.deepcopy(task.index.parts),
it = task.iterator_type,
tuple = nil,
}

if key == nil or it == nil then
return nil, nil
end
return key, it
end

-- default iterate_with function
local function default_iterate_with(task)
return task.index:pairs(task.start_key(), { iterator = task.iterator_type })
local continue_key, continue_it = create_continue_state(task)
local iter, param, state = task.index:pairs(continue_key or task.start_key(),
{ iterator = continue_it or task.iterator_type })
:take_while(
function()
return task:process_while()
end
)

if task_continue[task.name] then
return function(p, s)
local i, tuple = iter(p, s)
if tuple then
task_continue[task.name].tuple = tuple
else
task_continue[task.name] = nil
end
return i, tuple
end, param, state
else
return iter, param, state
end
end


-- ========================================================================= --
-- Expiration daemon management functions
-- ========================================================================= --
Expand Down Expand Up @@ -594,6 +671,12 @@ end
-- NOTE: By default expirationd does not start tasks on an instance with
-- configured upstreams, see the `force` option.
--
-- NOTE: By default expirationd continues processing from a last processed
-- tuple if a task with same name has not been killed properly with
-- @{task.kill} or @{kill}. This behavior only works with default functions
-- `start_key` and `iterate_with`. You need to set at least one function if
-- another behavior is needed, see `start_key` and `iterate_with` options.
--
-- @string name
-- Task name.
-- @string space_id
Expand Down Expand Up @@ -656,14 +739,18 @@ end
-- [index_object:pairs()][1], where `index` is a primary index or index
-- that specified with argument `options.index`:
--
-- Example of function:
--
-- ```
-- index:pairs(option.start_key(), {
-- iterator = option.iterator_type
-- }):take_while(
-- function()
-- return option.process_while()
-- end
-- )
-- local function iterate_with()
-- index:pairs(option.start_key(), {
-- iterator = option.iterator_type
-- }):take_while(
-- function()
-- return option.process_while()
-- end
-- )
-- end
-- ```
--
-- [1]: https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_index/pairs/.
Expand Down Expand Up @@ -810,7 +897,9 @@ local function expirationd_run_task(name, space_id, is_tuple_expired, options)
local prev = task_list[name]
if prev ~= nil then
log.info("restart task %q", name)
local tmp = task_continue[name]
prev:kill(name)
task_continue[name] = tmp
end
local task = create_task(name)
task.space_id = space_id
Expand Down Expand Up @@ -865,7 +954,7 @@ local function expirationd_run_task(name, space_id, is_tuple_expired, options)
end

-- check start_key
if options.start_key ~= nil or options.start_key == box.NULL then
if options.start_key ~= nil then
if type(options.start_key) == "function" then
task.start_key = function() return options.start_key() end
else
Expand Down Expand Up @@ -1080,8 +1169,13 @@ local function expirationd_update()
package.loaded["expirationd"] = nil
local expd_new = require("expirationd")
local tmp_task_list = task_list; task_list = {}
local tmp_task_continue = table.deepcopy(task_continue)
for _, task in pairs(tmp_task_list) do
task:kill()
-- kill() resets a continue state, we should restore the state
if tmp_task_continue[task.name] then
task_continue[task.name] = tmp_task_continue[task.name]
end
expd_new.start(
task.name, task.space_id,
task.is_tuple_expired, {
Expand Down
101 changes: 99 additions & 2 deletions test/integration/reload_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,13 @@ end)

g.before_each(function()
t.skip_if(not is_cartridge_helpers, "cartridge is not installed")
t.skip_if(not helpers.is_metrics_supported(),
"metrics >= 0.11.0 is not installed")
end)

g.after_each(function()
g.cluster:server('s1-master').net_box:eval([[
box.space.customers:truncate()
]])
end)

local function reload_roles(srv)
local ok, err = srv.net_box:eval([[
Expand All @@ -59,7 +62,95 @@ local function reload_roles(srv)
t.assert_equals({ok, err}, {true, nil})
end

local walk_task_name = "walk_all"
local task_sleep_on_10_eval = string.format([[
local expirationd = require('expirationd')
local fiber = require("fiber")
local helpers = require("test.helper")
for i = 1,100 do
box.space.customers:insert({i})
end
local tuples_cnt = 0
local is_expired_sleep = function()
tuples_cnt = tuples_cnt + 1
if tuples_cnt == 10 then
fiber.sleep(60)
end
return true
end
task = expirationd.start("%s", box.space.customers.id, is_expired_sleep,
{process_expired_tuple = function() return true end,
force = true})
helpers.retrying({}, function()
if tuples_cnt < 10 then
error("the task do not reach a target tuple")
end
end)
]], walk_task_name)

local task_first_tuple_eval = string.format([[
local expirationd = require('expirationd')
local helpers = require("test.helper")
local tuple = nil
local is_expired_tuple = function(arg, t)
if tuple == nil then
tuple = t
end
return true
end
task = expirationd.start("%s", box.space.customers.id, is_expired_tuple,
{force = true})
helpers.retrying({}, function()
if tuple == nil then
error("the task is not started")
end
end)
task:kill()
return tuple or {}
]], walk_task_name)

function g.test_task_continue_after_reload(cg)
cg.cluster:server('s1-master').net_box:eval(task_sleep_on_10_eval)

reload_roles(cg.cluster:server('s1-master'))

local tuple = cg.cluster:server('s1-master').net_box:eval(task_first_tuple_eval)
t.assert_equals(tuple, {10})
end

function g.test_task_continue_after_stop_and_reload(cg)
cg.cluster:server('s1-master').net_box:eval(task_sleep_on_10_eval .. [[
task:stop()
]])

reload_roles(cg.cluster:server('s1-master'))

local tuple = cg.cluster:server('s1-master').net_box:eval(task_first_tuple_eval)
t.assert_equals(tuple, {10})
end

function g.test_task_not_continue_after_kill_and_reload(cg)
cg.cluster:server('s1-master').net_box:eval(task_sleep_on_10_eval .. [[
task:kill()
]])

reload_roles(cg.cluster:server('s1-master'))

local tuple = cg.cluster:server('s1-master').net_box:eval(task_first_tuple_eval)
t.assert_equals(tuple, {1})
end

function g.test_cfg_metrics_disable_after_reload(cg)
t.skip_if(not helpers.is_metrics_supported(),
"metrics >= 0.11.0 is not installed")

cg.cluster:server('router').net_box:eval([[
local expirationd = require('expirationd')
expirationd.cfg({metrics = false})
Expand All @@ -74,6 +165,9 @@ function g.test_cfg_metrics_disable_after_reload(cg)
end

function g.test_cfg_metrics_enable_after_reload(cg)
t.skip_if(not helpers.is_metrics_supported(),
"metrics >= 0.11.0 is not installed")

cg.cluster:server('router').net_box:eval([[
local expirationd = require('expirationd')
expirationd.cfg({metrics = true})
Expand All @@ -88,6 +182,9 @@ function g.test_cfg_metrics_enable_after_reload(cg)
end

function g.test_cfg_metrics_clean_after_reload(cg)
t.skip_if(not helpers.is_metrics_supported(),
"metrics >= 0.11.0 is not installed")

local metrics = cg.cluster:server('s1-master').net_box:eval([[
local metrics = require('metrics')
local expirationd = require('expirationd')
Expand Down
Loading

0 comments on commit 718bdf2

Please sign in to comment.