Skip to content

Commit

Permalink
Merge pull request #23430 from taosdata/enh/TD-26641-3.0
Browse files Browse the repository at this point in the history
enh: only float/double allowed for sum/avg of rsma
  • Loading branch information
dapan1121 committed Oct 30, 2023
2 parents 8650c5d + eae1b4e commit 1b7a6bc
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 25 deletions.
6 changes: 5 additions & 1 deletion source/dnode/mnode/impl/src/mndSync.c
Expand Up @@ -95,7 +95,11 @@ static int32_t mndTransValidatePrepareAction(SMnode *pMnode, STrans *pTrans, STr
}

_OUT:
taosMemoryFreeClear(pRow);
if (pRow) {
SdbDeleteFp deleteFp = pSdb->deleteFps[pRaw->type];
if (deleteFp) (*deleteFp)(pSdb, pRow->pObj, false);
taosMemoryFreeClear(pRow);
}
return code;
}

Expand Down
3 changes: 2 additions & 1 deletion source/dnode/vnode/src/tsdb/tsdbSnapshot.c
Expand Up @@ -1521,7 +1521,8 @@ ETsdbFsState tsdbSnapGetFsState(SVnode* pVnode) {
return pVnode->pTsdb->pFS->fsstate;
}
for (int32_t lvl = 0; lvl < TSDB_RETENTION_MAX; ++lvl) {
if (SMA_RSMA_GET_TSDB(pVnode, lvl)->pFS->fsstate != TSDB_FS_STATE_NORMAL) {
STsdb* pTsdb = SMA_RSMA_GET_TSDB(pVnode, lvl);
if (pTsdb && pTsdb->pFS->fsstate != TSDB_FS_STATE_NORMAL) {
return TSDB_FS_STATE_INCOMPLETE;
}
}
Expand Down
97 changes: 85 additions & 12 deletions source/libs/parser/src/parTranslater.c
Expand Up @@ -4576,7 +4576,7 @@ static int32_t checkDbEnumOption(STranslateContext* pCxt, const char* pName, int
return TSDB_CODE_SUCCESS;
}

static int32_t checkDbRetentionsOption(STranslateContext* pCxt, SNodeList* pRetentions) {
static int32_t checkDbRetentionsOption(STranslateContext* pCxt, SNodeList* pRetentions, int8_t precision) {
if (NULL == pRetentions) {
return TSDB_CODE_SUCCESS;
}
Expand All @@ -4599,11 +4599,55 @@ static int32_t checkDbRetentionsOption(STranslateContext* pCxt, SNodeList* pRete

SValueNode* pFreq = (SValueNode*)nodesListGetNode(((SNodeListNode*)pRetention)->pNodeList, 0);
SValueNode* pKeep = (SValueNode*)nodesListGetNode(((SNodeListNode*)pRetention)->pNodeList, 1);
if (pFreq->datum.i <= 0 || 'n' == pFreq->unit || 'y' == pFreq->unit || pFreq->datum.i >= pKeep->datum.i ||
(NULL != pPrevFreq && pPrevFreq->datum.i >= pFreq->datum.i) ||
(NULL != pPrevKeep && pPrevKeep->datum.i > pKeep->datum.i)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION, "Invalid option retentions");

ASSERTS(pFreq->isDuration && pKeep->isDuration, "Retentions freq/keep should have unit");

// check unit
if (pFreq->isDuration && TIME_UNIT_SECOND != pFreq->unit && TIME_UNIT_MINUTE != pFreq->unit &&
TIME_UNIT_HOUR != pFreq->unit && TIME_UNIT_DAY != pFreq->unit && TIME_UNIT_WEEK != pFreq->unit) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
"Invalid option retentions(freq): %s, only s, m, h, d, w allowed", pFreq->literal);
}

if (pKeep->isDuration && TIME_UNIT_MINUTE != pKeep->unit && TIME_UNIT_HOUR != pKeep->unit &&
TIME_UNIT_DAY != pKeep->unit) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
"Invalid option retentions(keep): %s, only m, h, d allowed", pKeep->literal);
}

// check value range
if (pFreq->datum.i <= 0) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
"Invalid option retentions(freq): %s should larger than 0", pFreq->literal);
}
int64_t keepMinute = pKeep->datum.i / getUnitPerMinute(pKeep->node.resType.precision);
int64_t tsdbMaxKeep = TSDB_TIME_PRECISION_NANO == precision ? TSDB_MAX_KEEP_NS : TSDB_MAX_KEEP;
if (keepMinute < TSDB_MIN_KEEP || keepMinute > tsdbMaxKeep) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
"Invalid option retentions(keep): %" PRId64 "m, valid range: [%" PRIi64
"m, %" PRId64 "m]",
keepMinute, TSDB_MIN_KEEP, tsdbMaxKeep);
}

// check relationships
if (pFreq->datum.i >= pKeep->datum.i) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
"Invalid option retentions(freq/keep): %s should larger than %s", pKeep->literal,
pFreq->literal);
}

if (NULL != pPrevFreq && pPrevFreq->datum.i >= pFreq->datum.i) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
"Invalid option retentions(freq): %s should larger than %s", pFreq->literal,
pPrevFreq->literal);
}

if (NULL != pPrevKeep && pPrevKeep->datum.i > pKeep->datum.i) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
"Invalid option retentions(keep): %s should not larger than %s",
pPrevKeep->literal, pKeep->literal);
}

pPrevFreq = pFreq;
pPrevKeep = pKeep;
}
Expand Down Expand Up @@ -4723,7 +4767,7 @@ static int32_t checkDatabaseOptions(STranslateContext* pCxt, const char* pDbName
TSDB_DB_SINGLE_STABLE_OFF);
}
if (TSDB_CODE_SUCCESS == code) {
code = checkDbRetentionsOption(pCxt, pOptions->pRetentions);
code = checkDbRetentionsOption(pCxt, pOptions->pRetentions, pOptions->precision);
}
if (TSDB_CODE_SUCCESS == code) {
code = checkDbEnumOption(pCxt, "schemaless", pOptions->schemaless, TSDB_DB_SCHEMALESS_ON, TSDB_DB_SCHEMALESS_OFF);
Expand Down Expand Up @@ -5021,7 +5065,7 @@ static int32_t checkTableSmaOption(STranslateContext* pCxt, SCreateTableStmt* pS
}

static bool validRollupFunc(const char* pFunc) {
static const char* rollupFuncs[] = {"avg", "sum", "min", "max", "last", "first"};
static const char* rollupFuncs[] = {"avg", "sum", "min", "max", "last", "first"};
static const int32_t numOfRollupFuncs = (sizeof(rollupFuncs) / sizeof(char*));
for (int i = 0; i < numOfRollupFuncs; ++i) {
if (0 == strcmp(rollupFuncs[i], pFunc)) {
Expand All @@ -5031,6 +5075,17 @@ static bool validRollupFunc(const char* pFunc) {
return false;
}

static bool aggrRollupFunc(const char* pFunc) {
static const char* aggrRollupFuncs[] = {"avg", "sum"};
static const int32_t numOfAggrRollupFuncs = (sizeof(aggrRollupFuncs) / sizeof(char*));
for (int i = 0; i < numOfAggrRollupFuncs; ++i) {
if (0 == strcmp(aggrRollupFuncs[i], pFunc)) {
return true;
}
}
return false;
}

static int32_t checkTableRollupOption(STranslateContext* pCxt, SNodeList* pFuncs, bool createStable,
SDbCfgInfo* pDbCfg) {
if (NULL == pFuncs) {
Expand Down Expand Up @@ -5104,7 +5159,8 @@ static int32_t checkTableTagsSchema(STranslateContext* pCxt, SHashObj* pHash, SN
return code;
}

static int32_t checkTableColsSchema(STranslateContext* pCxt, SHashObj* pHash, int32_t ntags, SNodeList* pCols) {
static int32_t checkTableColsSchema(STranslateContext* pCxt, SHashObj* pHash, int32_t ntags, SNodeList* pCols,
SNodeList* pRollupFuncs) {
int32_t ncols = LIST_LENGTH(pCols);
if (ncols < TSDB_MIN_COLUMNS) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM);
Expand All @@ -5114,13 +5170,19 @@ static int32_t checkTableColsSchema(STranslateContext* pCxt, SHashObj* pHash, in

int32_t code = TSDB_CODE_SUCCESS;

bool first = true;
int32_t colIndex = 0;
int32_t rowSize = 0;
SNode* pNode = NULL;
char* pFunc = NULL;
bool isAggrRollup = false;

if (pRollupFuncs) {
pFunc = ((SFunctionNode*)nodesListGetNode(pRollupFuncs, 0))->functionName;
isAggrRollup = aggrRollupFunc(pFunc);
}
FOREACH(pNode, pCols) {
SColumnDefNode* pCol = (SColumnDefNode*)pNode;
if (first) {
first = false;
if (0 == colIndex) {
if (TSDB_DATA_TYPE_TIMESTAMP != pCol->dataType.type) {
code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FIRST_COLUMN);
}
Expand All @@ -5140,6 +5202,15 @@ static int32_t checkTableColsSchema(STranslateContext* pCxt, SHashObj* pHash, in
code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN);
}
}

if (TSDB_CODE_SUCCESS == code && isAggrRollup && 0 != colIndex) {
if (pCol->dataType.type != TSDB_DATA_TYPE_FLOAT && pCol->dataType.type != TSDB_DATA_TYPE_DOUBLE) {
code =
generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMN,
"Invalid column type: %s, only float/double allowed for %s", pCol->colName, pFunc);
}
}

if (TSDB_CODE_SUCCESS == code) {
code = taosHashPut(pHash, pCol->colName, len, &pCol, POINTER_BYTES);
}
Expand All @@ -5148,6 +5219,8 @@ static int32_t checkTableColsSchema(STranslateContext* pCxt, SHashObj* pHash, in
} else {
break;
}
// next column
++colIndex;
}

if (TSDB_CODE_SUCCESS == code && rowSize > TSDB_MAX_BYTES_PER_ROW) {
Expand All @@ -5166,7 +5239,7 @@ static int32_t checkTableSchema(STranslateContext* pCxt, SCreateTableStmt* pStmt

int32_t code = checkTableTagsSchema(pCxt, pHash, pStmt->pTags);
if (TSDB_CODE_SUCCESS == code) {
code = checkTableColsSchema(pCxt, pHash, LIST_LENGTH(pStmt->pTags), pStmt->pCols);
code = checkTableColsSchema(pCxt, pHash, LIST_LENGTH(pStmt->pTags), pStmt->pCols, pStmt->pOptions->pRollupFuncs);
}

taosHashCleanup(pHash);
Expand Down
71 changes: 60 additions & 11 deletions tests/system-test/1-insert/create_retentions.py
Expand Up @@ -52,13 +52,41 @@ def init(self, conn, logSql, replicaVar=1):
@property
def create_databases_sql_err(self):
return [
# check grammar
"create database db1 retentions",
"create database db1 retentions 1s:1d,2s:2d,3s:3d,4s:4d",
# check unit
"create database db1 retentions 1b:1d",
"create database db1 retentions 1u:1d",
"create database db1 retentions 1a:1d",
"create database db1 retentions 1n:1d",
"create database db1 retentions 1y:1d",
"create database db1 retentions 1s:86400s",
"create database db1 retentions 1s:86400000a",
"create database db1 retentions 1s:86400000000u",
"create database db1 retentions 1s:86400000000000b",
"create database db1 retentions 1s:1w",
"create database db1 retentions 1s:1n",
"create database db1 retentions 1s:1y",
# check value range
"create database db1 retentions -1s:1d",
"create database db1 retentions 0s:1d",
"create database db3 retentions 1s:-1d",
"create database db3 retentions 1s:0d",
"create database db1 retentions 1s:1y",
"create database db1 retentions 1s:1n",
"create database db2 retentions 1w:1d ;",
"create database db5 retentions 1s:1d,3s:3d,2s:2d",
"create database db1 retentions 1s:1n,2s:2d,3s:3d,4s:4d",
"create database db3 retentions 1s:1439m",
"create database db3 retentions 1s:365001d",
"create database db3 retentions 1s:8760001h",
"create database db3 retentions 1s:525600001m",
"create database db3 retentions 1s:106581d precision 'ns'",
"create database db3 retentions 1s:2557921h precision 'ns'",
"create database db3 retentions 1s:153475201m precision 'ns'",
# check relationships
"create database db5 retentions 1441m:1440m,2d:3d",
"create database db5 retentions 2m:1d,1s:2d",
"create database db5 retentions 1s:2880m,2s:2879m",
"create database db5 retentions 1s:1d,2s:2d,2s:3d",
"create database db5 retentions 1s:1d,3s:2d,2s:3d",
"create database db1 retentions 1s:1d,2s:3d,3s:2d",
]

@property
Expand Down Expand Up @@ -92,6 +120,16 @@ def create_stable_sql_err(self, dbname=DB2):
f"create stable {dbname}.stb24 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) " ,
f"create stable {dbname}.stb25 ({PRIMARY_COL} timestamp, {INT_COL} int) " ,
f"create stable {dbname}.stb26 ({PRIMARY_COL} timestamp, {INT_COL} int, {BINARY_COL} nchar(16)) " ,
# only float/double allowd for avg/sum
f"create stable {dbname}.stb11 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(avg)",
f"create stable {dbname}.stb11 ({PRIMARY_COL} timestamp, {BINT_COL} bigint) tags (tag1 int) rollup(avg)",
f"create stable {dbname}.stb11 ({PRIMARY_COL} timestamp, {BOOL_COL} bool) tags (tag1 int) rollup(avg)",
f"create stable {dbname}.stb11 ({PRIMARY_COL} timestamp, {BINARY_COL} binary(10)) tags (tag1 int) rollup(avg)",
f"create stable {dbname}.stb11 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(sum)",
f"create stable {dbname}.stb11 ({PRIMARY_COL} timestamp, {BINT_COL} bigint) tags (tag1 int) rollup(sum)",
f"create stable {dbname}.stb11 ({PRIMARY_COL} timestamp, {BOOL_COL} bool) tags (tag1 int) rollup(sum)",
f"create stable {dbname}.stb11 ({PRIMARY_COL} timestamp, {BINARY_COL} binary(10)) tags (tag1 int) rollup(sum)",


# watermark, max_delay: [0, 900000], [ms, s, m, ?]
f"create stable stb17 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay 1u",
Expand All @@ -108,10 +146,10 @@ def create_stable_sql_err(self, dbname=DB2):
@property
def create_stable_sql_current(self):
return [
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(avg)",
f"create stable stb1 ({PRIMARY_COL} timestamp, {FLOAT_COL} float) tags (tag1 int) rollup(avg)",
f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) watermark 5s max_delay 1m",
f"create stable stb3 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(max) watermark 5s max_delay 1m",
f"create stable stb4 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(sum) watermark 5s max_delay 1m",
f"create stable stb4 ({PRIMARY_COL} timestamp, {DOUBLE_COL} double) tags (tag1 int) rollup(sum) watermark 5s max_delay 1m",
f"create stable stb5 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(last) watermark 5s max_delay 1m",
f"create stable stb6 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(first) watermark 5s max_delay 1m",
f"create stable stb7 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(first) watermark 5s max_delay 1m sma({INT_COL})",
Expand Down Expand Up @@ -154,6 +192,12 @@ def __create_tb(self, stb=STBNAME, ctb_num=20, ntbnum=1, rsma=False, dbname=DBNA
{INT_UN_COL} int unsigned, {BINT_UN_COL} bigint unsigned, {BINARY_COL} binary(16)
) tags ({INT_TAG} int) rollup({rsma_type}) watermark 5s,5s max_delay 5s,5s
'''
elif rsma_type.lower().strip() in ("sum", "avg"):
create_stb_sql = f'''create table {dbname}.{stb}(
ts timestamp, {DOUBLE_COL} double, {DOUBLE_COL}_1 double, {DOUBLE_COL}_2 double, {DOUBLE_COL}_3 double,
{FLOAT_COL} float, {DOUBLE_COL}_4 double, {FLOAT_COL}_1 float, {FLOAT_COL}_2 float, {FLOAT_COL}_3 float,
{DOUBLE_COL}_5 double) tags ({INT_TAG} int) rollup({rsma_type}) watermark 5s,5s max_delay 5s,5s
'''
else:
create_stb_sql = f'''create table {dbname}.{stb}(
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
Expand Down Expand Up @@ -200,11 +244,16 @@ def __insert_data(self, rows, ctb_num=20, dbname=DBNAME, rsma=False, rsma_type="
{data.int_data[i]}, {data.bint_data[i]}, {data.sint_data[i]}, {data.tint_data[i]}, {data.float_data[i]}, {data.double_data[i]},
{data.utint_data[i]}, {data.usint_data[i]}, {data.uint_data[i]}, {data.ubint_data[i]}, '{data.vchar_data[i]}'
'''
else:
elif rsma_type.lower().strip() in ("sum", "avg"):
row_data = f'''
{data.int_data[i]}, {data.bint_data[i]}, {data.sint_data[i]}, {data.tint_data[i]}, {data.float_data[i]}, {data.double_data[i]},
{data.utint_data[i]}, {data.usint_data[i]}, {data.uint_data[i]}, {data.ubint_data[i]}
'''
else:
row_data = f'''
{data.double_data[i]}, {data.double_data[i]}, {data.double_data[i]}, {data.double_data[i]}, {data.float_data[i]}, {data.double_data[i]},
{data.float_data[i]}, {data.float_data[i]}, {data.float_data[i]}, {data.double_data[i]}
'''
else:
row_data = f'''
{data.int_data[i]}, {data.bint_data[i]}, {data.sint_data[i]}, {data.tint_data[i]}, {data.float_data[i]}, {data.double_data[i]},
Expand Down Expand Up @@ -245,17 +294,17 @@ def run(self):
tdSql.query(f"select count(*) from {DB3}.{STBNAME} where ts > now()-5m")
tdSql.checkData(0, 0, self.rows * db3_ctb_num)
tdSql.checkRows(1)
tdSql.query(f"select {INT_COL} from {DB3}.{CTBNAME} where ts > now()-4d")
tdSql.query(f"select {FLOAT_COL} from {DB3}.{CTBNAME} where ts > now()-4d")
# not stable
#tdSql.checkData(0, 0, self.rows-1)
tdSql.query(f"select {INT_COL} from {DB3}.{CTBNAME} where ts > now()-6d")
tdSql.query(f"select {DOUBLE_COL} from {DB3}.{CTBNAME} where ts > now()-6d")
# not stable
# tdSql.checkData(0, 0, self.rows-1)

# from ...pytest.util.sql import tdSql

tdLog.printNoPrefix("==========step2.1.1 : alter stb schemaL drop column")
tdSql.query(f"select {BINT_COL} from {DB3}.{STBNAME}")
tdSql.query(f"select {FLOAT_COL} from {DB3}.{STBNAME}")
#tdSql.execute(f"alter stable {DB3}.stb1 drop column {BINT_COL}")
# not support alter stable schema anymore
tdSql.error(f"alter stable {DB3}.stb1 drop column {BINT_COL}")
Expand Down

0 comments on commit 1b7a6bc

Please sign in to comment.