Skip to content

Commit

Permalink
sql: introduce mem_cmp_scalar()
Browse files Browse the repository at this point in the history
This patch introduces the mem_cmp_scalar() function that compares two
MEMs using SCALAR rules. MEMs must be scalars. Prior to this patch, there was a
function that used SCALAR rules to compare two MEMs, but its design became
overly complex as new types appeared.

Part of #6164
  • Loading branch information
ImeevMA authored and yanshtunder committed Oct 4, 2021
1 parent 14c24af commit aed3038
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 91 deletions.
30 changes: 23 additions & 7 deletions src/box/sql/func.c
Expand Up @@ -144,11 +144,16 @@ minmaxFunc(sql_context * context, int argc, sql_value ** argv)
for (i = 1; i < argc; i++) {
if (mem_is_null(argv[i]))
return;
if ((sqlMemCompare(argv[iBest], argv[i], pColl) ^ mask) >=
0) {
testcase(mask == 0);
iBest = i;
int res;
if (mem_cmp_scalar(argv[iBest], argv[i], &res, pColl) != 0) {
diag_set(ClientError, ER_SQL_TYPE_MISMATCH,
mem_str(argv[i]),
mem_type_to_str(argv[iBest]));
context->is_aborted = true;
return;
}
if ((res ^ mask) >= 0)
iBest = i;
}
sql_result_value(context, argv[iBest]);
}
Expand Down Expand Up @@ -1054,9 +1059,15 @@ nullifFunc(sql_context * context, int NotUsed, sql_value ** argv)
{
struct coll *pColl = sqlGetFuncCollSeq(context);
UNUSED_PARAMETER(NotUsed);
if (sqlMemCompare(argv[0], argv[1], pColl) != 0) {
sql_result_value(context, argv[0]);
int res;
if (mem_cmp_scalar(argv[0], argv[1], &res, pColl) != 0) {
diag_set(ClientError, ER_SQL_TYPE_MISMATCH,
mem_str(argv[1]), mem_type_to_str(argv[0]));
context->is_aborted = true;
return;
}
if (res != 0)
sql_result_value(context, argv[0]);
}

/**
Expand Down Expand Up @@ -1824,7 +1835,12 @@ minmaxStep(sql_context * context, int NotUsed, sql_value ** argv)
* comparison is inverted.
*/
bool is_max = (func->flags & SQL_FUNC_MAX) != 0;
cmp = sqlMemCompare(pBest, pArg, pColl);
if (mem_cmp_scalar(pBest, pArg, &cmp, pColl) != 0) {
diag_set(ClientError, ER_SQL_TYPE_MISMATCH,
mem_str(pArg), mem_type_to_str(pBest));
context->is_aborted = true;
return;
}
if ((is_max && cmp < 0) || (!is_max && cmp > 0)) {
mem_copy(pBest, pArg);
} else {
Expand Down
144 changes: 68 additions & 76 deletions src/box/sql/mem.c
Expand Up @@ -60,6 +60,44 @@ enum {
STR_VALUE_MAX_LEN = 128,
};

/**
* Analogue of enum mp_class for enum mp_type. The order of the classes must be
* the same as in the enum mp_class.
*/
enum mem_class {
MEM_CLASS_NULL,
MEM_CLASS_BOOL,
MEM_CLASS_NUMBER,
MEM_CLASS_STR,
MEM_CLASS_BIN,
MEM_CLASS_UUID,
mem_class_max,
};

static inline enum mem_class
mem_type_class(enum mem_type type)
{
switch (type) {
case MEM_TYPE_NULL:
return MEM_CLASS_NULL;
case MEM_TYPE_UINT:
case MEM_TYPE_INT:
case MEM_TYPE_DOUBLE:
return MEM_CLASS_NUMBER;
case MEM_TYPE_STR:
return MEM_CLASS_STR;
case MEM_TYPE_BIN:
return MEM_CLASS_BIN;
case MEM_TYPE_BOOL:
return MEM_CLASS_BOOL;
case MEM_TYPE_UUID:
return MEM_CLASS_UUID;
default:
break;
}
return mem_class_max;
}

bool
mem_is_field_compatible(const struct Mem *mem, enum field_type type)
{
Expand Down Expand Up @@ -2030,6 +2068,36 @@ mem_cmp_uuid(const struct Mem *a, const struct Mem *b, int *result)
return 0;
}

int
mem_cmp_scalar(const struct Mem *a, const struct Mem *b, int *result,
const struct coll *coll)
{
enum mem_class class_a = mem_type_class(a->type);
enum mem_class class_b = mem_type_class(b->type);
if (class_a != class_b) {
*result = class_a - class_b;
return 0;
}
switch (class_a) {
case MEM_CLASS_NULL:
*result = 0;
return 0;
case MEM_CLASS_BOOL:
return mem_cmp_bool(a, b, result);
case MEM_CLASS_NUMBER:
return mem_cmp_num(a, b, result);
case MEM_CLASS_STR:
return mem_cmp_str(a, b, result, coll);
case MEM_CLASS_BIN:
return mem_cmp_bin(a, b, result);
case MEM_CLASS_UUID:
return mem_cmp_uuid(a, b, result);
default:
unreachable();
}
return 0;
}

/*
* Both *pMem1 and *pMem2 contain string values. Compare the two values
* using the collation sequence pColl. As usual, return a negative , zero
Expand Down Expand Up @@ -2463,82 +2531,6 @@ sqlVdbeMemTooBig(Mem * p)
return 0;
}

/*
* Compare the values contained by the two memory cells, returning
* negative, zero or positive if pMem1 is less than, equal to, or greater
* than pMem2. Sorting order is NULL's first, followed by numbers (integers
* and reals) sorted numerically, followed by text ordered by the collating
* sequence pColl and finally blob's ordered by memcmp().
*
* Two NULL values are considered equal by this function.
*/
int
sqlMemCompare(const Mem * pMem1, const Mem * pMem2, const struct coll * pColl)
{
int res;

enum mem_type type1 = pMem1->type;
enum mem_type type2 = pMem2->type;

/* If one value is NULL, it is less than the other. If both values
* are NULL, return 0.
*/
if (((type1 | type2) & MEM_TYPE_NULL) != 0)
return (int)(type2 == MEM_TYPE_NULL) -
(int)(type1 == MEM_TYPE_NULL);

if (((type1 | type2) & MEM_TYPE_BOOL) != 0) {
if ((type1 & type2 & MEM_TYPE_BOOL) != 0) {
if (pMem1->u.b == pMem2->u.b)
return 0;
if (pMem1->u.b)
return 1;
return -1;
}
if (type2 == MEM_TYPE_BOOL)
return +1;
return -1;
}

if (((type1 | type2) & MEM_TYPE_UUID) != 0) {
if (mem_cmp_uuid(pMem1, pMem2, &res) == 0)
return res;
if (type1 != MEM_TYPE_UUID)
return +1;
return -1;
}

/* At least one of the two values is a number
*/
if (((type1 | type2) &
(MEM_TYPE_INT | MEM_TYPE_UINT | MEM_TYPE_DOUBLE)) != 0) {
if (!mem_is_num(pMem1))
return +1;
if (!mem_is_num(pMem2))
return -1;
mem_cmp_num(pMem1, pMem2, &res);
return res;
}

/* If one value is a string and the other is a blob, the string is less.
* If both are strings, compare using the collating functions.
*/
if (((type1 | type2) & MEM_TYPE_STR) != 0) {
if (type1 != MEM_TYPE_STR) {
return 1;
}
if (type2 != MEM_TYPE_STR) {
return -1;
}
mem_cmp_str(pMem1, pMem2, &res, pColl);
return res;
}

/* Both values must be blobs. Compare using memcmp(). */
mem_cmp_bin(pMem1, pMem2, &res);
return res;
}

int
sql_vdbemem_finalize(struct Mem *mem, struct func *func)
{
Expand Down
10 changes: 8 additions & 2 deletions src/box/sql/mem.h
Expand Up @@ -698,6 +698,14 @@ mem_cmp_num(const struct Mem *a, const struct Mem *b, int *result);
int
mem_cmp_uuid(const struct Mem *left, const struct Mem *right, int *result);

/**
* Compare two MEMs using SCALAR rules and return the result of comparison. MEMs
* should be scalars. Original MEMs are not changed.
*/
int
mem_cmp_scalar(const struct Mem *a, const struct Mem *b, int *result,
const struct coll *coll);

/**
* Convert the given MEM to INTEGER. This function and the function below define
* the rules that are used to convert values of all other types to INTEGER. In
Expand Down Expand Up @@ -963,8 +971,6 @@ int sqlVdbeMemTooBig(Mem *);
#define VdbeMemDynamic(X) (((X)->flags & MEM_Dyn) != 0 ||\
((X)->type & (MEM_TYPE_AGG | MEM_TYPE_FRAME)) != 0)

int sqlMemCompare(const Mem *, const Mem *, const struct coll *);

/** MEM manipulate functions. */

/**
Expand Down
8 changes: 7 additions & 1 deletion src/box/sql/vdbe.c
Expand Up @@ -1825,7 +1825,13 @@ case OP_Compare: {
assert(i < (int)def->part_count);
struct coll *coll = def->parts[i].coll;
bool is_rev = def->parts[i].sort_order == SORT_ORDER_DESC;
iCompare = sqlMemCompare(&aMem[p1+idx], &aMem[p2+idx], coll);
struct Mem *a = &aMem[p1+idx];
struct Mem *b = &aMem[p2+idx];
if (mem_cmp_scalar(a, b, &iCompare, coll) != 0) {
diag_set(ClientError, ER_SQL_TYPE_MISMATCH, mem_str(b),
mem_type_to_str(a));
goto abort_due_to_error;
}
if (iCompare) {
if (is_rev)
iCompare = -iCompare;
Expand Down
22 changes: 18 additions & 4 deletions src/box/sql/where.c
Expand Up @@ -1272,13 +1272,27 @@ whereRangeSkipScanEst(Parse * pParse, /* Parsing & code generating context */
rc = sql_stat4_column(db, samples[i].sample_key, nEq,
&pVal);
if (rc == 0 && p1 != NULL) {
int res = sqlMemCompare(p1, pVal, coll);
if (res >= 0)
int res;
rc = mem_cmp_scalar(p1, pVal, &res, coll);
if (rc != 0) {
diag_set(ClientError,
ER_SQL_TYPE_MISMATCH,
mem_str(pVal),
mem_type_to_str(p1));
}
if (rc == 0 && res >= 0)
nLower++;
}
if (rc == 0 && p2 != NULL) {
int res = sqlMemCompare(p2, pVal, coll);
if (res >= 0)
int res;
rc = mem_cmp_scalar(p2, pVal, &res, coll);
if (rc != 0) {
diag_set(ClientError,
ER_SQL_TYPE_MISMATCH,
mem_str(pVal),
mem_type_to_str(p2));
}
if (rc == 0 && res >= 0)
nUpper++;
}
}
Expand Down
45 changes: 44 additions & 1 deletion test/sql-tap/gh-6164-uuid-follow-ups.test.lua
@@ -1,6 +1,6 @@
#!/usr/bin/env tarantool
local test = require("sqltester")
test:plan(4)
test:plan(8)

-- Make sure that function quote() can work with uuid.
test:do_execsql_test(
Expand Down Expand Up @@ -35,4 +35,47 @@ test:do_test(
end,
uuid3)

--
-- Make sure a comparison that includes a UUID and follows the SCALAR rules is
-- working correctly.
--
box.execute([[CREATE TABLE t (i INTEGER PRIMARY KEY, s SCALAR);]])
box.execute([[INSERT INTO t VALUES (1, ?)]], {uuid1})

test:do_execsql_test(
"gh-6164-5",
[[
SELECT GREATEST(i, s, x'33', 'something') FROM t;
]], {
uuid1
})

test:do_execsql_test(
"gh-6164-6",
[[
SELECT LEAST(i, s, x'33', 'something') FROM t;
]], {
1
})

box.execute([[INSERT INTO t VALUES (2, 2);]])

test:do_execsql_test(
"gh-6164-7",
[[
SELECT MAX(s) FROM t;
]], {
uuid1
})

test:do_execsql_test(
"gh-6164-8",
[[
SELECT MIN(s) FROM t;
]], {
2
})

box.execute([[DROP TABLE t;]])

test:finish_test()

0 comments on commit aed3038

Please sign in to comment.