From ddf5414dd14d180744af0274c2785322d43aaf89 Mon Sep 17 00:00:00 2001 From: Oleg Mikhalev Date: Fri, 23 Oct 2020 00:53:36 +0300 Subject: [PATCH 1/2] drivers: add a validation of the space indexes For all queue drivers, the "new" method now checks for the presence of indexes created by the "create_space" method. Co-authored-by: Leonid Vasiliev Part of #62 --- queue/abstract/driver/fifo.lua | 14 ++++++ queue/abstract/driver/fifottl.lua | 14 ++++++ queue/abstract/driver/utube.lua | 14 ++++++ queue/abstract/driver/utubettl.lua | 14 ++++++ t/160-validate-space.t | 74 ++++++++++++++++++++++++++++++ 5 files changed, 130 insertions(+) create mode 100755 t/160-validate-space.t diff --git a/queue/abstract/driver/fifo.lua b/queue/abstract/driver/fifo.lua index c996e6e8..2884e8f9 100644 --- a/queue/abstract/driver/fifo.lua +++ b/queue/abstract/driver/fifo.lua @@ -6,6 +6,18 @@ local str_type = require('queue.compat').str_type local tube = {} local method = {} +-- validate space of queue +local function validate_space(space) + -- check indexes + local indexes = {'task_id', 'status'} + for _, index in pairs(indexes) do + if space.index[index] == nil then + error(string.format('space "%s" does not have "%s" index', + space.name, index)) + end + end +end + -- create space function tube.create_space(space_name, opts) local space_opts = {} @@ -35,6 +47,8 @@ end -- start tube on space function tube.new(space, on_task_change) + validate_space(space) + on_task_change = on_task_change or (function() end) local self = setmetatable({ space = space, diff --git a/queue/abstract/driver/fifottl.lua b/queue/abstract/driver/fifottl.lua index 1aa6f3f6..0e9ab0b7 100644 --- a/queue/abstract/driver/fifottl.lua +++ b/queue/abstract/driver/fifottl.lua @@ -43,6 +43,18 @@ local function is_expired(task) return (dead_event <= fiber.time64()) end +-- validate space of queue +local function validate_space(space) + -- check indexes + local indexes = {'task_id', 'status', 'watch'} + for _, index in pairs(indexes) do + if space.index[index] == nil then + error(string.format('space "%s" does not have "%s" index', + space.name, index)) + end + end +end + -- create space function tube.create_space(space_name, opts) opts.ttl = opts.ttl or TIMEOUT_INFINITY @@ -179,6 +191,8 @@ end -- start tube on space function tube.new(space, on_task_change, opts) + validate_space(space) + on_task_change = on_task_change or (function() end) local self = setmetatable({ space = space, diff --git a/queue/abstract/driver/utube.lua b/queue/abstract/driver/utube.lua index 21554e84..c0284d29 100644 --- a/queue/abstract/driver/utube.lua +++ b/queue/abstract/driver/utube.lua @@ -7,6 +7,18 @@ local method = {} local i_status = 2 +-- validate space of queue +local function validate_space(space) + -- check indexes + local indexes = {'task_id', 'status', 'utube'} + for _, index in pairs(indexes) do + if space.index[index] == nil then + error(string.format('space "%s" does not have "%s" index', + space.name, index)) + end + end +end + -- create space function tube.create_space(space_name, opts) local space_opts = {} @@ -43,6 +55,8 @@ end -- start tube on space function tube.new(space, on_task_change) + validate_space(space) + on_task_change = on_task_change or (function() end) local self = setmetatable({ space = space, diff --git a/queue/abstract/driver/utubettl.lua b/queue/abstract/driver/utubettl.lua index 0c17d1be..fc3fdea0 100644 --- a/queue/abstract/driver/utubettl.lua +++ b/queue/abstract/driver/utubettl.lua @@ -45,6 +45,18 @@ local function is_expired(task) return (dead_event <= fiber.time64()) end +-- validate space of queue +local function validate_space(space) + -- check indexes + local indexes = {'task_id', 'status', 'utube', 'watch'} + for _, index in pairs(indexes) do + if space.index[index] == nil then + error(string.format('space "%s" does not have "%s" index', + space.name, index)) + end + end +end + -- create space function tube.create_space(space_name, opts) opts.ttl = opts.ttl or TIMEOUT_INFINITY @@ -188,6 +200,8 @@ end -- start tube on space function tube.new(space, on_task_change, opts) + validate_space(space) + on_task_change = on_task_change or (function() end) local self = setmetatable({ space = space, diff --git a/t/160-validate-space.t b/t/160-validate-space.t new file mode 100755 index 00000000..6320736e --- /dev/null +++ b/t/160-validate-space.t @@ -0,0 +1,74 @@ +#!/usr/bin/env tarantool + +local tap = require('tap') +local os = require('os') +local tnt = require('t.tnt') +local test = tap.test('test space validation') + +local fifo = require('queue.abstract.driver.fifo') +local fifottl = require('queue.abstract.driver.fifottl') +local utube = require('queue.abstract.driver.utube') +local utubettl = require('queue.abstract.driver.utubettl') + +local engine = os.getenv('ENGINE') or 'memtx' + +test:plan(4) +tnt.cfg{} + +local function test_corrupted_space(test, driver, indexes) + test:plan(table.getn(indexes)) + + -- Can't drop primary key in space while secondary keys exist. + -- So, drop other indexes previously. + local function remove_task_id_index(space, indexes) + for _, index in pairs(indexes) do + if index ~= 'task_id' then + space.index[index]:drop() + end + end + space.index.task_id:drop() + end + + for _, index in pairs(indexes) do + test:test(index .. ' index does not exist', function(test) + test:plan(2) + + local space = driver.create_space('corrupted_space', + {engine = engine}) + + if index == 'task_id' then + remove_task_id_index(space, indexes) + else + space.index[index]:drop() + end + + local res, err = pcall(driver.new, space) + local err_match_msg = string.format('space "corrupted_space"' .. + ' does not have "%s" index', index) + test:ok(not res, 'exception was thrown') + test:ok(err:match(err_match_msg) ~= nil, 'text of exception') + + space:drop() + end) + end +end + +test:test('test corrupted space fifo', function(test) + test_corrupted_space(test, fifo, {'task_id', 'status'}) +end) + +test:test('test corrupted space fifottl', function(test) + test_corrupted_space(test, fifottl, {'task_id', 'status', 'watch'}) +end) + +test:test('test corrupted space utube', function(test) + test_corrupted_space(test, utube, {'task_id', 'status', 'utube'}) +end) + +test:test('test corrupted space utubettl', function(test) + test_corrupted_space(test, utubettl, + {'task_id', 'status', 'utube', 'watch'}) +end) + +tnt.finish() +os.exit(test:check() and 0 or 1) From e21bb153da63bac83ada76fb76f84ce50ddba2a4 Mon Sep 17 00:00:00 2001 From: Leonid Vasiliev Date: Fri, 23 Oct 2020 01:10:02 +0300 Subject: [PATCH 2/2] drivers: add validation of the space to create_space If a space is creating with the "if_not_exist = true" option and a space with such name already exists (and this is not a tube of the required type), a conflict will be detected and an error will be thrown. This is necessary because we don't want to create additional indexes for the existing space and repurpose it into the "tube". Closes #62 --- queue/abstract/driver/fifo.lua | 16 +++++++++----- queue/abstract/driver/fifottl.lua | 18 ++++++++------- queue/abstract/driver/utube.lua | 19 +++++++++------- queue/abstract/driver/utubettl.lua | 21 +++++++++--------- t/160-validate-space.t | 35 +++++++++++++++++++++++++++++- 5 files changed, 76 insertions(+), 33 deletions(-) diff --git a/queue/abstract/driver/fifo.lua b/queue/abstract/driver/fifo.lua index 2884e8f9..87b95839 100644 --- a/queue/abstract/driver/fifo.lua +++ b/queue/abstract/driver/fifo.lua @@ -23,7 +23,6 @@ function tube.create_space(space_name, opts) local space_opts = {} local if_not_exists = opts.if_not_exists or false space_opts.temporary = opts.temporary or false - space_opts.if_not_exists = if_not_exists space_opts.engine = opts.engine or 'memtx' space_opts.format = { {name = 'task_id', type = num_type()}, @@ -31,16 +30,21 @@ function tube.create_space(space_name, opts) {name = 'data', type = '*'} } - local space = box.schema.create_space(space_name, space_opts) + local space = box.space[space_name] + if if_not_exists and space then + -- Validate the existing space. + validate_space(box.space[space_name]) + return space + end + + space = box.schema.create_space(space_name, space_opts) space:create_index('task_id', { type = 'tree', - parts = {1, num_type()}, - if_not_exists = if_not_exists + parts = {1, num_type()} }) space:create_index('status', { type = 'tree', - parts = {2, str_type(), 1, num_type()}, - if_not_exists = if_not_exists + parts = {2, str_type(), 1, num_type()} }) return space end diff --git a/queue/abstract/driver/fifottl.lua b/queue/abstract/driver/fifottl.lua index 0e9ab0b7..800c4fbe 100644 --- a/queue/abstract/driver/fifottl.lua +++ b/queue/abstract/driver/fifottl.lua @@ -64,7 +64,6 @@ function tube.create_space(space_name, opts) local space_opts = {} local if_not_exists = opts.if_not_exists or false space_opts.temporary = opts.temporary or false - space_opts.if_not_exists = if_not_exists space_opts.engine = opts.engine or 'memtx' space_opts.format = { {name = 'task_id', type = num_type()}, @@ -79,23 +78,26 @@ function tube.create_space(space_name, opts) -- 1 2 3 4 5 6 7, 8 -- task_id, status, next_event, ttl, ttr, pri, created, data - local space = box.schema.create_space(space_name, space_opts) + local space = box.space[space_name] + if if_not_exists and space then + -- Validate the existing space. + validate_space(box.space[space_name]) + return space + end + space = box.schema.create_space(space_name, space_opts) space:create_index('task_id', { type = 'tree', - parts = {i_id, num_type()}, - if_not_exists = if_not_exists + parts = {i_id, num_type()} }) space:create_index('status', { type = 'tree', - parts = {i_status, str_type(), i_pri, num_type(), i_id, num_type()}, - if_not_exists = if_not_exists + parts = {i_status, str_type(), i_pri, num_type(), i_id, num_type()} }) space:create_index('watch', { type = 'tree', parts = {i_status, str_type(), i_next_event, num_type()}, - unique = false, - if_not_exists = if_not_exists + unique = false }) return space end diff --git a/queue/abstract/driver/utube.lua b/queue/abstract/driver/utube.lua index c0284d29..2ede5467 100644 --- a/queue/abstract/driver/utube.lua +++ b/queue/abstract/driver/utube.lua @@ -24,7 +24,6 @@ function tube.create_space(space_name, opts) local space_opts = {} local if_not_exists = opts.if_not_exists or false space_opts.temporary = opts.temporary or false - space_opts.if_not_exists = if_not_exists space_opts.engine = opts.engine or 'memtx' space_opts.format = { {name = 'task_id', type = num_type()}, @@ -34,21 +33,25 @@ function tube.create_space(space_name, opts) } -- id, status, utube, data - local space = box.schema.create_space(space_name, space_opts) + local space = box.space[space_name] + if if_not_exists and space then + -- Validate the existing space. + validate_space(box.space[space_name]) + return space + end + + space = box.schema.create_space(space_name, space_opts) space:create_index('task_id', { type = 'tree', - parts = {1, num_type()}, - if_not_exists = if_not_exists + parts = {1, num_type()} }) space:create_index('status', { type = 'tree', - parts = {2, str_type(), 1, num_type()}, - if_not_exists = if_not_exists + parts = {2, str_type(), 1, num_type()} }) space:create_index('utube', { type = 'tree', - parts = {2, str_type(), 3, str_type(), 1, num_type()}, - if_not_exists = if_not_exists + parts = {2, str_type(), 3, str_type(), 1, num_type()} }) return space end diff --git a/queue/abstract/driver/utubettl.lua b/queue/abstract/driver/utubettl.lua index fc3fdea0..b853c59b 100644 --- a/queue/abstract/driver/utubettl.lua +++ b/queue/abstract/driver/utubettl.lua @@ -66,7 +66,6 @@ function tube.create_space(space_name, opts) local space_opts = {} local if_not_exists = opts.if_not_exists or false space_opts.temporary = opts.temporary or false - space_opts.if_not_exists = if_not_exists space_opts.engine = opts.engine or 'memtx' space_opts.format = { {name = 'task_id', type = num_type()}, @@ -82,28 +81,30 @@ function tube.create_space(space_name, opts) -- 1 2 3 4 5 6 7, 8 9 -- task_id, status, next_event, ttl, ttr, pri, created, utube, data - local space = box.schema.create_space(space_name, space_opts) + local space = box.space[space_name] + if if_not_exists and space then + -- Validate the existing space. + validate_space(box.space[space_name]) + return space + end + space = box.schema.create_space(space_name, space_opts) space:create_index('task_id', { type = 'tree', - parts = {i_id, num_type()}, - if_not_exists = if_not_exists + parts = {i_id, num_type()} }) space:create_index('status', { type = 'tree', - parts = {i_status, str_type(), i_pri, num_type(), i_id, num_type()}, - if_not_exists = if_not_exists + parts = {i_status, str_type(), i_pri, num_type(), i_id, num_type()} }) space:create_index('watch', { type = 'tree', parts = {i_status, str_type(), i_next_event, num_type()}, - unique = false, - if_not_exists = if_not_exists + unique = false }) space:create_index('utube', { type = 'tree', - parts = {i_status, str_type(), i_utube, str_type(), i_id, num_type()}, - if_not_exists = if_not_exists + parts = {i_status, str_type(), i_utube, str_type(), i_id, num_type()} }) return space end diff --git a/t/160-validate-space.t b/t/160-validate-space.t index 6320736e..9dc25ea3 100755 --- a/t/160-validate-space.t +++ b/t/160-validate-space.t @@ -12,7 +12,7 @@ local utubettl = require('queue.abstract.driver.utubettl') local engine = os.getenv('ENGINE') or 'memtx' -test:plan(4) +test:plan(8) tnt.cfg{} local function test_corrupted_space(test, driver, indexes) @@ -53,6 +53,20 @@ local function test_corrupted_space(test, driver, indexes) end end +local function test_name_conflict(test, driver) + test:plan(2) + + local conflict_space = box.schema.create_space('conflict_tube') + local res, err = pcall(driver.create_space,'conflict_tube', + {engine = engine, if_not_exists = true}) + + test:ok(not res, 'exception was thrown') + test:ok(err:match('space "conflict_tube" does not' .. + ' have "task_id" index') ~= nil, 'text of exception') + + conflict_space:drop() +end + test:test('test corrupted space fifo', function(test) test_corrupted_space(test, fifo, {'task_id', 'status'}) end) @@ -70,5 +84,24 @@ test:test('test corrupted space utubettl', function(test) {'task_id', 'status', 'utube', 'watch'}) end) +test:test('Space name conflict fifo', function(test) + test_name_conflict(test, fifo) +end) + +test:test('Space name conflict fifo', function(test) + local fifo = require('queue.abstract.driver.fifo') + test_name_conflict(test, fifottl) +end) + +test:test('Space name conflict fifo', function(test) + local fifo = require('queue.abstract.driver.fifo') + test_name_conflict(test, utube) +end) + +test:test('Space name conflict fifo', function(test) + local fifo = require('queue.abstract.driver.fifo') + test_name_conflict(test, utubettl) +end) + tnt.finish() os.exit(test:check() and 0 or 1)