Skip to content

Commit

Permalink
Make queue to work with vinyl engine
Browse files Browse the repository at this point in the history
  • Loading branch information
bigbes committed Aug 22, 2017
1 parent 39edec9 commit 3adda75
Show file tree
Hide file tree
Showing 14 changed files with 85 additions and 48 deletions.
8 changes: 6 additions & 2 deletions CMakeLists.txt
Expand Up @@ -20,8 +20,12 @@ file(GLOB TEST_LIST ${CMAKE_SOURCE_DIR}/t/*.t)

foreach(TEST ${TEST_LIST})
get_filename_component(TEST_NAME ${TEST} NAME)
add_test(${TEST_NAME} tarantool ${TEST})
set_tests_properties(${TEST_NAME} PROPERTIES ENVIRONMENT "${LUA_PATH}")
add_test (memtx-${TEST_NAME} tarantool ${TEST})
set_tests_properties(memtx-${TEST_NAME} PROPERTIES ENVIRONMENT "${LUA_PATH}")
set_tests_properties(memtx-${TEST_NAME} PROPERTIES ENVIRONMENT "ENGINE=memtx")
add_test (vinyl-${TEST_NAME} tarantool ${TEST})
set_tests_properties(vinyl-${TEST_NAME} PROPERTIES ENVIRONMENT "${LUA_PATH}")
set_tests_properties(vinyl-${TEST_NAME} PROPERTIES ENVIRONMENT "ENGINE=vinyl")
endforeach()

add_custom_target(check
Expand Down
4 changes: 3 additions & 1 deletion queue/abstract/driver/fifo.lua
Expand Up @@ -12,6 +12,7 @@ function tube.create_space(space_name, opts)
local if_not_exists = opts.if_not_exists or false
space_opts.temporary = opts.temporary or false
space_opts.if_not_exists = if_not_exists
space_opts.engine = opts.engine or 'memtx'
space_opts.format = {
[1] = {name = 'task_id', type = num_type()},
[2] = {name = 'status', type = str_type()},
Expand Down Expand Up @@ -73,7 +74,8 @@ end

-- delete task
function method.delete(self, id)
local task = self.space:delete(id)
local task = self.space:get(id)
self.space:delete(id)
if task ~= nil then
task = task:transform(2, 1, state.DONE)
end
Expand Down
4 changes: 3 additions & 1 deletion queue/abstract/driver/fifottl.lua
Expand Up @@ -53,6 +53,7 @@ function tube.create_space(space_name, opts)
local if_not_exists = opts.if_not_exists or false
space_opts.temporary = opts.temporary or false
space_opts.if_not_exists = if_not_exists
space_opts.engine = opts.engine or 'memtx'
space_opts.format = {
[1] = {name = 'task_id', type = num_type()},
[2] = {name = 'status', type = str_type()},
Expand Down Expand Up @@ -289,7 +290,8 @@ end

-- delete task
function method.delete(self, id)
local task = self.space:delete(id)
local task = self.space:get(id)
self.space:delete(id)
if task ~= nil then
task = task:transform(2, 1, state.DONE)
end
Expand Down
4 changes: 3 additions & 1 deletion queue/abstract/driver/utube.lua
Expand Up @@ -13,6 +13,7 @@ function tube.create_space(space_name, opts)
local if_not_exists = opts.if_not_exists or false
space_opts.temporary = opts.temporary or false
space_opts.if_not_exists = if_not_exists
space_opts.engine = opts.engine or 'memtx'
space_opts.format = {
[1] = {name = 'task_id', type = num_type()},
[2] = {name = 'status', type = str_type()},
Expand Down Expand Up @@ -88,7 +89,8 @@ end

-- delete task
function method.delete(self, id)
local task = self.space:delete(id)
local task = self.space:get(id)
self.space:delete(id)
if task ~= nil then
task = task:transform(2, 1, state.DONE)

Expand Down
4 changes: 3 additions & 1 deletion queue/abstract/driver/utubettl.lua
Expand Up @@ -55,6 +55,7 @@ function tube.create_space(space_name, opts)
local if_not_exists = opts.if_not_exists or false
space_opts.temporary = opts.temporary or false
space_opts.if_not_exists = if_not_exists
space_opts.engine = opts.engine or 'memtx'
space_opts.format = {
[1] = {name = 'task_id', type = num_type()},
[2] = {name = 'status', type = str_type()},
Expand Down Expand Up @@ -302,7 +303,8 @@ end

-- delete task
function method.delete(self, id)
local task = self.space:delete(id)
local task = self.space:get(id)
self.space:delete(id)
if task ~= nil then
task = task:transform(i_status, 1, state.DONE)
return process_neighbour(self, task, 'delete')
Expand Down
4 changes: 3 additions & 1 deletion t/000-init.t
Expand Up @@ -7,6 +7,8 @@ test:plan(2)

local queue = require('queue')

local engine = os.getenv('ENGINE') or 'memtx'

test:test('access to queue until box.cfg is started', function(test)
test:plan(3)
test:isnil(rawget(box, 'space'), 'box is not started yet')
Expand All @@ -26,7 +28,7 @@ test:test('access to queue after box.cfg{}', function(test)
test:istable(queue.tube, 'queue.tube is table')
test:is(#queue.tube, 0, 'queue.tube is empty')

local tube = queue.create_tube('test', 'fifo')
local tube = queue.create_tube('test', 'fifo', { engine = engine })
test:ok(queue.tube.test, 'tube "test" is created')

test:ok(queue.tube.test:put(123), 'put')
Expand Down
39 changes: 24 additions & 15 deletions t/010-fifo.t
Expand Up @@ -8,14 +8,16 @@ test:plan(14)
local queue = require('queue')
local state = require('queue.abstract.state')

local engine = os.getenv('ENGINE') or 'memtx'

local tnt = require('t.tnt')
tnt.cfg{}

test:ok(rawget(box, 'space'), 'box started')
test:ok(queue, 'queue is loaded')

local tube = queue.create_tube('test', 'fifo')
local tube2 = queue.create_tube('test_stat', 'fifo')
local tube = queue.create_tube('test', 'fifo', { engine = engine })
local tube2 = queue.create_tube('test_stat', 'fifo', { engine = engine })
test:ok(tube, 'test tube created')
test:is(tube.name, 'test', 'tube.name')
test:is(tube.type, 'fifo', 'tube.type')
Expand Down Expand Up @@ -217,17 +219,21 @@ end)

test:test('creating existing tube', function(test)
test:plan(2)
local s, e = pcall(function() queue.create_tube('test', 'fifo') end)
local s, e = pcall(function() queue.create_tube('test', 'fifo', { engine = engine }) end)
test:ok(not s, 'exception was thrown')
test:ok(e:match("Space 'test' already exists") ~= nil, 'text of exception')
end)

test:test('tempspace', function(test)
test:plan(2)
tube = queue.create_tube('test1', 'fifo', { temporary = true })
test:ok(tube, 'tube was created')
local space_r = box.space._space:get{queue.tube.test1.raw.space.id}
test:ok(space_r[6].temporary, 'really tempspace')
if engine ~= 'vinyl' then
test:plan(2)
tube = queue.create_tube('test1', 'fifo', { temporary = true })
test:ok(tube, 'tube was created')
local space_r = box.space._space:get{queue.tube.test1.raw.space.id}
test:ok(space_r[6].temporary, 'really tempspace')
else
test:plan(0)
end
end)

test:test('disconnect test', function(test)
Expand All @@ -244,7 +250,6 @@ test:test('disconnect test', function(test)
local task3 = tube:take(.1)
test:ok(task3, 'task3 was taken')


queue._on_consumer_disconnect()

test:is(tube:peek(task1[1])[2], state.READY, 'task1 was marked as READY')
Expand All @@ -253,9 +258,13 @@ test:test('disconnect test', function(test)
end)

test:test('if not exists tests', function(test)
test:plan(1)
local tube_dup = queue.create_tube('test1', 'fifo', { if_not_exists = true })
test:is(tube_dup, tube, '')
if engine ~= 'vinyl' then
test:plan(1)
local tube_dup = queue.create_tube('test1', 'fifo', { if_not_exists = true })
test:is(tube_dup, tube, '')
else
test:plan(0)
end
end)

test:test('truncate test', function(test)
Expand All @@ -269,16 +278,16 @@ end)
test:test('if_not_exists test', function(test)
test:plan(2)
local tube = queue.create_tube('test_ine', 'fifo', {
if_not_exists = true
if_not_exists = true, engine = engine
})
local tube_new = queue.create_tube('test_ine', 'fifo', {
if_not_exists = true
if_not_exists = true, engine = engine
})
test:is(tube, tube_new, "if_not_exists if tube exists")

queue.tube['test_ine'] = nil
local tube_new = queue.create_tube('test_ine', 'fifo', {
if_not_exists = true
if_not_exists = true, engine = engine
})
test:isnt(tube, tube_new, "if_not_exists if tube doesn't exists")
end)
Expand Down
12 changes: 7 additions & 5 deletions t/020-fifottl.t
Expand Up @@ -12,11 +12,13 @@ local qc = require('queue.compat')
local tnt = require('t.tnt')
tnt.cfg{}

local engine = os.getenv('ENGINE') or 'vinyl'

test:ok(rawget(box, 'space'), 'box started')
test:ok(queue, 'queue is loaded')

local tube = queue.create_tube('test', 'fifottl')
local tube2 = queue.create_tube('test_stat', 'fifottl')
local tube = queue.create_tube('test', 'fifottl', { engine = engine })
local tube2 = queue.create_tube('test_stat', 'fifottl', { engine = engine })
test:ok(tube, 'test tube created')
test:is(tube.name, 'test', 'tube.name')
test:is(tube.type, 'fifottl', 'tube.type')
Expand Down Expand Up @@ -180,16 +182,16 @@ end)
test:test('if_not_exists test', function(test)
test:plan(2)
local tube = queue.create_tube('test_ine', 'fifottl', {
if_not_exists = true
if_not_exists = true, engine = engine
})
local tube_new = queue.create_tube('test_ine', 'fifottl', {
if_not_exists = true
if_not_exists = true, engine = engine
})
test:is(tube, tube_new, "if_not_exists if tube exists")

queue.tube['test_ine'] = nil
local tube_new = queue.create_tube('test_ine', 'fifottl', {
if_not_exists = true
if_not_exists = true, engine = engine
})
test:isnt(tube, tube_new, "if_not_exists if tube doesn't exists")
end)
Expand Down
12 changes: 7 additions & 5 deletions t/030-utube.t
Expand Up @@ -11,11 +11,13 @@ local state = require('queue.abstract.state')
local tnt = require('t.tnt')
tnt.cfg{}

local engine = os.getenv('ENGINE') or 'memtx'

test:ok(rawget(box, 'space'), 'box started')
test:ok(queue, 'queue is loaded')

local tube = queue.create_tube('test', 'utube')
local tube2 = queue.create_tube('test_stat', 'utube')
local tube = queue.create_tube('test', 'utube', { engine = engine })
local tube2 = queue.create_tube('test_stat', 'utube', { engine = engine })
test:ok(tube, 'test tube created')
test:is(tube.name, 'test', 'tube.name')
test:is(tube.type, 'utube', 'tube.type')
Expand Down Expand Up @@ -140,16 +142,16 @@ end)
test:test('if_not_exists test', function(test)
test:plan(2)
local tube = queue.create_tube('test_ine', 'utube', {
if_not_exists = true
if_not_exists = true, engine = engine
})
local tube_new = queue.create_tube('test_ine', 'utube', {
if_not_exists = true
if_not_exists = true, engine = engine
})
test:is(tube, tube_new, "if_not_exists if tube exists")

queue.tube['test_ine'] = nil
local tube_new = queue.create_tube('test_ine', 'utube', {
if_not_exists = true
if_not_exists = true, engine = engine
})
test:isnt(tube, tube_new, "if_not_exists if tube doesn't exists")
end)
Expand Down
14 changes: 8 additions & 6 deletions t/040-utubettl.t
Expand Up @@ -13,11 +13,13 @@ local qc = require('queue.compat')
local tnt = require('t.tnt')
tnt.cfg{}

local engine = os.getenv('ENGINE') or 'memtx'

test:ok(rawget(box, 'space'), 'box started')
test:ok(queue, 'queue is loaded')

local tube = queue.create_tube('test', 'utubettl')
local tube2 = queue.create_tube('test_stat', 'utubettl')
local tube = queue.create_tube('test', 'utubettl', { engine = engine })
local tube2 = queue.create_tube('test_stat', 'utubettl', { engine = engine })
test:ok(tube, 'test tube created')
test:is(tube.name, 'test', 'tube.name')
test:is(tube.type, 'utubettl', 'tube.type')
Expand Down Expand Up @@ -85,7 +87,7 @@ end)

test:test('ttr put/take', function(test)
test:plan(3)
local my_queue = queue.create_tube('trr_test', 'utubettl')
local my_queue = queue.create_tube('trr_test', 'utubettl', { engine = engine })
test:ok(my_queue:put('ttr1', { ttr = 1 }), 'put ttr task')
test:ok(my_queue:take(0.1) ~= nil, 'take this task')
fiber.sleep(1)
Expand Down Expand Up @@ -204,16 +206,16 @@ end)
test:test('if_not_exists test', function(test)
test:plan(2)
local tube = queue.create_tube('test_ine', 'utubettl', {
if_not_exists = true
if_not_exists = true, engine = engine
})
local tube_new = queue.create_tube('test_ine', 'utubettl', {
if_not_exists = true
if_not_exists = true, engine = engine
})
test:is(tube, tube_new, "if_not_exists if tube exists")

queue.tube['test_ine'] = nil
local tube_new = queue.create_tube('test_ine', 'utubettl', {
if_not_exists = true
if_not_exists = true, engine = engine
})
test:isnt(tube, tube_new, "if_not_exists if tube doesn't exists")
end)
Expand Down
10 changes: 6 additions & 4 deletions t/050-ttl.t
Expand Up @@ -11,13 +11,15 @@ tnt.cfg{
wal_mode = 'none'
}

local engine = os.getenv('ENGINE') or 'memtx'

local ttl = 0.1

test:ok(queue, 'queue is loaded')

test:test('one message per queue ffttl', function (test)
test:plan(20)
local tube = queue.create_tube('ompq_ffttl', 'fifottl')
local tube = queue.create_tube('ompq_ffttl', 'fifottl', { engine = engine })
for i = 1, 20 do
tube:put('ompq_' .. i, {ttl=ttl})
fiber.sleep(ttl)
Expand All @@ -27,7 +29,7 @@ end)

test:test('one message per queue utttl', function (test)
test:plan(20)
local tube = queue.create_tube('ompq_utttl', 'utubettl')
local tube = queue.create_tube('ompq_utttl', 'utubettl', { engine = engine })
for i = 1, 20 do
tube:put('ompq_' .. i, {ttl=ttl})
fiber.sleep(ttl)
Expand All @@ -38,7 +40,7 @@ end)
test:test('many messages, one queue ffttl', function (test)
test:plan(20)
for i = 1, 20 do
local tube = queue.create_tube('mmpq_ffttl_' .. i, 'fifottl')
local tube = queue.create_tube('mmpq_ffttl_' .. i, 'fifottl', { engine = engine })
tube:put('mmpq_' .. i, {ttl=ttl})
fiber.sleep(ttl)
test:is(#{tube:take(.1)}, 0, 'no task is taken')
Expand All @@ -48,7 +50,7 @@ end)
test:test('many messages, one queue utttl', function (test)
test:plan(20)
for i = 1, 20 do
local tube = queue.create_tube('mmpq_utttl_' .. i, 'utubettl')
local tube = queue.create_tube('mmpq_utttl_' .. i, 'utubettl', { engine = engine })
tube:put('mmpq_' .. i, {ttl=ttl})
fiber.sleep(ttl)
test:is(#{tube:take(.1)}, 0, 'no task is taken')
Expand Down
4 changes: 3 additions & 1 deletion t/060-async.t
Expand Up @@ -11,10 +11,12 @@ local state = require('queue.abstract.state')
local tnt = require('t.tnt')
tnt.cfg{}

local engine = os.getenv('ENGINE') or 'memtx'

test:ok(rawget(box, 'space'), 'box started')
test:ok(queue, 'queue is loaded')

local tube = queue.create_tube('test', 'fifo')
local tube = queue.create_tube('test', 'fifo', { engine = engine })
test:ok(tube, 'test tube created')
test:is(tube.name, 'test', 'tube.name')
test:is(tube.type, 'fifo', 'tube.type')
Expand Down
4 changes: 3 additions & 1 deletion t/080-otc-cb.t
Expand Up @@ -5,6 +5,8 @@ test:plan(1)
local tnt = require('t.tnt')
tnt.cfg{}

local engine = os.getenv('ENGINE') or 'memtx'

local queue = require('queue')

local function tube_check_simple(tube)
Expand All @@ -23,7 +25,7 @@ test:test('on_task_change callback', function(test)

-- init with initial callback
local tube = queue.create_tube('test2', 'fifo', {
on_task_change = cb
on_task_change = cb, engine = engine
})

tube_check_simple(tube)
Expand Down

0 comments on commit 3adda75

Please sign in to comment.