Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(stream): set the correct start version for the step 2 stage in fill history. #22826

Merged
merged 7 commits into from
Sep 11, 2023
Merged
16 changes: 5 additions & 11 deletions source/common/src/tdatablock.c
Original file line number Diff line number Diff line change
Expand Up @@ -2139,22 +2139,16 @@ int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, cha
return TSDB_CODE_FAILED;
}

SSmlKv pTag = {.key = "group_id",
.keyLen = sizeof("group_id") - 1,
.type = TSDB_DATA_TYPE_UBIGINT,
.u = groupId,
.length = sizeof(uint64_t)};
int8_t type = TSDB_DATA_TYPE_UBIGINT;
const char* name = "group_id";
int32_t len = strlen(name);
SSmlKv pTag = { .key = name, .keyLen = len, .type = type, .u = groupId, .length = sizeof(uint64_t)};
taosArrayPush(tags, &pTag);

RandTableName rname = {
.tags = tags,
.stbFullName = stbFullName,
.stbFullNameLen = strlen(stbFullName),
.ctbShortName = cname,
};
.tags = tags, .stbFullName = stbFullName, .stbFullNameLen = strlen(stbFullName), .ctbShortName = cname};

buildChildTableName(&rname);

taosArrayDestroy(tags);

if ((rname.ctbShortName && rname.ctbShortName[0]) == 0) {
Expand Down
8 changes: 5 additions & 3 deletions source/dnode/vnode/src/tq/tq.c
Original file line number Diff line number Diff line change
Expand Up @@ -839,7 +839,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
}

tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " currentVer:%" PRId64
tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64
" child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms",
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus),
Expand Down Expand Up @@ -983,6 +983,9 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms

bool restored = pTq->pVnode->restored;
if (p != NULL && restored) {
p->tsInfo.init = taosGetTimestampMs();
tqDebug("s-task:%s set the init ts:%"PRId64, p->id.idStr, p->tsInfo.init);

streamTaskCheckDownstream(p);
} else if (!restored) {
tqWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId);
Expand Down Expand Up @@ -1119,8 +1122,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
pTask->tsInfo.step2Start = taosGetTimestampMs();
streamSetParamForStreamScannerStep2(pTask, pRange, pWindow);

int64_t dstVer = pTask->dataRange.range.minVer - 1;

int64_t dstVer = pTask->dataRange.range.minVer;
pTask->chkInfo.nextProcessVer = dstVer;
walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer);
tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
Expand Down