Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(stream): set the correct end key of delete block. #22251

Merged
merged 3 commits into from Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 1 addition & 5 deletions include/libs/executor/executor.h
Expand Up @@ -221,13 +221,9 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo);
int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo);
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo);
bool qStreamRecoverScanStep1Finished(qTaskInfo_t tinfo);
bool qStreamRecoverScanStep2Finished(qTaskInfo_t tinfo);
int32_t qStreamRecoverSetAllStepFinished(qTaskInfo_t tinfo);
int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo);
void resetTaskInfo(qTaskInfo_t tinfo);

void qResetStreamInfoTimeWindow(qTaskInfo_t tinfo);

int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo);
int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo);

Expand Down
2 changes: 0 additions & 2 deletions include/libs/stream/tstream.h
Expand Up @@ -607,8 +607,6 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated);
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer);

bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask);
bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask);
int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask);

// common
Expand Down
2 changes: 1 addition & 1 deletion source/dnode/vnode/src/tq/tq.c
Expand Up @@ -1296,7 +1296,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
"s-task:%s scan-history in stream time window completed, no related fill-history task, reset the time "
"window:%" PRId64 " - %" PRId64,
id, pWindow->skey, pWindow->ekey);
qResetStreamInfoTimeWindow(pTask->exec.pExecutor);
qStreamInfoResetTimewindowFilter(pTask->exec.pExecutor);
} else {
// when related fill-history task exists, update the fill-history time window only when the
// state transfer is completed.
Expand Down
4 changes: 2 additions & 2 deletions source/libs/executor/inc/querytask.h
Expand Up @@ -62,8 +62,8 @@ typedef struct {
SSchemaWrapper* schema;
char tbName[TSDB_TABLE_NAME_LEN]; // this is the current scan table: todo refactor
int8_t recoverStep;
bool recoverStep1Finished;
bool recoverStep2Finished;
// bool recoverStep1Finished;
// bool recoverStep2Finished;
int8_t recoverScanFinished;
SQueryTableDataCond tableCond;
SVersionRange fillHistoryVer;
Expand Down
37 changes: 7 additions & 30 deletions source/libs/executor/src/executor.c
Expand Up @@ -116,17 +116,6 @@ void resetTaskInfo(qTaskInfo_t tinfo) {
clearStreamBlock(pTaskInfo->pRoot);
}

void qResetStreamInfoTimeWindow(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tinfo;
if (pTaskInfo == NULL) {
return;
}

qDebug("%s set stream fill-history window:%" PRId64"-%"PRId64, GET_TASKID(pTaskInfo), INT64_MIN, INT64_MAX);
pTaskInfo->streamInfo.fillHistoryWindow.skey = INT64_MIN;
pTaskInfo->streamInfo.fillHistoryWindow.ekey = INT64_MAX;
}

static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, const char* id) {
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if (pOperator->numOfDownstream == 0) {
Expand Down Expand Up @@ -341,7 +330,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v
return NULL;
}

qResetStreamInfoTimeWindow(pTaskInfo);
qStreamInfoResetTimewindowFilter(pTaskInfo);
return pTaskInfo;
}

Expand Down Expand Up @@ -891,8 +880,6 @@ int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRan
pStreamInfo->fillHistoryVer = *pVerRange;
pStreamInfo->fillHistoryWindow = *pWindow;
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE1;
pStreamInfo->recoverStep1Finished = false;
pStreamInfo->recoverStep2Finished = false;

qDebug("%s step 1. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64
" - %" PRId64,
Expand All @@ -910,8 +897,6 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan
pStreamInfo->fillHistoryVer = *pVerRange;
pStreamInfo->fillHistoryWindow = *pWindow;
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE2;
pStreamInfo->recoverStep1Finished = true;
pStreamInfo->recoverStep2Finished = false;

qDebug("%s step 2. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64
", window:%" PRId64 " - %" PRId64,
Expand Down Expand Up @@ -1050,23 +1035,15 @@ bool qStreamRecoverScanFinished(qTaskInfo_t tinfo) {
return pTaskInfo->streamInfo.recoverScanFinished;
}

bool qStreamRecoverScanStep1Finished(qTaskInfo_t tinfo) {
int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return pTaskInfo->streamInfo.recoverStep1Finished;
}
STimeWindow* pWindow = &pTaskInfo->streamInfo.fillHistoryWindow;

bool qStreamRecoverScanStep2Finished(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return pTaskInfo->streamInfo.recoverStep2Finished;
}

int32_t qStreamRecoverSetAllStepFinished(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
pTaskInfo->streamInfo.recoverStep1Finished = true;
pTaskInfo->streamInfo.recoverStep2Finished = true;
qDebug("%s set remove scan-history filter window:%" PRId64 "-%" PRId64 ", new window:%" PRId64 "-%" PRId64,
GET_TASKID(pTaskInfo), pWindow->skey, pWindow->ekey, INT64_MIN, INT64_MAX);

// reset the time window
pTaskInfo->streamInfo.fillHistoryWindow.skey = INT64_MIN;
pWindow->skey = INT64_MIN;
pWindow->ekey = INT64_MAX;
return 0;
}

Expand Down
47 changes: 30 additions & 17 deletions source/libs/executor/src/scanoperator.c
Expand Up @@ -1590,38 +1590,51 @@ static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeW
}

// re-build the delete block, ONLY according to the split timestamp
static void rebuildDeleteBlockData(SSDataBlock* pBlock, int64_t skey, const char* id) {
if (skey == INT64_MIN) {
return;
}

static void rebuildDeleteBlockData(SSDataBlock* pBlock, STimeWindow* pWindow, const char* id) {
int32_t numOfRows = pBlock->info.rows;

bool* p = taosMemoryCalloc(numOfRows, sizeof(bool));
bool hasUnqualified = false;
bool* p = taosMemoryCalloc(numOfRows, sizeof(bool));
bool hasUnqualified = false;
int64_t skey = pWindow->skey;
int64_t ekey = pWindow->ekey;

SColumnInfoData* pSrcStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
uint64_t* tsStartCol = (uint64_t*)pSrcStartCol->pData;
SColumnInfoData* pSrcEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
uint64_t* tsEndCol = (uint64_t*)pSrcEndCol->pData;

for (int32_t i = 0; i < numOfRows; i++) {
if (tsStartCol[i] < skey) {
tsStartCol[i] = skey;
if (pWindow->skey != INT64_MIN) {
for (int32_t i = 0; i < numOfRows; i++) {
if (tsStartCol[i] < skey) {
tsStartCol[i] = skey;
}

if (tsEndCol[i] >= skey) {
p[i] = true;
} else { // this row should be removed, since it is not in this query time window, which is [skey, INT64_MAX]
hasUnqualified = true;
}
}
} else if (pWindow->ekey != INT64_MAX) {
for(int32_t i = 0; i < numOfRows; ++i) {
if (tsEndCol[i] > ekey) {
tsEndCol[i] = ekey;
}

if (tsEndCol[i] >= skey) {
p[i] = true;
} else { // this row should be removed, since it is not in this query time window, which is [skey, INT64_MAX]
hasUnqualified = true;
if (tsStartCol[i] <= ekey) {
p[i] = true;
} else {
hasUnqualified = true;
}
}
}

if (hasUnqualified) {
trimDataBlock(pBlock, pBlock->info.rows, p);
qDebug("%s re-build delete datablock, start key revised to:%"PRId64", rows:%"PRId64, id, skey, pBlock->info.rows);
} else {
qDebug("%s not update the delete block", id);
}

qDebug("%s re-build delete datablock, start key revised to:%"PRId64", rows:%"PRId64, id, skey, pBlock->info.rows);
taosMemoryFree(p);
}

Expand Down Expand Up @@ -2030,7 +2043,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
}

setBlockGroupIdByUid(pInfo, pDelBlock);
rebuildDeleteBlockData(pDelBlock, pStreamInfo->fillHistoryWindow.skey, id);
rebuildDeleteBlockData(pDelBlock, &pStreamInfo->fillHistoryWindow, id);
printDataBlock(pDelBlock, "stream scan delete recv filtered");
if (pDelBlock->info.rows == 0) {
if (pInfo->tqReader) {
Expand Down
132 changes: 34 additions & 98 deletions source/libs/stream/src/streamExec.c
Expand Up @@ -163,15 +163,14 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
}

int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
int32_t code = 0;

ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
void* exec = pTask->exec.pExecutor;
int32_t code = TSDB_CODE_SUCCESS;
void* exec = pTask->exec.pExecutor;
bool finished = false;

qSetStreamOpOpen(exec);
bool finished = false;

while (1) {
while (!finished) {
if (streamTaskShouldPause(&pTask->status)) {
double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0;
qDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el);
Expand All @@ -184,131 +183,68 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
return -1;
}

int32_t batchCnt = 0;
int32_t numOfBlocks = 0;
while (1) {
if (streamTaskShouldStop(&pTask->status)) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return 0;
}

if (streamTaskShouldPause(&pTask->status)) {
break;
}

SSDataBlock* output = NULL;
uint64_t ts = 0;
if (qExecTask(exec, &output, &ts) < 0) {
continue;
}

if (output == NULL) {
if (qStreamRecoverScanFinished(exec)) {
finished = true;
} else {
qSetStreamOpOpen(exec);
if (streamTaskShouldPause(&pTask->status)) {
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
if (qRes == NULL) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}

qRes->type = STREAM_INPUT__DATA_BLOCK;
qRes->blocks = pRes;
code = streamTaskOutputResultBlock(pTask, qRes);
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
taosFreeQitem(qRes);
return code;
}
return 0;
}
}
if (output == NULL && qStreamRecoverScanFinished(exec)) {
finished = true;
break;
} else {
if (output == NULL) {
ASSERT(0);
}
}

SSDataBlock block = {0};
assignOneDataBlock(&block, output);
block.info.childId = pTask->info.selfChildId;
taosArrayPush(pRes, &block);

batchCnt++;

qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d", pTask->id.idStr, batchCnt, batchSz);
if (batchCnt >= batchSz) {
numOfBlocks++;
qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d", pTask->id.idStr, numOfBlocks, batchSz);
if (numOfBlocks >= batchSz) {
break;
}
}

if (taosArrayGetSize(pRes) == 0) {
taosArrayDestroy(pRes);

if (finished) {
qDebug("s-task:%s finish recover exec task ", pTask->id.idStr);
break;
} else {
qDebug("s-task:%s continue recover exec task ", pTask->id.idStr);
continue;
if (taosArrayGetSize(pRes) > 0) {
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
if (qRes == NULL) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
}

SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
if (qRes == NULL) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}

qRes->type = STREAM_INPUT__DATA_BLOCK;
qRes->blocks = pRes;
code = streamTaskOutputResultBlock(pTask, qRes);
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
taosFreeQitem(qRes);
return code;
}
qRes->type = STREAM_INPUT__DATA_BLOCK;
qRes->blocks = pRes;

if (finished) {
break;
}
}
return 0;
}

#if 0
int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) {
// fetch all queue item, merge according to batchLimit
int32_t numOfItems = taosReadAllQitems(pTask->inputQueue1, pTask->inputQall);
if (numOfItems == 0) {
qDebug("task: %d, stream task exec over, queue empty", pTask->id.taskId);
return 0;
}
SStreamQueueItem* pMerged = NULL;
SStreamQueueItem* pItem = NULL;
taosGetQitem(pTask->inputQall, (void**)&pItem);
if (pItem == NULL) {
if (pMerged != NULL) {
// process merged item
code = streamTaskOutputResultBlock(pTask, qRes);
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
taosFreeQitem(qRes);
return code;
}
} else {
return 0;
taosArrayDestroy(pRes);
}
}

// if drop
if (pItem->type == STREAM_INPUT__DESTROY) {
// set status drop
return -1;
}

if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
ASSERT(((SStreamQueueItem*)pItem)->type == STREAM_INPUT__DATA_BLOCK);
streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pItem);
}

// exec impl

// output
// try dispatch
return 0;
}
#endif

int32_t updateCheckPointInfo(SStreamTask* pTask) {
int64_t ckId = 0;
Expand Down Expand Up @@ -404,7 +340,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {

// expand the query time window for stream scanner
pTimeWindow->skey = INT64_MIN;
qResetStreamInfoTimeWindow(pStreamTask->exec.pExecutor);
qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor);

// transfer the ownership of executor state
streamTaskReleaseState(pTask);
Expand Down