Skip to content

Commit

Permalink
sql: clean-up in case constraint creation failed
Browse files Browse the repository at this point in the history
This patch makes VDBE to perform a clean-up if the creation of a
constraint fails because of the creation of two or more
constraints of the same type with the same name and in the same
CREATE TABLE statement.

For example:
CREATE TABLE t1(
	id INT PRIMARY KEY,
	CONSTRAINT ck1 CHECK(id > 1),
	CONSTRAINT ck1 CHECK(id < 10)
);

Part of #4183
  • Loading branch information
ImeevMA committed Jul 4, 2019
1 parent 0a0f232 commit 58bd81c
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 39 deletions.
94 changes: 60 additions & 34 deletions src/box/sql/build.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,33 @@ struct saved_record
int reg_key;
/** Number of registers the key consists of. */
int reg_key_count;
/** The number of the OP_SInsert operation. */
int insertion_opcode;
/** The address of the opcode. */
int op_addr;
/** Flag to show that operation is SInsert. */
bool is_insert;
};

/**
* Save inserted in system space record in list.
* Save inserted in system space record in list. This procedure is
* called after generation of either OP_SInsert or OP_NoColflict +
* OP_SetDiag. In the first case, record inserted to the system
* space is supposed to be deleted on error; in the latter - jump
* target specified in OP_SetDiag should be adjusted to the start
* of clean-up routines (current entry isn't inserted to the space
* yet, so there's no need to delete it).
*
* @param parser SQL Parser object.
* @param space_id Id of table in which record is inserted.
* @param reg_key Register that contains first field of the key.
* @param reg_key_count Exact number of fields of the key.
* @param insertion_opcode Number of OP_SInsert opcode.
* @param op_addr Address of opcode (OP_SetDiag or OP_SInsert).
* Used to fix jump target (see
* sql_finish_coding()).
* @param is_insert_op Whether opcode is OP_SInsert or not.
*/
static inline void
save_record(struct Parse *parser, uint32_t space_id, int reg_key,
int reg_key_count, int insertion_opcode)
int reg_key_count, int op_addr, bool is_insert_op)
{
struct saved_record *record =
region_alloc(&parser->region, sizeof(*record));
Expand All @@ -99,7 +110,8 @@ save_record(struct Parse *parser, uint32_t space_id, int reg_key,
record->space_id = space_id;
record->reg_key = reg_key;
record->reg_key_count = reg_key_count;
record->insertion_opcode = insertion_opcode;
record->op_addr = op_addr;
record->is_insert = is_insert_op;
rlist_add_entry(&parser->record_list, record, link);
}

Expand All @@ -118,28 +130,39 @@ sql_finish_coding(struct Parse *parse_context)
* it won't be created. This code works the same way for
* other "CREATE ..." statements but it won't delete
* anything as these statements create no more than one
* record.
* record. Hence for processed insertions we should remove
* entries from corresponding system spaces alongside
* with fixing jump address for OP_SInsert opcode in
* case it fails during VDBE runtime; for OP_SetDiag only
* adjust jump target to the start of clean-up program
* for already inserted entries.
*/
if (!rlist_empty(&parse_context->record_list)) {
struct saved_record *record =
rlist_shift_entry(&parse_context->record_list,
struct saved_record, link);
/* Set P2 of SInsert. */
sqlVdbeChangeP2(v, record->insertion_opcode, v->nOp);
/*
* Set jump target for OP_SetDiag and OP_SInsert.
*/
sqlVdbeChangeP2(v, record->op_addr, v->nOp);
MAYBE_UNUSED const char *comment =
"Delete entry from %s if CREATE TABLE fails";
rlist_foreach_entry(record, &parse_context->record_list, link) {
int record_reg = ++parse_context->nMem;
sqlVdbeAddOp3(v, OP_MakeRecord, record->reg_key,
record->reg_key_count, record_reg);
sqlVdbeAddOp2(v, OP_SDelete, record->space_id,
record_reg);
MAYBE_UNUSED struct space *space =
space_by_id(record->space_id);
VdbeComment((v, comment, space_name(space)));
/* Set P2 of SInsert. */
sqlVdbeChangeP2(v, record->insertion_opcode,
v->nOp);
if (record->is_insert) {
int rec_reg = ++parse_context->nMem;
sqlVdbeAddOp3(v, OP_MakeRecord, record->reg_key,
record->reg_key_count, rec_reg);
sqlVdbeAddOp2(v, OP_SDelete, record->space_id,
rec_reg);
MAYBE_UNUSED struct space *space =
space_by_id(record->space_id);
VdbeComment((v, comment, space_name(space)));
}
/*
* Set jump target for OP_SetDiag and
* OP_SInsert.
*/
sqlVdbeChangeP2(v, record->op_addr, v->nOp);
}
sqlVdbeAddOp1(v, OP_Halt, -1);
VdbeComment((v,
Expand Down Expand Up @@ -891,7 +914,7 @@ vdbe_emit_create_index(struct Parse *parse, struct space_def *def,
SQL_SUBTYPE_MSGPACK, index_parts, P4_STATIC);
sqlVdbeAddOp3(v, OP_MakeRecord, entry_reg, 6, tuple_reg);
sqlVdbeAddOp3(v, OP_SInsert, BOX_INDEX_ID, 0, tuple_reg);
save_record(parse, BOX_INDEX_ID, entry_reg, 2, v->nOp - 1);
save_record(parse, BOX_INDEX_ID, entry_reg, 2, v->nOp - 1, true);
return;
error:
parse->is_aborted = true;
Expand Down Expand Up @@ -950,7 +973,7 @@ vdbe_emit_space_create(struct Parse *pParse, int space_id_reg,
sqlVdbeAddOp3(v, OP_MakeRecord, iFirstCol, 7, tuple_reg);
sqlVdbeAddOp3(v, OP_SInsert, BOX_SPACE_ID, 0, tuple_reg);
sqlVdbeChangeP5(v, OPFLAG_NCHANGE);
save_record(pParse, BOX_SPACE_ID, iFirstCol, 1, v->nOp - 1);
save_record(pParse, BOX_SPACE_ID, iFirstCol, 1, v->nOp - 1, true);
return;
error:
pParse->is_aborted = true;
Expand Down Expand Up @@ -1072,12 +1095,12 @@ vdbe_emit_ck_constraint_create(struct Parse *parser,
if (vdbe_emit_halt_with_presence_test(parser, BOX_CK_CONSTRAINT_ID, 0,
ck_constraint_reg, 2,
ER_CONSTRAINT_EXISTS, error_msg,
false, OP_NoConflict) != 0)
false, OP_NoConflict, true) != 0)
return;
sqlVdbeAddOp3(v, OP_SInsert, BOX_CK_CONSTRAINT_ID, 0,
ck_constraint_reg + 5);
save_record(parser, BOX_CK_CONSTRAINT_ID, ck_constraint_reg, 2,
v->nOp - 1);
v->nOp - 1, true);
VdbeComment((v, "Create CK constraint %s", ck_def->name));
sqlReleaseTempRange(parser, ck_constraint_reg, 5);
}
Expand Down Expand Up @@ -1137,7 +1160,7 @@ vdbe_emit_fk_constraint_create(struct Parse *parse_context,
BOX_FK_CONSTRAINT_ID, 0,
constr_tuple_reg, 2,
ER_CONSTRAINT_EXISTS, error_msg,
false, OP_NoConflict) != 0)
false, OP_NoConflict, true) != 0)
return;
sqlVdbeAddOp2(vdbe, OP_Bool, fk->is_deferred, constr_tuple_reg + 3);
sqlVdbeAddOp4(vdbe, OP_String8, 0, constr_tuple_reg + 4, 0,
Expand Down Expand Up @@ -1187,7 +1210,7 @@ vdbe_emit_fk_constraint_create(struct Parse *parse_context,
sqlVdbeChangeP5(vdbe, OPFLAG_NCHANGE);
}
save_record(parse_context, BOX_FK_CONSTRAINT_ID, constr_tuple_reg, 2,
vdbe->nOp - 1);
vdbe->nOp - 1, true);
sqlReleaseTempRange(parse_context, constr_tuple_reg, 10);
return;
error:
Expand Down Expand Up @@ -1284,7 +1307,7 @@ sqlEndTable(struct Parse *pParse)
if (vdbe_emit_halt_with_presence_test(pParse, BOX_SPACE_ID, 2,
name_reg, 1, ER_SPACE_EXISTS,
error_msg, (no_err != 0),
OP_NoConflict) != 0)
OP_NoConflict, false) != 0)
return;

int reg_space_id = getNewSpaceId(pParse);
Expand All @@ -1310,15 +1333,15 @@ sqlEndTable(struct Parse *pParse)
sqlVdbeAddOp3(v, OP_SInsert, BOX_SEQUENCE_ID, 0,
reg_seq_record);
save_record(pParse, BOX_SEQUENCE_ID, reg_seq_record + 1, 1,
v->nOp - 1);
v->nOp - 1, true);
/* Do an insertion into _space_sequence. */
int reg_space_seq_record = emitNewSysSpaceSequenceRecord(pParse,
reg_space_id, reg_seq_id,
new_space->index[0]->def);
sqlVdbeAddOp3(v, OP_SInsert, BOX_SPACE_SEQUENCE_ID, 0,
reg_space_seq_record);
save_record(pParse, BOX_SPACE_SEQUENCE_ID,
reg_space_seq_record + 1, 1, v->nOp - 1);
reg_space_seq_record + 1, 1, v->nOp - 1, true);
}
/* Code creation of FK constraints, if any. */
struct fk_constraint_parse *fk_parse;
Expand Down Expand Up @@ -1445,7 +1468,7 @@ sql_create_view(struct Parse *parse_context)
if (vdbe_emit_halt_with_presence_test(parse_context, BOX_SPACE_ID, 2,
name_reg, 1, ER_SPACE_EXISTS,
error_msg, (no_err != 0),
OP_NoConflict) != 0)
OP_NoConflict, false) != 0)
goto create_view_fail;

vdbe_emit_space_create(parse_context, getNewSpaceId(parse_context),
Expand Down Expand Up @@ -1564,7 +1587,7 @@ vdbe_emit_fk_constraint_drop(struct Parse *parse_context, char *constraint_name,
BOX_FK_CONSTRAINT_ID, 0,
key_reg, 2, ER_NO_SUCH_CONSTRAINT,
error_msg, false,
OP_Found) != 0) {
OP_Found, false) != 0) {
sqlDbFree(parse_context->db, constraint_name);
return;
}
Expand Down Expand Up @@ -1597,7 +1620,7 @@ vdbe_emit_ck_constraint_drop(struct Parse *parser, const char *ck_name,
if (vdbe_emit_halt_with_presence_test(parser, BOX_CK_CONSTRAINT_ID, 0,
key_reg, 2, ER_NO_SUCH_CONSTRAINT,
error_msg, false,
OP_Found) != 0)
OP_Found, false) != 0)
return;
sqlVdbeAddOp3(v, OP_MakeRecord, key_reg, 2, key_reg + 2);
sqlVdbeAddOp2(v, OP_SDelete, BOX_CK_CONSTRAINT_ID, key_reg + 2);
Expand Down Expand Up @@ -3263,7 +3286,7 @@ vdbe_emit_halt_with_presence_test(struct Parse *parser, int space_id,
int index_id, int key_reg, uint32_t key_len,
int tarantool_error_code,
const char *error_src, bool no_error,
int cond_opcode)
int cond_opcode, bool is_clean_needed)
{
assert(cond_opcode == OP_NoConflict || cond_opcode == OP_Found);
struct Vdbe *v = sqlGetVdbe(parser);
Expand All @@ -3284,7 +3307,10 @@ vdbe_emit_halt_with_presence_test(struct Parse *parser, int space_id,
} else {
sqlVdbeAddOp4(v, OP_SetDiag, tarantool_error_code, 0, 0, error,
P4_DYNAMIC);
sqlVdbeAddOp1(v, OP_Halt, -1);
if (is_clean_needed)
save_record(parser, 0, 0, 0, v->nOp - 1, false);
else
sqlVdbeAddOp1(v, OP_Halt, -1);
}
sqlVdbeJumpHere(v, addr);
sqlVdbeAddOp1(v, OP_Close, cursor);
Expand Down
2 changes: 1 addition & 1 deletion src/box/sql/sqlInt.h
Original file line number Diff line number Diff line change
Expand Up @@ -4540,7 +4540,7 @@ vdbe_emit_halt_with_presence_test(struct Parse *parser, int space_id,
int index_id, int key_reg, uint32_t key_len,
int tarantool_error_code,
const char *error_src, bool no_error,
int cond_opcode);
int cond_opcode, bool is_clean_needed);

/**
* Generate VDBE code to delete records from system _sql_stat1 or
Expand Down
6 changes: 4 additions & 2 deletions src/box/sql/trigger.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ sql_trigger_begin(struct Parse *parse)
name_reg, 1,
ER_TRIGGER_EXISTS,
error_msg, (no_err != 0),
OP_NoConflict) != 0)
OP_NoConflict,
false) != 0)
goto trigger_cleanup;
}

Expand Down Expand Up @@ -420,7 +421,8 @@ sql_drop_trigger(struct Parse *parser)
sqlVdbeAddOp4(v, OP_String8, 0, name_reg, 0, name_copy, P4_DYNAMIC);
if (vdbe_emit_halt_with_presence_test(parser, BOX_TRIGGER_ID, 0,
name_reg, 1, ER_NO_SUCH_TRIGGER,
error_msg, no_err, OP_Found) != 0)
error_msg, no_err, OP_Found,
false) != 0)
goto drop_trigger_cleanup;

vdbe_code_drop_trigger(parser, trigger_name, true);
Expand Down
3 changes: 2 additions & 1 deletion test/sql/checks.result
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,9 @@ box.execute("CREATE TABLE T2(ID INT PRIMARY KEY, CONSTRAINT CK1 CHECK(ID > 0), C
---
- error: Constraint CK1 already exists
...
box.space.T2:drop()
box.space.T2
---
- null
...
box.space._ck_constraint:select()
---
Expand Down
2 changes: 1 addition & 1 deletion test/sql/checks.test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ box.execute("INSERT INTO \"test\" VALUES(11, 11);")
box.execute("INSERT INTO \"test\" VALUES(12, 11);")
s:drop()
box.execute("CREATE TABLE T2(ID INT PRIMARY KEY, CONSTRAINT CK1 CHECK(ID > 0), CONSTRAINT CK1 CHECK(ID < 0))")
box.space.T2:drop()
box.space.T2
box.space._ck_constraint:select()

--
Expand Down
30 changes: 30 additions & 0 deletions test/sql/clear.result
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,33 @@ box.execute("DROP TABLE zoobar")
...
-- Debug
-- require("console").start()
--
-- gh-4183: Check if there is a garbage in case of failure to
-- create a constraint, when more than one constraint of the same
-- type is created with the same name and in the same
-- CREATE TABLE statement.
--
box.execute("CREATE TABLE t1(id INT PRIMARY KEY, CONSTRAINT ck1 CHECK(id > 0), CONSTRAINT ck1 CHECK(id < 0));")
---
- error: Constraint CK1 already exists
...
box.space.t1
---
- null
...
box.space._ck_constraint:select()
---
- []
...
box.execute("CREATE TABLE t2(id INT PRIMARY KEY, CONSTRAINT fk1 FOREIGN KEY(id) REFERENCES t2, CONSTRAINT fk1 FOREIGN KEY(id) REFERENCES t2);")
---
- error: Constraint FK1 already exists
...
box.space.t2
---
- null
...
box.space._fk_constraint:select()
---
- []
...
14 changes: 14 additions & 0 deletions test/sql/clear.test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,17 @@ box.execute("DROP TABLE zoobar")

-- Debug
-- require("console").start()

--
-- gh-4183: Check if there is a garbage in case of failure to
-- create a constraint, when more than one constraint of the same
-- type is created with the same name and in the same
-- CREATE TABLE statement.
--
box.execute("CREATE TABLE t1(id INT PRIMARY KEY, CONSTRAINT ck1 CHECK(id > 0), CONSTRAINT ck1 CHECK(id < 0));")
box.space.t1
box.space._ck_constraint:select()

box.execute("CREATE TABLE t2(id INT PRIMARY KEY, CONSTRAINT fk1 FOREIGN KEY(id) REFERENCES t2, CONSTRAINT fk1 FOREIGN KEY(id) REFERENCES t2);")
box.space.t2
box.space._fk_constraint:select()

0 comments on commit 58bd81c

Please sign in to comment.