Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ build:
- tar -zxf tarantool-enterprise-bundle-${BUNDLE_VERSION}.tar.gz --strip 1
- source env.sh
- tarantoolctl rocks install luacheck
- tarantoolctl rocks install luatest
- tarantoolctl rocks install luatest 0.4.0
script:
- make build
- make lint
Expand Down
4 changes: 2 additions & 2 deletions test/api_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ local api = require('sharded_queue.api')
local config = require('test.helper.config')
local utils = require('test.helper.utils')

g.before_all = function()
g.before_all(function()
g.queue_conn = config.cluster:server('queue-router').net_box
end
end)

g.test_exported_api = function()
for method, _ in pairs(api.__private.sharded_tube) do
Expand Down
16 changes: 6 additions & 10 deletions test/drop_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,17 @@ local g = t.group('drop_test')
local config = require('test.helper.config')
local utils = require('test.helper.utils')

g.before_all = function()
g.before_all(function()
g.queue_conn = config.cluster:server('queue-router').net_box
end

local function shape_cmd(tube_name, cmd)
return string.format('queue.tube.%s:%s', tube_name, cmd)
end
end)

function g.test_drop_empty()
local tube_name = 'drop_empty_test'

g.queue_conn:call('queue.create_tube', {
tube_name
})
g.queue_conn:call(shape_cmd(tube_name, 'drop'))
g.queue_conn:call(utils.shape_cmd(tube_name, 'drop'))

local cur_stat = g.queue_conn:call('queue.statistics', { tube_name })
t.assert_equals(cur_stat, nil)
Expand All @@ -30,16 +26,16 @@ function g.test_drop_and_recreate()
g.queue_conn:call('queue.create_tube', {
tube_name
})
g.queue_conn:call(shape_cmd(tube_name, 'put'), { '*' } )
g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { '*' } )

g.queue_conn:call(shape_cmd(tube_name, 'drop'))
g.queue_conn:call(utils.shape_cmd(tube_name, 'drop'))

-- recreate tube with same name
t.assert(g.queue_conn:call('queue.create_tube', {
tube_name
}))

local task = g.queue_conn:call(shape_cmd(tube_name, 'put'), { '*' } )
local task = g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { '*' } )
t.assert_equals(task[utils.index.data], '*')
t.assert_equals(task[utils.index.status], utils.state.READY)

Expand Down
4 changes: 4 additions & 0 deletions test/helper/utils.lua
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,8 @@ function utils.nano(tm)
return 0ULL + tm * 1e6
end

function utils.shape_cmd(tube_name, cmd)
return string.format('queue.tube.%s:%s', tube_name, cmd)
end

return utils
46 changes: 21 additions & 25 deletions test/simple_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,9 @@ local g = t.group('simple_test')
local config = require('test.helper.config')
local utils = require('test.helper.utils')

g.before_all = function()
g.before_all(function()
g.queue_conn = config.cluster:server('queue-router').net_box
end

local function shape_cmd(tube_name, cmd)
return string.format('queue.tube.%s:%s', tube_name, cmd)
end
end)

for test_name, options in pairs({
fifottl = {},
Expand Down Expand Up @@ -39,9 +35,9 @@ for test_name, options in pairs({
-- returned tasks
local task_ids = {}
for _, data in pairs(tasks_data) do
local task = g.queue_conn:call(shape_cmd(tube_name, 'put'), { data })
local task = g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { data })

local peek_task = g.queue_conn:call(shape_cmd(tube_name, 'peek'),
local peek_task = g.queue_conn:call(utils.shape_cmd(tube_name, 'peek'),
{
task[utils.index.task_id]
})
Expand All @@ -52,8 +48,8 @@ for test_name, options in pairs({
-- try taken this tasks
local taken_task_ids = {}
for _, _ in pairs(task_ids) do
local task = g.queue_conn:call(shape_cmd(tube_name, 'take'))
local peek_task = g.queue_conn:call(shape_cmd(tube_name, 'peek'), {
local task = g.queue_conn:call(utils.shape_cmd(tube_name, 'take'))
local peek_task = g.queue_conn:call(utils.shape_cmd(tube_name, 'peek'), {
task[utils.index.task_id]
})
t.assert_equals(peek_task[utils.index.status], utils.state.TAKEN)
Expand All @@ -70,7 +66,7 @@ for test_name, options in pairs({
end

for task_id, _ in pairs(task_ids) do
g.queue_conn:call(shape_cmd(tube_name, 'ack'), {task_id})
g.queue_conn:call(utils.shape_cmd(tube_name, 'ack'), {task_id})
end

t.assert_equals(task_ids, taken_task_ids)
Expand All @@ -89,8 +85,8 @@ g.test_take_with_options = function()
local options, timeout, data = {}, 1, 'data'

for _, take_args in pairs({{}, {timeout}, {timeout, options}, {box.NULL, options}, {timeout, box.NULL}}) do
g.queue_conn:call(shape_cmd(tube_name, 'put'), { data })
local task = g.queue_conn:call(shape_cmd(tube_name, 'take'), take_args)
g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { data })
local task = g.queue_conn:call(utils.shape_cmd(tube_name, 'take'), take_args)
t.assert_equals(task[utils.index.data], data)
end
end
Expand Down Expand Up @@ -125,8 +121,8 @@ function g.test_delete()
-- returned tasks
local task_ids = {}
for _, data in pairs(tasks_data) do
local task = g.queue_conn:call(shape_cmd(tube_name, 'put'), { data })
local peek_task = g.queue_conn:call(shape_cmd(tube_name, 'peek'), {
local task = g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { data })
local peek_task = g.queue_conn:call(utils.shape_cmd(tube_name, 'peek'), {
task[utils.index.task_id]
})
t.assert_equals(peek_task[utils.index.status], utils.state.READY)
Expand All @@ -139,14 +135,14 @@ function g.test_delete()

for i = 1, deleted_tasks_count do
table.insert(deleted_tasks,
g.queue_conn:call(shape_cmd(tube_name, 'delete'), { task_ids[i] })[utils.index.task_id])
g.queue_conn:call(utils.shape_cmd(tube_name, 'delete'), { task_ids[i] })[utils.index.task_id])
end

-- taken tasks
local taken_task_ids = {}
for _ = 1, task_count - deleted_tasks_count do
local task = g.queue_conn:call(shape_cmd(tube_name, 'take'))
local peek_task = g.queue_conn:call(shape_cmd(tube_name, 'peek'), {
local task = g.queue_conn:call(utils.shape_cmd(tube_name, 'take'))
local peek_task = g.queue_conn:call(utils.shape_cmd(tube_name, 'peek'), {
task[utils.index.task_id]
})
t.assert_equals(peek_task[utils.index.status], utils.state.TAKEN)
Expand Down Expand Up @@ -190,7 +186,7 @@ function g.test_release()
-- returned tasks
local task_ids = {}
for _, data in pairs(tasks_data) do
local task = g.queue_conn:call(shape_cmd(tube_name, 'put'), { data })
local task = g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { data })
t.assert_equals(task[utils.index.status], utils.state.READY)
task_ids[task[utils.index.task_id]] = true
end
Expand All @@ -200,22 +196,22 @@ function g.test_release()
local taken_task_ids = {}

for _ = 1, taken_task_count do
local task = g.queue_conn:call(shape_cmd(tube_name, 'take'))
local task = g.queue_conn:call(utils.shape_cmd(tube_name, 'take'))
t.assert_equals(task[utils.index.status], utils.state.TAKEN)
taken_task_ids[task[utils.index.task_id]] = true
end

t.assert_covers(task_ids, taken_task_ids)

for task_id, _ in pairs(taken_task_ids) do
local task = g.queue_conn:call(shape_cmd(tube_name, 'release'), { task_id })
local task = g.queue_conn:call(utils.shape_cmd(tube_name, 'release'), { task_id })
t.assert_equals(task[utils.index.status], utils.state.READY)
end

local result_task_id = {}

for _ = 1, task_count do
local task_id = g.queue_conn:call(shape_cmd(tube_name, 'take'))[utils.index.task_id]
local task_id = g.queue_conn:call(utils.shape_cmd(tube_name, 'take'))[utils.index.task_id]
result_task_id[task_id] = true
end

Expand Down Expand Up @@ -244,14 +240,14 @@ function g.test_bury_kick()
-- returned tasks
local task_ids = {}
for i = 1, task_count do
local task = g.queue_conn:call(shape_cmd(tube_name, 'put'), { i })
local task = g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { i })
table.insert(task_ids, task[utils.index.task_id])
end

-- bury few task
local bury_task_count = 5
for i = 1, bury_task_count do
local task = g.queue_conn:call(shape_cmd(tube_name, 'bury'), { task_ids[i] })
local task = g.queue_conn:call(utils.shape_cmd(tube_name, 'bury'), { task_ids[i] })
t.assert_equals(task[utils.index.status], utils.state.BURIED)
end

Expand All @@ -260,7 +256,7 @@ function g.test_bury_kick()
t.assert_equals(cur_stat.tasks.ready, task_count - bury_task_count)

-- try unbury few task > bury_task_count
local kick_cmd = shape_cmd(tube_name, 'kick')
local kick_cmd = utils.shape_cmd(tube_name, 'kick')
t.assert_equals(g.queue_conn:call(kick_cmd, {bury_task_count + 3}), bury_task_count)

cur_stat = g.queue_conn:call('queue.statistics', { tube_name })
Expand Down
15 changes: 5 additions & 10 deletions test/statistics_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,9 @@ local config = require('test.helper.config')
local utils = require('test.helper.utils')
local fiber = require('fiber')


g.before_all = function()
g.before_all(function()
g.queue_conn = config.cluster:server('queue-router').net_box
end

local function shape_cmd(tube_name, cmd)
return string.format('queue.tube.%s:%s', tube_name, cmd)
end
end)

function g.test_statistics()
local tube_name = 'statistics_test'
Expand All @@ -28,7 +23,7 @@ function g.test_statistics()

for i = 1, task_count do
table.insert(task_pack,
g.queue_conn:call(shape_cmd(tube_name, 'put'), {
g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), {
i, { delay = 3 , ttl = 3, ttr = 1}
})[utils.index.task_id])

Expand Down Expand Up @@ -58,7 +53,7 @@ function g.test_statistics()
for _ = 1, taken_task_count do
table.insert(taken_task_pack,
g.queue_conn:call(
shape_cmd(tube_name, 'take'),
utils.shape_cmd(tube_name, 'take'),
{ 0.001 }
)[utils.index.task_id])
end
Expand All @@ -72,7 +67,7 @@ function g.test_statistics()
-- done few task with ack
local done_task_count = 10
for i = 1, done_task_count do
g.queue_conn:call(shape_cmd(tube_name, 'ack'), { taken_task_pack[i] })
g.queue_conn:call(utils.shape_cmd(tube_name, 'ack'), { taken_task_pack[i] })
end

-- after ack
Expand Down
12 changes: 4 additions & 8 deletions test/timeout_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,18 @@ local config = require('test.helper.config')
local utils = require('test.helper.utils')
local fiber = require('fiber')

g.before_all = function()
g.before_all(function()
--- Workaround for https://github.com/tarantool/cartridge/issues/462
config.cluster:server('queue-router').net_box:close()
config.cluster:server('queue-router').net_box = nil
config.cluster:server('queue-router'):connect_net_box()
g.queue_conn = config.cluster:server('queue-router').net_box
end

local function shape_cmd(tube_name, cmd)
return string.format('queue.tube.%s:%s', tube_name, cmd)
end
end)

local function task_take(tube_name, timeout, channel)
-- fiber function for take task with timeout and calc duration time
local start = fiber.time64()
local task = g.queue_conn:call(shape_cmd(tube_name, 'take'), { timeout })
local task = g.queue_conn:call(utils.shape_cmd(tube_name, 'take'), { timeout })
local duration = fiber.time64() - start

channel:put(duration)
Expand Down Expand Up @@ -73,7 +69,7 @@ function g.test_wait_put_taking()
fiber.create(task_take, tube_name, timeout, channel)

fiber.sleep(timeout / 2)
t.assert(g.queue_conn:call(shape_cmd(tube_name, 'put'), { 'simple_task' }, {timeout=1}))
t.assert(g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { 'simple_task' }, {timeout=1}))

local waiting_time = tonumber(channel:get()) / 1e6
local task = channel:get()
Expand Down
Loading