Skip to content

Commit

Permalink
fix:add combine function for groupKey
Browse files Browse the repository at this point in the history
  • Loading branch information
54liuyao committed Mar 23, 2023
1 parent 323d8ee commit fe20423
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 4 deletions.
1 change: 1 addition & 0 deletions include/libs/function/functionMgt.h
Expand Up @@ -241,6 +241,7 @@ int32_t fmGetUdafExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet);
int32_t fmSetInvertFunc(int32_t funcId, SFuncExecFuncs* pFpSet);
int32_t fmSetNormalFunc(int32_t funcId, SFuncExecFuncs* pFpSet);
bool fmIsInvertible(int32_t funcId);
char* fmGetFuncName(int32_t funcId);

#ifdef __cplusplus
}
Expand Down
8 changes: 5 additions & 3 deletions source/libs/executor/src/timewindowoperator.c
Expand Up @@ -2110,10 +2110,12 @@ void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3
} else if (functionNeedToExecute(&pDestCtx[k]) && pDestCtx[k].fpSet.combine != NULL) {
int32_t code = pDestCtx[k].fpSet.combine(&pDestCtx[k], &pSourceCtx[k]);
if (code != TSDB_CODE_SUCCESS) {
qError("%s apply functions error, code: %s", GET_TASKID(pTaskInfo), tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
qError("%s apply combine functions error, code: %s", GET_TASKID(pTaskInfo), tstrerror(code));
}
} else if (pDestCtx[k].fpSet.combine == NULL) {
char* funName = fmGetFuncName(pDestCtx[k].functionId);
qError("%s error, combine funcion for %s is not implemented", GET_TASKID(pTaskInfo), funName);
taosMemoryFreeClear(funName);
}
}
}
Expand Down
1 change: 1 addition & 0 deletions source/libs/function/inc/builtinsimpl.h
Expand Up @@ -235,6 +235,7 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getGroupKeyFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t groupKeyFunction(SqlFunctionCtx* pCtx);
int32_t groupKeyFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t groupKeyCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);

#ifdef __cplusplus
}
Expand Down
3 changes: 2 additions & 1 deletion source/libs/function/src/builtins.c
Expand Up @@ -2480,7 +2480,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "irate",
.type = FUNCTION_TYPE_IRATE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
.translateFunc = translateIrate,
.getEnvFunc = getIrateFuncEnv,
.initFunc = irateFuncSetup,
Expand Down Expand Up @@ -3234,6 +3234,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = functionSetup,
.processFunc = groupKeyFunction,
.finalizeFunc = groupKeyFinalize,
.combineFunc = groupKeyCombine,
.pPartialFunc = "_group_key",
.pMergeFunc = "_group_key"
},
Expand Down
33 changes: 33 additions & 0 deletions source/libs/function/src/builtinsimpl.c
Expand Up @@ -5900,6 +5900,39 @@ int32_t groupKeyFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return pResInfo->numOfRes;
}

int32_t groupKeyCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
SGroupKeyInfo* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);

SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
SGroupKeyInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);

// escape rest of data blocks to avoid first entry to be overwritten.
if (pDBuf->hasResult) {
goto _group_key_over;
}

if (pSBuf->isNull) {
pDBuf->isNull = true;
pDBuf->hasResult = true;
goto _group_key_over;
}

if (IS_VAR_DATA_TYPE(pSourceCtx->resDataInfo.type)) {
memcpy(pDBuf->data, pSBuf->data,
(pSourceCtx->resDataInfo.type == TSDB_DATA_TYPE_JSON) ? getJsonValueLen(pSBuf->data) : varDataTLen(pSBuf->data));
} else {
memcpy(pDBuf->data, pSBuf->data, pSourceCtx->resDataInfo.bytes);
}

pDBuf->hasResult = true;

_group_key_over:

SET_VAL(pDResInfo, 1, 1);
return TSDB_CODE_SUCCESS;
}

int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElems = 0;

Expand Down
7 changes: 7 additions & 0 deletions source/libs/function/src/functionMgt.c
Expand Up @@ -447,3 +447,10 @@ int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc

return code;
}

char* fmGetFuncName(int32_t funcId) {
if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) {
return taosStrdup("invalid function");
}
return taosStrdup(funcMgtBuiltins[funcId].name);
}
118 changes: 118 additions & 0 deletions tests/script/tsim/stream/distributeInterval0.sim
Expand Up @@ -272,4 +272,122 @@ if $data12 != 2 then
goto loop3
endi

print ===== step3

sql drop database if exists test4;
sql create database test4 vgroups 10;
sql use test4;
sql create stable st(ts timestamp,a int,b int,c varchar(250) ) tags(ta int,tb int,tc int);
sql create table aaa using st tags(1,1,1);
sql create table bbb using st tags(2,2,2);
sql create table ccc using st tags(3,2,2);
sql create table ddd using st tags(4,2,2);


sql create stream streams1 ignore expired 0 fill_history 0 watermark 3s into streamst subtable(c) as select _wstart, c , count(*) c1 from st partition by c interval(1s) ;

sql insert into aaa values(1648791221001,2,2,"/a1/aa/aa");
sql insert into bbb values(1648791221001,2,2,"/a1/aa/aa");
sql insert into ccc values(1648791221001,2,2,"/a1/aa/aa");
sql insert into ddd values(1648791221001,2,2,"/a1/aa/aa");

sql insert into aaa values(1648791222002,2,2,"/a2/aa/aa");
sql insert into bbb values(1648791222002,2,2,"/a2/aa/aa");
sql insert into ccc values(1648791222002,2,2,"/a2/aa/aa");
sql insert into ddd values(1648791222002,2,2,"/a2/aa/aa");

sql insert into aaa values(1648791223003,2,2,"/a3/aa/aa");
sql insert into bbb values(1648791223003,2,2,"/a3/aa/aa");
sql insert into ccc values(1648791223003,2,2,"/a3/aa/aa");
sql insert into ddd values(1648791223003,2,2,"/a3/aa/aa");

sql insert into aaa values(1648791224003,2,2,"/a4/aa/aa");
sql insert into bbb values(1648791224003,2,2,"/a4/aa/aa");
sql insert into ccc values(1648791224003,2,2,"/a4/aa/aa");
sql insert into ddd values(1648791224003,2,2,"/a4/aa/aa");


sql insert into aaa values(1648791225003,2,2,"/a5/aa/aa");
sql insert into bbb values(1648791225003,2,2,"/a5/aa/aa");
sql insert into ccc values(1648791225003,2,2,"/a5/aa/aa");
sql insert into ddd values(1648791225003,2,2,"/a5/aa/aa");

sql insert into aaa values(1648791226003,2,2,"/a6/aa/aa");
sql insert into bbb values(1648791226003,2,2,"/a6/aa/aa");
sql insert into ccc values(1648791226003,2,2,"/a6/aa/aa");
sql insert into ddd values(1648791226003,2,2,"/a6/aa/aa");

$loop_count = 0

loop4:
sleep 200

$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi

sql select * from streamst;

if $rows == 0 then
goto loop4
endi

sql delete from aaa where ts = 1648791223003 ;

$loop_count = 0

loop5:
sleep 200

$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi

sql select * from streamst;

if $rows == 0 then
goto loop5
endi


sql delete from ccc;

$loop_count = 0

loop6:
sleep 200

$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi

sql select * from streamst;

if $rows == 0 then
goto loop6
endi

sql delete from ddd;

$loop_count = 0

loop7:
sleep 200

$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi

sql select * from streamst;

if $rows == 0 then
goto loop7
endi

print ===== over

system sh/stop_dnodes.sh

0 comments on commit fe20423

Please sign in to comment.