Skip to content

Commit

Permalink
Merge pull request #22188 from taosdata/fix/3_liaohj
Browse files Browse the repository at this point in the history
fix(stream): set the correct step2 scan time window range.
  • Loading branch information
hjxilinx committed Jul 26, 2023
2 parents 0666284 + cdffabc commit 610f9d1
Show file tree
Hide file tree
Showing 15 changed files with 211 additions and 102 deletions.
8 changes: 5 additions & 3 deletions include/libs/stream/tstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -635,13 +635,15 @@ void streamMetaInit();
void streamMetaCleanup();
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId);
void streamMetaClose(SStreamMeta* streamMeta);

// save to b-tree meta store
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask);
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t checkpointVer, char* msg, int32_t msgLen);
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask);
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta); // todo remove it
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);

int32_t streamMetaBegin(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta);
Expand Down
11 changes: 9 additions & 2 deletions source/dnode/snode/src/snode.c
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {

// 2.save task
taosWLockLatch(&pSnode->pMeta->lock);
code = streamMetaAddDeployedTask(pSnode->pMeta, -1, pTask);
code = streamMetaRegisterTask(pSnode->pMeta, -1, pTask);
if (code < 0) {
taosWUnLockLatch(&pSnode->pMeta->lock);
return -1;
Expand All @@ -179,7 +179,14 @@ int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) {
SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg;
qDebug("snode:%d receive msg to drop stream task:0x%x", pSnode->pMeta->vgId, pReq->taskId);

streamMetaRemoveTask(pSnode->pMeta, pReq->taskId);
SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->taskId);
if (pTask == NULL) {
qError("vgId:%d failed to acquire s-task:0x%x when dropping it", pSnode->pMeta->vgId, pReq->taskId);
return 0;
}

streamMetaUnregisterTask(pSnode->pMeta, pReq->taskId);
streamMetaReleaseTask(pSnode->pMeta, pTask);
return 0;
}

Expand Down
1 change: 0 additions & 1 deletion source/dnode/vnode/src/inc/tq.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq);

// tq util
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock);
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem);
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
int32_t type, int64_t sver, int64_t ever);
Expand Down
22 changes: 16 additions & 6 deletions source/dnode/vnode/src/tq/tq.c
Original file line number Diff line number Diff line change
Expand Up @@ -1041,12 +1041,13 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
// 2.save task, use the newest commit version as the initial start version of stream task.
int32_t taskId = 0;
taosWLockLatch(&pStreamMeta->lock);
code = streamMetaAddDeployedTask(pStreamMeta, sversion, pTask);
code = streamMetaRegisterTask(pStreamMeta, sversion, pTask);

taskId = pTask->id.taskId;
int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta);
if (code < 0) {
tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, numOfTasks);
tFreeStreamTask(pTask);
taosWUnLockLatch(&pStreamMeta->lock);
return -1;
}
Expand Down Expand Up @@ -1136,7 +1137,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
pTask->streamTaskId.taskId, pTask->id.idStr);

pTask->status.taskStatus = TASK_STATUS__DROPPING;
tqDebug("s-task:%s scan-history-task set status to be dropping", id);
tqDebug("s-task:%s fill-history task set status to be dropping", id);

streamMetaSaveTask(pMeta, pTask);
streamMetaReleaseTask(pMeta, pTask);
Expand Down Expand Up @@ -1166,12 +1167,14 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
}

if (!streamTaskRecoverScanStep1Finished(pTask)) {
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " do secondary scan-history-data after halt the related stream task:%s",
id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, id);
STimeWindow* pWindow = &pTask->dataRange.window;
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
", do secondary scan-history data after halt the related stream task:%s",
id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, id);
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);

st = taosGetTimestampMs();
streamSetParamForStreamScannerStep2(pTask, pRange, &pTask->dataRange.window);
streamSetParamForStreamScannerStep2(pTask, pRange, pWindow);
}

if (!streamTaskRecoverScanStep2Finished(pTask)) {
Expand Down Expand Up @@ -1259,6 +1262,7 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t remain = streamAlignTransferState(pTask);
if (remain > 0) {
tqDebug("s-task:%s receive upstream transfer state msg, remain:%d", pTask->id.idStr, remain);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
}

Expand Down Expand Up @@ -1466,8 +1470,14 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
tqDebug("vgId:%d receive msg to drop stream task:0x%x", TD_VID(pTq->pVnode), pReq->taskId);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
if (pTask == NULL) {
tqError("vgId:%d failed to acquire s-task:0x%x when dropping it", pTq->pStreamMeta->vgId, pReq->taskId);
return 0;
}

streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
streamMetaUnregisterTask(pTq->pStreamMeta, pReq->taskId);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
}

Expand Down
1 change: 1 addition & 0 deletions source/dnode/vnode/src/tq/tqRead.c
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, const char* id) {
int32_t len = pReader->pHead->head.bodyLen - sizeof(SMsgHead);

extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem);
tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%"PRId64, id, len, ver);
} else {
ASSERT(0);
}
Expand Down
15 changes: 0 additions & 15 deletions source/dnode/vnode/src/tq/tqUtil.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,6 @@
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
const SMqMetaRsp* pRsp, int32_t vgId);

int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem) {
int32_t code = tAppendDataToInputQueue(pTask, pQueueItem);
if (code < 0) {
tqError("s-task:%s failed to put into queue, too many", pTask->id.idStr);
return -1;
}

if (streamSchedExec(pTask) < 0) {
tqError("stream task:%d failed to be launched, code:%s", pTask->id.taskId, tstrerror(terrno));
return -1;
}

return TSDB_CODE_SUCCESS;
}

int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq) {
pRsp->reqOffset = pReq->reqOffset;

Expand Down
10 changes: 10 additions & 0 deletions source/dnode/vnode/src/tsdb/tsdbRead.c
Original file line number Diff line number Diff line change
Expand Up @@ -3179,6 +3179,16 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {

// load the last data block of current table
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
if (pScanInfo == NULL) {
tsdbError("table Iter is null, invalid pScanInfo, try next table %s", pReader->idStr);
bool hasNexTable = moveToNextTable(pUidList, pStatus);
if (!hasNexTable) {
return TSDB_CODE_SUCCESS;
}

continue;
}

if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pScanInfo->uid, sizeof(pScanInfo->uid))) {
// reset the index in last block when handing a new file
doCleanupTableScanInfo(pScanInfo);
Expand Down
105 changes: 74 additions & 31 deletions source/libs/executor/src/scanoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -1774,6 +1774,67 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo)
}
}

static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeWindow* pWindow, const char* id) {
if (pWindow->skey != INT64_MIN) {
qDebug("%s filter for additional history window, skey:%"PRId64, id, pWindow->skey);

bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool));
bool hasUnqualified = false;

SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsIndex);
for(int32_t i = 0; i < pBlock->info.rows; ++i) {
int64_t* ts = (int64_t*) colDataGetData(pCol, i);
p[i] = (*ts >= pWindow->skey);

if (!p[i]) {
hasUnqualified = true;
}
}

if (hasUnqualified) {
trimDataBlock(pBlock, pBlock->info.rows, p);
}

taosMemoryFree(p);
}
}

// re-build the delete block, ONLY according to the split timestamp
static void rebuildDeleteBlockData(SSDataBlock* pBlock, int64_t skey, const char* id) {
if (skey == INT64_MIN) {
return;
}

int32_t numOfRows = pBlock->info.rows;

bool* p = taosMemoryCalloc(numOfRows, sizeof(bool));
bool hasUnqualified = false;

SColumnInfoData* pSrcStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
uint64_t* tsStartCol = (uint64_t*)pSrcStartCol->pData;
SColumnInfoData* pSrcEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
uint64_t* tsEndCol = (uint64_t*)pSrcEndCol->pData;

for (int32_t i = 0; i < numOfRows; i++) {
if (tsStartCol[i] < skey) {
tsStartCol[i] = skey;
}

if (tsEndCol[i] >= skey) {
p[i] = true;
} else { // this row should be removed, since it is not in this query time window, which is [skey, INT64_MAX]
hasUnqualified = true;
}
}

if (hasUnqualified) {
trimDataBlock(pBlock, pBlock->info.rows, p);
}

qDebug("%s re-build delete datablock, start key revised to:%"PRId64", rows:%"PRId64, id, skey, pBlock->info.rows);
taosMemoryFree(p);
}

static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
// NOTE: this operator does never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
Expand All @@ -1800,8 +1861,10 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
} else {
pTSInfo->base.cond.startVersion = pStreamInfo->fillHistoryVer.minVer;
pTSInfo->base.cond.endVersion = pStreamInfo->fillHistoryVer.maxVer;
qDebug("stream recover step2, verRange:%" PRId64 " - %" PRId64", %s", pTSInfo->base.cond.startVersion,
pTSInfo->base.cond.endVersion, id);
pTSInfo->base.cond.twindows = pStreamInfo->fillHistoryWindow;
qDebug("stream recover step2, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 "-%" PRId64 ", %s",
pTSInfo->base.cond.startVersion, pTSInfo->base.cond.endVersion, pTSInfo->base.cond.twindows.skey,
pTSInfo->base.cond.twindows.ekey, id);
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__SCAN2;
}

Expand Down Expand Up @@ -1920,6 +1983,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
if (pInfo->pUpdateInfo) {
pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version);
}

blockDataUpdateTsWindow(pBlock, 0);
switch (pBlock->info.type) {
case STREAM_NORMAL:
Expand All @@ -1942,14 +2006,17 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
} else {
pDelBlock = pBlock;
}

setBlockGroupIdByUid(pInfo, pDelBlock);
rebuildDeleteBlockData(pDelBlock, pStreamInfo->fillHistoryWindow.skey, id);
printDataBlock(pDelBlock, "stream scan delete recv filtered");
if (pDelBlock->info.rows == 0) {
if (pInfo->tqReader) {
blockDataDestroy(pDelBlock);
}
goto FETCH_NEXT_BLOCK;
}

if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) {
generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes);
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
Expand Down Expand Up @@ -2091,39 +2158,15 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
doCheckUpdate(pInfo, pBlockInfo->window.ekey, pBlock);
doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);

{ // do additional time window filter
STimeWindow* pWindow = &pStreamInfo->fillHistoryWindow;

if (pWindow->skey != INT64_MIN) {
qDebug("%s filter for additional history window, skey:%"PRId64, id, pWindow->skey);

bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool));
bool hasUnqualified = false;

SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
for(int32_t i = 0; i < pBlock->info.rows; ++i) {
int64_t* ts = (int64_t*) colDataGetData(pCol, i);
p[i] = (*ts >= pWindow->skey);

if (!p[i]) {
hasUnqualified = true;
}
}

if (hasUnqualified) {
trimDataBlock(pBlock, pBlock->info.rows, p);
}

taosMemoryFree(p);
}
}
// apply additional time window filter
doBlockDataWindowFilter(pBlock, pInfo->primaryTsIndex, &pStreamInfo->fillHistoryWindow, id);

pBlock->info.dataLoad = 1;
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);

qDebug("%s %" PRId64 " rows in datablock, update res:%" PRId64, id, pBlockInfo->rows,
pInfo->pUpdateDataRes->info.rows);
if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
int64_t numOfUpdateRes = pInfo->pUpdateDataRes->info.rows;
qDebug("%s %" PRId64 " rows in datablock, update res:%" PRId64, id, pBlockInfo->rows, numOfUpdateRes);
if (pBlockInfo->rows > 0 || numOfUpdateRes > 0) {
break;
}
}
Expand Down
24 changes: 12 additions & 12 deletions source/libs/stream/src/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ static void streamSchedByTimer(void* param, void* tmrId) {
SStreamTask* pTask = (void*)param;

int8_t status = atomic_load_8(&pTask->triggerStatus);
qDebug("s-task:%s in scheduler timer, trigger status:%d", pTask->id.idStr, status);
qDebug("s-task:%s in scheduler, trigger status:%d, next:%dms", pTask->id.idStr, status, (int32_t)pTask->triggerParam);

if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
streamMetaReleaseTask(NULL, pTask);
Expand All @@ -74,23 +74,22 @@ static void streamSchedByTimer(void* param, void* tmrId) {
}

if (status == TASK_TRIGGER_STATUS__ACTIVE) {
SStreamTrigger* trigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0);
if (trigger == NULL) {
SStreamTrigger* pTrigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0);
if (pTrigger == NULL) {
return;
}

trigger->type = STREAM_INPUT__GET_RES;
trigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (trigger->pBlock == NULL) {
taosFreeQitem(trigger);
pTrigger->type = STREAM_INPUT__GET_RES;
pTrigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (pTrigger->pBlock == NULL) {
taosFreeQitem(pTrigger);
return;
}

trigger->pBlock->info.type = STREAM_GET_ALL;
atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE);

if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)trigger) < 0) {
taosFreeQitem(trigger);
pTrigger->pBlock->info.type = STREAM_GET_ALL;
if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pTrigger) < 0) {
taosFreeQitem(pTrigger);
taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->schedTimer);
return;
}
Expand All @@ -102,7 +101,7 @@ static void streamSchedByTimer(void* param, void* tmrId) {
}

int32_t streamSetupScheduleTrigger(SStreamTask* pTask) {
if (pTask->triggerParam != 0) {
if (pTask->triggerParam != 0 && pTask->info.fillHistory == 0) {
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
ASSERT(ref == 2 && pTask->schedTimer == NULL);

Expand Down Expand Up @@ -399,6 +398,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {

if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {
atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
qDebug("s-task:%s new data arrived, active the trigger, trigerStatus:%d", pTask->id.idStr, pTask->triggerStatus);
}

return 0;
Expand Down

0 comments on commit 610f9d1

Please sign in to comment.