Skip to content

Commit

Permalink
cartridge: add a role
Browse files Browse the repository at this point in the history
The patch adds a Tarantool Cartridge role with features:

* expirationd as a Tarantool Cartridge service for easy access to
  all API calls.
* The role stops all expirationd tasks on an instance on the role
  termination.
* It can automatically start and kill old tasks from the role
  congiguration.

Closes #107
  • Loading branch information
oleg-jukovec committed Aug 3, 2022
1 parent b0f9ae3 commit 10364cb
Show file tree
Hide file tree
Showing 8 changed files with 1,203 additions and 0 deletions.
62 changes: 62 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,65 @@ $ make SEED=1334 test
luatest -v --coverage --shuffle all:1334
...
```
## Cartridge role
`cartridge.roles.expirationd` is a Tarantool Cartridge role for the expirationd
package with the features:
* It registers expirationd as a Tarantool Cartridge service for easy access to
all [API calls](https://tarantool.github.io/expirationd/#Module_functions):
```Lua
local task = cartridge.service_get('expirationd').start("task_name", id, is_expired)
task:kill()
```
* The role stops all expirationd tasks on an instance on the role termination.
* The role can automatically start or kill old tasks from the role
configuration:
```yaml
expirationd:
task_name1:
space_id: 579
is_expired: is_expired_func_name_in__G
is_master_only: true
options:
args:
- any
atomic_iteration: false
force: false
force_allow_functional_index: true
full_scan_delay: 1
full_scan_time: 1
index: 0
iterate_with: iterate_with_func_name_in__G
iteration_delay: 1
iterator_type: ALL
on_full_scan_complete: on_full_scan_complete_func_name_in__G
on_full_scan_error: on_full_scan_error_func_name_in__G
on_full_scan_start: on_full_scan_start_func_name_in__G
on_full_scan_success: on_full_scan_success_func_name_in__G
process_expired_tuple: process_expired_tuple_func_name_in__G
process_while: process_while_func_name_in__G
start_key:
- 1
tuples_per_iteration: 100
vinyl_assumed_space_len: 100
vinyl_assumed_space_len_factor: 1
task_name2:
...
```
[expirationd.start()](https://tarantool.github.io/expirationd/#start) has
the same parameters with the same meaning. Except for the additional optional
param `is_master_only`. It defines is a task should be started only on a
master instance. It is `false` by default.
Also you need to be careful with the parameters-functions. The string is
a key of in the global variable `_G` and the value must be a function. So
something like this should be done before initializing the role:
```Lua
rawset(_G, "is_expired_func_name_in__G", function(args, tuple)
--logic
end)
```
208 changes: 208 additions & 0 deletions cartridge/roles/expirationd.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
local expirationd = require("expirationd")
local role_name = "expirationd"
local started = require("cartridge.vars").new(role_name)

local function load_function(func_name)
if func_name == nil or type(func_name) ~= 'string' then
return nil
end

local func = rawget(_G, func_name)
if func == nil or type(func) ~= 'function' then
return nil
end
return func
end

local function get_param(param_name, value, types)
local types_map = {
b = {type = "boolean", err = "a boolean"},
n = {type = "number", err = "a number"},
s = {type = "string", err = "a string"},
f = {type = "string", transform = load_function, err = "a function name in _G"},
t = {type = "table", err = "a table"},
any = {err = "any type"},
}

local found = false
for _, t in ipairs(types) do
local type_opts = types_map[t]
if type_opts == nil then
error(role_name .. ": unsupported type option")
end
if not type_opts.type or type(value) == type_opts.type then
if type_opts.transform then
local tmp = type_opts.transform(value)
if tmp then
value = tmp
found = true
break
end
else
found = true
break
end
end
end

if not found then
local err = role_name .. ": " .. param_name .. " must be "
for i, t in ipairs(types) do
err = err .. types_map[t].err
if i ~= #types then
err = err .. " or "
end
end
return false, err
end
return true, value
end

local function get_task_options(opts)
local opts_map = {
args = {"any"},
atomic_iteration = {"b"},
force = {"b"},
force_allow_functional_index = {"b"},
full_scan_delay = {"n"},
full_scan_time = {"n"},
index = {"n", "s"},
iterate_with = {"f"},
iteration_delay = {"n"},
iterator_type = {"n", "s"},
on_full_scan_complete = {"f"},
on_full_scan_error = {"f"},
on_full_scan_start = {"f"},
on_full_scan_success = {"f"},
process_expired_tuple = {"f"},
process_while = {"f"},
start_key = {"f", "t"},
tuples_per_iteration = {"n"},
vinyl_assumed_space_len_factor = {"n"},
vinyl_assumed_space_len = {"n"},
}
if opts == nil then
return
end

for opt, val in pairs(opts) do
if type(opt) ~= "string" then
error(role_name .. ": an option must be a string")
end
if opts_map[opt] == nil then
error(role_name .. ": unsupported option '" .. opt .. "'")
end
local ok, res = get_param("options." .. opt, val, opts_map[opt])
if not ok then
error(res)
end
opts[opt] = res
end

return opts
end

local function get_task_config(task_conf)
-- setmetatable resets __newindex write protection on a copy
local conf = setmetatable(table.deepcopy(task_conf), {})
local params_map = {
space_id = {required = true, types = {"n", "s"}},
is_expired = {required = true, types = {"f"}},
is_master_only = {required = false, types = {"b"}},
options = {required = false, types = {"t"}},
}
for k, _ in pairs(conf) do
if type(k) ~= "string" then
error(role_name .. ": param must be a string")
end
if params_map[k] == nil then
error(role_name .. ": unsupported param " .. k)
end
end

for param, opts in pairs(params_map) do
if opts.required and conf[param] == nil then
error(role_name .. ": " .. param .. " is required")
end
if conf[param] ~= nil then
local ok, res = get_param(param, conf[param], opts.types)
if not ok then
error(res)
end
conf[param] = res
end
end

conf.options = get_task_options(conf.options)
return conf
end

local function init()

end

local function validate_config(conf_new)
local conf = conf_new[role_name] or {}

for task_name, task_conf in pairs(conf) do
local ok, res = get_param("task name", task_name, {"s"})
if not ok then
error(res)
end
local ok, res = get_param("task params", task_conf, {"t"})
if not ok then
error(res)
end
get_task_config(task_conf)
end

return true
end

local function apply_config(conf_new, opts)
local conf = conf_new[role_name] or {}

-- finishes tasks from an old configuration
for i=#started,1,-1 do
local task_name = started[i]
local ok, _ = pcall(expirationd.task, task_name)
if ok then
if conf[task_name] then
expirationd.task(task_name):stop()
else
expirationd.task(task_name):kill()
end
end
table.remove(started, i)
end

for task_name, task_conf in pairs(conf) do
task_conf = get_task_config(task_conf)

local skip = task_conf.is_master_only and not opts.is_master
if not skip then
local task = expirationd.start(task_name, task_conf.space_id,
task_conf.is_expired,
task_conf.options)
if task == nil then
error(role_name .. ": unable to start task " .. task_name)
end
table.insert(started, task_name)
end
end
end

local function stop()
for _, task_name in pairs(expirationd.tasks()) do
local task = expirationd.task(task_name)
task:stop()
end
end

return setmetatable({
role_name = role_name,
init = init,
validate_config = validate_config,
apply_config = apply_config,
stop = stop,
}, { __index = expirationd })
1 change: 1 addition & 0 deletions debian/tarantool-expirationd.install
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
expirationd.lua usr/share/tarantool/
cartridge/roles/expirationd.lua usr/share/tarantool/cartridge/roles/
1 change: 1 addition & 0 deletions expirationd-scm-1.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ build = {
type = "builtin",
modules = {
["expirationd"] = "expirationd.lua",
["cartridge.roles.expirationd"] = "cartridge/roles/expirationd.lua",
}
}
-- vim: syntax=lua
3 changes: 3 additions & 0 deletions rpm/tarantool-expirationd.spec
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@ some other space.
%install
install -d %{buildroot}%{_datarootdir}/tarantool/
install -m 0644 expirationd.lua %{buildroot}%{_datarootdir}/tarantool/
install -d %{buildroot}%{_datarootdir}/tarantool/cartridge/roles/
install -m 0644 cartridge/roles/expirationd.lua %{buildroot}%{_datarootdir}/tarantool/cartridge/roles/expirationd.lua

%files
%{_datarootdir}/tarantool/expirationd.lua
%{_datarootdir}/tarantool/cartridge
%doc README.md
%{!?_licensedir:%global license %doc}
%license LICENSE
Expand Down
69 changes: 69 additions & 0 deletions test/entrypoint/srv_role.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#!/usr/bin/env tarantool

require('strict').on()
_G.is_initialized = function() return false end

local log = require('log')
local errors = require('errors')
local fiber = require('fiber')
local cartridge = require('cartridge')
local hotreload = require('cartridge.hotreload')

package.preload['customers-storage'] = function()
return {
role_name = 'customers-storage',
init = function()
local customers_space = box.schema.space.create('customers', {
format = {
{name = 'id', type = 'unsigned'},
},
if_not_exists = true,
engine = 'memtx',
})

customers_space:create_index('id', {
parts = { {field = 'id'} },
unique = true,
type = 'TREE',
if_not_exists = true,
})
end,
}
end

local ok, err = errors.pcall('CartridgeCfgError', cartridge.cfg, {
advertise_uri = 'localhost:3301',
http_port = 8081,
bucket_count = 3000,
roles = {
'customers-storage',
'cartridge.roles.vshard-router',
'cartridge.roles.vshard-storage',
'cartridge.roles.expirationd'
},
roles_reload_allowed = true,
})

if not ok then
log.error('%s', err)
os.exit(1)
end

_G.is_initialized = cartridge.is_healthy
_G.always_true_test = function() return true end
_G.is_expired_test_continue = function(_, tuple)
if rawget(_G, "is_expired_test_first_tuple") == nil then
rawset(_G, "is_expired_test_first_tuple", tuple)
end

local cnt = rawget(_G, "is_expired_test_wait_cnt") or 0
cnt = cnt + 1
rawset(_G, "is_expired_test_wait_cnt", cnt)
if cnt == 5 then
fiber.sleep(60)
end
return true
end

hotreload.whitelist_globals({"always_true_test"})
hotreload.whitelist_globals({"is_expired_test_continue"})
Loading

0 comments on commit 10364cb

Please sign in to comment.