Skip to content

Commit

Permalink
fix:scan of fill history ended prematurely
Browse files Browse the repository at this point in the history
  • Loading branch information
54liuyao committed Mar 18, 2023
1 parent cd49862 commit 2cc3235
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 3 deletions.
3 changes: 2 additions & 1 deletion source/libs/executor/src/scanoperator.c
Expand Up @@ -1798,6 +1798,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
printDataBlock(pInfo->pUpdateRes, "recover update");
return pInfo->pUpdateRes;
} break;
case STREAM_SCAN_FROM_DATAREADER_RANGE: {
Expand All @@ -1808,7 +1809,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version);
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
checkUpdateData(pInfo, true, pSDB, false);
// printDataBlock(pSDB, "stream scan update");
printDataBlock(pSDB, "scan recover update");
calBlockTbName(pInfo, pSDB);
return pSDB;
}
Expand Down
10 changes: 8 additions & 2 deletions source/libs/stream/src/streamExec.c
Expand Up @@ -152,8 +152,14 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
if (batchCnt >= batchSz) break;
}
if (taosArrayGetSize(pRes) == 0) {
taosArrayDestroy(pRes);
break;
if (finished) {
taosArrayDestroy(pRes);
qDebug("task %d finish recover exec task ", pTask->taskId);
break;
} else {
qDebug("task %d continue recover exec task ", pTask->taskId);
continue;
}
}
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
if (qRes == NULL) {
Expand Down

0 comments on commit 2cc3235

Please sign in to comment.