diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index e34b27e9b87..3c171ca5109 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -635,13 +635,15 @@ void streamMetaInit(); void streamMetaCleanup(); SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId); void streamMetaClose(SStreamMeta* streamMeta); + +// save to b-tree meta store int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); -int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask); -int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t checkpointVer, char* msg, int32_t msgLen); +int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); +int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask); +int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId); int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta); // todo remove it SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); -void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); int32_t streamMetaBegin(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 8a7b61135bf..e8bdf97c70d 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -160,7 +160,7 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { // 2.save task taosWLockLatch(&pSnode->pMeta->lock); - code = streamMetaAddDeployedTask(pSnode->pMeta, -1, pTask); + code = streamMetaRegisterTask(pSnode->pMeta, -1, pTask); if (code < 0) { taosWUnLockLatch(&pSnode->pMeta->lock); return -1; @@ -179,7 +179,14 @@ int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) { SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg; qDebug("snode:%d receive msg to drop stream task:0x%x", pSnode->pMeta->vgId, pReq->taskId); - streamMetaRemoveTask(pSnode->pMeta, pReq->taskId); + SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->taskId); + if (pTask == NULL) { + qError("vgId:%d failed to acquire s-task:0x%x when dropping it", pSnode->pMeta->vgId, pReq->taskId); + return 0; + } + + streamMetaUnregisterTask(pSnode->pMeta, pReq->taskId); + streamMetaReleaseTask(pSnode->pMeta, pTask); return 0; } diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index b77bb54714c..13b991e0386 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -172,7 +172,6 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq); // tq util int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock); -int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem); int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg); int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId, int32_t type, int64_t sver, int64_t ever); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1f6c162b9d1..a935eaf5f73 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1041,12 +1041,13 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms // 2.save task, use the newest commit version as the initial start version of stream task. int32_t taskId = 0; taosWLockLatch(&pStreamMeta->lock); - code = streamMetaAddDeployedTask(pStreamMeta, sversion, pTask); + code = streamMetaRegisterTask(pStreamMeta, sversion, pTask); taskId = pTask->id.taskId; int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta); if (code < 0) { tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, numOfTasks); + tFreeStreamTask(pTask); taosWUnLockLatch(&pStreamMeta->lock); return -1; } @@ -1136,7 +1137,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { pTask->streamTaskId.taskId, pTask->id.idStr); pTask->status.taskStatus = TASK_STATUS__DROPPING; - tqDebug("s-task:%s scan-history-task set status to be dropping", id); + tqDebug("s-task:%s fill-history task set status to be dropping", id); streamMetaSaveTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask); @@ -1166,12 +1167,14 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } if (!streamTaskRecoverScanStep1Finished(pTask)) { - tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " do secondary scan-history-data after halt the related stream task:%s", - id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, id); + STimeWindow* pWindow = &pTask->dataRange.window; + tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64 + ", do secondary scan-history data after halt the related stream task:%s", + id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, id); ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); st = taosGetTimestampMs(); - streamSetParamForStreamScannerStep2(pTask, pRange, &pTask->dataRange.window); + streamSetParamForStreamScannerStep2(pTask, pRange, pWindow); } if (!streamTaskRecoverScanStep2Finished(pTask)) { @@ -1259,6 +1262,7 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) { int32_t remain = streamAlignTransferState(pTask); if (remain > 0) { tqDebug("s-task:%s receive upstream transfer state msg, remain:%d", pTask->id.idStr, remain); + streamMetaReleaseTask(pTq->pStreamMeta, pTask); return 0; } @@ -1466,8 +1470,14 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; tqDebug("vgId:%d receive msg to drop stream task:0x%x", TD_VID(pTq->pVnode), pReq->taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); + if (pTask == NULL) { + tqError("vgId:%d failed to acquire s-task:0x%x when dropping it", pTq->pStreamMeta->vgId, pReq->taskId); + return 0; + } - streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId); + streamMetaUnregisterTask(pTq->pStreamMeta, pReq->taskId); + streamMetaReleaseTask(pTq->pStreamMeta, pTask); return 0; } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index ba983b1833f..675cbe4549d 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -336,6 +336,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, const char* id) { int32_t len = pReader->pHead->head.bodyLen - sizeof(SMsgHead); extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem); + tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%"PRId64, id, len, ver); } else { ASSERT(0); } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 11bfcf7fc52..c2e4374505a 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -20,21 +20,6 @@ static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp, int32_t vgId); -int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem) { - int32_t code = tAppendDataToInputQueue(pTask, pQueueItem); - if (code < 0) { - tqError("s-task:%s failed to put into queue, too many", pTask->id.idStr); - return -1; - } - - if (streamSchedExec(pTask) < 0) { - tqError("stream task:%d failed to be launched, code:%s", pTask->id.taskId, tstrerror(terrno)); - return -1; - } - - return TSDB_CODE_SUCCESS; -} - int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq) { pRsp->reqOffset = pReq->reqOffset; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index f4109d5b991..2aa21bd86f3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3179,6 +3179,16 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { // load the last data block of current table STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter; + if (pScanInfo == NULL) { + tsdbError("table Iter is null, invalid pScanInfo, try next table %s", pReader->idStr); + bool hasNexTable = moveToNextTable(pUidList, pStatus); + if (!hasNexTable) { + return TSDB_CODE_SUCCESS; + } + + continue; + } + if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pScanInfo->uid, sizeof(pScanInfo->uid))) { // reset the index in last block when handing a new file doCleanupTableScanInfo(pScanInfo); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 9e5d3a3ab69..6c8d9ed59f5 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1774,6 +1774,67 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo) } } +static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeWindow* pWindow, const char* id) { + if (pWindow->skey != INT64_MIN) { + qDebug("%s filter for additional history window, skey:%"PRId64, id, pWindow->skey); + + bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool)); + bool hasUnqualified = false; + + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsIndex); + for(int32_t i = 0; i < pBlock->info.rows; ++i) { + int64_t* ts = (int64_t*) colDataGetData(pCol, i); + p[i] = (*ts >= pWindow->skey); + + if (!p[i]) { + hasUnqualified = true; + } + } + + if (hasUnqualified) { + trimDataBlock(pBlock, pBlock->info.rows, p); + } + + taosMemoryFree(p); + } +} + +// re-build the delete block, ONLY according to the split timestamp +static void rebuildDeleteBlockData(SSDataBlock* pBlock, int64_t skey, const char* id) { + if (skey == INT64_MIN) { + return; + } + + int32_t numOfRows = pBlock->info.rows; + + bool* p = taosMemoryCalloc(numOfRows, sizeof(bool)); + bool hasUnqualified = false; + + SColumnInfoData* pSrcStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); + uint64_t* tsStartCol = (uint64_t*)pSrcStartCol->pData; + SColumnInfoData* pSrcEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); + uint64_t* tsEndCol = (uint64_t*)pSrcEndCol->pData; + + for (int32_t i = 0; i < numOfRows; i++) { + if (tsStartCol[i] < skey) { + tsStartCol[i] = skey; + } + + if (tsEndCol[i] >= skey) { + p[i] = true; + } else { // this row should be removed, since it is not in this query time window, which is [skey, INT64_MAX] + hasUnqualified = true; + } + } + + if (hasUnqualified) { + trimDataBlock(pBlock, pBlock->info.rows, p); + } + + qDebug("%s re-build delete datablock, start key revised to:%"PRId64", rows:%"PRId64, id, skey, pBlock->info.rows); + taosMemoryFree(p); +} + static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { // NOTE: this operator does never check if current status is done or not SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -1800,8 +1861,10 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { } else { pTSInfo->base.cond.startVersion = pStreamInfo->fillHistoryVer.minVer; pTSInfo->base.cond.endVersion = pStreamInfo->fillHistoryVer.maxVer; - qDebug("stream recover step2, verRange:%" PRId64 " - %" PRId64", %s", pTSInfo->base.cond.startVersion, - pTSInfo->base.cond.endVersion, id); + pTSInfo->base.cond.twindows = pStreamInfo->fillHistoryWindow; + qDebug("stream recover step2, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 "-%" PRId64 ", %s", + pTSInfo->base.cond.startVersion, pTSInfo->base.cond.endVersion, pTSInfo->base.cond.twindows.skey, + pTSInfo->base.cond.twindows.ekey, id); pStreamInfo->recoverStep = STREAM_RECOVER_STEP__SCAN2; } @@ -1920,6 +1983,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { if (pInfo->pUpdateInfo) { pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version); } + blockDataUpdateTsWindow(pBlock, 0); switch (pBlock->info.type) { case STREAM_NORMAL: @@ -1942,7 +2006,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { } else { pDelBlock = pBlock; } + setBlockGroupIdByUid(pInfo, pDelBlock); + rebuildDeleteBlockData(pDelBlock, pStreamInfo->fillHistoryWindow.skey, id); printDataBlock(pDelBlock, "stream scan delete recv filtered"); if (pDelBlock->info.rows == 0) { if (pInfo->tqReader) { @@ -1950,6 +2016,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { } goto FETCH_NEXT_BLOCK; } + if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) { generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes); pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT; @@ -2091,39 +2158,15 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { doCheckUpdate(pInfo, pBlockInfo->window.ekey, pBlock); doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL); - { // do additional time window filter - STimeWindow* pWindow = &pStreamInfo->fillHistoryWindow; - - if (pWindow->skey != INT64_MIN) { - qDebug("%s filter for additional history window, skey:%"PRId64, id, pWindow->skey); - - bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool)); - bool hasUnqualified = false; - - SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); - for(int32_t i = 0; i < pBlock->info.rows; ++i) { - int64_t* ts = (int64_t*) colDataGetData(pCol, i); - p[i] = (*ts >= pWindow->skey); - - if (!p[i]) { - hasUnqualified = true; - } - } - - if (hasUnqualified) { - trimDataBlock(pBlock, pBlock->info.rows, p); - } - - taosMemoryFree(p); - } - } + // apply additional time window filter + doBlockDataWindowFilter(pBlock, pInfo->primaryTsIndex, &pStreamInfo->fillHistoryWindow, id); pBlock->info.dataLoad = 1; blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex); - qDebug("%s %" PRId64 " rows in datablock, update res:%" PRId64, id, pBlockInfo->rows, - pInfo->pUpdateDataRes->info.rows); - if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) { + int64_t numOfUpdateRes = pInfo->pUpdateDataRes->info.rows; + qDebug("%s %" PRId64 " rows in datablock, update res:%" PRId64, id, pBlockInfo->rows, numOfUpdateRes); + if (pBlockInfo->rows > 0 || numOfUpdateRes > 0) { break; } } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index cae7e10e3e9..fa0561a7221 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -65,7 +65,7 @@ static void streamSchedByTimer(void* param, void* tmrId) { SStreamTask* pTask = (void*)param; int8_t status = atomic_load_8(&pTask->triggerStatus); - qDebug("s-task:%s in scheduler timer, trigger status:%d", pTask->id.idStr, status); + qDebug("s-task:%s in scheduler, trigger status:%d, next:%dms", pTask->id.idStr, status, (int32_t)pTask->triggerParam); if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { streamMetaReleaseTask(NULL, pTask); @@ -74,23 +74,22 @@ static void streamSchedByTimer(void* param, void* tmrId) { } if (status == TASK_TRIGGER_STATUS__ACTIVE) { - SStreamTrigger* trigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0); - if (trigger == NULL) { + SStreamTrigger* pTrigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0); + if (pTrigger == NULL) { return; } - trigger->type = STREAM_INPUT__GET_RES; - trigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); - if (trigger->pBlock == NULL) { - taosFreeQitem(trigger); + pTrigger->type = STREAM_INPUT__GET_RES; + pTrigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); + if (pTrigger->pBlock == NULL) { + taosFreeQitem(pTrigger); return; } - trigger->pBlock->info.type = STREAM_GET_ALL; atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE); - - if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)trigger) < 0) { - taosFreeQitem(trigger); + pTrigger->pBlock->info.type = STREAM_GET_ALL; + if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pTrigger) < 0) { + taosFreeQitem(pTrigger); taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->schedTimer); return; } @@ -102,7 +101,7 @@ static void streamSchedByTimer(void* param, void* tmrId) { } int32_t streamSetupScheduleTrigger(SStreamTask* pTask) { - if (pTask->triggerParam != 0) { + if (pTask->triggerParam != 0 && pTask->info.fillHistory == 0) { int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1); ASSERT(ref == 2 && pTask->schedTimer == NULL); @@ -399,6 +398,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) { atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE); + qDebug("s-task:%s new data arrived, active the trigger, trigerStatus:%d", pTask->id.idStr, pTask->triggerStatus); } return 0; diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 92f1fc47abb..bb4b842787a 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -166,6 +166,8 @@ int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubm } SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) { + terrno = 0; + if (dst->type == STREAM_INPUT__DATA_BLOCK && pElem->type == STREAM_INPUT__DATA_BLOCK) { SStreamDataBlock* pBlock = (SStreamDataBlock*)dst; SStreamDataBlock* pBlockSrc = (SStreamDataBlock*)pElem; @@ -181,7 +183,10 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* return dst; } else if (dst->type == STREAM_INPUT__DATA_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) { SStreamMergedSubmit* pMerged = streamMergedSubmitNew(); - // todo handle error + if (pMerged == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } streamMergeSubmit(pMerged, (SStreamDataSubmit*)dst); streamMergeSubmit(pMerged, (SStreamDataSubmit*)pElem); @@ -189,6 +194,7 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* taosFreeQitem(pElem); return (SStreamQueueItem*)pMerged; } else { + qDebug("block type:%d not merged with existed blocks list, type:%d", pElem->type, dst->type); return NULL; } } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index e7adcf36dc5..af93d95a9fd 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -407,12 +407,14 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { streamTaskResumeFromHalt(pStreamTask); - pTask->status.taskStatus = TASK_STATUS__DROPPING; qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); + int32_t taskId = pTask->id.taskId; + + // free it and remove it from disk meta-store + streamMetaUnregisterTask(pMeta, taskId); // save to disk taosWLockLatch(&pMeta->lock); - streamMetaSaveTask(pMeta, pTask); streamMetaSaveTask(pMeta, pStreamTask); if (streamMetaCommit(pMeta) < 0) { // persist to disk @@ -464,7 +466,12 @@ static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu // todo we need to sort the data block, instead of just appending into the array list. void* newRet = streamMergeQueueItem(*pInput, qItem); if (newRet == NULL) { - qError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks); + if (terrno == 0) { + qDebug("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks); + } else { + qDebug("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks, + tstrerror(terrno)); + } streamQueueProcessFail(pTask->inputQueue); return TSDB_CODE_SUCCESS; } @@ -492,6 +499,10 @@ int32_t streamExecForAll(SStreamTask* pTask) { while (1) { int32_t batchSize = 0; SStreamQueueItem* pInput = NULL; + if (streamTaskShouldStop(&pTask->status)) { + qDebug("s-task:%s stream task stopped, abort", id); + break; + } // merge multiple input data if possible in the input queue. qDebug("s-task:%s start to extract data block from inputQ", id); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 0ef9807f8a1..758530f4fbc 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -217,6 +217,7 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { tEncoderClear(&encoder); if (tdbTbUpsert(pMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), buf, len, pMeta->txn) < 0) { + qError("s-task:%s save to disk failed, code:%s", pTask->id.idStr, tstrerror(terrno)); return -1; } @@ -224,8 +225,19 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { return 0; } +int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { + int32_t code = tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(taskId), pMeta->txn); + if (code != 0) { + qError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, taskId, tstrerror(terrno)); + } else { + qDebug("vgId:%d remove task:0x%x from metastore", pMeta->vgId, taskId); + } + + return code; +} + // add to the ready tasks hash map, not the restored tasks hash map -int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) { +int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) { void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); if (p == NULL) { if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { @@ -233,6 +245,8 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* return -1; } + taosArrayPush(pMeta->pTaskList, &pTask->id.taskId); + if (streamMetaSaveTask(pMeta, pTask) < 0) { tFreeStreamTask(pTask); return -1; @@ -242,7 +256,6 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* tFreeStreamTask(pTask); return -1; } - taosArrayPush(pMeta->pTaskList, &pTask->id.taskId); } else { return 0; } @@ -281,6 +294,7 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) { qTrace("s-task:%s release task, ref:%d", pTask->id.idStr, ref); } else if (ref == 0) { ASSERT(streamTaskShouldStop(&pTask->status)); + qTrace("s-task:%s all refs are gone, free it", pTask->id.idStr); tFreeStreamTask(pTask); } else if (ref < 0) { qError("task ref is invalid, ref:%d, %s", ref, pTask->id.idStr); @@ -297,7 +311,7 @@ static void doRemoveIdFromList(SStreamMeta* pMeta, int32_t num, int32_t taskId) } } -void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { +int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) { SStreamTask* pTask = NULL; // pre-delete operation @@ -309,7 +323,7 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { } else { qDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId); taosWUnLockLatch(&pMeta->lock); - return; + return 0; } taosWUnLockLatch(&pMeta->lock); @@ -339,27 +353,26 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); if (ppTask) { taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t)); - tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn); - atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING); + ASSERT(pTask->status.timerActive == 0); int32_t num = taosArrayGetSize(pMeta->pTaskList); - qDebug("s-task:%s set the drop task flag, remain running s-task:%d", pTask->id.idStr, num - 1); doRemoveIdFromList(pMeta, num, pTask->id.taskId); // remove the ref by timer if (pTask->triggerParam != 0) { taosTmrStop(pTask->schedTimer); - streamMetaReleaseTask(pMeta, pTask); } + streamMetaRemoveTask(pMeta, taskId); streamMetaReleaseTask(pMeta, pTask); } else { qDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", pMeta->vgId, taskId); } taosWUnLockLatch(&pMeta->lock); + return 0; } int32_t streamMetaBegin(SStreamMeta* pMeta) { @@ -404,7 +417,9 @@ int32_t streamMetaAbort(SStreamMeta* pMeta) { int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { TBC* pCur = NULL; + if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { + qError("vgId:%d failed to open stream meta, code:%s", pMeta->vgId, tstrerror(terrno)); return -1; } @@ -413,6 +428,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { void* pVal = NULL; int32_t vLen = 0; SDecoder decoder; + SArray* pRecycleList = taosArrayInit(4, sizeof(int32_t)); tdbTbcMoveToFirst(pCur); @@ -422,6 +438,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { tdbFree(pKey); tdbFree(pVal); tdbTbcClose(pCur); + taosArrayDestroy(pRecycleList); return -1; } @@ -429,16 +446,29 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { tDecodeStreamTask(&decoder, pTask); tDecoderClear(&decoder); - // remove duplicate + if (pTask->status.taskStatus == TASK_STATUS__DROPPING) { + int32_t taskId = pTask->id.taskId; + tFreeStreamTask(pTask); + + taosArrayPush(pRecycleList, &taskId); + + int32_t total = taosArrayGetSize(pRecycleList); + qDebug("s-task:0x%x is already dropped, add into recycle list, total:%d", taskId, total); + continue; + } + + // do duplicate task check. void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); if (p == NULL) { if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.version) < 0) { tdbFree(pKey); tdbFree(pVal); tdbTbcClose(pCur); - taosMemoryFree(pTask); + tFreeStreamTask(pTask); + taosArrayDestroy(pRecycleList); return -1; } + taosArrayPush(pMeta->pTaskList, &pTask->id.taskId); } else { tdbFree(pKey); @@ -452,7 +482,8 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { tdbFree(pKey); tdbFree(pVal); tdbTbcClose(pCur); - taosMemoryFree(pTask); + tFreeStreamTask(pTask); + taosArrayDestroy(pRecycleList); return -1; } @@ -462,8 +493,18 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { tdbFree(pKey); tdbFree(pVal); if (tdbTbcClose(pCur) < 0) { + taosArrayDestroy(pRecycleList); return -1; } + if (taosArrayGetSize(pRecycleList) > 0) { + for(int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) { + int32_t taskId = *(int32_t*) taosArrayGet(pRecycleList, i); + streamMetaRemoveTask(pMeta, taskId); + } + } + + qDebug("vgId:%d load %d task from disk", pMeta->vgId, (int32_t) taosArrayGetSize(pMeta->pTaskList)); + taosArrayDestroy(pRecycleList); return 0; } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 1c9e2672d19..dffa28e769a 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -765,7 +765,13 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { } else { SHistDataRange* pRange = &pTask->dataRange; - int64_t ekey = pRange->window.ekey + 1; + int64_t ekey = 0; + if (pRange->window.ekey < INT64_MAX) { + ekey = pRange->window.ekey + 1; + } else { + ekey = pRange->window.ekey; + } + int64_t ver = pRange->range.minVer; pRange->window.skey = ekey; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 863c4ce0252..1eb8d119168 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -211,7 +211,7 @@ static void freeItem(void* p) { } void tFreeStreamTask(SStreamTask* pTask) { - qDebug("free s-task:%s", pTask->id.idStr); + qDebug("free s-task:%s, %p", pTask->id.idStr, pTask); int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus)); if (pTask->inputQueue) { diff --git a/tests/script/tsim/stream/basic3.sim b/tests/script/tsim/stream/basic3.sim index 2df33541b46..f18061a6df0 100644 --- a/tests/script/tsim/stream/basic3.sim +++ b/tests/script/tsim/stream/basic3.sim @@ -1,11 +1,9 @@ system sh/stop_dnodes.sh system sh/deploy.sh -n dnode1 -i 1 -system sh/cfg.sh -n dnode1 -c debugflag -v 131 system sh/cfg.sh -n dnode1 -c keepColumnName -v 1 system sh/exec.sh -n dnode1 -s start -sleep 5000 - +sleep 1000 sql connect print ========== interval\session\state window @@ -32,7 +30,6 @@ sql create stream streamd6 into streamt6 as select ca, _wstart,_wend, count(*), sql alter local 'keepColumnName' '1' - sql CREATE STABLE `meters_test_data` (`ts` TIMESTAMP, `close` FLOAT, `parttime` TIMESTAMP, `parttime_str` VARCHAR(32)) TAGS (`id` VARCHAR(32)); sql_error create stream realtime_meters fill_history 1 into realtime_meters as select last(parttime),first(close),last(close) from meters_test_data partition by tbname state_window(parttime_str); @@ -58,17 +55,13 @@ sql_error create stream streamd11 into streamd11 as select _wstart, _wend, count sql alter local 'keepColumnName' '0' sql create stream realtime_meters fill_history 1 into realtime_meters as select last(parttime),first(close),last(close) from meters_test_data partition by tbname state_window(parttime_str); - sql desc realtime_meters; - if $rows == 0 then return -1 endi -sql create stream streamd7 into streamt7 as select _wstart, _wend, count(*), first(ca), last(ca) from t1 interval(10s); - +sql create stream streamd7 into streamt7 as select _wstart t1, _wend t2, count(*), first(ca), last(ca) from t1 interval(10s); sql desc streamt7; - if $rows == 0 then return -1 endi @@ -76,12 +69,11 @@ endi sql create stream streamd71 into streamt71 as select _wstart, _wend, count(*) as ca, first(ca), last(ca) as c2 from t1 interval(10s); sql desc streamt71; - if $rows == 0 then return -1 endi -sleep 3000 +sleep 1000 sql drop stream if exists streamd1; sql drop stream if exists streamd2; @@ -93,23 +85,19 @@ sql drop stream if exists streamd6; sql create stream streamd10 into streamd10 as select _wstart, _wend, count(*), first(ca), last(cb) as c2 from t1 interval(10s); sql desc streamd10; - if $rows == 0 then return -1 endi sql_error create stream streamd11 into streamd11 as select _wstart, _wend, count(*), last(ca), last(ca) from t1 interval(10s); - sql create stream streamd12 into streamd12 as select _wstart, _wend, count(*), last(ca), last(cb) as c2 from t1 interval(10s); - sql desc streamd12; if $rows == 0 then return -1 endi - _OVER: system sh/exec.sh -n dnode1 -s stop -x SIGINT print =============== check