Skip to content

Commit

Permalink
Merge pull request #21963 from luckeverda/fix/TD-24473-new
Browse files Browse the repository at this point in the history
fix/TD-24473
  • Loading branch information
gccgdb1234 committed Jul 11, 2023
2 parents c20ac4f + 8de3121 commit 13bc1e7
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 75 deletions.
2 changes: 1 addition & 1 deletion include/libs/scalar/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ typedef struct SFilterColumnParam {
} SFilterColumnParam;

extern int32_t filterInitFromNode(SNode *pNode, SFilterInfo **pinfo, uint32_t options);
extern bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p, SColumnDataAgg *statis,
extern int32_t filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p, SColumnDataAgg *statis,
int16_t numOfCols, int32_t *pFilterResStatus);
extern int32_t filterSetDataFromSlotId(SFilterInfo *info, void *param);
extern int32_t filterSetDataFromColId(SFilterInfo *info, void *param);
Expand Down
3 changes: 3 additions & 0 deletions include/util/taoserror.h
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,9 @@ int32_t* taosGetErrno();
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)
#define TSDB_CODE_INDEX_INVALID_FILE TAOS_DEF_ERROR_CODE(0, 0x3201)

//scalar
#define TSDB_CODE_SCALAR_CONVERT_ERROR TAOS_DEF_ERROR_CODE(0, 0x3250)

//tmq
#define TSDB_CODE_TMQ_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x4000)
#define TSDB_CODE_TMQ_CONSUMER_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x4001)
Expand Down
2 changes: 1 addition & 1 deletion source/libs/executor/inc/executorInt.h
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* de

extern void doDestroyExchangeOperatorInfo(void* param);

void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo);
int32_t doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo);
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
int32_t rows, const char* idStr, STableMetaCacheInfo* pCache);

Expand Down
42 changes: 24 additions & 18 deletions source/libs/executor/src/executorInt.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock*
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);

static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep,
int32_t status);
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status);
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
bool createDummyCol);
static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
Expand Down Expand Up @@ -501,20 +500,26 @@ void clearResultRowInitFlag(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
}
}

void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo) {
int32_t doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo) {
if (pFilterInfo == NULL || pBlock->info.rows == 0) {
return;
return TSDB_CODE_SUCCESS;
}

SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
int32_t code = filterSetDataFromSlotId(pFilterInfo, &param1);
SColumnInfoData* p = NULL;

SColumnInfoData* p = NULL;
int32_t status = 0;
int32_t code = filterSetDataFromSlotId(pFilterInfo, &param1);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}

// todo the keep seems never to be True??
bool keep = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
extractQualifiedTupleByFilterResult(pBlock, p, keep, status);
int32_t status = 0;
code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}

extractQualifiedTupleByFilterResult(pBlock, p, status);

if (pColMatchInfo != NULL) {
size_t size = taosArrayGetSize(pColMatchInfo->pList);
Expand All @@ -529,23 +534,24 @@ void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pCol
}
}
}
code = TSDB_CODE_SUCCESS;

_err:
colDataDestroy(p);
taosMemoryFree(p);
return code;
}

void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep, int32_t status) {
if (keep) {
return;
}

void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status) {
int8_t* pIndicator = (int8_t*)p->pData;
if (status == FILTER_RESULT_ALL_QUALIFIED) {
// here nothing needs to be done
} else if (status == FILTER_RESULT_NONE_QUALIFIED) {
pBlock->info.rows = 0;
} else if (status == FILTER_RESULT_PARTIAL_QUALIFIED) {
trimDataBlock(pBlock, pBlock->info.rows, (bool*)pIndicator);
} else {
trimDataBlock(pBlock, pBlock->info.rows, (bool*) pIndicator);
qError("unknown filter result type: %d", status);
}
}

Expand Down Expand Up @@ -587,7 +593,7 @@ void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultR
pCtx[j].resultInfo->numOfRes = pRow->numOfRows;
}
}

blockDataEnsureCapacity(pBlock, pBlock->info.rows + pCtx[j].resultInfo->numOfRes);
int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
if (TAOS_FAILED(code)) {
Expand Down Expand Up @@ -1062,5 +1068,5 @@ void streamOpReloadState(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.reloadStreamStateFn) {
downstream->fpSet.reloadStreamStateFn(downstream);
}
}
}
7 changes: 4 additions & 3 deletions source/libs/executor/src/scanoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -401,9 +401,10 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
pCost->totalRows -= pBlock->info.rows;

if (pOperator->exprSupp.pFilterInfo != NULL) {
int64_t st = taosGetTimestampUs();
doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pTableScanInfo->matchInfo);
int32_t code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pTableScanInfo->matchInfo);
if (code != TSDB_CODE_SUCCESS) return code;

int64_t st = taosGetTimestampUs();
double el = (taosGetTimestampUs() - st) / 1000.0;
pTableScanInfo->readRecorder.filterTime += el;

Expand Down Expand Up @@ -2880,7 +2881,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
} else if (kWay <= 2) {
kWay = 2;
} else {
int i = 2;
int i = 2;
while (i * 2 <= kWay) i = i * 2;
kWay = i;
}
Expand Down
39 changes: 22 additions & 17 deletions source/libs/scalar/src/filter.c
Original file line number Diff line number Diff line change
Expand Up @@ -1979,7 +1979,7 @@ int32_t fltInitValFieldData(SFilterInfo *info) {
int32_t code = sclConvertValueToSclParam(var, &out, NULL);
if (code != TSDB_CODE_SUCCESS) {
qError("convert value to type[%d] failed", type);
return TSDB_CODE_TSC_INVALID_OPERATION;
return code;
}

size_t bufBytes = IS_VAR_DATA_TYPE(type) ? varDataTLen(out.columnData->pData)
Expand Down Expand Up @@ -4644,19 +4644,19 @@ int32_t filterInitFromNode(SNode *pNode, SFilterInfo **pInfo, uint32_t options)
FLT_RET(code);
}

bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p, SColumnDataAgg *statis, int16_t numOfCols,
int32_t *pResultStatus) {
int32_t filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p, SColumnDataAgg *statis,
int16_t numOfCols, int32_t *pResultStatus) {
if (NULL == info) {
*pResultStatus = FILTER_RESULT_ALL_QUALIFIED;
return false;
return TSDB_CODE_SUCCESS;
}

SScalarParam output = {0};
SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};

int32_t code = sclCreateColumnInfoData(&type, pSrc->info.rows, &output);
if (code != TSDB_CODE_SUCCESS) {
return false;
return code;
}

if (info->scalarMode) {
Expand All @@ -4666,7 +4666,7 @@ bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p, SC
code = scalarCalculate(info->sclCtx.node, pList, &output);
taosArrayDestroy(pList);

FLT_ERR_RET(code); // TODO: current errcode returns as true
FLT_ERR_RET(code);

*p = output.columnData;

Expand All @@ -4677,18 +4677,23 @@ bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p, SC
} else {
*pResultStatus = FILTER_RESULT_PARTIAL_QUALIFIED;
}
return false;
} else {
*p = output.columnData;
output.numOfRows = pSrc->info.rows;
return TSDB_CODE_SUCCESS;
}

if (*p == NULL) {
return false;
}
ASSERT(false == info->scalarMode);
*p = output.columnData;
output.numOfRows = pSrc->info.rows;

bool keep = (*info->func)(info, pSrc->info.rows, *p, statis, numOfCols, &output.numOfQualified);
if (*p == NULL) {
return TSDB_CODE_APP_ERROR;
}

// todo this should be return during filter procedure
bool keepAll = (*info->func)(info, pSrc->info.rows, *p, statis, numOfCols, &output.numOfQualified);

// todo this should be return during filter procedure
if (keepAll) {
*pResultStatus = FILTER_RESULT_ALL_QUALIFIED;
} else {
int32_t num = 0;
for (int32_t i = 0; i < output.numOfRows; ++i) {
if (((int8_t *)((*p)->pData))[i] == 1) {
Expand All @@ -4703,9 +4708,9 @@ bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p, SC
} else {
*pResultStatus = FILTER_RESULT_PARTIAL_QUALIFIED;
}

return keep;
}

return TSDB_CODE_SUCCESS;
}

typedef struct SClassifyConditionCxt {
Expand Down
Loading

0 comments on commit 13bc1e7

Please sign in to comment.