Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

check update data #22256

Merged
merged 1 commit into from Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/libs/executor/storageapi.h
Expand Up @@ -368,6 +368,8 @@ typedef struct SStateStore {
bool (*updateInfoIsUpdated)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts);
bool (*updateInfoIsTableInserted)(SUpdateInfo* pInfo, int64_t tbUid);
void (*updateInfoDestroy)(SUpdateInfo* pInfo);
void (*windowSBfDelete)(SUpdateInfo *pInfo, uint64_t count);
void (*windowSBfAdd)(SUpdateInfo *pInfo, uint64_t count);

SUpdateInfo* (*updateInfoInitP)(SInterval* pInterval, int64_t watermark);
void (*updateInfoAddCloseWindowSBF)(SUpdateInfo* pInfo);
Expand Down
2 changes: 2 additions & 0 deletions include/libs/stream/tstreamUpdate.h
Expand Up @@ -53,6 +53,8 @@ void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo);
void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo);
int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo);
int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo);
void windowSBfDelete(SUpdateInfo *pInfo, uint64_t count);
void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count);

#ifdef __cplusplus
}
Expand Down
2 changes: 2 additions & 0 deletions source/dnode/snode/src/snodeInitApi.c
Expand Up @@ -78,6 +78,8 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->updateInfoIsUpdated = updateInfoIsUpdated;
pStore->updateInfoIsTableInserted = updateInfoIsTableInserted;
pStore->updateInfoDestroy = updateInfoDestroy;
pStore->windowSBfDelete = windowSBfDelete;
pStore->windowSBfAdd = windowSBfAdd;

pStore->updateInfoInitP = updateInfoInitP;
pStore->updateInfoAddCloseWindowSBF = updateInfoAddCloseWindowSBF;
Expand Down
2 changes: 2 additions & 0 deletions source/dnode/vnode/src/vnd/vnodeInitApi.c
Expand Up @@ -180,6 +180,8 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->updateInfoIsUpdated = updateInfoIsUpdated;
pStore->updateInfoIsTableInserted = updateInfoIsTableInserted;
pStore->updateInfoDestroy = updateInfoDestroy;
pStore->windowSBfDelete = windowSBfDelete;
pStore->windowSBfAdd = windowSBfAdd;

pStore->updateInfoInitP = updateInfoInitP;
pStore->updateInfoAddCloseWindowSBF = updateInfoAddCloseWindowSBF;
Expand Down
4 changes: 3 additions & 1 deletion source/libs/executor/src/scanoperator.c
Expand Up @@ -2411,7 +2411,9 @@ void streamScanReloadState(SOperatorInfo* pOperator) {
pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo);
pInfo->pUpdateInfo = pUpInfo;
} else {
pInfo->pUpdateInfo->minTS = TMAX(pInfo->pUpdateInfo->minTS, pUpInfo->minTS);
pInfo->stateStore.windowSBfDelete(pInfo->pUpdateInfo, 1);
pInfo->stateStore.windowSBfAdd(pInfo->pUpdateInfo, 1);
ASSERT(pInfo->pUpdateInfo->minTS > pUpInfo->minTS);
pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pUpInfo->maxDataVersion);
SHashObj* curMap = pInfo->pUpdateInfo->pMap;
void *pIte = taosHashIterate(curMap, NULL);
Expand Down
4 changes: 2 additions & 2 deletions source/libs/stream/src/streamUpdate.c
Expand Up @@ -33,7 +33,7 @@

static int64_t adjustExpEntries(int64_t entries) { return TMIN(DEFAULT_EXPECTED_ENTRIES, entries); }

static void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) {
void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) {
if (pInfo->numSBFs < count) {
count = pInfo->numSBFs;
}
Expand All @@ -49,7 +49,7 @@ static void clearItemHelper(void *p) {
tScalableBfDestroy(*pBf);
}

static void windowSBfDelete(SUpdateInfo *pInfo, uint64_t count) {
void windowSBfDelete(SUpdateInfo *pInfo, uint64_t count) {
if (count < pInfo->numSBFs) {
for (uint64_t i = 0; i < count; ++i) {
SScalableBf *pTsSBFs = taosArrayGetP(pInfo->pTsSBFs, 0);
Expand Down