Skip to content

Commit

Permalink
Merge pull request #23992 from taosdata/fix/TD-27778
Browse files Browse the repository at this point in the history
init delete mark
  • Loading branch information
dapan1121 committed Dec 11, 2023
2 parents 4b13f91 + 76a5560 commit 037d2ff
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 7 deletions.
3 changes: 3 additions & 0 deletions source/libs/executor/inc/executorInt.h
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,7 @@ typedef struct SStreamEventAggOperatorInfo {
bool isHistoryOp;
SArray* historyWins;
bool reCkBlock;
bool recvGetAll;
SSDataBlock* pCheckpointRes;
SFilterInfo* pStartCondInfo;
SFilterInfo* pEndCondInfo;
Expand Down Expand Up @@ -837,6 +838,8 @@ void compactTimeWindow(SExprSupp* pSup, SStreamAggSupporter* pAggSup, STimeW
int32_t releaseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI);
void resetWinRange(STimeWindow* winRange);
bool checkExpiredData(SStateStore* pAPI, SUpdateInfo* pUpdateInfo, STimeWindowAggSupp* pTwSup, uint64_t tableId, TSKEY ts);
int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval);
void resetUnCloseSessionWinInfo(SSHashObj* winMap);

int32_t encodeSSessionKey(void** buf, SSessionKey* key);
void* decodeSSessionKey(void* buf, SSessionKey* key);
Expand Down
8 changes: 8 additions & 0 deletions source/libs/executor/src/streameventwindowoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,11 @@ static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) {
return pInfo->pCheckpointRes;
}

if (pInfo->recvGetAll) {
pInfo->recvGetAll = false;
resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows);
}

setOperatorCompleted(pOperator);
return NULL;
}
Expand All @@ -510,6 +515,7 @@ static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) {
deleteSessionWinState(&pInfo->streamAggSup, pBlock, pInfo->pSeUpdated, pInfo->pSeDeleted);
continue;
} else if (pBlock->info.type == STREAM_GET_ALL) {
pInfo->recvGetAll = true;
getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pSeUpdated);
continue;
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
Expand Down Expand Up @@ -672,6 +678,7 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys
.calTrigger = pEventNode->window.triggerType,
.maxTs = INT64_MIN,
.minTs = INT64_MAX,
.deleteMark = getDeleteMark(&pEventNode->window, 0),
};

initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
Expand Down Expand Up @@ -720,6 +727,7 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys

pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
pInfo->reCkBlock = false;
pInfo->recvGetAll = false;

// for stream
void* buff = NULL;
Expand Down
14 changes: 7 additions & 7 deletions source/libs/executor/src/streamtimewindowoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -1345,12 +1345,12 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
return buildIntervalResult(pOperator);
}

static int64_t getDeleteMark(SIntervalPhysiNode* pIntervalPhyNode) {
if (pIntervalPhyNode->window.deleteMark <= 0) {
int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval) {
if (pWinPhyNode->deleteMark <= 0) {
return DEAULT_DELETE_MARK;
}
int64_t deleteMark = TMAX(pIntervalPhyNode->window.deleteMark, pIntervalPhyNode->window.watermark);
deleteMark = TMAX(deleteMark, pIntervalPhyNode->interval);
int64_t deleteMark = TMAX(pWinPhyNode->deleteMark, pWinPhyNode->watermark);
deleteMark = TMAX(deleteMark, interval);
return deleteMark;
}

Expand Down Expand Up @@ -1442,7 +1442,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
.calTrigger = pIntervalPhyNode->window.triggerType,
.maxTs = INT64_MIN,
.minTs = INT64_MAX,
.deleteMark = getDeleteMark(pIntervalPhyNode),
.deleteMark = getDeleteMark(&pIntervalPhyNode->window, pIntervalPhyNode->interval),
.deleteMarkSaved = 0,
.calTriggerSaved = 0,
};
Expand Down Expand Up @@ -2565,7 +2565,7 @@ void doStreamSessionSaveCheckpoint(SOperatorInfo* pOperator) {
taosMemoryFree(buf);
}

static void resetUnCloseSessionWinInfo(SSHashObj* winMap) {
void resetUnCloseSessionWinInfo(SSHashObj* winMap) {
void* pIte = NULL;
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(winMap, pIte, &iter)) != NULL) {
Expand Down Expand Up @@ -3963,7 +3963,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
.calTrigger = pIntervalPhyNode->window.triggerType,
.maxTs = INT64_MIN,
.minTs = INT64_MAX,
.deleteMark = getDeleteMark(pIntervalPhyNode)};
.deleteMark = getDeleteMark(&pIntervalPhyNode->window, pIntervalPhyNode->interval)};

ASSERTS(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay");

Expand Down

0 comments on commit 037d2ff

Please sign in to comment.