Skip to content

Commit

Permalink
box: make box.schema DDL functions transactional
Browse files Browse the repository at this point in the history
Wraps multistatement DDL functions into begin/commit block if no
transaction is active.

The functions are:
- `box.schema.space.drop`
- `box.schema.index.create`
- `box.schema.index.drop`
- `box.schema.index.alter`
- `box.schema.sequence.drop`
- `box.schema.func.drop`
- `box.schema.user.create`
- `box.schema.user.drop`
- `box.schema.role.drop`

Added tests for atomicity of each transactioned function except
the `box.schema.role.drop`, which is implicitly tested with the
`box.schema.user.drop` test, and the `box.schema.index.drop`,
which is impossible to test in this flavor whthout using error
injection.

Updated the tests modified in #8947, because
the space drop is atomic now.

Closes #4348

NO_DOC=bugfix
  • Loading branch information
mkostoevr committed Sep 13, 2023
1 parent f713a30 commit 0b0a4a3
Show file tree
Hide file tree
Showing 14 changed files with 340 additions and 111 deletions.
4 changes: 4 additions & 0 deletions changelogs/unreleased/gh-4348-transactional-ddl.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
## bugfix/box

* All DDL functions from the `box.schema` module are now wrapped into a
transaction to avoid database inconsistency on failed operations (gh-4348).
100 changes: 40 additions & 60 deletions src/box/lua/schema.lua
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,18 @@ box.atomic = function(arg0, arg1, ...)
end
end

-- Wrap a function into transaction if none is active.
local function atomic_wrapper(func)
return function(...)
-- No reason to start a transaction if one is active already.
if box.is_in_txn() then
return func(...)
else
return box.atomic(func, ...)
end
end
end

-- box.commit yields, so it's defined as Lua/C binding
-- box.rollback and box.rollback_to_savepoint yields as well

Expand Down Expand Up @@ -820,7 +832,7 @@ end

box.schema.create_space = box.schema.space.create

box.schema.space.drop = function(space_id, space_name, opts)
box.schema.space.drop = atomic_wrapper(function(space_id, space_name, opts)
check_param(space_id, 'space_id', 'number')
opts = opts or {}
check_param_table(opts, { if_exists = 'boolean' })
Expand Down Expand Up @@ -872,7 +884,7 @@ box.schema.space.drop = function(space_id, space_name, opts)
end

feedback_save_event('drop_space')
end
end)

box.schema.space.rename = function(space_id, space_name)
check_param(space_id, 'space_id', 'number')
Expand Down Expand Up @@ -1252,15 +1264,13 @@ local function space_sequence_check(sequence, parts, space_name, index_name)
end

--
-- The first stage of a space sequence modification operation.
-- Called before altering the space definition. Checks sequence
-- options and detaches the old sequence from the space.
-- Returns a proxy object that is supposed to be passed to
-- space_sequence_alter_commit() to complete the operation.
-- Checks sequence options, detaches the old sequence from the space,
-- attaches the new sequence to the space and drops the old sequence
-- if required.
--
local function space_sequence_alter_prepare(format, parts, options,
space_id, index_id,
space_name, index_name)
local function space_sequence_alter(format, parts, options,
space_id, index_id,
space_name, index_name)
local _space_sequence = box.space[box.schema.SPACE_SEQUENCE_ID]

-- A sequence can only be attached to a primary index.
Expand Down Expand Up @@ -1362,35 +1372,9 @@ local function space_sequence_alter_prepare(format, parts, options,
end

if old_sequence ~= nil then
-- Detach the old sequence before altering the space.
_space_sequence:delete(space_id)
end

return {
space_id = space_id,
new_sequence = new_sequence,
old_sequence = old_sequence,
}
end

--
-- The second stage of a space sequence modification operation.
-- Called after altering the space definition. Attaches the sequence
-- to the space and drops the old sequence if required. 'proxy' is
-- an object returned by space_sequence_alter_prepare().
--
local function space_sequence_alter_commit(proxy)
local _space_sequence = box.space[box.schema.SPACE_SEQUENCE_ID]

if proxy == nil then
-- No sequence option, nothing to do.
return
end

local space_id = proxy.space_id
local old_sequence = proxy.old_sequence
local new_sequence = proxy.new_sequence

if new_sequence ~= nil then
-- Attach the new sequence.
_space_sequence:insert{space_id, new_sequence.id,
Expand Down Expand Up @@ -1476,7 +1460,7 @@ local function func_id_by_name(func_name)
end
box.internal.func_id_by_name = func_id_by_name -- for space.upgrade

box.schema.index.create = function(space_id, name, options)
box.schema.index.create = atomic_wrapper(function(space_id, name, options)
check_param(space_id, 'space_id', 'number')
check_param(name, 'name', 'string')
check_param_table(options, create_index_template)
Expand Down Expand Up @@ -1587,21 +1571,19 @@ box.schema.index.create = function(space_id, name, options)
if index_opts.func ~= nil and type(index_opts.func) == 'string' then
index_opts.func = func_id_by_name(index_opts.func)
end
local sequence_proxy = space_sequence_alter_prepare(format, parts, options,
space_id, iid,
space.name, name)
_index:insert{space_id, iid, name, options.type, index_opts, parts}
space_sequence_alter_commit(sequence_proxy)
space_sequence_alter(format, parts, options, space_id,
iid, space.name, name)
if index_opts.func ~= nil then
local _func_index = box.space[box.schema.FUNC_INDEX_ID]
_func_index:insert{space_id, iid, index_opts.func}
end

feedback_save_event('create_index')
return space.index[name]
end
end)

box.schema.index.drop = function(space_id, index_id)
box.schema.index.drop = atomic_wrapper(function(space_id, index_id)
check_param(space_id, 'space_id', 'number')
check_param(index_id, 'index_id', 'number')
if index_id == 0 then
Expand All @@ -1625,7 +1607,7 @@ box.schema.index.drop = function(space_id, index_id)
_index:delete{space_id, index_id}

feedback_save_event('drop_index')
end
end)

box.schema.index.rename = function(space_id, index_id, name)
check_param(space_id, 'space_id', 'number')
Expand All @@ -1636,7 +1618,7 @@ box.schema.index.rename = function(space_id, index_id, name)
_index:update({space_id, index_id}, {{"=", 3, name}})
end

box.schema.index.alter = function(space_id, index_id, options)
box.schema.index.alter = atomic_wrapper(function(space_id, index_id, options)
local space = box.space[space_id]
if space == nil then
box.error(box.error.NO_SUCH_SPACE, '#'..tostring(space_id))
Expand Down Expand Up @@ -1729,17 +1711,15 @@ box.schema.index.alter = function(space_id, index_id, options)
if index_opts.func ~= nil and type(index_opts.func) == 'string' then
index_opts.func = func_id_by_name(index_opts.func)
end
local sequence_proxy = space_sequence_alter_prepare(format, parts, options,
space_id, index_id,
space.name, options.name)
_index:replace{space_id, index_id, options.name, options.type,
index_opts, parts}
space_sequence_alter(format, parts, options, space_id,
index_id, space.name, options.name)
if index_opts.func ~= nil then
local _func_index = box.space[box.schema.FUNC_INDEX_ID]
_func_index:insert{space_id, index_id, index_opts.func}
end
space_sequence_alter_commit(sequence_proxy)
end
end)

-- a static box_tuple_t ** instance for calling box_index_* API
local ptuple = ffi.new('box_tuple_t *[1]')
Expand Down Expand Up @@ -2850,7 +2830,7 @@ box.schema.sequence.alter = function(name, opts)
opts.max, opts.start, opts.cache, opts.cycle}
end
box.schema.sequence.drop = function(name, opts)
box.schema.sequence.drop = atomic_wrapper(function(name, opts)
opts = opts or {}
check_param_table(opts, {if_exists = 'boolean'})
local id = sequence_resolve(name)
Expand All @@ -2865,7 +2845,7 @@ box.schema.sequence.drop = function(name, opts)
local _sequence_data = box.space[box.schema.SEQUENCE_DATA_ID]
_sequence_data:delete{id}
_sequence:delete{id}
end
end)
local function privilege_parse(privs)
-- TODO: introduce a global privilege -> bit mapping?
Expand Down Expand Up @@ -3142,7 +3122,7 @@ box.schema.func.create = function(name, opts)
opts.comment, opts.created, opts.last_altered}
end
box.schema.func.drop = function(name, opts)
box.schema.func.drop = atomic_wrapper(function(name, opts)
opts = opts or {}
check_param_table(opts, { if_exists = 'boolean' })
local _func = box.space[box.schema.FUNC_ID]
Expand All @@ -3165,7 +3145,7 @@ box.schema.func.drop = function(name, opts)
end
revoke_object_privs('function', fid)
_func:delete{fid}
end
end)
function box.schema.func.exists(name_or_id)
local _vfunc = box.space[box.schema.VFUNC_ID]
Expand Down Expand Up @@ -3326,7 +3306,7 @@ box.schema.user.passwd = function(name, new_password)
end
end
box.schema.user.create = function(name, opts)
box.schema.user.create = atomic_wrapper(function(name, opts)
local uid = user_or_role_resolve(name)
opts = opts or {}
check_param_table(opts, { password = 'string', if_not_exists = 'boolean' })
Expand Down Expand Up @@ -3356,7 +3336,7 @@ box.schema.user.create = function(name, opts)
-- grant option
box.session.su('admin', box.schema.user.grant, uid, 'session,usage', 'universe',
nil, {if_not_exists=true})
end
end)
box.schema.user.exists = function(name)
if user_resolve(name) then
Expand Down Expand Up @@ -3542,7 +3522,7 @@ box.schema.user.disable = function(user)
{if_exists = true})
end
box.schema.user.drop = function(name, opts)
box.schema.user.drop = atomic_wrapper(function(name, opts)
opts = opts or {}
check_param_table(opts, { if_exists = 'boolean' })
local uid = user_resolve(name)
Expand All @@ -3563,7 +3543,7 @@ box.schema.user.drop = function(name, opts)
box.error(box.error.NO_SUCH_USER, name)
end
return
end
end)
local function info(id)
local _priv = box.space._vpriv
Expand Down Expand Up @@ -3616,7 +3596,7 @@ box.schema.role.create = function(name, opts)
math.floor(fiber.time())}
end
box.schema.role.drop = function(name, opts)
box.schema.role.drop = atomic_wrapper(function(name, opts)
opts = opts or {}
check_param_table(opts, { if_exists = 'boolean' })
local uid = role_resolve(name)
Expand All @@ -3632,7 +3612,7 @@ box.schema.role.drop = function(name, opts)
box.error(box.error.DROP_USER, name, "the user or the role is a system")
end
return drop(uid)
end
end)
local function role_check_grant_revoke_of_sys_priv(priv)
priv = string.lower(priv)
Expand Down
2 changes: 1 addition & 1 deletion test/box-luatest/builtin_events_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ g.test_box_schema = function(cg)

version_n = 0
cg.master:exec(function() box.space.p:drop() end)
t.helpers.retrying({}, function() t.assert_equals(version_n, 2) end)
t.helpers.retrying({}, function() t.assert_equals(version_n, 1) end)
-- there'll be 2 changes - index and space
t.assert_equals(version, init_version + 4)

Expand Down

0 comments on commit 0b0a4a3

Please sign in to comment.