Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions queue/abstract/driver/fifo.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,53 @@ local str_type = require('queue.compat').str_type
local tube = {}
local method = {}

-- validate space of queue
local function validate_space(space)
-- check indexes
local indexes = {'task_id', 'status'}
for _, index in pairs(indexes) do
if space.index[index] == nil then
error(string.format('space "%s" does not have "%s" index',
space.name, index))
end
end
end

-- create space
function tube.create_space(space_name, opts)
local space_opts = {}
local if_not_exists = opts.if_not_exists or false
space_opts.temporary = opts.temporary or false
space_opts.if_not_exists = if_not_exists
space_opts.engine = opts.engine or 'memtx'
space_opts.format = {
{name = 'task_id', type = num_type()},
{name = 'status', type = str_type()},
{name = 'data', type = '*'}
}

local space = box.schema.create_space(space_name, space_opts)
local space = box.space[space_name]
if if_not_exists and space then
-- Validate the existing space.
validate_space(box.space[space_name])
return space
end

space = box.schema.create_space(space_name, space_opts)
space:create_index('task_id', {
type = 'tree',
parts = {1, num_type()},
if_not_exists = if_not_exists
parts = {1, num_type()}
})
space:create_index('status', {
type = 'tree',
parts = {2, str_type(), 1, num_type()},
if_not_exists = if_not_exists
parts = {2, str_type(), 1, num_type()}
})
return space
end

-- start tube on space
function tube.new(space, on_task_change)
validate_space(space)

on_task_change = on_task_change or (function() end)
local self = setmetatable({
space = space,
Expand Down
32 changes: 24 additions & 8 deletions queue/abstract/driver/fifottl.lua
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@ local function is_expired(task)
return (dead_event <= fiber.time64())
end

-- validate space of queue
local function validate_space(space)
-- check indexes
local indexes = {'task_id', 'status', 'watch'}
for _, index in pairs(indexes) do
if space.index[index] == nil then
error(string.format('space "%s" does not have "%s" index',
space.name, index))
end
end
end

-- create space
function tube.create_space(space_name, opts)
opts.ttl = opts.ttl or TIMEOUT_INFINITY
Expand All @@ -52,7 +64,6 @@ function tube.create_space(space_name, opts)
local space_opts = {}
local if_not_exists = opts.if_not_exists or false
space_opts.temporary = opts.temporary or false
space_opts.if_not_exists = if_not_exists
space_opts.engine = opts.engine or 'memtx'
space_opts.format = {
{name = 'task_id', type = num_type()},
Expand All @@ -67,23 +78,26 @@ function tube.create_space(space_name, opts)

-- 1 2 3 4 5 6 7, 8
-- task_id, status, next_event, ttl, ttr, pri, created, data
local space = box.schema.create_space(space_name, space_opts)
local space = box.space[space_name]
if if_not_exists and space then
-- Validate the existing space.
validate_space(box.space[space_name])
return space
end

space = box.schema.create_space(space_name, space_opts)
space:create_index('task_id', {
type = 'tree',
parts = {i_id, num_type()},
if_not_exists = if_not_exists
parts = {i_id, num_type()}
})
space:create_index('status', {
type = 'tree',
parts = {i_status, str_type(), i_pri, num_type(), i_id, num_type()},
if_not_exists = if_not_exists
parts = {i_status, str_type(), i_pri, num_type(), i_id, num_type()}
})
space:create_index('watch', {
type = 'tree',
parts = {i_status, str_type(), i_next_event, num_type()},
unique = false,
if_not_exists = if_not_exists
unique = false
})
return space
end
Expand Down Expand Up @@ -179,6 +193,8 @@ end

-- start tube on space
function tube.new(space, on_task_change, opts)
validate_space(space)

on_task_change = on_task_change or (function() end)
local self = setmetatable({
space = space,
Expand Down
33 changes: 25 additions & 8 deletions queue/abstract/driver/utube.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,23 @@ local method = {}

local i_status = 2

-- validate space of queue
local function validate_space(space)
-- check indexes
local indexes = {'task_id', 'status', 'utube'}
for _, index in pairs(indexes) do
if space.index[index] == nil then
error(string.format('space "%s" does not have "%s" index',
space.name, index))
end
end
end

-- create space
function tube.create_space(space_name, opts)
local space_opts = {}
local if_not_exists = opts.if_not_exists or false
space_opts.temporary = opts.temporary or false
space_opts.if_not_exists = if_not_exists
space_opts.engine = opts.engine or 'memtx'
space_opts.format = {
{name = 'task_id', type = num_type()},
Expand All @@ -22,27 +33,33 @@ function tube.create_space(space_name, opts)
}

-- id, status, utube, data
local space = box.schema.create_space(space_name, space_opts)
local space = box.space[space_name]
if if_not_exists and space then
-- Validate the existing space.
validate_space(box.space[space_name])
return space
end

space = box.schema.create_space(space_name, space_opts)
space:create_index('task_id', {
type = 'tree',
parts = {1, num_type()},
if_not_exists = if_not_exists
parts = {1, num_type()}
})
space:create_index('status', {
type = 'tree',
parts = {2, str_type(), 1, num_type()},
if_not_exists = if_not_exists
parts = {2, str_type(), 1, num_type()}
})
space:create_index('utube', {
type = 'tree',
parts = {2, str_type(), 3, str_type(), 1, num_type()},
if_not_exists = if_not_exists
parts = {2, str_type(), 3, str_type(), 1, num_type()}
})
return space
end

-- start tube on space
function tube.new(space, on_task_change)
validate_space(space)

on_task_change = on_task_change or (function() end)
local self = setmetatable({
space = space,
Expand Down
35 changes: 25 additions & 10 deletions queue/abstract/driver/utubettl.lua
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,18 @@ local function is_expired(task)
return (dead_event <= fiber.time64())
end

-- validate space of queue
local function validate_space(space)
-- check indexes
local indexes = {'task_id', 'status', 'utube', 'watch'}
for _, index in pairs(indexes) do
if space.index[index] == nil then
error(string.format('space "%s" does not have "%s" index',
space.name, index))
end
end
end

-- create space
function tube.create_space(space_name, opts)
opts.ttl = opts.ttl or TIMEOUT_INFINITY
Expand All @@ -54,7 +66,6 @@ function tube.create_space(space_name, opts)
local space_opts = {}
local if_not_exists = opts.if_not_exists or false
space_opts.temporary = opts.temporary or false
space_opts.if_not_exists = if_not_exists
space_opts.engine = opts.engine or 'memtx'
space_opts.format = {
{name = 'task_id', type = num_type()},
Expand All @@ -70,28 +81,30 @@ function tube.create_space(space_name, opts)

-- 1 2 3 4 5 6 7, 8 9
-- task_id, status, next_event, ttl, ttr, pri, created, utube, data
local space = box.schema.create_space(space_name, space_opts)
local space = box.space[space_name]
if if_not_exists and space then
-- Validate the existing space.
validate_space(box.space[space_name])
return space
end

space = box.schema.create_space(space_name, space_opts)
space:create_index('task_id', {
type = 'tree',
parts = {i_id, num_type()},
if_not_exists = if_not_exists
parts = {i_id, num_type()}
})
space:create_index('status', {
type = 'tree',
parts = {i_status, str_type(), i_pri, num_type(), i_id, num_type()},
if_not_exists = if_not_exists
parts = {i_status, str_type(), i_pri, num_type(), i_id, num_type()}
})
space:create_index('watch', {
type = 'tree',
parts = {i_status, str_type(), i_next_event, num_type()},
unique = false,
if_not_exists = if_not_exists
unique = false
})
space:create_index('utube', {
type = 'tree',
parts = {i_status, str_type(), i_utube, str_type(), i_id, num_type()},
if_not_exists = if_not_exists
parts = {i_status, str_type(), i_utube, str_type(), i_id, num_type()}
})
return space
end
Expand Down Expand Up @@ -188,6 +201,8 @@ end

-- start tube on space
function tube.new(space, on_task_change, opts)
validate_space(space)

on_task_change = on_task_change or (function() end)
local self = setmetatable({
space = space,
Expand Down
107 changes: 107 additions & 0 deletions t/160-validate-space.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#!/usr/bin/env tarantool

local tap = require('tap')
local os = require('os')
local tnt = require('t.tnt')
local test = tap.test('test space validation')

local fifo = require('queue.abstract.driver.fifo')
local fifottl = require('queue.abstract.driver.fifottl')
local utube = require('queue.abstract.driver.utube')
local utubettl = require('queue.abstract.driver.utubettl')

local engine = os.getenv('ENGINE') or 'memtx'

test:plan(8)
tnt.cfg{}

local function test_corrupted_space(test, driver, indexes)
test:plan(table.getn(indexes))

-- Can't drop primary key in space while secondary keys exist.
-- So, drop other indexes previously.
local function remove_task_id_index(space, indexes)
for _, index in pairs(indexes) do
if index ~= 'task_id' then
space.index[index]:drop()
end
end
space.index.task_id:drop()
end

for _, index in pairs(indexes) do
test:test(index .. ' index does not exist', function(test)
test:plan(2)

local space = driver.create_space('corrupted_space',
{engine = engine})

if index == 'task_id' then
remove_task_id_index(space, indexes)
else
space.index[index]:drop()
end

local res, err = pcall(driver.new, space)
local err_match_msg = string.format('space "corrupted_space"' ..
' does not have "%s" index', index)
test:ok(not res, 'exception was thrown')
test:ok(err:match(err_match_msg) ~= nil, 'text of exception')

space:drop()
end)
end
end

local function test_name_conflict(test, driver)
test:plan(2)

local conflict_space = box.schema.create_space('conflict_tube')
local res, err = pcall(driver.create_space,'conflict_tube',
{engine = engine, if_not_exists = true})

test:ok(not res, 'exception was thrown')
test:ok(err:match('space "conflict_tube" does not' ..
' have "task_id" index') ~= nil, 'text of exception')

conflict_space:drop()
end

test:test('test corrupted space fifo', function(test)
test_corrupted_space(test, fifo, {'task_id', 'status'})
end)

test:test('test corrupted space fifottl', function(test)
test_corrupted_space(test, fifottl, {'task_id', 'status', 'watch'})
end)

test:test('test corrupted space utube', function(test)
test_corrupted_space(test, utube, {'task_id', 'status', 'utube'})
end)

test:test('test corrupted space utubettl', function(test)
test_corrupted_space(test, utubettl,
{'task_id', 'status', 'utube', 'watch'})
end)

test:test('Space name conflict fifo', function(test)
test_name_conflict(test, fifo)
end)

test:test('Space name conflict fifo', function(test)
local fifo = require('queue.abstract.driver.fifo')
test_name_conflict(test, fifottl)
end)

test:test('Space name conflict fifo', function(test)
local fifo = require('queue.abstract.driver.fifo')
test_name_conflict(test, utube)
end)

test:test('Space name conflict fifo', function(test)
local fifo = require('queue.abstract.driver.fifo')
test_name_conflict(test, utubettl)
end)

tnt.finish()
os.exit(test:check() and 0 or 1)