Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ opt
* `args` - passed to `is_tuple_expired()` and `process_expired_tuple()` as additional context
* `tuples_per_iteration` - number of tuples will be checked by one iteration
* `full_scan_time` - time required for full index scan (in seconds)
* `iteration_delay` - max sleep time between iterations (in seconds)
* `full_scan_delay` - sleep time between full scans (in seconds)
* `on_full_scan_start` - call function on starting full scan iteration
Receives `(task)` as arguments.
* `on_full_scan_complete` - call function on complete full scan iteration.
Called after `on_full_scan_success` or `on_full_scan_error`.
Receives `(task)` as arguments.
* `on_full_scan_success` - call function on success full scan iteration
Receives `(task)` as arguments.
* `on_full_scan_error` - call function on error full scan iteration
Receives `(task, error)` as arguments.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me, it isn't obvious that on_full_scan_complete will be called after on_full_scan_success or on_full_scan_error. I propose to mention it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* `force` - run, even on replica

### `expirationd.kill (name)`
Expand Down
77 changes: 72 additions & 5 deletions expirationd.lua
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ local constants = {
-- assumed size of vinyl space (in the first iteration)
default_vinyl_assumed_space_len = math.pow(10, 7),
-- factor for recalculation of vinyl space size
default_vinyl_assumed_space_len_factor = 2
default_vinyl_assumed_space_len_factor = 2,
-- default function on full scan
default_on_full_scan = function(...) end
}

-- ========================================================================= --
Expand Down Expand Up @@ -71,7 +73,7 @@ end

local function suspend_basic(scan_space, task, len)
local delay = (task.tuples_per_iteration * task.full_scan_time)
delay = math.min(delay / len, constants.max_delay)
delay = math.min(delay / len, task.iteration_delay)
fiber.sleep(delay)
end

Expand Down Expand Up @@ -160,11 +162,22 @@ local function worker_loop(task)

while true do
if (box.cfg.replication_source == nil and box.cfg.replication == nil) or task.force then
task.do_worker_iteration(task)
task.on_full_scan_start(task)
local state, err = pcall(task.do_worker_iteration, task)
if state then
task.on_full_scan_success(task)
else
task.on_full_scan_error(task, err)
end

task.on_full_scan_complete(task)
if not state then
error(err)
end
end

-- iteration is complete, yield
fiber.sleep(constants.max_delay)
-- Full scan iteration is complete, yield
fiber.sleep(task.full_scan_delay)
end
end

Expand Down Expand Up @@ -247,10 +260,16 @@ local function create_task(name)
is_tuple_expired = nil,
process_expired_tuple = nil,
args = nil,
iteration_delay = constants.max_delay,
full_scan_delay = constants.max_delay,
tuples_per_iteration = constants.default_tuples_per_iteration,
full_scan_time = constants.default_full_scan_time,
vinyl_assumed_space_len = constants.default_vinyl_assumed_space_len,
vinyl_assumed_space_len_factor = constants.default_vinyl_assumed_space_len_factor,
on_full_scan_error = constants.default_on_full_scan,
on_full_scan_success = constants.default_on_full_scan,
on_full_scan_start = constants.default_on_full_scan,
on_full_scan_complete = constants.default_on_full_scan
}, { __index = Task_methods })
return task
end
Expand Down Expand Up @@ -289,10 +308,16 @@ end
-- options = { -- (table with named options)
-- * process_expired_tuple -- applied to expired tuples, receives
-- (space_id, args, tuple) as arguments
-- * on_full_scan_start -- call function on starting full scan iteration
-- * on_full_scan_complete -- call function on complete full scan iteration
-- * on_full_scan_success -- call function on success full scan iteration
-- * on_full_scan_error -- call function on error full scan iteration
-- * args -- passed to is_tuple_expired and
-- process_expired_tuple() as additional context
-- * tuples_per_iteration -- number of tuples will be checked by one iteration
-- * full_scan_time -- time required for full index scan (in seconds)
-- * iteration_delay -- max sleep time between iterations (in seconds)
-- * full_scan_delay -- sleep time between full scans (in seconds)
-- * force -- run task even on replica
-- }
local function expirationd_run_task(name, space_id, is_tuple_expired, options)
Expand Down Expand Up @@ -380,6 +405,48 @@ local function expirationd_run_task(name, space_id, is_tuple_expired, options)
task.do_worker_iteration = default_do_worker_iteration
end

if options.iteration_delay ~= nil then
if type(options.iteration_delay) ~= 'number' then
error("invalid type of iteration_delay value")
end
task.iteration_delay = options.iteration_delay
end

if options.full_scan_delay ~= nil then
if type(options.full_scan_delay) ~= 'number' then
error("invalid type of full_scan_delay value")
end
task.full_scan_delay = options.full_scan_delay
end

if options.on_full_scan_start ~= nil then
if type(options.on_full_scan_start) ~= 'function' then
error("invalid type of on_full_scan_start is not function")
end
task.on_full_scan_start = options.on_full_scan_start
end

if options.on_full_scan_success ~= nil then
if type(options.on_full_scan_success) ~= 'function' then
error("invalid type of on_full_scan_success is not function")
end
task.on_full_scan_success = options.on_full_scan_success
end

if options.on_full_scan_complete ~= nil then
if type(options.on_full_scan_complete) ~= 'function' then
error("invalid type of on_full_scan_complete is not function")
end
task.on_full_scan_complete = options.on_full_scan_complete
end

if options.on_full_scan_error ~= nil then
if type(options.on_full_scan_error) ~= 'function' then
error("invalid type of on_full_scan_error is not function")
end
task.on_full_scan_error = options.on_full_scan_error
end

-- put the task to table
task_list[name] = task
-- run
Expand Down
149 changes: 147 additions & 2 deletions test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,28 @@ local function init_box()
if_not_exists = true
})
truncate(f.id)

local g = box.schema.create_space('delays_test', {
engine = space_type,
if_not_exists = true
})
g:create_index('first', {
type = index_type,
parts = {1, 'NUM'},
if_not_exists = true
})
truncate(g.id)

local h = box.schema.create_space('error_callback_test', {
engine = space_type,
if_not_exists = true
})
h:create_index('first', {
type = index_type,
parts = {1, 'NUM'},
if_not_exists = true
})
truncate(h.id)
end

local space_id = 'origin'
Expand All @@ -230,9 +252,14 @@ init_box()
-- 2. errors test,
-- 3. not expire test,
-- 4. kill zombie test
-- 5. default drop function test
-- 5. multiple expires test
-- 6. default drop function test
-- 7. restart test
-- 8. complex key test
-- 9. delays and scan callbacks test
-- 10. error callback test
-- ========================================================================= --
test:plan(8)
test:plan(10)

test:test('simple expires test', function(test)
test:plan(4)
Expand Down Expand Up @@ -537,4 +564,122 @@ test:test("complex key test", function(test)
expirationd.kill_task("test")
end)

test:test('delays and scan callbacks test', function(test)
test:plan(4)

-- Prepare the space.
local tuples_count = 10
local time = fiber.time()
local space_name = 'delays_test'
local expire_delta = 10

for i = 1, tuples_count do
box.space[space_name]:insert{i, time + expire_delta}
end

-- To check all delays (iteration and full scan), two full scan
-- iterations will be performed.
local first_iteration_done = false
local task_name = 'delays_task'
local cond = fiber.cond()
local start_time = 0
local complete_time = 0

local iteration_delay = 1
local full_scan_delay = 2

expirationd.start(
task_name,
space_name,
check_tuple_expire_by_timestamp,
{
args = {
field_no = 2
},
tuples_per_iteration = 10,
iteration_delay = iteration_delay,
full_scan_delay = full_scan_delay,
on_full_scan_start = function(task)
start_time = fiber.time()
if first_iteration_done then
-- Check the full scan delay with an accuracy
-- of 0.1 seconds.
test:ok(math.abs(start_time - complete_time -
full_scan_delay) < 0.1, 'test full scan delay')
end
end,
on_full_scan_success = function(task)
-- Must be called twice.
test:ok(true, 'test success callback invoke')
end,
on_full_scan_complete = function(task)
complete_time = fiber.time()
if first_iteration_done then
cond:signal()
else
-- Check the iteration delay with an accuracy
-- of 0.1 seconds.
test:ok(math.abs(complete_time - start_time -
iteration_delay) < 0.1, 'test iteration delay')
first_iteration_done = true
end
end
}
)

cond:wait()
expirationd.kill_task(task_name)
end)

test:test('error callback test', function(test)
test:plan(2)

-- Prepare the space.
local tuples_count = 1
local time = fiber.time()
local space_name = 'error_callback_test'
local expire_delta = 10

for i = 1, tuples_count do
box.space[space_name]:insert{i, time + expire_delta}
end

local task_name = 'error_callback_task'
local cond = fiber.cond()

local error_cb_called = false
local complete_cb_called = false
local err_msg = 'The error is occured'

expirationd.start(
task_name,
space_name,
function(args, tuple)
error(err_msg)
end,
{
args = {
field_no = 2
},
-- The callbacks can be called multiple times because guardian_loop
-- will restart the task.
on_full_scan_error = function(task, err)
if err:find(err_msg) then
error_cb_called = true
end
end,
on_full_scan_complete = function(task)
complete_cb_called = true
cond:signal()
end
}
)

cond:wait()
expirationd.kill_task(task_name)

test:ok(error_cb_called, 'the "error" callback has been invoked')
test:ok(complete_cb_called, 'the "complete" callback has been invoked')
end)

os.exit(test:check() and 0 or 1)