Skip to content

Commit

Permalink
vinyl: fix index name in duplicate key error message
Browse files Browse the repository at this point in the history
The code setting ER_TUPLE_FOUND uses index_name_by_id() to find
the index name, but it passes an index in the dense index map to
it while the function expects an index in the sparse index map.
Apparently, this doesn't work as expected after an index is removed
from the middle of the index map. This bug was introduced by
commit fc3834c ("vinyl: check key uniqueness before modifying
tx write set").

Instead of just fixing the index passed to index_name_by_id(), we do
a bit of refactoring. We stop passing index_name and space_name to
vy_check_is_unique_*() functions and instead get them right before
raising ER_TUPLE_FOUND. Note, to get the space name, we need to call
space_by_id() but it should be fine because (a) the space is very likely
to be cached as the last accessed one and (b) this is an error path so
it isn't performance critical. We also drop index_name_by_id() and
extract the index name from the LSM tree object.

Closes #5975

NO_DOC=bug fix
  • Loading branch information
locker committed May 20, 2024
1 parent 5ac0d26 commit 2cfba5e
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 60 deletions.
3 changes: 3 additions & 0 deletions changelogs/unreleased/gh-5975-vy-duplicate-key-error-fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
## bugfix/vinyl

* Fixed a bug when a duplicate key error referred to a wrong index (gh-5975).
9 changes: 0 additions & 9 deletions src/box/space.c
Original file line number Diff line number Diff line change
Expand Up @@ -833,15 +833,6 @@ space_index_def(struct space *space, int n)
return space->index[n]->def;
}

const char *
index_name_by_id(struct space *space, uint32_t id)
{
struct index *index = space_index(space, id);
if (index != NULL)
return index->def->name;
return NULL;
}

/**
* Pushes arguments for replace triggers (on_replace, before_replace)
* to port_c. Transaction mustn't be aborted.
Expand Down
12 changes: 0 additions & 12 deletions src/box/space.h
Original file line number Diff line number Diff line change
Expand Up @@ -516,18 +516,6 @@ space_bsize(struct space *space);
struct index_def *
space_index_def(struct space *space, int n);

/**
* Get name of the index by its identifier and parent space.
*
* @param space Parent space.
* @param id Index identifier.
*
* @retval not NULL Index name.
* @retval NULL No index with the specified identifier.
*/
const char *
index_name_by_id(struct space *space, uint32_t id);

/**
* Check whether or not the current user can be granted
* the requested access to the space.
Expand Down
57 changes: 18 additions & 39 deletions src/box/vinyl.c
Original file line number Diff line number Diff line change
Expand Up @@ -1508,8 +1508,6 @@ vy_get_by_raw_key(struct vy_lsm *lsm, struct vy_tx *tx,
* of the primary index.
* @param tx Current transaction.
* @param rv Read view.
* @param space_name Space name.
* @param index_name Index name.
* @param lsm LSM tree corresponding to the index.
* @param stmt New tuple.
*
Expand All @@ -1518,7 +1516,6 @@ vy_get_by_raw_key(struct vy_lsm *lsm, struct vy_tx *tx,
*/
static inline int
vy_check_is_unique_primary(struct vy_tx *tx, const struct vy_read_view **rv,
const char *space_name, const char *index_name,
struct vy_lsm *lsm, struct tuple *stmt)
{
assert(lsm->index_id == 0);
Expand All @@ -1529,8 +1526,11 @@ vy_check_is_unique_primary(struct vy_tx *tx, const struct vy_read_view **rv,
if (vy_get(lsm, tx, rv, stmt, &found))
return -1;
if (found != NULL) {
diag_set(ClientError, ER_TUPLE_FOUND, index_name, space_name,
tuple_str(found), tuple_str(stmt), found, stmt);
struct index *index = &lsm->base;
struct space *space = space_by_id(index->def->space_id);
diag_set(ClientError, ER_TUPLE_FOUND, index->def->name,
space_name(space), tuple_str(found), tuple_str(stmt),
found, stmt);
tuple_unref(found);
return -1;
}
Expand All @@ -1539,7 +1539,6 @@ vy_check_is_unique_primary(struct vy_tx *tx, const struct vy_read_view **rv,

static int
vy_check_is_unique_secondary_one(struct vy_tx *tx, const struct vy_read_view **rv,
const char *space_name, const char *index_name,
struct vy_lsm *lsm, struct tuple *stmt,
int multikey_idx)
{
Expand Down Expand Up @@ -1577,8 +1576,11 @@ vy_check_is_unique_secondary_one(struct vy_tx *tx, const struct vy_read_view **r
return 0;
}
if (found != NULL) {
diag_set(ClientError, ER_TUPLE_FOUND, index_name, space_name,
tuple_str(found), tuple_str(stmt), found, stmt);
struct index *index = &lsm->base;
struct space *space = space_by_id(index->def->space_id);
diag_set(ClientError, ER_TUPLE_FOUND, index->def->name,
space_name(space), tuple_str(found), tuple_str(stmt),
found, stmt);
tuple_unref(found);
return -1;
}
Expand All @@ -1590,8 +1592,6 @@ vy_check_is_unique_secondary_one(struct vy_tx *tx, const struct vy_read_view **r
* of a secondary index.
* @param tx Current transaction.
* @param rv Read view.
* @param space_name Space name.
* @param index_name Index name.
* @param lsm LSM tree corresponding to the index.
* @param stmt New tuple.
*
Expand All @@ -1600,19 +1600,16 @@ vy_check_is_unique_secondary_one(struct vy_tx *tx, const struct vy_read_view **r
*/
static int
vy_check_is_unique_secondary(struct vy_tx *tx, const struct vy_read_view **rv,
const char *space_name, const char *index_name,
struct vy_lsm *lsm, struct tuple *stmt)
{
assert(lsm->opts.is_unique);
if (!lsm->cmp_def->is_multikey) {
return vy_check_is_unique_secondary_one(tx, rv,
space_name, index_name, lsm, stmt,
MULTIKEY_NONE);
return vy_check_is_unique_secondary_one(tx, rv, lsm, stmt,
MULTIKEY_NONE);
}
int count = tuple_multikey_count(stmt, lsm->cmp_def);
for (int i = 0; i < count; ++i) {
if (vy_check_is_unique_secondary_one(tx, rv,
space_name, index_name, lsm, stmt, i) != 0)
if (vy_check_is_unique_secondary_one(tx, rv, lsm, stmt, i) != 0)
return -1;
}
return 0;
Expand Down Expand Up @@ -1658,9 +1655,7 @@ vy_check_is_unique(struct vy_env *env, struct vy_tx *tx,
if (space_needs_check_unique_constraint(space, 0) &&
vy_stmt_type(stmt) == IPROTO_INSERT) {
struct vy_lsm *lsm = vy_lsm(space->index[0]);
if (vy_check_is_unique_primary(tx, rv, space_name(space),
index_name_by_id(space, 0),
lsm, stmt) != 0)
if (vy_check_is_unique_primary(tx, rv, lsm, stmt) != 0)
return -1;
}

Expand All @@ -1675,9 +1670,7 @@ vy_check_is_unique(struct vy_env *env, struct vy_tx *tx,
if (key_update_can_be_skipped(lsm->key_def->column_mask,
column_mask))
continue;
if (vy_check_is_unique_secondary(tx, rv, space_name(space),
index_name_by_id(space, i),
lsm, stmt) != 0)
if (vy_check_is_unique_secondary(tx, rv, lsm, stmt) != 0)
return -1;
}
return 0;
Expand Down Expand Up @@ -3903,12 +3896,6 @@ struct vy_build_ctx {
* if this flag is set.
*/
bool check_unique_constraint;
/**
* Names of the altered space and the new index.
* Used for error reporting.
*/
const char *space_name;
const char *index_name;
/** Set in case a build error occurred. */
bool is_failed;
/** Container for storing errors. */
Expand Down Expand Up @@ -3940,7 +3927,6 @@ vy_build_on_replace(struct trigger *trigger, void *event)
/* Check key uniqueness if necessary. */
if (ctx->check_unique_constraint && stmt->new_tuple != NULL &&
vy_check_is_unique_secondary(tx, vy_tx_read_view(tx),
ctx->space_name, ctx->index_name,
lsm, stmt->new_tuple) != 0)
goto err;

Expand Down Expand Up @@ -4013,7 +3999,6 @@ vy_build_insert_stmt(struct vy_lsm *lsm, struct vy_mem *mem,
*/
static int
vy_build_insert_tuple(struct vy_env *env, struct vy_lsm *lsm,
const char *space_name, const char *index_name,
struct tuple_format *new_format,
bool check_unique_constraint, struct tuple *tuple)
{
Expand Down Expand Up @@ -4054,9 +4039,8 @@ vy_build_insert_tuple(struct vy_env *env, struct vy_lsm *lsm,
*/
if (check_unique_constraint) {
vy_mem_pin(mem);
rc = vy_check_is_unique_secondary(NULL,
&env->xm->p_committed_read_view,
space_name, index_name, lsm, stmt);
rc = vy_check_is_unique_secondary(
NULL, &env->xm->p_committed_read_view, lsm, stmt);
vy_mem_unpin(mem);
if (rc != 0) {
tuple_unref(stmt);
Expand Down Expand Up @@ -4302,8 +4286,6 @@ vinyl_space_build_index(struct space *src_space, struct index *new_index,
ctx.lsm = new_lsm;
ctx.format = new_format;
ctx.check_unique_constraint = check_unique_constraint;
ctx.space_name = space_name(src_space);
ctx.index_name = new_index->def->name;
ctx.is_failed = false;
diag_create(&ctx.diag);
trigger_create(&on_replace, vy_build_on_replace, &ctx, NULL);
Expand Down Expand Up @@ -4345,10 +4327,7 @@ vinyl_space_build_index(struct space *src_space, struct index *new_index,
* in which case we would insert an outdated tuple.
*/
if (vy_stmt_lsn(tuple) <= build_lsn) {
rc = vy_build_insert_tuple(env, new_lsm,
space_name(src_space),
new_index->def->name,
new_format,
rc = vy_build_insert_tuple(env, new_lsm, new_format,
check_unique_constraint,
tuple);
if (rc != 0)
Expand Down
67 changes: 67 additions & 0 deletions test/engine-luatest/gh_5975_duplicate_key_error_test.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
local t = require('luatest')

local server = require('luatest.server')

local g = t.group(nil, t.helpers.matrix{engine = {'memtx', 'vinyl'}})

g.before_all(function(cg)
cg.server = server:new()
cg.server:start()
end)

g.after_all(function(cg)
cg.server:drop()
end)

g.after_each(function(cg)
cg.server:exec(function()
if box.space.test ~= nil then
box.space.test:drop()
end
end)
end)

g.test_duplicate_key_error = function(cg)
cg.server:exec(function(engine)
local function check_err(space_name, index_name, old_tuple, new_tuple)
local space = box.space[space_name]
t.assert_error_covers({
type = 'ClientError',
code = box.error.TUPLE_FOUND,
message = string.format(
'Duplicate key exists in unique index "%s" in space ' ..
'"%s" with old tuple - %s and new tuple - %s',
index_name, space_name,
box.tuple.new(old_tuple), box.tuple.new(new_tuple)),
space = space_name,
index = index_name,
old_tuple = old_tuple,
new_tuple = new_tuple,
}, space.insert, space, new_tuple)
end

local s = box.schema.space.create('test', {
engine = engine,
format = {
{name = 'a', type = 'unsigned'},
{name = 'b', type = 'unsigned'},
},
})
s:create_index('pk', {parts = {'a'}})
s:create_index('i1', {parts = {'b'}, unique = true})
s:create_index('i2', {parts = {'b'}, unique = true})
s:insert({1, 1})

check_err('test', 'pk', {1, 1}, {1, 2})
check_err('test', 'i1', {1, 1}, {2, 1})
t.assert_equals(s:select({}, {fullscan = true}), {{1, 1}})
s.index.i1:drop()
check_err('test', 'pk', {1, 1}, {1, 2})
check_err('test', 'i2', {1, 1}, {2, 1})
t.assert_equals(s:select({}, {fullscan = true}), {{1, 1}})
s.index.i2:drop()
check_err('test', 'pk', {1, 1}, {1, 2})
s:insert({2, 1})
t.assert_equals(s:select({}, {fullscan = true}), {{1, 1}, {2, 1}})
end, {cg.params.engine})
end

0 comments on commit 2cfba5e

Please sign in to comment.