Skip to content

Commit

Permalink
Merge pull request #22438 from taosdata/feat/TD-23299
Browse files Browse the repository at this point in the history
feat: add irate distributed execution
  • Loading branch information
dapan1121 committed Aug 16, 2023
2 parents a6ecb72 + 3c6ddad commit b113d02
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 0 deletions.
2 changes: 2 additions & 0 deletions include/libs/function/functionMgt.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ typedef enum EFunctionType {
FUNCTION_TYPE_AVG_MERGE,
FUNCTION_TYPE_STDDEV_PARTIAL,
FUNCTION_TYPE_STDDEV_MERGE,
FUNCTION_TYPE_IRATE_PARTIAL,
FUNCTION_TYPE_IRATE_MERGE,

// geometry functions
FUNCTION_TYPE_GEOM_FROM_TEXT = 4250,
Expand Down
3 changes: 3 additions & 0 deletions source/libs/function/inc/builtinsimpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,10 @@ int32_t derivativeFunction(SqlFunctionCtx* pCtx);
bool getIrateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool irateFuncSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo);
int32_t irateFunction(SqlFunctionCtx* pCtx);
int32_t irateFunctionMerge(SqlFunctionCtx* pCtx);
int32_t irateFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t iratePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t getIrateInfoSize();

int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx);

Expand Down
64 changes: 64 additions & 0 deletions source/libs/function/src/builtins.c
Original file line number Diff line number Diff line change
Expand Up @@ -1567,6 +1567,45 @@ static int32_t translateIrate(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
return TSDB_CODE_SUCCESS;
}

static int32_t translateIrateImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) {
uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
if (isPartial) {
if (3 != LIST_LENGTH(pFunc->pParameterList)) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
if (!IS_NUMERIC_TYPE(colType)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
pFunc->node.resType = (SDataType){.bytes = getIrateInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
} else {
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
if (TSDB_DATA_TYPE_BINARY != colType) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE};

// add database precision as param
uint8_t dbPrec = pFunc->node.resType.precision;
int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}


return TSDB_CODE_SUCCESS;
}

static int32_t translateIratePartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateIrateImpl(pFunc, pErrBuf, len, true);
}

static int32_t translateIrateMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateIrateImpl(pFunc, pErrBuf, len, false);
}

static int32_t translateInterp(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
uint8_t dbPrec = pFunc->node.resType.precision;
Expand Down Expand Up @@ -2604,6 +2643,31 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = irateFuncSetup,
.processFunc = irateFunction,
.sprocessFunc = irateScalarFunction,
.finalizeFunc = irateFinalize,
.pPartialFunc = "_irate_partial",
.pMergeFunc = "_irate_merge"
},
{
.name = "_irate_partial",
.type = FUNCTION_TYPE_IRATE_PARTIAL,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC |
FUNC_MGT_FORBID_SYSTABLE_FUNC,
.translateFunc = translateIratePartial,
.getEnvFunc = getIrateFuncEnv,
.initFunc = irateFuncSetup,
.processFunc = irateFunction,
.sprocessFunc = irateScalarFunction,
.finalizeFunc = iratePartialFinalize
},
{
.name = "_irate_merge",
.type = FUNCTION_TYPE_IRATE_MERGE,
.classification = FUNC_MGT_AGG_FUNC,
.translateFunc = translateIrateMerge,
.getEnvFunc = getIrateFuncEnv,
.initFunc = irateFuncSetup,
.processFunc = irateFunctionMerge,
.sprocessFunc = irateScalarFunction,
.finalizeFunc = irateFinalize
},
{
Expand Down
96 changes: 96 additions & 0 deletions source/libs/function/src/builtinsimpl.c
Original file line number Diff line number Diff line change
Expand Up @@ -5768,6 +5768,8 @@ int32_t derivativeFunction(SqlFunctionCtx* pCtx) {
return TSDB_CODE_SUCCESS;
}

int32_t getIrateInfoSize() { return (int32_t)sizeof(SRateInfo); }

bool getIrateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SRateInfo);
return true;
Expand Down Expand Up @@ -5817,6 +5819,7 @@ int32_t irateFunction(SqlFunctionCtx* pCtx) {
if (INT64_MIN == pRateInfo->lastKey) {
pRateInfo->lastValue = v;
pRateInfo->lastKey = tsList[i];
pRateInfo->hasResult = 1;
continue;
}

Expand Down Expand Up @@ -5868,6 +5871,99 @@ static double doCalcRate(const SRateInfo* pRateInfo, double tickPerSec) {
return (duration > 0) ? ((double)diff) / (duration / tickPerSec) : 0.0;
}

static void irateTransferInfoImpl(TSKEY inputKey, SRateInfo* pInput, SRateInfo* pOutput, bool isFirstKey) {
if (inputKey > pOutput->lastKey) {
pOutput->firstKey = pOutput->lastKey;
pOutput->firstValue = pOutput->lastValue;

pOutput->lastKey = isFirstKey ? pInput->firstKey : pInput->lastKey;
pOutput->lastValue = isFirstKey ? pInput->firstValue : pInput->lastValue;
} else if ((inputKey < pOutput->lastKey) && (inputKey > pOutput->firstKey)) {
pOutput->firstKey = isFirstKey ? pInput->firstKey : pInput->lastKey;
pOutput->firstValue = isFirstKey ? pInput->firstValue : pInput->lastValue;
} else {
// inputKey < pOutput->firstKey
}
}

static void irateCopyInfo(SRateInfo* pInput, SRateInfo* pOutput) {
pOutput->firstKey = pInput->firstKey;
pOutput->lastKey = pInput->lastKey;

pOutput->firstValue = pInput->firstValue;
pOutput->lastValue = pInput->lastValue;
}

static int32_t irateTransferInfo(SRateInfo* pInput, SRateInfo* pOutput) {
if ((pInput->firstKey != INT64_MIN && (pInput->firstKey == pOutput->firstKey || pInput->firstKey == pOutput->lastKey)) ||
(pInput->lastKey != INT64_MIN && (pInput->lastKey == pOutput->firstKey || pInput->lastKey == pOutput->lastKey))) {
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
}

if (pOutput->hasResult == 0) {
irateCopyInfo(pInput, pOutput);
pOutput->hasResult = pInput->hasResult;
return TSDB_CODE_SUCCESS;
}

if (pInput->firstKey != INT64_MIN) {
irateTransferInfoImpl(pInput->firstKey, pInput, pOutput, true);
}

if (pInput->lastKey != INT64_MIN) {
irateTransferInfoImpl(pInput->lastKey, pInput, pOutput, false);
}

pOutput->hasResult = pInput->hasResult;
return TSDB_CODE_SUCCESS;
}

int32_t irateFunctionMerge(SqlFunctionCtx* pCtx) {
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pCol = pInput->pData[0];
if (pCol->info.type != TSDB_DATA_TYPE_BINARY) {
return TSDB_CODE_FUNC_FUNTION_PARA_TYPE;
}

SRateInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));

int32_t start = pInput->startRowIndex;
for (int32_t i = start; i < start + pInput->numOfRows; ++i) {
char* data = colDataGetData(pCol, i);
SRateInfo* pInputInfo = (SRateInfo*)varDataVal(data);
if (pInputInfo->hasResult) {
int32_t code = irateTransferInfo(pInputInfo, pInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
}

if (pInfo->hasResult) {
GET_RES_INFO(pCtx)->numOfRes = 1;
}

return TSDB_CODE_SUCCESS;
}

int32_t iratePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SRateInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t resultBytes = getIrateInfoSize();
char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));

memcpy(varDataVal(res), pInfo, resultBytes);
varDataSetLen(res, resultBytes);

int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);

colDataSetVal(pCol, pBlock->info.rows, res, false);

taosMemoryFree(res);
return pResInfo->numOfRes;
}

int32_t irateFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
Expand Down

0 comments on commit b113d02

Please sign in to comment.