From 2cc32350e7a1910a36219c76fa6a09417dcea1b6 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Sat, 18 Mar 2023 12:29:40 +0800 Subject: [PATCH] fix:scan of fill history ended prematurely --- source/libs/executor/src/scanoperator.c | 3 ++- source/libs/stream/src/streamExec.c | 10 ++++++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 29a23cd90bbb..f76fcd6f0980 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1798,6 +1798,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes); prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; + printDataBlock(pInfo->pUpdateRes, "recover update"); return pInfo->pUpdateRes; } break; case STREAM_SCAN_FROM_DATAREADER_RANGE: { @@ -1808,7 +1809,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version); pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; checkUpdateData(pInfo, true, pSDB, false); - // printDataBlock(pSDB, "stream scan update"); + printDataBlock(pSDB, "scan recover update"); calBlockTbName(pInfo, pSDB); return pSDB; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index cb610ad6b536..25b265636586 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -152,8 +152,14 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { if (batchCnt >= batchSz) break; } if (taosArrayGetSize(pRes) == 0) { - taosArrayDestroy(pRes); - break; + if (finished) { + taosArrayDestroy(pRes); + qDebug("task %d finish recover exec task ", pTask->taskId); + break; + } else { + qDebug("task %d continue recover exec task ", pTask->taskId); + continue; + } } SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); if (qRes == NULL) {