Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,42 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Added

- Metric `tnt_sharded_queue_api_role_stats` is a [summary][metrics-summary]
with quantiles of `sharded_queue.api` role API calls (#71).
The metric includes a counter of API calls and errors.
The metric contains labels in the following format:
`{name = "tube_name", method = "api_call_method", status = "ok" or "error"}`
- Metric `tnt_sharded_queue_storage_role_stats` is a [summary][metrics-summary]
with quantiles of `sharded_queue.storage` role API calls (#71).
The metric includes a counter of API calls and errors.
The metric contains labels in the following format:
`{name = "tube_name", method = "api_call_method", status = "ok" or "error"}`
- Metric `tnt_sharded_queue_storage_statistics_calls_total` as an equivalent of
`tnt_sharded_queue_api_statistics_calls_total` for the
`sharded_queue.storage` role (#71).
Values have the same meaning as the [`queue` statistics][queue-statistics]
`calls` table.
The metric contains labels in the following format:
`{name = "tube_name", state = "call_type"}`
- Metric `tnt_sharded_queue_storage_statistics_tasks` as an equivalent of
`tnt_sharded_queue_api_statistics_tasks` for the `sharded_queue.storage`
role (#71).
Values have the same meaning as the [`queue` statistics][queue-statistics]
`tasks` table.
The metric contains labels in the following format:
`{name = "tube_name", state = "task_state"}`

### Changed

- Metric `sharded_queue_calls` renamed to
`tnt_sharded_queue_api_statistics_calls_total` (#71). The metric now has
labels in the format `{name = "tube_name", state = "call_type"}` instead of
`{name = "tube_name", status = "call_type"}`.
- Metric `sharded_queue_tasks` renamed to
`tnt_sharded_queue_api_statistics_tasks` (#71). The metric now has labels
in the format `{name = "tube_name", state = "task_state"}` instead of
`{name = "tube_name", status = "task_state"}`.

### Fixed

- Data race with fifo driver for put()/take() methods with vinyl
Expand Down Expand Up @@ -47,3 +81,6 @@ different shards (over the whole cluster).
- Testing CI (#53).
- Linter check on CI (#18).
- Publish CI (#54).

[metrics-summary]: https://www.tarantool.io/en/doc/latest/book/monitoring/api_reference/#summary
[queue-statistics]: https://github.com/tarantool/queue?tab=readme-ov-file#getting-statistics
70 changes: 70 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,73 @@ make deps
make test
```

## Metrics

The module exports several metrics if the module `metrics` >= 0.11 is
installed and the feature is not disabled by the configuration.

### Role sharded_queue.api

* Metric `tnt_sharded_queue_api_statistics_calls_total` is a counter with
the number of requests broken down by [the type of request][queue-statistics].
The metric has labels in the following format:

`{name = "tube_name", state = "request_type"}`

A list of possible request types: `done`, `take`, `kick`, `bury`, `put`,
`delete`, `touch`, `ack`, `release`. The metric on the `sharded_queue.api`
role accumulates values from all buckets.

* Metric `tnt_sharded_queue_api_statistics_tasks` is a gauge with
the number of tasks in a queue broken down by [a task state][queue-statistics].
The metric has labels in the following format:

`{name = "tube_name", state = "task_state"}`

A list of possible task states: `taken`, `buried`, `ready`, `done`,
`delayed`, `total`. The metric on the `sharded_queue.api` role accumulates
values from all buckets.

* Metric `tnt_sharded_queue_api_role_stats` is a [summary][metrics-summary]
with quantiles of `sharded_queue.api` role API calls. The metric includes a
counter of API calls and errors and has labels in the following format:

`{name = "tube_name", method = "api_call_method", status = "ok" or "error"}`

A list of possible call methods: `put`, `take`, `delete`, `release`, `touch`,
`ack`, `bury`, `kick`, `peek`, `drop`.

### Role sharded_queue.storage

* Metric `tnt_sharded_queue_storage_statistics_calls_total` is a counter with
the number of requests broken down by [the type of request][queue-statistics].
The metric has labels in the following format:

`{name = "tube_name", state = "request_type"}`

A list of possible request types: `done`, `take`, `kick`, `bury`, `put`,
`delete`, `touch`, `ack`, `release`. The metric on the `sharded_queue.storage`
role shows actual values on the instance.

* Metric `tnt_sharded_queue_storage_statistics_tasks` is a gauge with
the number of tasks in a queue broken down by [a task state][queue-statistics].
The metric has labels in the following format:

`{name = "tube_name", state = "task_state"}`

A list of possible task states: `taken`, `buried`, `ready`, `done`,
`delayed`, `total`. The metric on the `sharded_queue.storage` role shows
actual values on the instance.

* Metric `tnt_sharded_queue_storage_role_stats` is a [summary][metrics-summary]
with quantiles of `sharded_queue.api` role API calls. The metric includes a
counter of API calls and errors and has labels in the following format:

`{name = "tube_name", method = "api_call_method", status = "ok" or "error"}`

A list of possible call methods: `statistic`, `put`, `take`, `delete`,
`release`, `touch`, `ack`, `bury`, `kick`, `peek`, `drop`.

## API extensions (compared to tarantool/queue)

* ``tube:take`` method has additional table argument ``options``. It may be used to provide additional logic in some
Expand Down Expand Up @@ -198,3 +265,6 @@ make test
```

If you use **fifottl** driver (default), you can log driver's method calls with `log_request` (log router's and storage's operations).

[metrics-summary]: https://www.tarantool.io/en/doc/latest/book/monitoring/api_reference/#summary
[queue-statistics]: https://github.com/tarantool/queue?tab=readme-ov-file#getting-statistics
4 changes: 3 additions & 1 deletion sharded-queue-scm-1.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ build = {
['sharded_queue.drivers.fifottl'] = 'sharded_queue/drivers/fifottl.lua',
['sharded_queue.time'] = 'sharded_queue/time.lua',
['sharded_queue.utils'] = 'sharded_queue/utils.lua',
['sharded_queue.metrics'] = 'sharded_queue/metrics.lua',
['sharded_queue.stash'] = 'sharded_queue/stash.lua',
['sharded_queue.state'] = 'sharded_queue/state.lua',
['sharded_queue.statistics'] = 'sharded_queue/statistics.lua',
['sharded_queue.stats.storage'] = 'sharded_queue/stats/storage.lua',
['sharded_queue.version'] = 'sharded_queue/version.lua',
},
},
Expand Down
141 changes: 36 additions & 105 deletions sharded_queue/api.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,20 @@ local vshard = require('vshard')
local fiber = require('fiber')
local log = require('log')

local metrics = require('sharded_queue.metrics')
local stash = require('sharded_queue.stash')
local state = require('sharded_queue.state')
local time = require('sharded_queue.time')
local utils = require('sharded_queue.utils')

local cartridge_pool = require('cartridge.pool')
local cartridge_rpc = require('cartridge.rpc')
local is_metrics_package, metrics = pcall(require, "metrics")
local is_hotreload_package, hotreload = pcall(require, "cartridge.hotreload")

local stash_names = {
cfg = '__sharded_queue_cfg',
metrics_stats = '__sharded_queue_metrics_stats',
cfg = '__sharded_queue_api_cfg',
metrics_stats = '__sharded_queue_api_metrics_stats',
}

if is_hotreload_package then
for _, name in pairs(stash_names) do
hotreload.whitelist_globals({ name })
end
end

-- get a stash instance, initialize if needed
local function stash_get(name)
local instance = rawget(_G, name) or {}
rawset(_G, name, instance)
return instance
end
stash.setup(stash_names)

local remote_call = function(method, instance_uri, args, timeout)
local conn = cartridge_pool.connect(instance_uri)
Expand Down Expand Up @@ -414,92 +402,50 @@ end

local sharded_queue = {
tube = {},
cfg = stash_get(stash_names.cfg),
metrics_stats = stash_get(stash_names.metrics_stats),
cfg = stash.get(stash_names.cfg),
metrics_stats = metrics.init(stash.get(stash_names.metrics_stats)),
}

if sharded_queue.cfg.metrics == nil then
sharded_queue.cfg.metrics = true
end

local function is_metrics_v_0_11_installed()
if not is_metrics_package or metrics.unregister_callback == nil then
return false
end
local counter = require('metrics.collectors.counter')
return counter.remove and true or false
if sharded_queue.cfg.metrics then
sharded_queue.cfg.metrics = metrics.is_supported()
end

local function metrics_create_collectors()
return {
calls = {
collector = metrics.counter(
"sharded_queue_calls",
"sharded_queue's number of calls"
),
values = {},
},
tasks = {
collector = metrics.gauge(
"sharded_queue_tasks",
"sharded_queue's number of tasks"
)
},
}
end
local function wrap_sharded_queue_call_counters(call, fun)
return function(self, ...)
local before = fiber.clock()
local ok, ret = pcall(fun, self, ...)
local after = fiber.clock()

local function metrics_disable()
if sharded_queue.metrics_stats.callback then
metrics.unregister_callback(sharded_queue.metrics_stats.callback)
end
sharded_queue.metrics_stats.callback = nil
if sharded_queue.cfg.metrics then
sharded_queue.metrics_stats:observe(after - before,
self.tube_name, call, ok)
end

if sharded_queue.metrics_stats.collectors then
for _, c in pairs(sharded_queue.metrics_stats.collectors) do
metrics.registry:unregister(c.collector)
if not ok then
error(ret)
end

return ret
end
sharded_queue.metrics_stats.collectors = nil
end

for call, fun in pairs(sharded_tube) do
sharded_tube[call] = wrap_sharded_queue_call_counters(call, fun)
end

local function metrics_enable()
-- Drop all collectors and a callback.
metrics_disable()

-- Set all collectors and the callback.
sharded_queue.metrics_stats.collectors = metrics_create_collectors()
local callback = function()
local metrics_stats = sharded_queue.metrics_stats
for tube_name, _ in pairs(sharded_queue.tube) do
local stat = sharded_queue.statistics(tube_name)
local collectors = metrics_stats.collectors
if collectors.calls.values[tube_name] == nil then
collectors.calls.values[tube_name] = {}
end
for k, v in pairs(stat.calls) do
local prev = metrics_stats.collectors.calls.values[tube_name][k] or 0
local inc = v - prev
metrics_stats.collectors.calls.collector:inc(inc, {
name = tube_name,
status = k,
})
metrics_stats.collectors.calls.values[tube_name][k] = v
end
for k, v in pairs(stat.tasks) do
metrics_stats.collectors.tasks.collector:set(v, {
name = tube_name,
status = k,
})
end
end
local get_statistic = function(tube)
return sharded_queue.statistics(tube)
end

metrics.register_callback(callback)
sharded_queue.metrics_stats.callback = callback
return true
sharded_queue.metrics_stats:enable('api', sharded_queue.tube, get_statistic)
end

if sharded_queue.cfg.metrics then
sharded_queue.cfg.metrics = is_metrics_v_0_11_installed()
local function metrics_disable()
sharded_queue.metrics_stats:disable()
end

function sharded_queue.cfg_call(_, options)
Expand Down Expand Up @@ -538,7 +484,6 @@ function sharded_queue.statistics(tube_name)

local stats_collection, err = cartridge_pool.map_call('tube_statistic',
{{ tube_name = tube_name }}, {uri_list=storages})

if err ~= nil then
return nil, err
end
Expand Down Expand Up @@ -597,22 +542,7 @@ local function init(opts)
end

local function validate_config(cfg)
if cfg['cfg'] == nil then
return
end

cfg = cfg['cfg']
if type(cfg) ~= 'table' then
error('"cfg" must be a table')
end
if cfg.metrics and type(cfg.metrics) ~= 'boolean' then
error('"cfg.metrics" must be a boolean')
end
if cfg.metrics and cfg.metrics == true then
if not is_metrics_v_0_11_installed() then
error("metrics >= 0.11.0 is required")
end
end
return utils.validate_config_cfg(cfg)
end

local function apply_config(cfg, opts)
Expand All @@ -630,7 +560,7 @@ local function apply_config(cfg, opts)
wait_factor = options.wait_factor or time.DEFAULT_WAIT_FACTOR,
log_request = utils.normalize.log_request(options.log_request),
}, {
__index = sharded_tube
__index = sharded_tube,
})
sharded_queue.tube[tube_name] = self
end
Expand All @@ -657,14 +587,15 @@ local function queue_action_wrapper(action)
if not sharded_queue.tube[name] then
return nil, string.format('No queue "%s" initialized yet', name)
end

return sharded_queue.tube[name][action](sharded_queue.tube[name], ...)
end
end

return {
init = init,
apply_config = apply_config,
validate_config = validate_config,

put = queue_action_wrapper('put'),
take = queue_action_wrapper('take'),
delete = queue_action_wrapper('delete'),
Expand Down
4 changes: 2 additions & 2 deletions sharded_queue/drivers/fifo.lua
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
local state = require('sharded_queue.state')
local utils = require('sharded_queue.utils')
local log = require('log') -- luacheck: ignore
local statistics = require('sharded_queue.statistics')
local stats = require('sharded_queue.stats.storage')

local function update_stat(tube_name, name)
statistics.update(tube_name, name, '+', 1)
stats.update(tube_name, name, '+', 1)
end

local method = {}
Expand Down
4 changes: 2 additions & 2 deletions sharded_queue/drivers/fifottl.lua
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
local fiber = require('fiber')
local state = require('sharded_queue.state')
local utils = require('sharded_queue.utils')
local statistics = require('sharded_queue.statistics')
local stats = require('sharded_queue.stats.storage')
local time = require('sharded_queue.time')
local log = require('log') -- luacheck: ignore

Expand All @@ -19,7 +19,7 @@ local index = {
}

local function update_stat(tube_name, name)
statistics.update(tube_name, name, '+', 1)
stats.update(tube_name, name, '+', 1)
end

local function is_expired(task)
Expand Down
Loading