Skip to content

Commit

Permalink
Merge pull request #22764 from taosdata/fix/TD-26132
Browse files Browse the repository at this point in the history
fix: fix coverity issues
  • Loading branch information
dapan1121 committed Sep 7, 2023
2 parents 5982c80 + 875a081 commit e4f52ba
Show file tree
Hide file tree
Showing 11 changed files with 137 additions and 26 deletions.
10 changes: 4 additions & 6 deletions source/client/src/clientEnv.c
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,7 @@ void destroySubRequests(SRequestObj *pRequest) {
pReqList[++reqIdx] = pTmp;
releaseRequest(tmpRefId);
} else {
tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, tmpRefId,
pTmp->requestId);
tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId);
break;
}
}
Expand All @@ -398,7 +397,7 @@ void destroySubRequests(SRequestObj *pRequest) {
removeRequest(pTmp->self);
releaseRequest(pTmp->self);
} else {
tscError("0x%" PRIx64 " is not there", tmpRefId);
tscError("next req ref 0x%" PRIx64 " is not there", tmpRefId);
break;
}
}
Expand Down Expand Up @@ -492,8 +491,7 @@ void stopAllQueries(SRequestObj *pRequest) {
pReqList[++reqIdx] = pTmp;
releaseRequest(tmpRefId);
} else {
tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, tmpRefId,
pTmp->requestId);
tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId);
break;
}
}
Expand All @@ -512,7 +510,7 @@ void stopAllQueries(SRequestObj *pRequest) {
taosStopQueryImpl(pTmp);
releaseRequest(pTmp->self);
} else {
tscError("0x%" PRIx64 " is not there", tmpRefId);
tscError("next req ref 0x%" PRIx64 " is not there", tmpRefId);
break;
}
}
Expand Down
14 changes: 9 additions & 5 deletions source/client/src/clientMain.c
Original file line number Diff line number Diff line change
Expand Up @@ -874,8 +874,13 @@ void handleSubQueryFromAnalyse(SSqlCallbackWrapper *pWrapper, SMetaData *pResult
if (TSDB_CODE_SUCCESS == code) {
code = cloneCatalogReq(&pNewWrapper->pCatalogReq, pWrapper->pCatalogReq);
}
doAsyncQueryFromAnalyse(pResultMeta, pNewWrapper, code);
nodesDestroyNode(pRoot);
if (TSDB_CODE_SUCCESS == code) {
doAsyncQueryFromAnalyse(pResultMeta, pNewWrapper, code);
nodesDestroyNode(pRoot);
} else {
handleQueryAnslyseRes(pWrapper, pResultMeta, code);
return;
}
}

void handleQueryAnslyseRes(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, int32_t code) {
Expand Down Expand Up @@ -1148,8 +1153,7 @@ void restartAsyncQuery(SRequestObj *pRequest, int32_t code) {
pReqList[++reqIdx] = pTmp;
releaseRequest(tmpRefId);
} else {
tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, tmpRefId,
pTmp->requestId);
tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId);
break;
}
}
Expand All @@ -1162,7 +1166,7 @@ void restartAsyncQuery(SRequestObj *pRequest, int32_t code) {
removeRequest(pTmp->self);
releaseRequest(pTmp->self);
} else {
tscError("0x%" PRIx64 " is not there", tmpRefId);
tscError("next req ref 0x%" PRIx64 " is not there", tmpRefId);
break;
}
}
Expand Down
3 changes: 2 additions & 1 deletion source/libs/command/src/command.c
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,11 @@ static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char* dbName, ch

if (retentions) {
len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, " RETENTIONS %s", retentions);
taosMemoryFree(retentions);
}
}

taosMemoryFree(retentions);

(varDataLen(buf2)) = len;

colDataSetVal(pCol2, 0, buf2, false);
Expand Down
2 changes: 1 addition & 1 deletion source/libs/executor/src/dataInserter.c
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
taosThreadMutexInit(&inserter->mutex, NULL);
if (NULL == inserter->pDataBlocks) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_OUT_OF_MEMORY;
goto _return;
}

inserter->fullOrderColList = pInserterNode->pCols->length == inserter->pSchema->numOfCols;
Expand Down
64 changes: 62 additions & 2 deletions source/libs/executor/src/dynqueryctrloperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,21 @@ static void updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo* pStbJoin)
static int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, bool needCache, SOperatorParam* pChild) {
*ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
if (NULL == *ppRes) {
freeOperatorParam(pChild, OP_GET_PARAM);
return TSDB_CODE_OUT_OF_MEMORY;
}
if (pChild) {
(*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
if (NULL == *ppRes) {
if (NULL == (*ppRes)->pChildren) {
freeOperatorParam(pChild, OP_GET_PARAM);
freeOperatorParam(*ppRes, OP_GET_PARAM);
*ppRes = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) {
freeOperatorParam(pChild, OP_GET_PARAM);
freeOperatorParam(*ppRes, OP_GET_PARAM);
*ppRes = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
} else {
Expand All @@ -167,6 +174,8 @@ static int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t down

SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam));
if (NULL == pGc) {
freeOperatorParam(*ppRes, OP_GET_PARAM);
*ppRes = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}

Expand All @@ -193,6 +202,7 @@ static int32_t buildGroupCacheNotifyOperatorParam(SOperatorParam** ppRes, int32_

SGcNotifyOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcNotifyOperatorParam));
if (NULL == pGc) {
freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
return TSDB_CODE_OUT_OF_MEMORY;
}

Expand Down Expand Up @@ -248,13 +258,15 @@ static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t d

SExchangeOperatorBatchParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorBatchParam));
if (NULL == pExc) {
taosMemoryFreeClear(*ppRes);
return TSDB_CODE_OUT_OF_MEMORY;
}

pExc->multiParams = true;
pExc->pBatchs = tSimpleHashInit(tSimpleHashGetSize(pVg), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
if (NULL == pExc->pBatchs) {
taosMemoryFree(pExc);
taosMemoryFreeClear(*ppRes);
return TSDB_CODE_OUT_OF_MEMORY;
}
tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
Expand Down Expand Up @@ -288,21 +300,36 @@ static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t d
static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initParam, SOperatorParam* pChild0, SOperatorParam* pChild1) {
*ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
if (NULL == *ppRes) {
freeOperatorParam(pChild0, OP_GET_PARAM);
freeOperatorParam(pChild1, OP_GET_PARAM);
return TSDB_CODE_OUT_OF_MEMORY;
}
(*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
if (NULL == *ppRes) {
freeOperatorParam(pChild0, OP_GET_PARAM);
freeOperatorParam(pChild1, OP_GET_PARAM);
freeOperatorParam(*ppRes, OP_GET_PARAM);
*ppRes = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) {
freeOperatorParam(pChild0, OP_GET_PARAM);
freeOperatorParam(pChild1, OP_GET_PARAM);
freeOperatorParam(*ppRes, OP_GET_PARAM);
*ppRes = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) {
freeOperatorParam(pChild1, OP_GET_PARAM);
freeOperatorParam(*ppRes, OP_GET_PARAM);
*ppRes = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}

SSortMergeJoinOperatorParam* pJoin = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam));
if (NULL == pJoin) {
freeOperatorParam(*ppRes, OP_GET_PARAM);
*ppRes = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}

Expand All @@ -318,16 +345,28 @@ static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initPara
static int32_t buildMergeJoinNotifyOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) {
*ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
if (NULL == *ppRes) {
freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
return TSDB_CODE_OUT_OF_MEMORY;
}
(*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
if (NULL == *ppRes) {
taosMemoryFreeClear(*ppRes);
freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
return TSDB_CODE_OUT_OF_MEMORY;
}
if (pChild0 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) {
freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
*ppRes = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
if (pChild1 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) {
freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
*ppRes = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}

Expand Down Expand Up @@ -420,13 +459,34 @@ static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SS

if (TSDB_CODE_SUCCESS == code) {
code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pSrcParam0);
pSrcParam0 = NULL;
}
if (TSDB_CODE_SUCCESS == code) {
code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pPost->rightNeedCache, pSrcParam1);
pSrcParam1 = NULL;
}
if (TSDB_CODE_SUCCESS == code) {
code = buildMergeJoinOperatorParam(ppParam, pSrcParam0 ? true : false, pGcParam0, pGcParam1);
}
if (TSDB_CODE_SUCCESS != code) {
if (pSrcParam0) {
freeOperatorParam(pSrcParam0, OP_GET_PARAM);
}
if (pSrcParam1) {
freeOperatorParam(pSrcParam1, OP_GET_PARAM);
}
if (pGcParam0) {
freeOperatorParam(pGcParam0, OP_GET_PARAM);
}
if (pGcParam1) {
freeOperatorParam(pGcParam1, OP_GET_PARAM);
}
if (*ppParam) {
freeOperatorParam(*ppParam, OP_GET_PARAM);
*ppParam = NULL;
}
}

return code;
}

Expand Down Expand Up @@ -488,7 +548,7 @@ static void handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCt

if (pPost->leftNeedCache) {
uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
if (--(*num) <= 0) {
if (num && --(*num) <= 0) {
tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
notifySeqJoinTableCacheEnd(pOperator, pPost, true);
}
Expand Down
6 changes: 4 additions & 2 deletions source/libs/executor/src/eventwindowoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* p
SFilterColumnParam param2 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
code = filterSetDataFromSlotId(pInfo->pEndCondInfo, &param2);
if (code != TSDB_CODE_SUCCESS) {
return code;
goto _return;
}

int32_t status2 = 0;
Expand Down Expand Up @@ -331,10 +331,12 @@ int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* p
}
}

_return:

colDataDestroy(ps);
taosMemoryFree(ps);
colDataDestroy(pe);
taosMemoryFree(pe);

return TSDB_CODE_SUCCESS;
return code;
}
19 changes: 17 additions & 2 deletions source/libs/executor/src/groupcacheoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ static int32_t acquireFdFromFileCtx(SGcFileCacheCtx* pFileCtx, int32_t fileId, S
SGroupCacheFileInfo newFile = {0};
taosHashPut(pFileCtx->pCacheFile, &fileId, sizeof(fileId), &newFile, sizeof(newFile));
pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId));
if (NULL == pTmp) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}

if (pTmp->deleted) {
Expand Down Expand Up @@ -287,7 +290,7 @@ static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamC

if (deleted) {
qTrace("FileId:%d-%d-%d already be deleted, skip write",
pCtx->id, pGroup->vgId, pHead->basic.fileId);
pCtx->id, pGroup ? pGroup->vgId : GROUP_CACHE_DEFAULT_VGID, pHead->basic.fileId);

int64_t blkId = pHead->basic.blkId;
pHead = pHead->next;
Expand Down Expand Up @@ -337,7 +340,9 @@ static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcDownstr
return TSDB_CODE_OUT_OF_MEMORY;
}
pBufInfo = taosHashGet(pCache->pDirtyBlk, &pBufInfo->basic.blkId, sizeof(pBufInfo->basic.blkId));

if (NULL == pBufInfo) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = TSDB_CODE_SUCCESS;
SGcBlkBufInfo* pWriteHead = NULL;

Expand Down Expand Up @@ -378,6 +383,10 @@ static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcDownstr

static FORCE_INLINE void chkRemoveVgroupCurrFile(SGcFileCacheCtx* pFileCtx, int32_t downstreamIdx, int32_t vgId) {
SGroupCacheFileInfo* pFileInfo = taosHashGet(pFileCtx->pCacheFile, &pFileCtx->fileId, sizeof(pFileCtx->fileId));
if (NULL == pFileInfo) {
return;
}

if (0 == pFileInfo->groupNum) {
removeGroupCacheFile(pFileInfo);

Expand Down Expand Up @@ -711,6 +720,9 @@ static int32_t addFileRefTableNum(SGcFileCacheCtx* pFileCtx, int32_t fileId, int
newFile.groupNum = 1;
taosHashPut(pFileCtx->pCacheFile, &fileId, sizeof(fileId), &newFile, sizeof(newFile));
pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId));
if (NULL == pTmp) {
return TSDB_CODE_OUT_OF_MEMORY;
}
} else {
pTmp->groupNum++;
}
Expand Down Expand Up @@ -786,6 +798,9 @@ static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam*
}

*ppGrp = taosHashGet(pGrpHash, &uid, sizeof(uid));
if (NULL == *ppGrp) {
return TSDB_CODE_OUT_OF_MEMORY;
}
initNewGroupData(pCtx, *ppGrp, pParam->downstreamIdx, vgId, pGCache->batchFetch, pGcParam->needCache);

qError("new group %" PRIu64 " initialized, downstreamIdx:%d, vgId:%d, needCache:%d", uid, pParam->downstreamIdx, vgId, pGcParam->needCache);
Expand Down
2 changes: 2 additions & 0 deletions source/libs/executor/src/hashjoinoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -636,12 +636,14 @@ static int32_t addRowToHashImpl(SHJoinOperatorInfo* pJoin, SGroupData* pGroup, S

int32_t code = getValBufFromPages(pJoin->pRowBufs, getHJoinValBufSize(pTable, rowIdx), &pTable->valData, pRow);
if (code) {
taosMemoryFree(pRow);
return code;
}

if (NULL == pGroup) {
pRow->next = NULL;
if (tSimpleHashPut(pJoin->pKeyHash, pTable->keyData, keyLen, &group, sizeof(group))) {
taosMemoryFree(pRow);
return TSDB_CODE_OUT_OF_MEMORY;
}
} else {
Expand Down
5 changes: 5 additions & 0 deletions source/libs/executor/src/mergejoinoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,11 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs
}
}
}

if (NULL == pJoinInfo->pLeft || NULL == pJoinInfo->pRight) {
setMergeJoinDone(pOperator);
return false;
}

// only the timestamp match support for ordinary table
SColumnInfoData* pLeftCol = taosArrayGet(pJoinInfo->pLeft->pDataBlock, pJoinInfo->leftCol.slotId);
Expand Down

0 comments on commit e4f52ba

Please sign in to comment.