diff --git a/queue/abstract/driver/fifo.lua b/queue/abstract/driver/fifo.lua index c996e6e8..87b95839 100644 --- a/queue/abstract/driver/fifo.lua +++ b/queue/abstract/driver/fifo.lua @@ -6,12 +6,23 @@ local str_type = require('queue.compat').str_type local tube = {} local method = {} +-- validate space of queue +local function validate_space(space) + -- check indexes + local indexes = {'task_id', 'status'} + for _, index in pairs(indexes) do + if space.index[index] == nil then + error(string.format('space "%s" does not have "%s" index', + space.name, index)) + end + end +end + -- create space function tube.create_space(space_name, opts) local space_opts = {} local if_not_exists = opts.if_not_exists or false space_opts.temporary = opts.temporary or false - space_opts.if_not_exists = if_not_exists space_opts.engine = opts.engine or 'memtx' space_opts.format = { {name = 'task_id', type = num_type()}, @@ -19,22 +30,29 @@ function tube.create_space(space_name, opts) {name = 'data', type = '*'} } - local space = box.schema.create_space(space_name, space_opts) + local space = box.space[space_name] + if if_not_exists and space then + -- Validate the existing space. + validate_space(box.space[space_name]) + return space + end + + space = box.schema.create_space(space_name, space_opts) space:create_index('task_id', { type = 'tree', - parts = {1, num_type()}, - if_not_exists = if_not_exists + parts = {1, num_type()} }) space:create_index('status', { type = 'tree', - parts = {2, str_type(), 1, num_type()}, - if_not_exists = if_not_exists + parts = {2, str_type(), 1, num_type()} }) return space end -- start tube on space function tube.new(space, on_task_change) + validate_space(space) + on_task_change = on_task_change or (function() end) local self = setmetatable({ space = space, diff --git a/queue/abstract/driver/fifottl.lua b/queue/abstract/driver/fifottl.lua index 1aa6f3f6..800c4fbe 100644 --- a/queue/abstract/driver/fifottl.lua +++ b/queue/abstract/driver/fifottl.lua @@ -43,6 +43,18 @@ local function is_expired(task) return (dead_event <= fiber.time64()) end +-- validate space of queue +local function validate_space(space) + -- check indexes + local indexes = {'task_id', 'status', 'watch'} + for _, index in pairs(indexes) do + if space.index[index] == nil then + error(string.format('space "%s" does not have "%s" index', + space.name, index)) + end + end +end + -- create space function tube.create_space(space_name, opts) opts.ttl = opts.ttl or TIMEOUT_INFINITY @@ -52,7 +64,6 @@ function tube.create_space(space_name, opts) local space_opts = {} local if_not_exists = opts.if_not_exists or false space_opts.temporary = opts.temporary or false - space_opts.if_not_exists = if_not_exists space_opts.engine = opts.engine or 'memtx' space_opts.format = { {name = 'task_id', type = num_type()}, @@ -67,23 +78,26 @@ function tube.create_space(space_name, opts) -- 1 2 3 4 5 6 7, 8 -- task_id, status, next_event, ttl, ttr, pri, created, data - local space = box.schema.create_space(space_name, space_opts) + local space = box.space[space_name] + if if_not_exists and space then + -- Validate the existing space. + validate_space(box.space[space_name]) + return space + end + space = box.schema.create_space(space_name, space_opts) space:create_index('task_id', { type = 'tree', - parts = {i_id, num_type()}, - if_not_exists = if_not_exists + parts = {i_id, num_type()} }) space:create_index('status', { type = 'tree', - parts = {i_status, str_type(), i_pri, num_type(), i_id, num_type()}, - if_not_exists = if_not_exists + parts = {i_status, str_type(), i_pri, num_type(), i_id, num_type()} }) space:create_index('watch', { type = 'tree', parts = {i_status, str_type(), i_next_event, num_type()}, - unique = false, - if_not_exists = if_not_exists + unique = false }) return space end @@ -179,6 +193,8 @@ end -- start tube on space function tube.new(space, on_task_change, opts) + validate_space(space) + on_task_change = on_task_change or (function() end) local self = setmetatable({ space = space, diff --git a/queue/abstract/driver/utube.lua b/queue/abstract/driver/utube.lua index 21554e84..2ede5467 100644 --- a/queue/abstract/driver/utube.lua +++ b/queue/abstract/driver/utube.lua @@ -7,12 +7,23 @@ local method = {} local i_status = 2 +-- validate space of queue +local function validate_space(space) + -- check indexes + local indexes = {'task_id', 'status', 'utube'} + for _, index in pairs(indexes) do + if space.index[index] == nil then + error(string.format('space "%s" does not have "%s" index', + space.name, index)) + end + end +end + -- create space function tube.create_space(space_name, opts) local space_opts = {} local if_not_exists = opts.if_not_exists or false space_opts.temporary = opts.temporary or false - space_opts.if_not_exists = if_not_exists space_opts.engine = opts.engine or 'memtx' space_opts.format = { {name = 'task_id', type = num_type()}, @@ -22,27 +33,33 @@ function tube.create_space(space_name, opts) } -- id, status, utube, data - local space = box.schema.create_space(space_name, space_opts) + local space = box.space[space_name] + if if_not_exists and space then + -- Validate the existing space. + validate_space(box.space[space_name]) + return space + end + + space = box.schema.create_space(space_name, space_opts) space:create_index('task_id', { type = 'tree', - parts = {1, num_type()}, - if_not_exists = if_not_exists + parts = {1, num_type()} }) space:create_index('status', { type = 'tree', - parts = {2, str_type(), 1, num_type()}, - if_not_exists = if_not_exists + parts = {2, str_type(), 1, num_type()} }) space:create_index('utube', { type = 'tree', - parts = {2, str_type(), 3, str_type(), 1, num_type()}, - if_not_exists = if_not_exists + parts = {2, str_type(), 3, str_type(), 1, num_type()} }) return space end -- start tube on space function tube.new(space, on_task_change) + validate_space(space) + on_task_change = on_task_change or (function() end) local self = setmetatable({ space = space, diff --git a/queue/abstract/driver/utubettl.lua b/queue/abstract/driver/utubettl.lua index 0c17d1be..b853c59b 100644 --- a/queue/abstract/driver/utubettl.lua +++ b/queue/abstract/driver/utubettl.lua @@ -45,6 +45,18 @@ local function is_expired(task) return (dead_event <= fiber.time64()) end +-- validate space of queue +local function validate_space(space) + -- check indexes + local indexes = {'task_id', 'status', 'utube', 'watch'} + for _, index in pairs(indexes) do + if space.index[index] == nil then + error(string.format('space "%s" does not have "%s" index', + space.name, index)) + end + end +end + -- create space function tube.create_space(space_name, opts) opts.ttl = opts.ttl or TIMEOUT_INFINITY @@ -54,7 +66,6 @@ function tube.create_space(space_name, opts) local space_opts = {} local if_not_exists = opts.if_not_exists or false space_opts.temporary = opts.temporary or false - space_opts.if_not_exists = if_not_exists space_opts.engine = opts.engine or 'memtx' space_opts.format = { {name = 'task_id', type = num_type()}, @@ -70,28 +81,30 @@ function tube.create_space(space_name, opts) -- 1 2 3 4 5 6 7, 8 9 -- task_id, status, next_event, ttl, ttr, pri, created, utube, data - local space = box.schema.create_space(space_name, space_opts) + local space = box.space[space_name] + if if_not_exists and space then + -- Validate the existing space. + validate_space(box.space[space_name]) + return space + end + space = box.schema.create_space(space_name, space_opts) space:create_index('task_id', { type = 'tree', - parts = {i_id, num_type()}, - if_not_exists = if_not_exists + parts = {i_id, num_type()} }) space:create_index('status', { type = 'tree', - parts = {i_status, str_type(), i_pri, num_type(), i_id, num_type()}, - if_not_exists = if_not_exists + parts = {i_status, str_type(), i_pri, num_type(), i_id, num_type()} }) space:create_index('watch', { type = 'tree', parts = {i_status, str_type(), i_next_event, num_type()}, - unique = false, - if_not_exists = if_not_exists + unique = false }) space:create_index('utube', { type = 'tree', - parts = {i_status, str_type(), i_utube, str_type(), i_id, num_type()}, - if_not_exists = if_not_exists + parts = {i_status, str_type(), i_utube, str_type(), i_id, num_type()} }) return space end @@ -188,6 +201,8 @@ end -- start tube on space function tube.new(space, on_task_change, opts) + validate_space(space) + on_task_change = on_task_change or (function() end) local self = setmetatable({ space = space, diff --git a/t/160-validate-space.t b/t/160-validate-space.t new file mode 100755 index 00000000..9dc25ea3 --- /dev/null +++ b/t/160-validate-space.t @@ -0,0 +1,107 @@ +#!/usr/bin/env tarantool + +local tap = require('tap') +local os = require('os') +local tnt = require('t.tnt') +local test = tap.test('test space validation') + +local fifo = require('queue.abstract.driver.fifo') +local fifottl = require('queue.abstract.driver.fifottl') +local utube = require('queue.abstract.driver.utube') +local utubettl = require('queue.abstract.driver.utubettl') + +local engine = os.getenv('ENGINE') or 'memtx' + +test:plan(8) +tnt.cfg{} + +local function test_corrupted_space(test, driver, indexes) + test:plan(table.getn(indexes)) + + -- Can't drop primary key in space while secondary keys exist. + -- So, drop other indexes previously. + local function remove_task_id_index(space, indexes) + for _, index in pairs(indexes) do + if index ~= 'task_id' then + space.index[index]:drop() + end + end + space.index.task_id:drop() + end + + for _, index in pairs(indexes) do + test:test(index .. ' index does not exist', function(test) + test:plan(2) + + local space = driver.create_space('corrupted_space', + {engine = engine}) + + if index == 'task_id' then + remove_task_id_index(space, indexes) + else + space.index[index]:drop() + end + + local res, err = pcall(driver.new, space) + local err_match_msg = string.format('space "corrupted_space"' .. + ' does not have "%s" index', index) + test:ok(not res, 'exception was thrown') + test:ok(err:match(err_match_msg) ~= nil, 'text of exception') + + space:drop() + end) + end +end + +local function test_name_conflict(test, driver) + test:plan(2) + + local conflict_space = box.schema.create_space('conflict_tube') + local res, err = pcall(driver.create_space,'conflict_tube', + {engine = engine, if_not_exists = true}) + + test:ok(not res, 'exception was thrown') + test:ok(err:match('space "conflict_tube" does not' .. + ' have "task_id" index') ~= nil, 'text of exception') + + conflict_space:drop() +end + +test:test('test corrupted space fifo', function(test) + test_corrupted_space(test, fifo, {'task_id', 'status'}) +end) + +test:test('test corrupted space fifottl', function(test) + test_corrupted_space(test, fifottl, {'task_id', 'status', 'watch'}) +end) + +test:test('test corrupted space utube', function(test) + test_corrupted_space(test, utube, {'task_id', 'status', 'utube'}) +end) + +test:test('test corrupted space utubettl', function(test) + test_corrupted_space(test, utubettl, + {'task_id', 'status', 'utube', 'watch'}) +end) + +test:test('Space name conflict fifo', function(test) + test_name_conflict(test, fifo) +end) + +test:test('Space name conflict fifo', function(test) + local fifo = require('queue.abstract.driver.fifo') + test_name_conflict(test, fifottl) +end) + +test:test('Space name conflict fifo', function(test) + local fifo = require('queue.abstract.driver.fifo') + test_name_conflict(test, utube) +end) + +test:test('Space name conflict fifo', function(test) + local fifo = require('queue.abstract.driver.fifo') + test_name_conflict(test, utubettl) +end) + +tnt.finish() +os.exit(test:check() and 0 or 1)