Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(stream): fix error in fill history and check quit status of vnode. #22818

Merged
merged 29 commits into from
Sep 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
c86eeb3
fix(stream): add task status check when waiting for the table create.
hjxilinx Sep 6, 2023
e607a0a
fix(stream): add some logs.
hjxilinx Sep 6, 2023
8783ebc
enh(stream): add some logs and record the submit package performance.
hjxilinx Sep 6, 2023
236b363
fix(stream): fix syntax error.
hjxilinx Sep 6, 2023
b382cba
fix(stream): fix syntax error.
hjxilinx Sep 6, 2023
926cc01
refactor: do some internal refactor.
hjxilinx Sep 6, 2023
a82027f
enh(stream): pack multi-datablock into one submit message.
hjxilinx Sep 6, 2023
5ba6a68
fix(stream): fix syntax error.
hjxilinx Sep 6, 2023
8edf86b
fix(stream): add some logs.
hjxilinx Sep 6, 2023
2b84e0b
refactor: check if the put data into queue is success or failed.
hjxilinx Sep 6, 2023
b240e91
enh(stream): add token bucket to limit the rate of sink operation.
hjxilinx Sep 7, 2023
956047a
fix(stream): count down the token in bucket.
hjxilinx Sep 7, 2023
e407d7d
fix(stream): update log.
hjxilinx Sep 7, 2023
dc82105
fix(stream): update the log.
hjxilinx Sep 7, 2023
82222f1
other: disable detail display.
hjxilinx Sep 7, 2023
55ac835
Merge branch 'main' into fix/3_liaohj
hjxilinx Sep 7, 2023
b317c8e
fix(stream): disable merge submit blocks.
hjxilinx Sep 8, 2023
328377c
refactor: add some logs.
hjxilinx Sep 8, 2023
35b97cb
fix(stream): set correct token in bucket.
hjxilinx Sep 8, 2023
bf79c2c
fix(stream): limit the sink rate
hjxilinx Sep 8, 2023
e4918e8
fix(stream): limit the sink rate
hjxilinx Sep 8, 2023
47fd144
other: add some logs.
hjxilinx Sep 9, 2023
6f5c855
fix(stream): fix msg lost bug.
hjxilinx Sep 9, 2023
aa8909b
other: add some logs.
hjxilinx Sep 9, 2023
9a30573
fix(stream): disable scan wal when halt is set.
hjxilinx Sep 9, 2023
85a5c45
fix(stream): limit the already processed data.
hjxilinx Sep 9, 2023
4ffb4a5
refactor: disable some logs.
hjxilinx Sep 10, 2023
10271ae
fix(stream): fix syntax error in unit test.
hjxilinx Sep 10, 2023
a932124
fix(stream): set the correct flag when there is one token existed.
hjxilinx Sep 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmake/cmake.define
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
cmake_minimum_required(VERSION 3.0)
set(CMAKE_VERBOSE_MAKEFILE ON)
set(CMAKE_VERBOSE_MAKEFILE FALSE)
set(TD_BUILD_TAOSA_INTERNAL FALSE)

#set output directory
Expand Down
20 changes: 18 additions & 2 deletions include/libs/stream/tstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ typedef struct SStreamTaskId {
typedef struct SCheckpointInfo {
int64_t checkpointId;
int64_t checkpointVer; // latest checkpointId version
int64_t currentVer; // current offset in WAL, not serialize it
int64_t nextProcessVer; // current offset in WAL, not serialize it
} SCheckpointInfo;

typedef struct SStreamStatus {
Expand Down Expand Up @@ -312,12 +312,27 @@ typedef struct STaskSchedInfo {
void* pTimer;
} STaskSchedInfo;

typedef struct SSinkTaskRecorder {
int64_t numOfSubmit;
int64_t numOfBlocks;
int64_t numOfRows;
} SSinkTaskRecorder;

typedef struct {
int64_t created;
int64_t init;
int64_t step1Start;
int64_t step2Start;
int64_t sinkStart;
} STaskTimestamp;

typedef struct STokenBucket {
int32_t capacity; // total capacity
int64_t fillTimestamp;// fill timestamp
int32_t numOfToken; // total available tokens
int32_t rate; // number of token per second
} STokenBucket;

struct SStreamTask {
int64_t ver;
SStreamTaskId id;
Expand Down Expand Up @@ -345,6 +360,8 @@ struct SStreamTask {
STaskSinkSma smaSink;
STaskSinkFetch fetchSink;
};
SSinkTaskRecorder sinkRecorder;
STokenBucket tokenBucket;

void* launchTaskTimer;
SMsgCb* pMsgCb; // msg handle
Expand Down Expand Up @@ -683,7 +700,6 @@ int32_t streamSourceScanHistoryData(SStreamTask* pTask);
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);

// agg level
int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask);
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq,
SRpcHandleInfo* pRpcInfo);
int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask);
Expand Down
2 changes: 1 addition & 1 deletion include/libs/wal/wal.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ int32_t walApplyVer(SWal *, int64_t ver);
// int32_t walDataCorrupted(SWal*);

// wal reader
SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond);
SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond, int64_t id);
void walCloseReader(SWalReader *pRead);
void walReadReset(SWalReader *pReader);
int32_t walReadVer(SWalReader *pRead, int64_t ver);
Expand Down
14 changes: 7 additions & 7 deletions source/dnode/snode/src/snode.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,18 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
// checkpoint ver is the kept version, handled data should be the next version.
if (pTask->chkInfo.checkpointId != 0) {
pTask->chkInfo.currentVer = pTask->chkInfo.checkpointVer + 1;
qInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer);
pTask->chkInfo.nextProcessVer = pTask->chkInfo.checkpointVer + 1;
qInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64, pTask->id.idStr,
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
} else {
if (pTask->chkInfo.currentVer == -1) {
pTask->chkInfo.currentVer = 0;
if (pTask->chkInfo.nextProcessVer == -1) {
pTask->chkInfo.nextProcessVer = 0;
}
}

qInfo("snode:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " currentVer:%" PRId64
qInfo("snode:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64
" child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms",
SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer,
SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus),
pTask->info.fillHistory, pTask->info.triggerParam);

Expand Down
2 changes: 1 addition & 1 deletion source/dnode/vnode/src/inc/vnodeInt.h
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, SRpcMsg* pMsg);
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec);
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
Expand Down
34 changes: 20 additions & 14 deletions source/dnode/vnode/src/tq/tq.c
Original file line number Diff line number Diff line change
Expand Up @@ -819,7 +819,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {

if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
SWalFilterCond cond = {.deleteMsg = 1}; // delete msg also extract from wal files
pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond);
pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond, pTask->id.taskId);
}

// reset the task status from unfinished transaction
Expand All @@ -834,14 +834,14 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {

// checkpoint ver is the kept version, handled data should be the next version.
if (pTask->chkInfo.checkpointId != 0) {
pTask->chkInfo.currentVer = pTask->chkInfo.checkpointVer + 1;
pTask->chkInfo.nextProcessVer = pTask->chkInfo.checkpointVer + 1;
tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer);
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
}

tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " currentVer:%" PRId64
" child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms",
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer,
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus),
pTask->info.fillHistory, pTask->info.triggerParam);

Expand Down Expand Up @@ -890,9 +890,10 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
return streamSendCheckRsp(pTq->pStreamMeta, &req, &rsp, &pMsg->info, taskId);
}

int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) {
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) {
char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
int32_t vgId = pTq->pStreamMeta->vgId;

int32_t code;
SStreamTaskCheckRsp rsp;
Expand All @@ -901,7 +902,9 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) {
tDecoderInit(&decoder, (uint8_t*)pReq, len);
code = tDecodeStreamTaskCheckRsp(&decoder, &rsp);
if (code < 0) {
terrno = TSDB_CODE_INVALID_MSG;
tDecoderClear(&decoder);
tqError("vgId:%d failed to parse check rsp msg, code:%s", vgId, tstrerror(terrno));
return -1;
}

Expand Down Expand Up @@ -1087,14 +1090,17 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
}

// now we can stop the stream task execution
streamTaskHalt(pStreamTask);

int64_t latestVer = 0;
taosThreadMutexLock(&pStreamTask->lock);
streamTaskHalt(pStreamTask);
tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr,
pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id);
latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
taosThreadMutexUnlock(&pStreamTask->lock);

// if it's an source task, extract the last version in wal.
pRange = &pTask->dataRange.range;
int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
done = streamHistoryTaskSetVerRangeStep2(pTask, latestVer);

if (done) {
Expand All @@ -1115,15 +1121,15 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {

int64_t dstVer = pTask->dataRange.range.minVer - 1;

pTask->chkInfo.currentVer = dstVer;
pTask->chkInfo.nextProcessVer = dstVer;
walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer);
tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE);

atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);

// set the fill-history task to be normal
if (pTask->info.fillHistory == 1) {
if (pTask->info.fillHistory == 1 && !streamTaskShouldStop(&pTask->status)) {
streamSetStatusNormal(pTask);
}

Expand All @@ -1148,7 +1154,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
tqDebug(
"s-task:%s scan-history in stream time window completed, now start to handle data from WAL, start "
"ver:%" PRId64 ", window:%" PRId64 " - %" PRId64,
id, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey);
id, pTask->chkInfo.nextProcessVer, pWindow->skey, pWindow->ekey);
}

code = streamTaskScanHistoryDataComplete(pTask);
Expand Down Expand Up @@ -1283,8 +1289,8 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
// even in halt status, the data in inputQ must be processed
int8_t st = pTask->status.taskStatus;
if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK) {
tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr,
pTask->chkInfo.currentVer);
tqDebug("vgId:%d s-task:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr,
pTask->chkInfo.nextProcessVer);
streamProcessRunReq(pTask);
} else {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
Expand Down Expand Up @@ -1423,10 +1429,10 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
", schedStatus:%d",
vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
} else { // from the previous paused version and go on
tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d",
vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
}

if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory &&
Expand Down
4 changes: 2 additions & 2 deletions source/dnode/vnode/src/tq/tqMeta.c
Original file line number Diff line number Diff line change
Expand Up @@ -312,14 +312,14 @@ static int buildHandle(STQ* pTq, STqHandle* handle){
return -1;
}
} else if (handle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
handle->pWalReader = walOpenReader(pVnode->pWal, NULL);
handle->pWalReader = walOpenReader(pVnode->pWal, NULL, 0);
handle->execHandle.pTqReader = tqReaderOpen(pVnode);

buildSnapContext(reader.vnode, reader.version, 0, handle->execHandle.subType, handle->fetchMeta,
(SSnapContext**)(&reader.sContext));
handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId);
} else if (handle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
handle->pWalReader = walOpenReader(pVnode->pWal, NULL);
handle->pWalReader = walOpenReader(pVnode->pWal, NULL, 0);

if(handle->execHandle.execTb.qmsg != NULL && strcmp(handle->execHandle.execTb.qmsg, "") != 0) {
if (nodesStringToNode(handle->execHandle.execTb.qmsg, &handle->execHandle.execTb.node) != 0) {
Expand Down
21 changes: 11 additions & 10 deletions source/dnode/vnode/src/tq/tqRead.c
Original file line number Diff line number Diff line change
Expand Up @@ -187,25 +187,26 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId) {
int32_t code = -1;
int32_t vgId = TD_VID(pTq->pVnode);
int64_t id = pHandle->pWalReader->readerId;

int64_t offset = *fetchOffset;
int64_t lastVer = walGetLastVer(pHandle->pWalReader->pWal);
int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal);
int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal);

wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 ", applied index:%" PRId64,
vgId, offset, lastVer, committedVer, appliedVer);
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 ", applied index:%" PRId64", 0x%"PRIx64,
vgId, offset, lastVer, committedVer, appliedVer, id);

while (offset <= appliedVer) {
if (walFetchHead(pHandle->pWalReader, offset) < 0) {
tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
", no more log to return, reqId:0x%" PRIx64,
pHandle->consumerId, pHandle->epoch, vgId, offset, reqId);
", no more log to return, reqId:0x%" PRIx64 " 0x%" PRIx64,
pHandle->consumerId, pHandle->epoch, vgId, offset, reqId, id);
goto END;
}

tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s, reqId:0x%" PRIx64, vgId,
pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId);
tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s, reqId:0x%" PRIx64" 0x%"PRIx64, vgId,
pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId, id);

if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
code = walFetchBody(pHandle->pWalReader);
Expand Down Expand Up @@ -250,7 +251,7 @@ STqReader* tqReaderOpen(SVnode* pVnode) {
return NULL;
}

pReader->pWalReader = walOpenReader(pVnode->pWal, NULL);
pReader->pWalReader = walOpenReader(pVnode->pWal, NULL, 0);
if (pReader->pWalReader == NULL) {
taosMemoryFree(pReader);
return NULL;
Expand Down Expand Up @@ -491,11 +492,11 @@ bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {

void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
if (ret != NULL) {
tqDebug("block found, ver:%" PRId64 ", uid:%" PRId64", %s", pReader->msg.ver, pSubmitTbData->uid, idstr);
tqDebug("block found, ver:%" PRId64 ", uid:%" PRId64 ", %s", pReader->msg.ver, pSubmitTbData->uid, idstr);
return true;
} else {
tqInfo("discard submit block, uid:%" PRId64 ", total queried tables:%d continue %s", pSubmitTbData->uid,
taosHashGetSize(pReader->tbIdHash), idstr);
tqDebug("discard submit block, uid:%" PRId64 ", total queried tables:%d continue %s", pSubmitTbData->uid,
taosHashGetSize(pReader->tbIdHash), idstr);
}

pReader->nextBlk++;
Expand Down