Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(stream): continue check wal when meeting empty delete block msg. #22558

Merged
merged 2 commits into from
Aug 24, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
93 changes: 55 additions & 38 deletions source/dnode/vnode/src/tq/tqRead.c
Original file line number Diff line number Diff line change
Expand Up @@ -296,53 +296,70 @@ int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
}

int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, const char* id) {
int32_t code = walNextValidMsg(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}

int64_t ver = pReader->pHead->head.version;
if (ver > maxVer) {
tqDebug("maxVer in WAL:%"PRId64" reached current:%"PRId64", do not scan wal anymore, %s", maxVer, ver, id);
return TSDB_CODE_SUCCESS;
}
int32_t code = 0;

if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SSubmitReq2Msg));
int32_t len = pReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
while(1) {
code = walNextValidMsg(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}

void* data = taosMemoryMalloc(len);
if (data == NULL) {
// todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0);
return -1;
SWalCont* pCont = &pReader->pHead->head;
int64_t ver = pCont->version;
if (ver > maxVer) {
tqDebug("maxVer in WAL:%" PRId64 " reached current:%" PRId64 ", do not scan wal anymore, %s", maxVer, ver, id);
return TSDB_CODE_SUCCESS;
}

memcpy(data, pBody, len);
SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data};
if (pCont->msgType == TDMT_VND_SUBMIT) {
void* pBody = POINTER_SHIFT(pCont->body, sizeof(SSubmitReq2Msg));
int32_t len = pCont->bodyLen - sizeof(SSubmitReq2Msg);

*pItem = (SStreamQueueItem*)streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT);
if (*pItem == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("%s failed to create data submit for stream since out of memory", id);
return terrno;
}
} else if (pReader->pHead->head.msgType == TDMT_VND_DELETE) {
void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SMsgHead));
int32_t len = pReader->pHead->head.bodyLen - sizeof(SMsgHead);
void* data = taosMemoryMalloc(len);
if (data == NULL) {
// todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry
code = TSDB_CODE_OUT_OF_MEMORY;
terrno = code;

tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0);
return code;
}

memcpy(data, pBody, len);
SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data};

*pItem = (SStreamQueueItem*)streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT);
if (*pItem == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
terrno = code;
tqError("%s failed to create data submit for stream since out of memory", id);
return code;
}
} else if (pCont->msgType == TDMT_VND_DELETE) {
void* pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead));
int32_t len = pCont->bodyLen - sizeof(SMsgHead);

code = extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem);
if (code == TSDB_CODE_SUCCESS) {
if (*pItem == NULL) {
tqDebug("s-task:%s empty delete msg, discard it, len:%d, ver:%" PRId64, id, len, ver);
// we need to continue check next data in the wal files.
continue;
} else {
tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%" PRId64, id, len, ver);
}
} else {
terrno = code;
tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code));
return code;
}

code = extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem);
if (code != TSDB_CODE_SUCCESS) {
tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code));
} else {
tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%"PRId64, id, len, ver);
ASSERT(0);
}
} else {
ASSERT(0);
}

return 0;
return code;
}
}

// todo ignore the error in wal?
Expand Down