Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix:add log & init to 0 specific #21876

Merged
merged 11 commits into from
Jun 29, 2023
5 changes: 3 additions & 2 deletions source/client/src/clientTmq.c
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
offRows->offset = pVg->offsetInfo.currentOffset;
char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset);
tscInfo("report offset: vgId:%d, offset:%s, rows:%"PRId64, offRows->vgId, buf, offRows->rows);
tscInfo("consumer:0x%" PRIx64 ",report offset: vgId:%d, offset:%s, rows:%"PRId64, tmq->consumerId, offRows->vgId, buf, offRows->rows);
}
}
// tmq->needReportOffsetRows = false;
Expand Down Expand Up @@ -1489,7 +1489,8 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
makeTopicVgroupKey(vgKey, pTopic->topicName, pVgEp->vgId);
SVgroupSaveInfo* pInfo = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey));

STqOffsetVal offsetNew = {.type = tmq->resetOffsetCfg};
STqOffsetVal offsetNew = {0};
offsetNew.type = tmq->resetOffsetCfg;

SMqClientVg clientVg = {
.pollCnt = 0,
Expand Down
2 changes: 1 addition & 1 deletion source/dnode/mnode/impl/src/mndConsumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -1190,7 +1190,7 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
tFormatOffset(buf, TSDB_OFFSET_LEN, &pVal);

char parasStr[64 + TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0};
sprintf(varDataVal(parasStr), "tbname:%d,commit:%d,interval:%d,reset:%s", pConsumer->withTbName, pConsumer->autoCommit, pConsumer->autoCommitInterval, buf);
sprintf(varDataVal(parasStr), "tbname:%d,commit:%d,interval:%dms,reset:%s", pConsumer->withTbName, pConsumer->autoCommit, pConsumer->autoCommitInterval, buf);
varDataSetLen(parasStr, strlen(varDataVal(parasStr)));

pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
Expand Down
79 changes: 47 additions & 32 deletions source/dnode/mnode/impl/src/mndSubscribe.c
Original file line number Diff line number Diff line change
Expand Up @@ -468,40 +468,51 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
}
}

if(taosHashGetSize(pOutput->pSub->consumerHash) == 0) { // if all consumer is removed
// if(taosHashGetSize(pOutput->pSub->consumerHash) == 0) { // if all consumer is removed
SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key); // put all offset rows
if (pSub) {
taosRLockLatch(&pSub->lock);
bool init = false;
if (pOutput->pSub->offsetRows == NULL) {
pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows));
init = true;
}
pIter = NULL;
while (1) {
pIter = taosHashIterate(pSub->consumerHash, pIter);
if (pIter == NULL) break;
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
if (init) {
taosArrayAddAll(pOutput->pSub->offsetRows, pConsumerEp->offsetRows);
// mDebug("pSub->offsetRows is init");
} else {
for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) {
OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j);
for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) {
OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i);
if (d1->vgId == d2->vgId) {
d2->rows += d1->rows;
d2->offset = d1->offset;
// mDebug("pSub->offsetRows add vgId:%d, after:%"PRId64", before:%"PRId64, d2->vgId, d2->rows, d1->rows);
}
SMqConsumerEp *pConsumerEpNew = taosHashGet(pOutput->pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t));

for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) {
OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j);
bool jump = false;
for (int i = 0; pConsumerEpNew && i < taosArrayGetSize(pConsumerEpNew->vgs); i++){
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEpNew->vgs, i);
if(pVgEp->vgId == d1->vgId){
jump = true;
mInfo("pSub->offsetRows jump, because consumer id:%"PRIx64 " and vgId:%d not change", pConsumerEp->consumerId, pVgEp->vgId);
break;
}
}
if(jump) continue;
bool find = false;
for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) {
OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i);
if (d1->vgId == d2->vgId) {
d2->rows += d1->rows;
d2->offset = d1->offset;
find = true;
mInfo("pSub->offsetRows add vgId:%d, after:%"PRId64", before:%"PRId64, d2->vgId, d2->rows, d1->rows);
break;
}
}
if(!find){
taosArrayPush(pOutput->pSub->offsetRows, d1);
}
}
}
taosRUnLockLatch(&pSub->lock);
mndReleaseSubscribe(pMnode, pSub);
}
// }
}

// 8. generate logs
Expand Down Expand Up @@ -771,8 +782,10 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
}

static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
SMDropCgroupReq dropReq = {0};
SMnode *pMnode = pMsg->info.node;
SMDropCgroupReq dropReq = {0};
STrans *pTrans = NULL;
int32_t code = TSDB_CODE_ACTION_IN_PROGRESS;

if (tDeserializeSMDropCgroupReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
Expand All @@ -791,38 +804,40 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
}
}

taosWLockLatch(&pSub->lock);
if (taosHashGetSize(pSub->consumerHash) != 0) {
terrno = TSDB_CODE_MND_CGROUP_USED;
mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
mndReleaseSubscribe(pMnode, pSub);
return -1;
code = -1;
goto end;
}

STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "drop-cgroup");
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "drop-cgroup");
if (pTrans == NULL) {
mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
mndReleaseSubscribe(pMnode, pSub);
mndTransDrop(pTrans);
return -1;
code = -1;
goto end;
}

mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);

if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
mError("cgroup %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
mndReleaseSubscribe(pMnode, pSub);
mndTransDrop(pTrans);
return -1;
code = -1;
goto end;
}

if (mndTransPrepare(pMnode, pTrans) < 0) {
mndReleaseSubscribe(pMnode, pSub);
mndTransDrop(pTrans);
return -1;
code = -1;
goto end;
}

end:
taosWUnLockLatch(&pSub->lock);
mndReleaseSubscribe(pMnode, pSub);
mndTransDrop(pTrans);

return TSDB_CODE_ACTION_IN_PROGRESS;
return code;
}

void mndCleanupSubscribe(SMnode *pMnode) {}
Expand Down
10 changes: 5 additions & 5 deletions source/dnode/vnode/src/tq/tqRead.c
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id) {

int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
while (pReader->nextBlk < numOfBlocks) {
tqDebug("tq reader next data block %d/%d, len:%d %" PRId64 " %d", pReader->nextBlk,
tqTrace("tq reader next data block %d/%d, len:%d %" PRId64 " %d", pReader->nextBlk,
numOfBlocks, pReader->msg.msgLen, pReader->msg.ver, pReader->nextBlk);

SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
Expand All @@ -403,7 +403,7 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id) {

void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
if (ret != NULL) {
tqDebug("tq reader return submit block, uid:%" PRId64 ", ver:%" PRId64, pSubmitTbData->uid, pReader->msg.ver);
tqTrace("tq reader return submit block, uid:%" PRId64 ", ver:%" PRId64, pSubmitTbData->uid, pReader->msg.ver);

SSDataBlock* pRes = NULL;
int32_t code = tqRetrieveDataBlock(pReader, &pRes, NULL);
Expand All @@ -412,11 +412,11 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id) {
}
} else {
pReader->nextBlk += 1;
tqDebug("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid);
tqTrace("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid);
}
}

qDebug("stream scan return empty, all %d submit blocks consumed, %s", numOfBlocks, id);
qTrace("stream scan return empty, all %d submit blocks consumed, %s", numOfBlocks, id);
tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);

pReader->msg.msgStr = NULL;
Expand Down Expand Up @@ -604,7 +604,7 @@ static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SCol
}

int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* id) {
tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
tqTrace("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++);

SSDataBlock* pBlock = pReader->pResBlock;
Expand Down
10 changes: 10 additions & 0 deletions source/libs/executor/src/executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -1078,6 +1078,16 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
SOperatorInfo* pOperator = pTaskInfo->pRoot;
const char* id = GET_TASKID(pTaskInfo);

if(subType == TOPIC_SUB_TYPE__COLUMN && pOffset->type == TMQ_OFFSET__LOG){
pOperator = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id);
if (pOperator == NULL) {
return -1;
}
SStreamScanInfo* pInfo = pOperator->info;
SStoreTqReader* pReaderAPI = &pTaskInfo->storageAPI.tqReaderFn;
SWalReader* pWalReader = pReaderAPI->tqReaderGetWalReader(pInfo->tqReader);
walReaderVerifyOffset(pWalReader, pOffset);
}
// if pOffset equal to current offset, means continue consume
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.currentOffset)) {
return 0;
Expand Down
2 changes: 1 addition & 1 deletion tests/parallel_test/cases.task
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,7 @@
,,y,script,./test.sh -f tsim/user/basic.sim
,,y,script,./test.sh -f tsim/user/password.sim
,,y,script,./test.sh -f tsim/user/privilege_db.sim
,,y,script,./test.sh -f tsim/user/privilege_sysinfo.sim
#,,y,script,./test.sh -f tsim/user/privilege_sysinfo.sim
,,y,script,./test.sh -f tsim/user/privilege_topic.sim
,,y,script,./test.sh -f tsim/user/privilege_table.sim
,,y,script,./test.sh -f tsim/db/alter_option.sim
Expand Down
2 changes: 1 addition & 1 deletion tests/system-test/7-tmq/checkOffsetRowParams.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ def tmqCase1(self, cfgPath, buildPath):

tdSql.query("show consumers")
tdSql.checkRows(1)
tdSql.checkData(0, 8, "tbname:1,commit:1,interval:2000,reset:earliest")
tdSql.checkData(0, 8, "tbname:1,commit:1,interval:2000ms,reset:earliest")

time.sleep(2)
tdLog.info("start insert data")
Expand Down
2 changes: 1 addition & 1 deletion tests/system-test/7-tmq/tmqParamsTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def tmqParamsTest(self):
consumer_commit = 1 if consumer_dict["enable.auto.commit"] == "true" else 0
consumer_tbname = 1 if consumer_dict["msg.with.table.name"] == "true" else 0
consumer_ret = "earliest" if offset_value == "" else offset_value
expected_parameters=f'tbname:{consumer_tbname},commit:{consumer_commit},interval:{paraDict["auto_commit_interval"]},reset:{consumer_ret}'
expected_parameters=f'tbname:{consumer_tbname},commit:{consumer_commit},interval:{paraDict["auto_commit_interval"]}ms,reset:{consumer_ret}'
if len(offset_value) == 0:
del consumer_dict["auto.offset.reset"]
consumer = Consumer(consumer_dict)
Expand Down