Skip to content

Commit

Permalink
fix:after restarting taosd, stream does not work.
Browse files Browse the repository at this point in the history
  • Loading branch information
54liuyao committed May 9, 2023
1 parent f4f3b88 commit 1fe16bd
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 0 deletions.
9 changes: 9 additions & 0 deletions source/dnode/vnode/src/tq/tqRestore.c
Expand Up @@ -109,6 +109,15 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
// seek the stored version and extract data from WAL
int32_t code = walReadSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer);
if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit
SWal *pWal = pTask->exec.pWalReader->pWal;
if (pTask->chkInfo.currentVer < pWal->vers.firstVer ) {
pTask->chkInfo.currentVer = pWal->vers.firstVer;
code = walReadSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer);
if (code != TSDB_CODE_SUCCESS) {
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
}
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
Expand Down
5 changes: 5 additions & 0 deletions source/libs/stream/src/streamMeta.c
Expand Up @@ -188,6 +188,11 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask*
return -1;
}

if (streamMetaCommit(pMeta) < 0) {
tFreeStreamTask(pTask);
return -1;
}

taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, POINTER_BYTES);
taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
return 0;
Expand Down

0 comments on commit 1fe16bd

Please sign in to comment.